Skip to content

Commit cd833a4

Browse files
committed
Simple produce checkpoint
1 parent 5283ac0 commit cd833a4

File tree

4 files changed

+92
-0
lines changed

4 files changed

+92
-0
lines changed

lib/datadog/tracing/contrib/waterdrop/middleware.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ def call(message)
1919
WaterDrop.inject(trace_op.to_digest, message[:headers] ||= {})
2020
end
2121

22+
if Datadog::DataStreams.enabled?
23+
Datadog::DataStreams.set_produce_checkpoint(
24+
type: 'kafka',
25+
destination: message[:topic],
26+
auto_instrumentation: true
27+
) do |key, value|
28+
message[:headers] ||= {}
29+
message[:headers][key] = value
30+
end
31+
end
32+
2233
message
2334
end
2435

lib/datadog/tracing/contrib/waterdrop/patcher.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ def patch
2828

2929
included_middlewares = producer.middleware.instance_variable_get(:@steps)
3030
producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware)
31+
32+
producer.monitor.subscribe('message.acknowledged') do |ack_event|
33+
if Datadog::DataStreams.enabled?
34+
payload = ack_event.payload
35+
Datadog::DataStreams.track_kafka_produce(payload[:topic], payload[:partition], payload[:offset])
36+
end
37+
end
3138
end
3239
end
3340
end

spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,65 @@
6060
)
6161
end
6262
end
63+
64+
context 'when DataStreams is enabled' do
65+
before do
66+
allow(Datadog::DataStreams).to receive(:enabled?).and_return(true)
67+
allow(Datadog::DataStreams).to receive(:set_produce_checkpoint) do |**_kwargs, &block|
68+
block.call('data_streams_key', 'data_streams_value')
69+
end
70+
end
71+
72+
it 'calls set_produce_checkpoint and injects headers' do
73+
message = {topic: 'some_topic', payload: 'hello'}
74+
75+
middleware.call(message)
76+
77+
expect(Datadog::DataStreams).to have_received(:set_produce_checkpoint).with(
78+
type: 'kafka',
79+
destination: 'some_topic',
80+
auto_instrumentation: true
81+
)
82+
expect(message[:headers]).to include('data_streams_key' => 'data_streams_value')
83+
end
84+
85+
it 'initializes headers if not present' do
86+
message = {topic: 'some_topic', payload: 'hello'}
87+
88+
middleware.call(message)
89+
90+
expect(Datadog::DataStreams).to have_received(:set_produce_checkpoint).with(
91+
type: 'kafka',
92+
destination: 'some_topic',
93+
auto_instrumentation: true
94+
)
95+
end
96+
97+
it 'preserves existing headers' do
98+
message = {topic: 'some_topic', payload: 'hello', headers: {'existing' => 'header'}}
99+
100+
middleware.call(message)
101+
102+
expect(message[:headers]).to include(
103+
'data_streams_key' => 'data_streams_value',
104+
'existing' => 'header'
105+
)
106+
end
107+
end
108+
109+
context 'when DataStreams is disabled' do
110+
before do
111+
allow(Datadog::DataStreams).to receive(:enabled?).and_return(false)
112+
allow(Datadog::DataStreams).to receive(:set_produce_checkpoint)
113+
end
114+
115+
it 'does not call set_produce_checkpoint' do
116+
message = {topic: 'some_topic', payload: 'hello'}
117+
118+
middleware.call(message)
119+
120+
expect(Datadog::DataStreams).not_to have_received(:set_produce_checkpoint)
121+
end
122+
end
63123
end
64124
end

spec/datadog/tracing/contrib/waterdrop/patcher_spec.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,19 @@
7272
)
7373
end
7474
end
75+
76+
context 'when DataStreams is enabled' do
77+
before do
78+
allow(Datadog::DataStreams).to receive(:enabled?).and_return(true)
79+
end
80+
81+
it 'patches without errors' do
82+
expect do
83+
WaterDrop::Producer.new do |config|
84+
config.client_class = WaterDrop::Clients::Buffered
85+
end
86+
end.not_to raise_error
87+
end
88+
end
7589
end
7690
end

0 commit comments

Comments
 (0)