Skip to content

Commit b618b67

Browse files
psteinroeclaude
andcommitted
refactor: update NATS sink for dynamic routing and payload-only output
- Make subject config optional (can come from event metadata key "topic") - Add resolve_subject() for dynamic routing from metadata - Send only event.payload instead of full event envelope - Remove info! logging - Use DestinationError for publish failures, ConfigError for missing config - Update tests to verify payload-only content and metadata-based routing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 545b604 commit b618b67

2 files changed

Lines changed: 112 additions & 50 deletions

File tree

src/sink/nats.rs

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
//! NATS sink for publishing events to a NATS subject.
22
//!
3-
//! Publishes each event as a JSON message to the configured subject.
4-
//! The sink maintains a persistent connection with automatic reconnection.
3+
//! Publishes each event's payload as a JSON message to a subject determined by:
4+
//! 1. `topic` key in event metadata (from subscription's metadata/metadata_extensions)
5+
//! 2. Fallback to `subject` in sink config
56
//!
6-
//! # Payload Extensions
7+
//! # Dynamic Routing
78
//!
8-
//! This sink does not require any specific payload extensions. However, you can
9-
//! use payload extensions to add routing metadata for NATS subject hierarchies:
9+
//! The target subject can be configured per-event using metadata_extensions:
1010
//!
1111
//! ```sql
12-
//! payload_extensions = '[
13-
//! {"key": "subject_suffix", "value": "orders.created"}
12+
//! metadata_extensions = '[
13+
//! {"json_path": "topic", "expression": "''events.'' || table_name"}
1414
//! ]'
1515
//! ```
1616
1717
use async_nats::Client;
1818
use etl::error::EtlResult;
1919
use serde::{Deserialize, Serialize};
2020
use std::sync::Arc;
21-
use tracing::info;
2221

2322
use crate::sink::Sink;
2423
use crate::types::TriggeredEvent;
@@ -33,17 +32,18 @@ pub struct NatsSinkConfig {
3332
/// Contains credentials and should be treated as sensitive.
3433
pub url: String,
3534

36-
/// Subject to publish messages to.
37-
pub subject: String,
35+
/// Subject to publish messages to. Optional if provided via event metadata.
36+
#[serde(default)]
37+
pub subject: Option<String>,
3838
}
3939

4040
/// Configuration for the NATS sink without sensitive data.
4141
///
4242
/// Safe to serialize and log. Use this for debugging and metrics.
4343
#[derive(Clone, Debug, Serialize, Deserialize)]
4444
pub struct NatsSinkConfigWithoutSecrets {
45-
/// Subject to publish messages to.
46-
pub subject: String,
45+
/// Subject to publish messages to (if configured).
46+
pub subject: Option<String>,
4747
}
4848

4949
impl From<NatsSinkConfig> for NatsSinkConfigWithoutSecrets {
@@ -64,15 +64,15 @@ impl From<&NatsSinkConfig> for NatsSinkConfigWithoutSecrets {
6464

6565
/// Sink that publishes events to a NATS subject.
6666
///
67-
/// Each event is serialized as JSON and published to the configured subject.
67+
/// Each event's payload is serialized as JSON and published to the subject.
6868
/// The NATS client handles connection pooling and automatic reconnection.
6969
#[derive(Clone)]
7070
pub struct NatsSink {
7171
/// Shared NATS client connection.
7272
client: Arc<Client>,
7373

74-
/// Subject to publish messages to.
75-
subject: String,
74+
/// Default subject to publish messages to. Can be overridden per-event via metadata.
75+
subject: Option<String>,
7676
}
7777

7878
impl NatsSink {
@@ -91,6 +91,18 @@ impl NatsSink {
9191
subject: config.subject,
9292
})
9393
}
94+
95+
/// Resolves the subject for an event from metadata or config.
96+
fn resolve_subject<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
97+
// First check event metadata for dynamic subject (using generic "topic" key).
98+
if let Some(ref metadata) = event.metadata {
99+
if let Some(topic) = metadata.get("topic").and_then(|v| v.as_str()) {
100+
return Some(topic);
101+
}
102+
}
103+
// Fall back to config subject.
104+
self.subject.as_deref()
105+
}
94106
}
95107

