Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 293 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.1.0"

[features]
default = []
sink-nats = ["dep:async-nats"]
sink-redis-streams = ["dep:redis"]
sink-redis-strings = ["dep:redis"]
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]
Expand Down Expand Up @@ -49,6 +50,7 @@ etl-postgres = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16a
uuid = { version = "1.19.0", default-features = false, features = ["v4"] }

# Optional sink dependencies.
async-nats = { version = "0.38", optional = true }
redis = { version = "0.27", default-features = false, features = [
"tokio-comp",
"connection-manager",
Expand All @@ -65,7 +67,7 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [

ctor = { version = "0.4", optional = true }
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "redis", "blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "redis", "nats", "blocking"] }

[dev-dependencies]
temp-env = "0.3"
Expand Down
8 changes: 8 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use serde::Deserialize;

#[cfg(feature = "sink-nats")]
use crate::sink::nats::NatsSinkConfig;

#[cfg(feature = "sink-redis-strings")]
use crate::sink::redis_strings::RedisStringsSinkConfig;

Expand All @@ -24,4 +27,9 @@ pub enum SinkConfig {
#[cfg(feature = "sink-redis-streams")]
#[serde(rename = "redis-streams")]
RedisStreams(RedisStreamsSinkConfig),

/// NATS sink for pub/sub messaging.
#[cfg(feature = "sink-nats")]
#[serde(rename = "nats")]
Nats(NatsSinkConfig),
}
18 changes: 18 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
})?;
AnySink::RedisStreams(s)
}

#[cfg(feature = "sink-nats")]
SinkConfig::Nats(cfg) => {
use crate::sink::nats::NatsSink;
let s = NatsSink::new(cfg.clone()).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::InvalidData,
"Failed to create NATS sink",
e.to_string()
)
})?;
AnySink::Nats(s)
}
};

// Create PgStream as an ETL destination
Expand Down Expand Up @@ -158,6 +171,11 @@ fn log_sink_config(config: &SinkConfig) {
SinkConfig::RedisStreams(_cfg) => {
debug!("using redis-streams sink");
}

#[cfg(feature = "sink-nats")]
SinkConfig::Nats(_cfg) => {
debug!("using nats sink");
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
mod base;
pub mod memory;

#[cfg(feature = "sink-nats")]
pub mod nats;

#[cfg(feature = "sink-redis-strings")]
pub mod redis_strings;

Expand All @@ -12,6 +15,9 @@ pub use base::Sink;
use etl::error::EtlResult;
use memory::MemorySink;

#[cfg(feature = "sink-nats")]
use nats::NatsSink;

#[cfg(feature = "sink-redis-strings")]
use redis_strings::RedisStringsSink;

Expand All @@ -36,6 +42,10 @@ pub enum AnySink {
/// Redis streams sink for append-only log storage.
#[cfg(feature = "sink-redis-streams")]
RedisStreams(RedisStreamsSink),

/// NATS sink for pub/sub messaging.
#[cfg(feature = "sink-nats")]
Nats(NatsSink),
}

impl Sink for AnySink {
Expand All @@ -52,6 +62,9 @@ impl Sink for AnySink {

#[cfg(feature = "sink-redis-streams")]
AnySink::RedisStreams(sink) => sink.publish_events(events).await,

#[cfg(feature = "sink-nats")]
AnySink::Nats(sink) => sink.publish_events(events).await,
}
}
}
172 changes: 172 additions & 0 deletions src/sink/nats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//! NATS sink for publishing events to a NATS subject.
//!
//! Publishes each event's payload as a JSON message to a subject determined by:
//! 1. `topic` key in event metadata (from subscription's metadata/metadata_extensions)
//! 2. Fallback to `subject` in sink config
//!
//! # Dynamic Routing
//!
//! The target subject can be configured per-event using metadata_extensions:
//!
//! ```sql
//! metadata_extensions = '[
//! {"json_path": "topic", "expression": "''events.'' || table_name"}
//! ]'
//! ```

use async_nats::Client;
use etl::error::EtlResult;
use futures::future::try_join_all;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

use crate::sink::Sink;
use crate::types::TriggeredEvent;

/// Configuration for the NATS sink.
///
/// This intentionally does not implement [`Serialize`] to avoid accidentally
/// leaking secrets (URL credentials) in serialized forms.
#[derive(Clone, Debug, Deserialize)]
pub struct NatsSinkConfig {
/// NATS server URL (e.g., "nats://localhost:4222").
/// Contains credentials and should be treated as sensitive.
pub url: String,

/// Subject to publish messages to. Optional if provided via event metadata.
#[serde(default)]
pub subject: Option<String>,
}

/// Configuration for the NATS sink without sensitive data.
///
/// Safe to serialize and log. Use this for debugging and metrics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NatsSinkConfigWithoutSecrets {
/// Subject to publish messages to (if configured).
pub subject: Option<String>,
}

