Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.1.0"

[features]
default = []
sink-redis-streams = ["dep:redis"]
sink-redis-strings = ["dep:redis"]
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]

Expand Down Expand Up @@ -48,7 +49,11 @@ etl-postgres = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16a
uuid = { version = "1.19.0", default-features = false, features = ["v4"] }

# Optional sink dependencies.
redis = { version = "0.27", default-features = false, features = ["tokio-comp", "connection-manager"], optional = true }
redis = { version = "0.27", default-features = false, features = [
"tokio-comp",
"connection-manager",
"streams",
], optional = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] }
Expand Down
8 changes: 8 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use serde::Deserialize;
#[cfg(feature = "sink-redis-strings")]
use crate::sink::redis_strings::RedisStringsSinkConfig;

#[cfg(feature = "sink-redis-streams")]
use crate::sink::redis_streams::RedisStreamsSinkConfig;

/// Sink destination configuration.
///
/// Determines where replicated events are sent.
Expand All @@ -16,4 +19,9 @@ pub enum SinkConfig {
#[cfg(feature = "sink-redis-strings")]
#[serde(rename = "redis-strings")]
RedisStrings(RedisStringsSinkConfig),

/// Redis streams sink for append-only log storage.
#[cfg(feature = "sink-redis-streams")]
#[serde(rename = "redis-streams")]
RedisStreams(RedisStreamsSinkConfig),
}
18 changes: 18 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
})?;
AnySink::RedisStrings(s)
}

#[cfg(feature = "sink-redis-streams")]
SinkConfig::RedisStreams(cfg) => {
use crate::sink::redis_streams::RedisStreamsSink;
let s = RedisStreamsSink::new(cfg.clone()).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::InvalidData,
"Failed to create Redis Streams sink",
e.to_string()
)
})?;
AnySink::RedisStreams(s)
}
};

// Create PgStream as an ETL destination
Expand Down Expand Up @@ -140,6 +153,11 @@ fn log_sink_config(config: &SinkConfig) {
SinkConfig::RedisStrings(_cfg) => {
debug!("using redis-strings sink");
}

#[cfg(feature = "sink-redis-streams")]
SinkConfig::RedisStreams(_cfg) => {
debug!("using redis-streams sink");
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ pub mod memory;
#[cfg(feature = "sink-redis-strings")]
pub mod redis_strings;

#[cfg(feature = "sink-redis-streams")]
pub mod redis_streams;

pub use base::Sink;

use etl::error::EtlResult;
Expand All @@ -12,6 +15,9 @@ use memory::MemorySink;
#[cfg(feature = "sink-redis-strings")]
use redis_strings::RedisStringsSink;

#[cfg(feature = "sink-redis-streams")]
use redis_streams::RedisStreamsSink;

use crate::types::TriggeredEvent;

/// Wrapper enum for all supported sink types.
Expand All @@ -26,6 +32,10 @@ pub enum AnySink {
/// Redis strings sink for key-value storage.
#[cfg(feature = "sink-redis-strings")]
RedisStrings(RedisStringsSink),

/// Redis streams sink for append-only log storage.
#[cfg(feature = "sink-redis-streams")]
RedisStreams(RedisStreamsSink),
}

impl Sink for AnySink {
Expand All @@ -39,6 +49,9 @@ impl Sink for AnySink {

#[cfg(feature = "sink-redis-strings")]
AnySink::RedisStrings(sink) => sink.publish_events(events).await,

#[cfg(feature = "sink-redis-streams")]
AnySink::RedisStreams(sink) => sink.publish_events(events).await,
}
}
}
188 changes: 188 additions & 0 deletions src/sink/redis_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
//! Redis Streams sink for publishing events to a Redis stream.
//!
//! Appends each event's payload to a Redis stream determined by:
//! 1. `stream` key in event metadata (from subscription's metadata/metadata_extensions)
//! 2. Fallback to `stream_name` in sink config
//!
//! # Message Format
//!
//! Each event is added to the stream using `XADD` with:
//! - Auto-generated stream entry ID (`*`)
//! - Single field: `payload` containing the JSON-serialized event payload
//!
//! Example entry when read with `XREAD`:
//! ```text
//! 1704067200000-0 payload {"id": 1, "name": "test"}
//! ```
//!
//! # Dynamic Routing
//!
//! The target stream can be configured per-event using metadata_extensions:
//!
//! ```sql
//! metadata_extensions = '[
//! {"json_path": "stream", "expression": "''events:'' || table_name"}
//! ]'
//! ```

use etl::error::EtlResult;
use redis::aio::ConnectionManager;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::sink::Sink;
use crate::types::TriggeredEvent;

/// Configuration for the Redis Streams sink.
///
/// This intentionally does not implement [`Serialize`] to avoid accidentally
/// leaking secrets (URL credentials) in serialized forms.
#[derive(Clone, Debug, Deserialize)]
pub struct RedisStreamsSinkConfig {
/// Redis connection URL (e.g., "redis://localhost:6379").
/// Contains credentials and should be treated as sensitive.
pub url: String,

/// Name of the Redis stream to append events to. Optional if provided via event metadata.
#[serde(default)]
pub stream_name: Option<String>,

/// Maximum stream length (optional). Uses MAXLEN ~ for approximate trimming.
#[serde(default)]
pub max_len: Option<usize>,
}

/// Configuration for the Redis Streams sink without sensitive data.
///
/// Safe to serialize and log. Use this for debugging and metrics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RedisStreamsSinkConfigWithoutSecrets {
/// Name of the Redis stream to append events to (if configured).
pub stream_name: Option<String>,

/// Maximum stream length (optional).
pub max_len: Option<usize>,
}