96108
impl Sink for NatsSink {
@@ -103,44 +115,32 @@ impl Sink for NatsSink {
103115
return Ok(());
104116
}
105117

106-
info!(
107-
"publishing {} events to NATS subject '{}'",
108-
events.len(),
109-
self.subject
110-
);
111-
112118
for event in &events {
113-
// Build JSON object manually since TriggeredEvent doesn't implement Serialize.
114-
let mut json_obj = serde_json::json!({
115-
"id": event.id.id,
116-
"created_at": event.id.created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
117-
"payload": event.payload,
118-
"stream_id": format!("{:?}", event.stream_id),
119-
});
120-
121-
// Add optional fields.
122-
if let Some(ref metadata) = event.metadata {
123-
json_obj["metadata"] = metadata.clone();
124-
}
125-
if let Some(lsn) = event.lsn {
126-
json_obj["lsn"] = serde_json::json!(lsn.to_string());
127-
}
119+
// Resolve subject from event metadata or config.
120+
let subject = self.resolve_subject(event).ok_or_else(|| {
121+
etl::etl_error!(
122+
etl::error::ErrorKind::ConfigError,
123+
"No subject configured",
124+
"Subject must be provided in sink config or event metadata (topic key)"
125+
)
126+
})?;
128127

129-
let payload = serde_json::to_vec(&json_obj).map_err(|e| {
128+
// Serialize payload to JSON.
129+
let payload = serde_json::to_vec(&event.payload).map_err(|e| {
130130
etl::etl_error!(
131131
etl::error::ErrorKind::InvalidData,
132-
"Failed to serialize event to JSON",
132+
"Failed to serialize payload to JSON",
133133
e.to_string()
134134
)
135135
})?;
136136

137-
// Publish to the configured subject.
137+
// Publish to the resolved subject.
138138
self.client
139-
.publish(self.subject.clone(), payload.into())
139+
.publish(subject.to_string(), payload.into())
140140
.await
141141
.map_err(|e| {
142142
etl::etl_error!(
143-
etl::error::ErrorKind::InvalidData,
143+
etl::error::ErrorKind::DestinationError,
144144
"Failed to publish event to NATS",
145145
e.to_string()
146146
)

tests/nats_sink_tests.rs

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async fn test_nats_sink_publishes_events() {
3232

3333
let config = NatsSinkConfig {
3434
url: nats_url.clone(),
35-
subject: subject.to_string(),
35+
subject: Some(subject.to_string()),
3636
};
3737

3838
let sink = NatsSink::new(config)
@@ -66,16 +66,21 @@ async fn test_nats_sink_publishes_events() {
6666
.expect("Timeout waiting for second message")
6767
.expect("Expected second message");
6868

69-
// Parse and verify message content.
70-
let event1: serde_json::Value =
69+
// Parse and verify message content - only payload is sent.
70+
let payload1: serde_json::Value =
7171
serde_json::from_slice(&msg1.payload).expect("Failed to parse first message");
72-
let event2: serde_json::Value =
72+
let payload2: serde_json::Value =
7373
serde_json::from_slice(&msg2.payload).expect("Failed to parse second message");
7474

75-
assert!(event1.get("id").is_some());
76-
assert!(event1.get("payload").is_some());
77-
assert!(event2.get("id").is_some());
78-
assert!(event2.get("payload").is_some());
75+
// Only payload fields should be present.
76+
assert!(payload1.get("test_id").is_some());
77+
assert!(payload1.get("message").is_some());
78+
assert!(payload2.get("test_id").is_some());
79+
assert!(payload2.get("message").is_some());
80+
// No envelope fields.
81+
assert!(payload1.get("id").is_none());
82+
assert!(payload1.get("metadata").is_none());
83+
assert!(payload1.get("lsn").is_none());
7984
}
8085

8186
#[tokio::test(flavor = "multi_thread")]
@@ -85,7 +90,7 @@ async fn test_nats_sink_empty_batch() {
8590

8691
let config = NatsSinkConfig {
8792
url: nats_url,
88-
subject: "pgstream.empty-test".to_string(),
93+
subject: Some("pgstream.empty-test".to_string()),
8994
};
9095

9196
let sink = NatsSink::new(config)
@@ -98,6 +103,63 @@ async fn test_nats_sink_empty_batch() {
98103
.expect("Empty batch should succeed");
99104
}
100105

106+
#[tokio::test(flavor = "multi_thread")]
107+
async fn test_nats_sink_uses_topic_from_metadata() {
108+
let nats_port = ensure_nats().await;
109+
let nats_url = format!("nats://127.0.0.1:{}", nats_port);
110+
let subject = "pgstream.metadata-topic";
111+
112+
// Create sink without subject - will get it from metadata.
113+
let config = NatsSinkConfig {
114+
url: nats_url.clone(),
115+
subject: None,
116+
};
117+
118+
let sink = NatsSink::new(config)
119+
.await
120+
.expect("Failed to create NATS sink");
121+
122+
// Subscribe before publishing to verify messages.
123+
let client = async_nats::connect(&nats_url)
124+
.await
125+
.expect("Failed to connect to NATS");
126+
127+
let mut subscriber = client
128+
.subscribe(subject.to_string())
129+
.await
130+
.expect("Failed to subscribe");
131+
132+
// Create event with topic in metadata.
133+
let event = TriggeredEvent {
134+
id: EventIdentifier::new("nats-metadata-event".to_string(), Utc::now()),
135+
payload: serde_json::json!({
136+
"test_id": "metadata-event",
137+
"message": "Test event with metadata topic",
138+
}),
139+
metadata: Some(serde_json::json!({ "topic": subject })),
140+
stream_id: StreamId::from(1u64),
141+
lsn: Some("0/16B3748".parse().unwrap()),
142+
};
143+
144+
sink.publish_events(vec![event])
145+
.await
146+
.expect("Failed to publish events");
147+
148+
// Verify event was received.
149+
let msg = tokio::time::timeout(std::time::Duration::from_secs(5), subscriber.next())
150+
.await
151+
.expect("Timeout waiting for message")
152+
.expect("Expected message");
153+
154+
// Parse and verify message content - only payload is sent.
155+
let payload: serde_json::Value =
156+
serde_json::from_slice(&msg.payload).expect("Failed to parse message");
157+
158+
assert_eq!(payload["test_id"], "metadata-event");
159+
assert!(payload.get("id").is_none());
160+
assert!(payload.get("metadata").is_none());
161+
}
162+
101163
#[test]
102164
fn test_sink_name() {
103165
assert_eq!(NatsSink::name(), "nats");

0 commit comments

Comments
 (0)