diff --git a/Cargo.lock b/Cargo.lock index 769b19e..424069f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,6 +435,7 @@ dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", + "aws-smithy-eventstream", "aws-smithy-http", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -450,6 +451,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-kinesis" +version = "1.97.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd9a4c879bef23961715c0797b01f20d930bd958f25538191f032d74d41e18fa" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.3.0", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sns" version = "1.92.0" @@ -569,6 +593,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69e523e1c4e8e7e8ff219d732988e22bfeae8a1cafdbe6d9eca1546fa080be7c" dependencies = [ "aws-credential-types", + "aws-smithy-eventstream", "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", @@ -595,12 +620,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc12f8b310e38cad85cf3bef45ad236f470717393c613266ce0a89512286b650" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + [[package]] name = "aws-smithy-http" version = "0.62.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" dependencies = [ + "aws-smithy-eventstream", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1102,6 +1139,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -3084,6 +3130,7 @@ dependencies = [ "anyhow", "async-nats", "aws-config", + "aws-sdk-kinesis", "aws-sdk-sns", "aws-sdk-sqs", "chrono", diff --git a/Cargo.toml b/Cargo.toml index f2a7d0f..aab82c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ version = "0.1.0" [features] default = [] sink-kafka = ["dep:rdkafka"] +sink-kinesis = ["dep:aws-sdk-kinesis", "dep:aws-config"] sink-nats = ["dep:async-nats"] sink-rabbitmq = ["dep:lapin"] sink-redis-streams = ["dep:redis"] @@ -57,6 +58,7 @@ uuid = { version = "1.19.0", default-features = false, features = ["v4"] # Optional sink dependencies. async-nats = { version = "0.38", optional = true } aws-config = { version = "1.5", optional = true } +aws-sdk-kinesis = { version = "1.60", optional = true } aws-sdk-sns = { version = "1.60", optional = true } aws-sdk-sqs = { version = "1.60", optional = true } lapin = { version = "2.5", optional = true } diff --git a/src/config/sink.rs b/src/config/sink.rs index 1d2ad2a..91d509d 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -24,6 +24,9 @@ use crate::sink::sqs::SqsSinkConfig; #[cfg(feature = "sink-sns")] use crate::sink::sns::SnsSinkConfig; +#[cfg(feature = "sink-kinesis")] +use crate::sink::kinesis::KinesisSinkConfig; + /// Sink destination configuration. /// /// Determines where replicated events are sent. @@ -72,4 +75,9 @@ pub enum SinkConfig { #[cfg(feature = "sink-sns")] #[serde(rename = "sns")] Sns(SnsSinkConfig), + + /// AWS Kinesis sink for data stream publishing. + #[cfg(feature = "sink-kinesis")] + #[serde(rename = "kinesis")] + Kinesis(KinesisSinkConfig), } diff --git a/src/core.rs b/src/core.rs index ceedf88..3b3273a 100644 --- a/src/core.rs +++ b/src/core.rs @@ -181,6 +181,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { })?; AnySink::Sns(s) } + + #[cfg(feature = "sink-kinesis")] + SinkConfig::Kinesis(cfg) => { + use crate::sink::kinesis::KinesisSink; + let s = KinesisSink::new(cfg.clone()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to create Kinesis sink", + e.to_string() + ) + })?; + AnySink::Kinesis(s) + } }; // Create PgStream as an ETL destination @@ -266,6 +279,11 @@ fn log_sink_config(config: &SinkConfig) { SinkConfig::Sns(_cfg) => { debug!("using sns sink"); } + + #[cfg(feature = "sink-kinesis")] + SinkConfig::Kinesis(_cfg) => { + debug!("using kinesis sink"); + } } } diff --git a/src/sink/kinesis.rs b/src/sink/kinesis.rs new file mode 100644 index 0000000..139784e --- /dev/null +++ b/src/sink/kinesis.rs @@ -0,0 +1,307 @@ +//! AWS Kinesis sink for publishing events to a Kinesis data stream. +//! +//! Publishes each event's payload as a data record to a stream determined by: +//! 1. `stream` key in event metadata (from subscription's metadata/metadata_extensions) +//! 2. Fallback to `stream_name` in sink config +//! +//! # Dynamic Routing +//! +//! The target stream can be configured per-event using metadata_extensions: +//! +//! ```sql +//! metadata_extensions = '[ +//! {"json_path": "stream", "expression": "''my-stream-'' || shard_key"} +//! ]' +//! ``` + +use aws_sdk_kinesis::Client; +use aws_sdk_kinesis::types::PutRecordsRequestEntry; +use etl::error::EtlResult; +use futures::future::try_join_all; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::sink::Sink; +use crate::types::TriggeredEvent; + +/// Maximum number of records per Kinesis PutRecords request. +const KINESIS_MAX_BATCH_SIZE: usize = 500; + +/// Configuration for the AWS Kinesis sink. +/// +/// This intentionally does not implement [`Serialize`] to avoid accidentally +/// leaking secrets (AWS credentials, endpoint URLs) in serialized forms. +#[derive(Clone, Debug, Deserialize)] +pub struct KinesisSinkConfig { + /// Kinesis stream name. Optional if provided via event metadata. + pub stream_name: Option, + + /// AWS region (e.g., "us-east-1"). + pub region: String, + + /// Optional custom endpoint URL for LocalStack testing. + #[serde(default)] + pub endpoint_url: Option, + + /// Optional AWS access key ID (uses default credentials chain if not set). + #[serde(default)] + pub access_key_id: Option, + + /// Optional AWS secret access key (uses default credentials chain if not set). + #[serde(default)] + pub secret_access_key: Option, +} + +/// Configuration for the AWS Kinesis sink without sensitive data. +/// +/// Safe to serialize and log. Use this for debugging and metrics. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KinesisSinkConfigWithoutSecrets { + /// Kinesis stream name (if configured). + pub stream_name: Option, + + /// AWS region. + pub region: String, + + /// Whether a custom endpoint is configured. + pub has_custom_endpoint: bool, + + /// Whether explicit credentials are configured. + pub has_explicit_credentials: bool, +} + +impl From for KinesisSinkConfigWithoutSecrets { + fn from(config: KinesisSinkConfig) -> Self { + Self { + stream_name: config.stream_name, + region: config.region, + has_custom_endpoint: config.endpoint_url.is_some(), + has_explicit_credentials: config.access_key_id.is_some(), + } + } +} + +impl From<&KinesisSinkConfig> for KinesisSinkConfigWithoutSecrets { + fn from(config: &KinesisSinkConfig) -> Self { + Self { + stream_name: config.stream_name.clone(), + region: config.region.clone(), + has_custom_endpoint: config.endpoint_url.is_some(), + has_explicit_credentials: config.access_key_id.is_some(), + } + } +} + +/// Sink that publishes events to an AWS Kinesis data stream. +/// +/// Each event's payload is serialized as JSON and published as a data record. +/// The sink uses the AWS SDK with automatic retry handling. +#[derive(Clone)] +pub struct KinesisSink { + /// AWS Kinesis client. + client: Arc, + + /// Default stream name. Can be overridden per-event via metadata. + stream_name: Option, +} + +impl KinesisSink { + /// Creates a new Kinesis sink from configuration. + /// + /// # Errors + /// + /// Returns an error if the AWS client cannot be configured. + pub async fn new( + config: KinesisSinkConfig, + ) -> Result> { + let region = aws_sdk_kinesis::config::Region::new(config.region); + + let mut sdk_config_loader = + aws_config::defaults(aws_config::BehaviorVersion::latest()).region(region); + + // Configure custom endpoint if provided (for LocalStack). + if let Some(endpoint) = &config.endpoint_url { + sdk_config_loader = sdk_config_loader.endpoint_url(endpoint); + } + + // Configure explicit credentials if provided. + if let (Some(access_key), Some(secret_key)) = + (&config.access_key_id, &config.secret_access_key) + { + sdk_config_loader = + sdk_config_loader.credentials_provider(aws_sdk_kinesis::config::Credentials::new( + access_key.clone(), + secret_key.clone(), + None, + None, + "pgstream", + )); + } + + let sdk_config = sdk_config_loader.load().await; + let client = Client::new(&sdk_config); + + Ok(Self { + client: Arc::new(client), + stream_name: config.stream_name, + }) + } + + /// Resolves the stream name for an event from metadata or config. + fn resolve_stream_name<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> { + // First check event metadata for dynamic stream + if let Some(ref metadata) = event.metadata { + if let Some(stream) = metadata.get("stream").and_then(|v| v.as_str()) { + return Some(stream); + } + } + // Fall back to config stream name + self.stream_name.as_deref() + } +} + +impl Sink for KinesisSink { + fn name() -> &'static str { + "kinesis" + } + + async fn publish_events(&self, events: Vec) -> EtlResult<()> { + if events.is_empty() { + return Ok(()); + } + + // Build records and group by stream in a single pass. + let mut records_by_stream: HashMap> = HashMap::new(); + + for event in events { + // Convert to owned String to end borrow before moving event fields. + let stream_name = self + .resolve_stream_name(&event) + .ok_or_else(|| { + etl::etl_error!( + etl::error::ErrorKind::ConfigError, + "No stream name configured", + "Stream name must be provided in sink config or event metadata" + ) + })? + .to_string(); + + let data = serde_json::to_vec(&event.payload).map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to serialize payload to JSON", + e.to_string() + ) + })?; + + let record = PutRecordsRequestEntry::builder() + .partition_key(event.id.id) + .data(aws_sdk_kinesis::primitives::Blob::new(data)) + .build() + .map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to build record entry", + e.to_string() + ) + })?; + + records_by_stream + .entry(stream_name) + .or_default() + .push(record); + } + + // Create batch futures for all streams and chunks. + let mut batch_futures = Vec::new(); + + for (stream_name, mut records) in records_by_stream { + // Process in chunks of KINESIS_MAX_BATCH_SIZE. + while !records.is_empty() { + let chunk_size = records.len().min(KINESIS_MAX_BATCH_SIZE); + let chunk: Vec<_> = records.drain(..chunk_size).collect(); + let chunk_len = chunk.len(); + + let client = self.client.clone(); + let stream_name = stream_name.clone(); + batch_futures.push(async move { + let result = client + .put_records() + .stream_name(&stream_name) + .set_records(Some(chunk)) + .send() + .await + .map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to publish records to Kinesis", + e.to_string() + ) + })?; + + // Check for partial failures. + let failed_count = result.failed_record_count.unwrap_or(0); + if failed_count > 0 { + let first_error = result + .records + .iter() + .find(|r| r.error_code.is_some()) + .map(|r| { + format!( + "{}: {}", + r.error_code.as_deref().unwrap_or("Unknown"), + r.error_message.as_deref().unwrap_or("No message") + ) + }) + .unwrap_or_else(|| "Unknown error".to_string()); + + return Err(etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Some records failed to publish to Kinesis", + format!( + "{} of {} records failed. First error: {}", + failed_count, chunk_len, first_error + ) + )); + } + + Ok(()) + }); + } + } + + // Send all batches concurrently. + try_join_all(batch_futures).await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sink_name() { + assert_eq!(KinesisSink::name(), "kinesis"); + } + + #[test] + fn test_config_without_secrets() { + let config = KinesisSinkConfig { + stream_name: Some("my-stream".to_string()), + region: "us-east-1".to_string(), + endpoint_url: Some("http://localhost:4566".to_string()), + access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()), + secret_access_key: Some("secret".to_string()), + }; + + let without_secrets: KinesisSinkConfigWithoutSecrets = (&config).into(); + + assert_eq!(without_secrets.stream_name, Some("my-stream".to_string())); + assert_eq!(without_secrets.region, "us-east-1"); + assert!(without_secrets.has_custom_endpoint); + assert!(without_secrets.has_explicit_credentials); + } +} diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 8001487..d28076f 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -25,6 +25,9 @@ pub mod sqs; #[cfg(feature = "sink-sns")] pub mod sns; +#[cfg(feature = "sink-kinesis")] +pub mod kinesis; + pub use base::Sink; use etl::error::EtlResult; @@ -54,6 +57,9 @@ use sqs::SqsSink; #[cfg(feature = "sink-sns")] use sns::SnsSink; +#[cfg(feature = "sink-kinesis")] +use kinesis::KinesisSink; + use crate::types::TriggeredEvent; /// Wrapper enum for all supported sink types. @@ -96,6 +102,10 @@ pub enum AnySink { /// AWS SNS sink for topic publishing. #[cfg(feature = "sink-sns")] Sns(SnsSink), + + /// AWS Kinesis sink for data stream publishing. + #[cfg(feature = "sink-kinesis")] + Kinesis(KinesisSink), } impl Sink for AnySink { @@ -130,6 +140,9 @@ impl Sink for AnySink { #[cfg(feature = "sink-sns")] AnySink::Sns(sink) => sink.publish_events(events).await, + + #[cfg(feature = "sink-kinesis")] + AnySink::Kinesis(sink) => sink.publish_events(events).await, } } } diff --git a/tests/kinesis_sink_tests.rs b/tests/kinesis_sink_tests.rs new file mode 100644 index 0000000..afe776f --- /dev/null +++ b/tests/kinesis_sink_tests.rs @@ -0,0 +1,255 @@ +//! Integration tests for AWS Kinesis sink. +//! +//! Uses LocalStack container to test Kinesis record publishing. + +#![cfg(feature = "sink-kinesis")] + +use chrono::Utc; +use postgres_stream::sink::Sink; +use postgres_stream::sink::kinesis::{KinesisSink, KinesisSinkConfig}; +use postgres_stream::test_utils::ensure_localstack; +use postgres_stream::types::{EventIdentifier, PgLsn, StreamId, TriggeredEvent}; +use uuid::Uuid; + +/// Creates a test event with the given payload key. +fn make_test_event(key: &str) -> TriggeredEvent { + TriggeredEvent { + id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "key": key, "value": "test_data" }), + metadata: Some(serde_json::json!({ "source": "test" })), + lsn: Some(PgLsn::from(12345u64)), + } +} + +/// Creates a Kinesis stream in LocalStack. +async fn create_kinesis_stream(endpoint: &str, stream_name: &str) { + let region = aws_sdk_kinesis::config::Region::new("us-east-1"); + let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region) + .endpoint_url(endpoint) + .credentials_provider(aws_sdk_kinesis::config::Credentials::new( + "test", "test", None, None, "test", + )) + .load() + .await; + + let client = aws_sdk_kinesis::Client::new(&sdk_config); + + client + .create_stream() + .stream_name(stream_name) + .shard_count(1) + .send() + .await + .expect("Failed to create Kinesis stream"); + + // Wait for stream to become active. + for _ in 0..30 { + let desc = client + .describe_stream() + .stream_name(stream_name) + .send() + .await + .expect("Failed to describe stream"); + + if let Some(stream_desc) = desc.stream_description() { + if stream_desc.stream_status() == &aws_sdk_kinesis::types::StreamStatus::Active { + return; + } + } + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + panic!("Stream did not become active in time"); +} + +/// Gets records from a Kinesis stream. +async fn get_kinesis_records(endpoint: &str, stream_name: &str, count: usize) -> Vec { + let region = aws_sdk_kinesis::config::Region::new("us-east-1"); + let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region) + .endpoint_url(endpoint) + .credentials_provider(aws_sdk_kinesis::config::Credentials::new( + "test", "test", None, None, "test", + )) + .load() + .await; + + let client = aws_sdk_kinesis::Client::new(&sdk_config); + + // Get shard iterator. + let shards = client + .list_shards() + .stream_name(stream_name) + .send() + .await + .expect("Failed to list shards"); + + let shard_id = shards.shards().first().expect("No shards found").shard_id(); + + let iterator_response = client + .get_shard_iterator() + .stream_name(stream_name) + .shard_id(shard_id) + .shard_iterator_type(aws_sdk_kinesis::types::ShardIteratorType::TrimHorizon) + .send() + .await + .expect("Failed to get shard iterator"); + + let mut shard_iterator = iterator_response.shard_iterator().unwrap().to_string(); + let mut records = Vec::new(); + + // Poll for records. + for _ in 0..10 { + let response = client + .get_records() + .shard_iterator(&shard_iterator) + .limit(100) + .send() + .await + .expect("Failed to get records"); + + for record in response.records() { + let data = String::from_utf8(record.data().as_ref().to_vec()) + .expect("Invalid UTF-8 in record"); + records.push(data); + } + + if records.len() >= count { + break; + } + + if let Some(next_iterator) = response.next_shard_iterator() { + shard_iterator = next_iterator.to_string(); + } else { + break; + } + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + records +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_kinesis_sink_publishes_events() { + let port = ensure_localstack().await; + let endpoint = format!("http://127.0.0.1:{port}"); + + let stream_name = format!("test-stream-{}", Uuid::new_v4()); + create_kinesis_stream(&endpoint, &stream_name).await; + + // Create sink. + let config = KinesisSinkConfig { + stream_name: Some(stream_name.clone()), + region: "us-east-1".to_string(), + endpoint_url: Some(endpoint.clone()), + access_key_id: Some("test".to_string()), + secret_access_key: Some("test".to_string()), + }; + + let sink = KinesisSink::new(config) + .await + .expect("Failed to create sink"); + + // Publish events. + let events = vec![make_test_event("event1"), make_test_event("event2")]; + sink.publish_events(events) + .await + .expect("Failed to publish"); + + // Verify records arrived - only payload is sent. + let records = get_kinesis_records(&endpoint, &stream_name, 2).await; + assert_eq!(records.len(), 2, "Expected 2 records"); + + for record in &records { + let payload: serde_json::Value = + serde_json::from_str(record).expect("Failed to parse record"); + // Only payload fields should be present. + assert!(payload["key"].is_string()); + assert!(payload["value"].is_string()); + // No envelope fields. + assert!(payload.get("id").is_none()); + assert!(payload.get("metadata").is_none()); + assert!(payload.get("lsn").is_none()); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_kinesis_sink_handles_empty_batch() { + let port = ensure_localstack().await; + let endpoint = format!("http://127.0.0.1:{port}"); + + let stream_name = format!("test-stream-{}", Uuid::new_v4()); + create_kinesis_stream(&endpoint, &stream_name).await; + + let config = KinesisSinkConfig { + stream_name: Some(stream_name), + region: "us-east-1".to_string(), + endpoint_url: Some(endpoint), + access_key_id: Some("test".to_string()), + secret_access_key: Some("test".to_string()), + }; + + let sink = KinesisSink::new(config) + .await + .expect("Failed to create sink"); + + // Publishing empty batch should succeed without making any API calls. + sink.publish_events(vec![]) + .await + .expect("Empty batch should succeed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_kinesis_sink_uses_stream_from_metadata() { + let port = ensure_localstack().await; + let endpoint = format!("http://127.0.0.1:{port}"); + + let stream_name = format!("test-stream-{}", Uuid::new_v4()); + create_kinesis_stream(&endpoint, &stream_name).await; + + // Create sink without stream_name - will get it from metadata. + let config = KinesisSinkConfig { + stream_name: None, + region: "us-east-1".to_string(), + endpoint_url: Some(endpoint.clone()), + access_key_id: Some("test".to_string()), + secret_access_key: Some("test".to_string()), + }; + + let sink = KinesisSink::new(config) + .await + .expect("Failed to create sink"); + + // Create event with stream in metadata. + let event = TriggeredEvent { + id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "action": "created" }), + metadata: Some(serde_json::json!({ "stream": stream_name })), + lsn: Some(PgLsn::from(99999u64)), + }; + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish"); + + let records = get_kinesis_records(&endpoint, &stream_name, 1).await; + assert_eq!(records.len(), 1); + + let payload: serde_json::Value = + serde_json::from_str(&records[0]).expect("Failed to parse record"); + + // Only payload is sent. + assert_eq!(payload["action"], "created"); + assert!(payload.get("id").is_none()); + assert!(payload.get("metadata").is_none()); +} + +#[test] +fn test_sink_name() { + assert_eq!(KinesisSink::name(), "kinesis"); +}