impl From<NatsSinkConfig> for NatsSinkConfigWithoutSecrets {
fn from(config: NatsSinkConfig) -> Self {
Self {
subject: config.subject,
}
}
}

impl From<&NatsSinkConfig> for NatsSinkConfigWithoutSecrets {
fn from(config: &NatsSinkConfig) -> Self {
Self {
subject: config.subject.clone(),
}
}
}

/// Sink that publishes events to a NATS subject.
///
/// Each event's payload is serialized as JSON and published to the subject.
/// The NATS client handles connection pooling and automatic reconnection.
#[derive(Clone)]
pub struct NatsSink {
/// Shared NATS client connection.
client: Arc<Client>,

/// Default subject to publish messages to. Can be overridden per-event via metadata.
subject: Option<String>,
}

impl NatsSink {
/// Creates a new NATS sink from configuration.
///
/// # Errors
///
/// Returns an error if the NATS connection cannot be established.
pub async fn new(
config: NatsSinkConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = async_nats::connect(&config.url).await?;

Ok(Self {
client: Arc::new(client),
subject: config.subject,
})
}

/// Resolves the subject for an event from metadata or config.
fn resolve_subject<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
// First check event metadata for dynamic subject (using generic "topic" key).
if let Some(ref metadata) = event.metadata {
if let Some(topic) = metadata.get("topic").and_then(|v| v.as_str()) {
return Some(topic);
}
}
// Fall back to config subject.
self.subject.as_deref()
}
}

impl Sink for NatsSink {
fn name() -> &'static str {
"nats"
}

async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
if events.is_empty() {
return Ok(());
}

// Publish all messages concurrently in a single pass.
try_join_all(events.into_iter().map(|event| {
let client = &self.client;
let subject_opt = self.resolve_subject(&event).map(|s| s.to_string());
async move {
let subject = subject_opt.ok_or_else(|| {
etl::etl_error!(
etl::error::ErrorKind::ConfigError,
"No subject configured",
"Subject must be provided in sink config or event metadata (topic key)"
)
})?;

let bytes = serde_json::to_vec(&event.payload).map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::InvalidData,
"Failed to serialize payload to JSON",
e.to_string()
)
})?;

client.publish(subject, bytes.into()).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to publish event to NATS",
e.to_string()
)
})
}
}))
.await?;

// Flush to ensure all messages are sent.
self.client.flush().await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to flush NATS messages",
e.to_string()
)
})?;

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_sink_name() {
assert_eq!(NatsSink::name(), "nats");
}
}
38 changes: 38 additions & 0 deletions src/test_utils/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use ctor::dtor;
use etl::config::{PgConnectionConfig, TlsConfig};
use std::sync::{Mutex, OnceLock};
use testcontainers::{ContainerRequest, ImageExt, runners::SyncRunner};
use testcontainers_modules::nats::Nats;
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::redis::Redis;
use uuid::Uuid;

static POSTGRES_PORT: OnceLock<u16> = OnceLock::new();
static REDIS_PORT: OnceLock<u16> = OnceLock::new();
static NATS_PORT: OnceLock<u16> = OnceLock::new();

// Using Mutex<Option<...>> so we can take ownership for cleanup.
static POSTGRES_CONTAINER: OnceLock<Mutex<Option<testcontainers::Container<Postgres>>>> =
OnceLock::new();
static REDIS_CONTAINER: OnceLock<Mutex<Option<testcontainers::Container<Redis>>>> = OnceLock::new();
static NATS_CONTAINER: OnceLock<Mutex<Option<testcontainers::Container<Nats>>>> = OnceLock::new();

/// Cleanup function that runs at program exit to stop and remove the postgres container.
#[dtor]
Expand All @@ -38,6 +41,18 @@ fn cleanup_redis_container() {
}
}

/// Cleanup function that runs at program exit to stop and remove the NATS container.
#[dtor]
fn cleanup_nats_container() {
if let Some(mutex) = NATS_CONTAINER.get() {
if let Ok(mut guard) = mutex.lock() {
if let Some(container) = guard.take() {
let _ = container.rm();
}
}
}
}

pub async fn ensure_postgres() -> u16 {
// Use get_or_init to handle concurrent initialization attempts
*POSTGRES_PORT.get_or_init(|| {
Expand Down Expand Up @@ -112,3 +127,26 @@ pub async fn ensure_redis() -> u16 {
.expect("Failed to join redis container startup thread")
})
}

/// Ensures a NATS container is running and returns its port.
///
/// Uses singleton pattern to reuse the same container across tests.
pub async fn ensure_nats() -> u16 {
*NATS_PORT.get_or_init(|| {
std::thread::spawn(|| {
let container: ContainerRequest<Nats> = Nats::default().into();

let container = container.start().expect("Failed to start nats container");

let port = container
.get_host_port_ipv4(4222)
.expect("Failed to get nats container port");

let _ = NATS_CONTAINER.set(Mutex::new(Some(container)));

port
})
.join()
.expect("Failed to join nats container startup thread")
})
}
Loading