diff --git a/Cargo.lock b/Cargo.lock index 13992a1..80e39d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,42 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" +[[package]] +name = "async-nats" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76433c4de73442daedb3a59e991d94e85c14ebfc33db53dfcd347a21cd6ef4f8" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "pin-project", + "portable-atomic", + "rand 0.8.5", + "regex", + "ring", + "rustls-native-certs 0.7.3", + "rustls-pemfile", + "rustls-webpki 0.102.8", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -187,7 +223,7 @@ dependencies = [ "log", "pin-project-lite", "rustls", - "rustls-native-certs", + "rustls-native-certs 0.8.2", "rustls-pemfile", "rustls-pki-types", "serde", @@ -195,7 +231,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-util", "tower-service", @@ -231,6 +267,9 @@ name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -313,6 +352,16 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -403,6 +452,32 @@ version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2931af7e13dc045d8e9d26afccc6fa115d64e115c9c84b1166288b46f6782c2" +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "darling" version = "0.21.3" @@ -438,6 +513,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.10" @@ -539,6 +620,28 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "either" version = "1.15.0" @@ -622,7 +725,7 @@ dependencies = [ "secrecy", "serde", "sqlx", - "thiserror", + "thiserror 2.0.17", "tokio-postgres", ] @@ -638,7 +741,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-postgres", "tracing", @@ -678,6 +781,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "filetime" version = "0.2.26" @@ -1441,7 +1550,7 @@ dependencies = [ "metrics", "metrics-util", "quanta", - "thiserror", + "thiserror 2.0.17", "tokio", "tracing", ] @@ -1479,6 +1588,21 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.16", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -1489,6 +1613,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -1695,6 +1828,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1777,6 +1930,7 @@ name = "postgres-stream" version = "0.1.0" dependencies = [ "anyhow", + "async-nats", "chrono", "config", "const-oid", @@ -1800,7 +1954,7 @@ dependencies = [ "tempfile", "testcontainers", "testcontainers-modules", - "thiserror", + "thiserror 2.0.17", "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", @@ -2091,6 +2245,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "1.1.2" @@ -2115,11 +2278,24 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.8", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.2" @@ -2129,7 +2305,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.5.1", ] [[package]] @@ -2150,6 +2326,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.8" @@ -2223,6 +2410,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.10.0", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.5.1" @@ -2230,7 +2430,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ "bitflags 2.10.0", - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -2246,6 +2446,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.228" @@ -2289,6 +2495,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -2402,6 +2617,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -2520,7 +2747,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-stream", "tracing", @@ -2605,7 +2832,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.17", "tracing", "uuid", "whoami", @@ -2644,7 +2871,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.17", "tracing", "uuid", "whoami", @@ -2670,7 +2897,7 @@ dependencies = [ "serde", "serde_urlencoded", "sqlx-core", - "thiserror", + "thiserror 2.0.17", "tracing", "url", "uuid", @@ -2793,7 +3020,7 @@ dependencies = [ "serde", "serde_json", "serde_with", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-stream", "tokio-tar", @@ -2810,13 +3037,33 @@ dependencies = [ "testcontainers", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.17", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -3027,6 +3274,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-sink", + "http", + "httparse", + "rand 0.8.5", + "ring", + "rustls-native-certs 0.8.2", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", +] + [[package]] name = "tower-service" version = "0.3.3" @@ -3086,6 +3354,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "typenum" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 5469273..5f9b585 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ version = "0.1.0" [features] default = [] +sink-nats = ["dep:async-nats"] sink-redis-streams = ["dep:redis"] sink-redis-strings = ["dep:redis"] test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"] @@ -49,6 +50,7 @@ etl-postgres = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16a uuid = { version = "1.19.0", default-features = false, features = ["v4"] } # Optional sink dependencies. +async-nats = { version = "0.38", optional = true } redis = { version = "0.27", default-features = false, features = [ "tokio-comp", "connection-manager", @@ -65,7 +67,7 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [ ctor = { version = "0.4", optional = true } testcontainers = { version = "0.23", optional = true, features = ["blocking"] } -testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "redis", "blocking"] } +testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "redis", "nats", "blocking"] } [dev-dependencies] temp-env = "0.3" diff --git a/src/config/sink.rs b/src/config/sink.rs index de87aa3..e1de301 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -1,5 +1,8 @@ use serde::Deserialize; +#[cfg(feature = "sink-nats")] +use crate::sink::nats::NatsSinkConfig; + #[cfg(feature = "sink-redis-strings")] use crate::sink::redis_strings::RedisStringsSinkConfig; @@ -24,4 +27,9 @@ pub enum SinkConfig { #[cfg(feature = "sink-redis-streams")] #[serde(rename = "redis-streams")] RedisStreams(RedisStreamsSinkConfig), + + /// NATS sink for pub/sub messaging. + #[cfg(feature = "sink-nats")] + #[serde(rename = "nats")] + Nats(NatsSinkConfig), } diff --git a/src/core.rs b/src/core.rs index d9c2a62..62a90b7 100644 --- a/src/core.rs +++ b/src/core.rs @@ -103,6 +103,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { })?; AnySink::RedisStreams(s) } + + #[cfg(feature = "sink-nats")] + SinkConfig::Nats(cfg) => { + use crate::sink::nats::NatsSink; + let s = NatsSink::new(cfg.clone()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to create NATS sink", + e.to_string() + ) + })?; + AnySink::Nats(s) + } }; // Create PgStream as an ETL destination @@ -158,6 +171,11 @@ fn log_sink_config(config: &SinkConfig) { SinkConfig::RedisStreams(_cfg) => { debug!("using redis-streams sink"); } + + #[cfg(feature = "sink-nats")] + SinkConfig::Nats(_cfg) => { + debug!("using nats sink"); + } } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 0adcb79..0882026 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -1,6 +1,9 @@ mod base; pub mod memory; +#[cfg(feature = "sink-nats")] +pub mod nats; + #[cfg(feature = "sink-redis-strings")] pub mod redis_strings; @@ -12,6 +15,9 @@ pub use base::Sink; use etl::error::EtlResult; use memory::MemorySink; +#[cfg(feature = "sink-nats")] +use nats::NatsSink; + #[cfg(feature = "sink-redis-strings")] use redis_strings::RedisStringsSink; @@ -36,6 +42,10 @@ pub enum AnySink { /// Redis streams sink for append-only log storage. #[cfg(feature = "sink-redis-streams")] RedisStreams(RedisStreamsSink), + + /// NATS sink for pub/sub messaging. + #[cfg(feature = "sink-nats")] + Nats(NatsSink), } impl Sink for AnySink { @@ -52,6 +62,9 @@ impl Sink for AnySink { #[cfg(feature = "sink-redis-streams")] AnySink::RedisStreams(sink) => sink.publish_events(events).await, + + #[cfg(feature = "sink-nats")] + AnySink::Nats(sink) => sink.publish_events(events).await, } } } diff --git a/src/sink/nats.rs b/src/sink/nats.rs new file mode 100644 index 0000000..0667433 --- /dev/null +++ b/src/sink/nats.rs @@ -0,0 +1,172 @@ +//! NATS sink for publishing events to a NATS subject. +//! +//! Publishes each event's payload as a JSON message to a subject determined by: +//! 1. `topic` key in event metadata (from subscription's metadata/metadata_extensions) +//! 2. Fallback to `subject` in sink config +//! +//! # Dynamic Routing +//! +//! The target subject can be configured per-event using metadata_extensions: +//! +//! ```sql +//! metadata_extensions = '[ +//! {"json_path": "topic", "expression": "''events.'' || table_name"} +//! ]' +//! ``` + +use async_nats::Client; +use etl::error::EtlResult; +use futures::future::try_join_all; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use crate::sink::Sink; +use crate::types::TriggeredEvent; + +/// Configuration for the NATS sink. +/// +/// This intentionally does not implement [`Serialize`] to avoid accidentally +/// leaking secrets (URL credentials) in serialized forms. +#[derive(Clone, Debug, Deserialize)] +pub struct NatsSinkConfig { + /// NATS server URL (e.g., "nats://localhost:4222"). + /// Contains credentials and should be treated as sensitive. + pub url: String, + + /// Subject to publish messages to. Optional if provided via event metadata. + #[serde(default)] + pub subject: Option, +} + +/// Configuration for the NATS sink without sensitive data. +/// +/// Safe to serialize and log. Use this for debugging and metrics. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NatsSinkConfigWithoutSecrets { + /// Subject to publish messages to (if configured). + pub subject: Option, +} + +impl From for NatsSinkConfigWithoutSecrets { + fn from(config: NatsSinkConfig) -> Self { + Self { + subject: config.subject, + } + } +} + +impl From<&NatsSinkConfig> for NatsSinkConfigWithoutSecrets { + fn from(config: &NatsSinkConfig) -> Self { + Self { + subject: config.subject.clone(), + } + } +} + +/// Sink that publishes events to a NATS subject. +/// +/// Each event's payload is serialized as JSON and published to the subject. +/// The NATS client handles connection pooling and automatic reconnection. +#[derive(Clone)] +pub struct NatsSink { + /// Shared NATS client connection. + client: Arc, + + /// Default subject to publish messages to. Can be overridden per-event via metadata. + subject: Option, +} + +impl NatsSink { + /// Creates a new NATS sink from configuration. + /// + /// # Errors + /// + /// Returns an error if the NATS connection cannot be established. + pub async fn new( + config: NatsSinkConfig, + ) -> Result> { + let client = async_nats::connect(&config.url).await?; + + Ok(Self { + client: Arc::new(client), + subject: config.subject, + }) + } + + /// Resolves the subject for an event from metadata or config. + fn resolve_subject<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> { + // First check event metadata for dynamic subject (using generic "topic" key). + if let Some(ref metadata) = event.metadata { + if let Some(topic) = metadata.get("topic").and_then(|v| v.as_str()) { + return Some(topic); + } + } + // Fall back to config subject. + self.subject.as_deref() + } +} + +impl Sink for NatsSink { + fn name() -> &'static str { + "nats" + } + + async fn publish_events(&self, events: Vec) -> EtlResult<()> { + if events.is_empty() { + return Ok(()); + } + + // Publish all messages concurrently in a single pass. + try_join_all(events.into_iter().map(|event| { + let client = &self.client; + let subject_opt = self.resolve_subject(&event).map(|s| s.to_string()); + async move { + let subject = subject_opt.ok_or_else(|| { + etl::etl_error!( + etl::error::ErrorKind::ConfigError, + "No subject configured", + "Subject must be provided in sink config or event metadata (topic key)" + ) + })?; + + let bytes = serde_json::to_vec(&event.payload).map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to serialize payload to JSON", + e.to_string() + ) + })?; + + client.publish(subject, bytes.into()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to publish event to NATS", + e.to_string() + ) + }) + } + })) + .await?; + + // Flush to ensure all messages are sent. + self.client.flush().await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to flush NATS messages", + e.to_string() + ) + })?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sink_name() { + assert_eq!(NatsSink::name(), "nats"); + } +} diff --git a/src/test_utils/container.rs b/src/test_utils/container.rs index ff9bbed..3e7ead6 100644 --- a/src/test_utils/container.rs +++ b/src/test_utils/container.rs @@ -2,17 +2,20 @@ use ctor::dtor; use etl::config::{PgConnectionConfig, TlsConfig}; use std::sync::{Mutex, OnceLock}; use testcontainers::{ContainerRequest, ImageExt, runners::SyncRunner}; +use testcontainers_modules::nats::Nats; use testcontainers_modules::postgres::Postgres; use testcontainers_modules::redis::Redis; use uuid::Uuid; static POSTGRES_PORT: OnceLock = OnceLock::new(); static REDIS_PORT: OnceLock = OnceLock::new(); +static NATS_PORT: OnceLock = OnceLock::new(); // Using Mutex> so we can take ownership for cleanup. static POSTGRES_CONTAINER: OnceLock>>> = OnceLock::new(); static REDIS_CONTAINER: OnceLock>>> = OnceLock::new(); +static NATS_CONTAINER: OnceLock>>> = OnceLock::new(); /// Cleanup function that runs at program exit to stop and remove the postgres container. #[dtor] @@ -38,6 +41,18 @@ fn cleanup_redis_container() { } } +/// Cleanup function that runs at program exit to stop and remove the NATS container. +#[dtor] +fn cleanup_nats_container() { + if let Some(mutex) = NATS_CONTAINER.get() { + if let Ok(mut guard) = mutex.lock() { + if let Some(container) = guard.take() { + let _ = container.rm(); + } + } + } +} + pub async fn ensure_postgres() -> u16 { // Use get_or_init to handle concurrent initialization attempts *POSTGRES_PORT.get_or_init(|| { @@ -112,3 +127,26 @@ pub async fn ensure_redis() -> u16 { .expect("Failed to join redis container startup thread") }) } + +/// Ensures a NATS container is running and returns its port. +/// +/// Uses singleton pattern to reuse the same container across tests. +pub async fn ensure_nats() -> u16 { + *NATS_PORT.get_or_init(|| { + std::thread::spawn(|| { + let container: ContainerRequest = Nats::default().into(); + + let container = container.start().expect("Failed to start nats container"); + + let port = container + .get_host_port_ipv4(4222) + .expect("Failed to get nats container port"); + + let _ = NATS_CONTAINER.set(Mutex::new(Some(container))); + + port + }) + .join() + .expect("Failed to join nats container startup thread") + }) +} diff --git a/tests/nats_sink_tests.rs b/tests/nats_sink_tests.rs new file mode 100644 index 0000000..e4fdee4 --- /dev/null +++ b/tests/nats_sink_tests.rs @@ -0,0 +1,175 @@ +//! Integration tests for the NATS sink. + +#![cfg(feature = "sink-nats")] + +use postgres_stream::sink::Sink; +use postgres_stream::sink::nats::{NatsSink, NatsSinkConfig}; +use postgres_stream::test_utils::ensure_nats; +use postgres_stream::types::{EventIdentifier, StreamId, TriggeredEvent}; + +use chrono::Utc; +use futures::StreamExt; + +/// Creates a test event with the given ID. +fn make_test_event(id: &str) -> TriggeredEvent { + TriggeredEvent { + id: EventIdentifier::new(id.to_string(), Utc::now()), + payload: serde_json::json!({ + "test_id": id, + "message": format!("Test event {}", id), + }), + metadata: Some(serde_json::json!({ "source": "test" })), + stream_id: StreamId::from(1u64), + lsn: Some("0/16B3748".parse().unwrap()), + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_nats_sink_publishes_events() { + let nats_port = ensure_nats().await; + let nats_url = format!("nats://127.0.0.1:{nats_port}"); + let subject = "pgstream.test-events"; + + let config = NatsSinkConfig { + url: nats_url.clone(), + subject: Some(subject.to_string()), + }; + + let sink = NatsSink::new(config) + .await + .expect("Failed to create NATS sink"); + + // Subscribe before publishing to verify messages. + let client = async_nats::connect(&nats_url) + .await + .expect("Failed to connect to NATS"); + + let mut subscriber = client + .subscribe(subject.to_string()) + .await + .expect("Failed to subscribe"); + + // Give the subscription time to fully establish. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Publish test events. + let events = vec![ + make_test_event("nats-event-1"), + make_test_event("nats-event-2"), + ]; + sink.publish_events(events) + .await + .expect("Failed to publish events"); + + // Verify events were received. + let msg1 = tokio::time::timeout(std::time::Duration::from_secs(5), subscriber.next()) + .await + .expect("Timeout waiting for first message") + .expect("Expected first message"); + + let msg2 = tokio::time::timeout(std::time::Duration::from_secs(5), subscriber.next()) + .await + .expect("Timeout waiting for second message") + .expect("Expected second message"); + + // Parse and verify message content - only payload is sent. + let payload1: serde_json::Value = + serde_json::from_slice(&msg1.payload).expect("Failed to parse first message"); + let payload2: serde_json::Value = + serde_json::from_slice(&msg2.payload).expect("Failed to parse second message"); + + // Only payload fields should be present. + assert!(payload1.get("test_id").is_some()); + assert!(payload1.get("message").is_some()); + assert!(payload2.get("test_id").is_some()); + assert!(payload2.get("message").is_some()); + // No envelope fields. + assert!(payload1.get("id").is_none()); + assert!(payload1.get("metadata").is_none()); + assert!(payload1.get("lsn").is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_nats_sink_empty_batch() { + let nats_port = ensure_nats().await; + let nats_url = format!("nats://127.0.0.1:{nats_port}"); + + let config = NatsSinkConfig { + url: nats_url, + subject: Some("pgstream.empty-test".to_string()), + }; + + let sink = NatsSink::new(config) + .await + .expect("Failed to create NATS sink"); + + // Empty batch should succeed without error. + sink.publish_events(vec![]) + .await + .expect("Empty batch should succeed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_nats_sink_uses_topic_from_metadata() { + let nats_port = ensure_nats().await; + let nats_url = format!("nats://127.0.0.1:{nats_port}"); + let subject = "pgstream.metadata-topic"; + + // Create sink without subject - will get it from metadata. + let config = NatsSinkConfig { + url: nats_url.clone(), + subject: None, + }; + + let sink = NatsSink::new(config) + .await + .expect("Failed to create NATS sink"); + + // Subscribe before publishing to verify messages. + let client = async_nats::connect(&nats_url) + .await + .expect("Failed to connect to NATS"); + + let mut subscriber = client + .subscribe(subject.to_string()) + .await + .expect("Failed to subscribe"); + + // Give the subscription time to fully establish. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Create event with topic in metadata. + let event = TriggeredEvent { + id: EventIdentifier::new("nats-metadata-event".to_string(), Utc::now()), + payload: serde_json::json!({ + "test_id": "metadata-event", + "message": "Test event with metadata topic", + }), + metadata: Some(serde_json::json!({ "topic": subject })), + stream_id: StreamId::from(1u64), + lsn: Some("0/16B3748".parse().unwrap()), + }; + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish events"); + + // Verify event was received. + let msg = tokio::time::timeout(std::time::Duration::from_secs(5), subscriber.next()) + .await + .expect("Timeout waiting for message") + .expect("Expected message"); + + // Parse and verify message content - only payload is sent. + let payload: serde_json::Value = + serde_json::from_slice(&msg.payload).expect("Failed to parse message"); + + assert_eq!(payload["test_id"], "metadata-event"); + assert!(payload.get("id").is_none()); + assert!(payload.get("metadata").is_none()); +} + +#[test] +fn test_sink_name() { + assert_eq!(NatsSink::name(), "nats"); +}