diff --git a/Cargo.toml b/Cargo.toml index 38565b8..5469273 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ version = "0.1.0" [features] default = [] +sink-redis-streams = ["dep:redis"] sink-redis-strings = ["dep:redis"] test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"] @@ -48,7 +49,11 @@ etl-postgres = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16a uuid = { version = "1.19.0", default-features = false, features = ["v4"] } # Optional sink dependencies. -redis = { version = "0.27", default-features = false, features = ["tokio-comp", "connection-manager"], optional = true } +redis = { version = "0.27", default-features = false, features = [ + "tokio-comp", + "connection-manager", + "streams", +], optional = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] } diff --git a/src/config/sink.rs b/src/config/sink.rs index dcdf216..de87aa3 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -3,6 +3,9 @@ use serde::Deserialize; #[cfg(feature = "sink-redis-strings")] use crate::sink::redis_strings::RedisStringsSinkConfig; +#[cfg(feature = "sink-redis-streams")] +use crate::sink::redis_streams::RedisStreamsSinkConfig; + /// Sink destination configuration. /// /// Determines where replicated events are sent. @@ -16,4 +19,9 @@ pub enum SinkConfig { #[cfg(feature = "sink-redis-strings")] #[serde(rename = "redis-strings")] RedisStrings(RedisStringsSinkConfig), + + /// Redis streams sink for append-only log storage. + #[cfg(feature = "sink-redis-streams")] + #[serde(rename = "redis-streams")] + RedisStreams(RedisStreamsSinkConfig), } diff --git a/src/core.rs b/src/core.rs index e6670b5..d9c2a62 100644 --- a/src/core.rs +++ b/src/core.rs @@ -90,6 +90,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { })?; AnySink::RedisStrings(s) } + + #[cfg(feature = "sink-redis-streams")] + SinkConfig::RedisStreams(cfg) => { + use crate::sink::redis_streams::RedisStreamsSink; + let s = RedisStreamsSink::new(cfg.clone()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to create Redis Streams sink", + e.to_string() + ) + })?; + AnySink::RedisStreams(s) + } }; // Create PgStream as an ETL destination @@ -140,6 +153,11 @@ fn log_sink_config(config: &SinkConfig) { SinkConfig::RedisStrings(_cfg) => { debug!("using redis-strings sink"); } + + #[cfg(feature = "sink-redis-streams")] + SinkConfig::RedisStreams(_cfg) => { + debug!("using redis-streams sink"); + } } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index c968b83..0adcb79 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -4,6 +4,9 @@ pub mod memory; #[cfg(feature = "sink-redis-strings")] pub mod redis_strings; +#[cfg(feature = "sink-redis-streams")] +pub mod redis_streams; + pub use base::Sink; use etl::error::EtlResult; @@ -12,6 +15,9 @@ use memory::MemorySink; #[cfg(feature = "sink-redis-strings")] use redis_strings::RedisStringsSink; +#[cfg(feature = "sink-redis-streams")] +use redis_streams::RedisStreamsSink; + use crate::types::TriggeredEvent; /// Wrapper enum for all supported sink types. @@ -26,6 +32,10 @@ pub enum AnySink { /// Redis strings sink for key-value storage. #[cfg(feature = "sink-redis-strings")] RedisStrings(RedisStringsSink), + + /// Redis streams sink for append-only log storage. + #[cfg(feature = "sink-redis-streams")] + RedisStreams(RedisStreamsSink), } impl Sink for AnySink { @@ -39,6 +49,9 @@ impl Sink for AnySink { #[cfg(feature = "sink-redis-strings")] AnySink::RedisStrings(sink) => sink.publish_events(events).await, + + #[cfg(feature = "sink-redis-streams")] + AnySink::RedisStreams(sink) => sink.publish_events(events).await, } } } diff --git a/src/sink/redis_streams.rs b/src/sink/redis_streams.rs new file mode 100644 index 0000000..92dde11 --- /dev/null +++ b/src/sink/redis_streams.rs @@ -0,0 +1,188 @@ +//! Redis Streams sink for publishing events to a Redis stream. +//! +//! Appends each event's payload to a Redis stream determined by: +//! 1. `stream` key in event metadata (from subscription's metadata/metadata_extensions) +//! 2. Fallback to `stream_name` in sink config +//! +//! # Message Format +//! +//! Each event is added to the stream using `XADD` with: +//! - Auto-generated stream entry ID (`*`) +//! - Single field: `payload` containing the JSON-serialized event payload +//! +//! Example entry when read with `XREAD`: +//! ```text +//! 1704067200000-0 payload {"id": 1, "name": "test"} +//! ``` +//! +//! # Dynamic Routing +//! +//! The target stream can be configured per-event using metadata_extensions: +//! +//! ```sql +//! metadata_extensions = '[ +//! {"json_path": "stream", "expression": "''events:'' || table_name"} +//! ]' +//! ``` + +use etl::error::EtlResult; +use redis::aio::ConnectionManager; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::sink::Sink; +use crate::types::TriggeredEvent; + +/// Configuration for the Redis Streams sink. +/// +/// This intentionally does not implement [`Serialize`] to avoid accidentally +/// leaking secrets (URL credentials) in serialized forms. +#[derive(Clone, Debug, Deserialize)] +pub struct RedisStreamsSinkConfig { + /// Redis connection URL (e.g., "redis://localhost:6379"). + /// Contains credentials and should be treated as sensitive. + pub url: String, + + /// Name of the Redis stream to append events to. Optional if provided via event metadata. + #[serde(default)] + pub stream_name: Option, + + /// Maximum stream length (optional). Uses MAXLEN ~ for approximate trimming. + #[serde(default)] + pub max_len: Option, +} + +/// Configuration for the Redis Streams sink without sensitive data. +/// +/// Safe to serialize and log. Use this for debugging and metrics. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RedisStreamsSinkConfigWithoutSecrets { + /// Name of the Redis stream to append events to (if configured). + pub stream_name: Option, + + /// Maximum stream length (optional). + pub max_len: Option, +} + +impl From for RedisStreamsSinkConfigWithoutSecrets { + fn from(config: RedisStreamsSinkConfig) -> Self { + Self { + stream_name: config.stream_name, + max_len: config.max_len, + } + } +} + +impl From<&RedisStreamsSinkConfig> for RedisStreamsSinkConfigWithoutSecrets { + fn from(config: &RedisStreamsSinkConfig) -> Self { + Self { + stream_name: config.stream_name.clone(), + max_len: config.max_len, + } + } +} + +/// Sink that appends events to a Redis stream. +/// +/// Each event's payload is added using XADD with a single "payload" field. +/// The sink uses a connection manager for automatic reconnection handling. +#[derive(Clone)] +pub struct RedisStreamsSink { + /// Shared Redis connection manager. + connection: Arc>, + + /// Default stream name. Can be overridden per-event via metadata. + stream_name: Option, + + /// Maximum stream length for approximate trimming. + max_len: Option, +} + +impl RedisStreamsSink { + /// Creates a new Redis Streams sink from configuration. + /// + /// # Errors + /// + /// Returns an error if the Redis connection cannot be established. + pub async fn new( + config: RedisStreamsSinkConfig, + ) -> Result> { + let client = redis::Client::open(config.url)?; + let connection = ConnectionManager::new(client).await?; + + Ok(Self { + connection: Arc::new(Mutex::new(connection)), + stream_name: config.stream_name, + max_len: config.max_len, + }) + } + + /// Resolves the stream name for an event from metadata or config. + fn resolve_stream_name<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> { + // First check event metadata for dynamic stream. + if let Some(ref metadata) = event.metadata { + if let Some(stream) = metadata.get("stream").and_then(|v| v.as_str()) { + return Some(stream); + } + } + // Fall back to config stream name. + self.stream_name.as_deref() + } +} + +impl Sink for RedisStreamsSink { + fn name() -> &'static str { + "redis-streams" + } + + async fn publish_events(&self, events: Vec) -> EtlResult<()> { + if events.is_empty() { + return Ok(()); + } + + // Build pipeline in a single pass - no pre-collection needed. + let mut conn = self.connection.lock().await; + let mut pipe = redis::pipe(); + + for event in events { + let stream_name = self.resolve_stream_name(&event).ok_or_else(|| { + etl::etl_error!( + etl::error::ErrorKind::ConfigError, + "No stream_name configured", + "Stream name must be provided in sink config or event metadata (stream key)" + ) + })?; + + let mut cmd = redis::cmd("XADD"); + cmd.arg(stream_name); // Copies internally. + + if let Some(max_len) = self.max_len { + cmd.arg("MAXLEN").arg("~").arg(max_len); + } + + cmd.arg("*").arg("payload").arg(event.payload.to_string()); + pipe.add_command(cmd); + } + + pipe.query_async::<()>(&mut *conn).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to XADD events to Redis stream", + e.to_string() + ) + })?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sink_name() { + assert_eq!(RedisStreamsSink::name(), "redis-streams"); + } +} diff --git a/tests/redis_streams_sink_tests.rs b/tests/redis_streams_sink_tests.rs new file mode 100644 index 0000000..f8382f6 --- /dev/null +++ b/tests/redis_streams_sink_tests.rs @@ -0,0 +1,209 @@ +//! Integration tests for the Redis Streams sink. + +#![cfg(feature = "sink-redis-streams")] + +use postgres_stream::sink::Sink; +use postgres_stream::sink::redis_streams::{RedisStreamsSink, RedisStreamsSinkConfig}; +use postgres_stream::test_utils::ensure_redis; +use postgres_stream::types::{EventIdentifier, StreamId, TriggeredEvent}; + +use chrono::Utc; +use redis::AsyncCommands; + +/// Creates a test event with the given ID. +fn make_test_event(id: &str) -> TriggeredEvent { + TriggeredEvent { + id: EventIdentifier::new(id.to_string(), Utc::now()), + payload: serde_json::json!({ + "test_id": id, + "message": format!("Test event {}", id), + }), + metadata: Some(serde_json::json!({ "source": "test" })), + stream_id: StreamId::from(1u64), + lsn: Some("0/16B3748".parse().unwrap()), + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_redis_streams_sink_publishes_events() { + let redis_port = ensure_redis().await; + let redis_url = format!("redis://127.0.0.1:{redis_port}"); + let stream_name = "pgstream:test-events"; + + let config = RedisStreamsSinkConfig { + url: redis_url.clone(), + stream_name: Some(stream_name.to_string()), + max_len: None, + }; + + let sink = RedisStreamsSink::new(config) + .await + .expect("Failed to create Redis Streams sink"); + + // Publish test events. + let events = vec![ + make_test_event("stream-event-1"), + make_test_event("stream-event-2"), + ]; + sink.publish_events(events) + .await + .expect("Failed to publish events"); + + // Verify events are in the stream. + let client = redis::Client::open(redis_url).expect("Failed to open redis client"); + let mut conn = client + .get_multiplexed_async_connection() + .await + .expect("Failed to get connection"); + + // Read all entries from the stream. + let result: redis::streams::StreamReadReply = conn + .xread(&[stream_name], &["0"]) + .await + .expect("Failed to read stream"); + + assert_eq!(result.keys.len(), 1); + let entries = &result.keys[0].ids; + assert_eq!(entries.len(), 2); + + // Verify first entry has only payload field. + let first_entry = &entries[0]; + assert!(first_entry.map.contains_key("payload")); + // No envelope fields. + assert!(!first_entry.map.contains_key("id")); + assert!(!first_entry.map.contains_key("stream_id")); + assert!(!first_entry.map.contains_key("created_at")); + assert!(!first_entry.map.contains_key("metadata")); + assert!(!first_entry.map.contains_key("lsn")); + + // Verify payload content. + let payload_str: String = + redis::from_redis_value(first_entry.map.get("payload").unwrap()).unwrap(); + let payload: serde_json::Value = + serde_json::from_str(&payload_str).expect("Failed to parse payload JSON"); + assert!(payload.get("test_id").is_some()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_redis_streams_sink_with_max_len() { + let redis_port = ensure_redis().await; + let redis_url = format!("redis://127.0.0.1:{redis_port}"); + let stream_name = "pgstream:maxlen-test"; + + let config = RedisStreamsSinkConfig { + url: redis_url.clone(), + stream_name: Some(stream_name.to_string()), + max_len: Some(5), + }; + + let sink = RedisStreamsSink::new(config) + .await + .expect("Failed to create Redis Streams sink"); + + // Publish 10 events; stream should be trimmed to ~5. + for i in 0..10 { + let events = vec![make_test_event(&format!("maxlen-event-{i}"))]; + sink.publish_events(events) + .await + .expect("Failed to publish events"); + } + + // Verify stream length is approximately max_len (MAXLEN ~ is approximate). + let client = redis::Client::open(redis_url).expect("Failed to open redis client"); + let mut conn = client + .get_multiplexed_async_connection() + .await + .expect("Failed to get connection"); + + let len: usize = conn + .xlen(stream_name) + .await + .expect("Failed to get stream length"); + + // Approximate trimming means length should be <= 10 and close to 5. + assert!(len <= 10, "Stream length should be trimmed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_redis_streams_sink_empty_batch() { + let redis_port = ensure_redis().await; + let redis_url = format!("redis://127.0.0.1:{redis_port}"); + + let config = RedisStreamsSinkConfig { + url: redis_url, + stream_name: Some("pgstream:empty-test".to_string()), + max_len: None, + }; + + let sink = RedisStreamsSink::new(config) + .await + .expect("Failed to create Redis Streams sink"); + + // Empty batch should succeed without error. + sink.publish_events(vec![]) + .await + .expect("Empty batch should succeed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_redis_streams_sink_uses_stream_from_metadata() { + let redis_port = ensure_redis().await; + let redis_url = format!("redis://127.0.0.1:{redis_port}"); + let stream_name = "pgstream:metadata-stream"; + + // Create sink without stream_name - will get it from metadata. + let config = RedisStreamsSinkConfig { + url: redis_url.clone(), + stream_name: None, + max_len: None, + }; + + let sink = RedisStreamsSink::new(config) + .await + .expect("Failed to create Redis Streams sink"); + + // Create event with stream in metadata. + let event = TriggeredEvent { + id: EventIdentifier::new("metadata-stream-event".to_string(), Utc::now()), + payload: serde_json::json!({ + "test_id": "metadata-stream-event", + "message": "Test event with metadata stream", + }), + metadata: Some(serde_json::json!({ "stream": stream_name })), + stream_id: StreamId::from(1u64), + lsn: Some("0/16B3748".parse().unwrap()), + }; + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish events"); + + // Verify event is in the metadata-specified stream. + let client = redis::Client::open(redis_url).expect("Failed to open redis client"); + let mut conn = client + .get_multiplexed_async_connection() + .await + .expect("Failed to get connection"); + + let result: redis::streams::StreamReadReply = conn + .xread(&[stream_name], &["0"]) + .await + .expect("Failed to read stream"); + + assert_eq!(result.keys.len(), 1); + let entries = &result.keys[0].ids; + assert_eq!(entries.len(), 1); + + // Verify payload content. + let first_entry = &entries[0]; + let payload_str: String = + redis::from_redis_value(first_entry.map.get("payload").unwrap()).unwrap(); + let payload: serde_json::Value = + serde_json::from_str(&payload_str).expect("Failed to parse payload JSON"); + assert_eq!(payload["test_id"], "metadata-stream-event"); +} + +#[test] +fn test_sink_name() { + assert_eq!(RedisStreamsSink::name(), "redis-streams"); +}