impl From<RedisStreamsSinkConfig> for RedisStreamsSinkConfigWithoutSecrets {
fn from(config: RedisStreamsSinkConfig) -> Self {
Self {
stream_name: config.stream_name,
max_len: config.max_len,
}
}
}

impl From<&RedisStreamsSinkConfig> for RedisStreamsSinkConfigWithoutSecrets {
fn from(config: &RedisStreamsSinkConfig) -> Self {
Self {
stream_name: config.stream_name.clone(),
max_len: config.max_len,
}
}
}

/// Sink that appends events to a Redis stream.
///
/// Each event's payload is added using XADD with a single "payload" field.
/// The sink uses a connection manager for automatic reconnection handling.
#[derive(Clone)]
pub struct RedisStreamsSink {
/// Shared Redis connection manager.
connection: Arc<Mutex<ConnectionManager>>,

/// Default stream name. Can be overridden per-event via metadata.
stream_name: Option<String>,

/// Maximum stream length for approximate trimming.
max_len: Option<usize>,
}

impl RedisStreamsSink {
/// Creates a new Redis Streams sink from configuration.
///
/// # Errors
///
/// Returns an error if the Redis connection cannot be established.
pub async fn new(
config: RedisStreamsSinkConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = redis::Client::open(config.url)?;
let connection = ConnectionManager::new(client).await?;

Ok(Self {
connection: Arc::new(Mutex::new(connection)),
stream_name: config.stream_name,
max_len: config.max_len,
})
}

/// Resolves the stream name for an event from metadata or config.
fn resolve_stream_name<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
// First check event metadata for dynamic stream.
if let Some(ref metadata) = event.metadata {
if let Some(stream) = metadata.get("stream").and_then(|v| v.as_str()) {
return Some(stream);
}
}
// Fall back to config stream name.
self.stream_name.as_deref()
}
}

impl Sink for RedisStreamsSink {
fn name() -> &'static str {
"redis-streams"
}

async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
if events.is_empty() {
return Ok(());
}

// Build pipeline in a single pass - no pre-collection needed.
let mut conn = self.connection.lock().await;
let mut pipe = redis::pipe();

for event in events {
let stream_name = self.resolve_stream_name(&event).ok_or_else(|| {
etl::etl_error!(
etl::error::ErrorKind::ConfigError,
"No stream_name configured",
"Stream name must be provided in sink config or event metadata (stream key)"
)
})?;

let mut cmd = redis::cmd("XADD");
cmd.arg(stream_name); // Copies internally.

if let Some(max_len) = self.max_len {
cmd.arg("MAXLEN").arg("~").arg(max_len);
}

cmd.arg("*").arg("payload").arg(event.payload.to_string());
pipe.add_command(cmd);
}

pipe.query_async::<()>(&mut *conn).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to XADD events to Redis stream",
e.to_string()
)
})?;

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_sink_name() {
assert_eq!(RedisStreamsSink::name(), "redis-streams");
}
}
Loading