Skip to content

Commit b2b06c0

Browse files
committed
Simple produce checkpoint
1 parent 5283ac0 commit b2b06c0

File tree

7 files changed

+97
-24
lines changed

7 files changed

+97
-24
lines changed

lib/datadog/tracing/contrib/waterdrop/middleware.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ def call(message)
1919
WaterDrop.inject(trace_op.to_digest, message[:headers] ||= {})
2020
end
2121

22+
if Datadog::DataStreams.enabled?
23+
Datadog::DataStreams.set_produce_checkpoint(
24+
type: 'kafka',
25+
destination: message[:topic],
26+
auto_instrumentation: true
27+
) do |key, value|
28+
message[:headers] ||= {}
29+
message[:headers][key] = value
30+
end
31+
end
32+
2233
message
2334
end
2435

lib/datadog/tracing/contrib/waterdrop/patcher.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ def patch
2828

2929
included_middlewares = producer.middleware.instance_variable_get(:@steps)
3030
producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware)
31+
32+
producer.monitor.subscribe('message.acknowledged') do |ack_event|
33+
if Datadog::DataStreams.enabled?
34+
payload = ack_event.payload
35+
Datadog::DataStreams.track_kafka_produce(payload[:topic], payload[:partition], payload[:offset])
36+
end
37+
end
3138
end
3239
end
3340
end

spec/datadog/tracing/contrib/karafka/monitor_spec.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22

33
require 'datadog/tracing/contrib/support/spec_helper'
44

5-
# FFI::Function background native thread
6-
ThreadHelpers.with_leaky_thread_creation(:rdkafka) do
7-
require 'karafka'
8-
end
5+
require 'karafka'
96
require 'datadog'
107

118
RSpec.describe 'Karafka monitor' do

spec/datadog/tracing/contrib/waterdrop/distributed/propagation_spec.rb

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'datadog/tracing/contrib/support/spec_helper'
4-
5-
# FFI::Function background native thread
6-
ThreadHelpers.with_leaky_thread_creation(:rdkafka) do
7-
require 'waterdrop'
8-
end
4+
require 'waterdrop'
95
require 'datadog'
106

117
RSpec.describe Datadog::Tracing::Contrib::WaterDrop::Distributed::Propagation do

spec/datadog/tracing/contrib/waterdrop/middleware_spec.rb

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'datadog/tracing/contrib/support/spec_helper'
4-
5-
# FFI::Function background native thread
6-
ThreadHelpers.with_leaky_thread_creation(:rdkafka) do
7-
require 'waterdrop'
8-
end
4+
require 'waterdrop'
95
require 'datadog'
106

117
RSpec.describe 'WaterDrop middleware' do
@@ -60,5 +56,65 @@
6056
)
6157
end
6258
end
59+
60+
context 'when DataStreams is enabled' do
61+
before do
62+
allow(Datadog::DataStreams).to receive(:enabled?).and_return(true)
63+
allow(Datadog::DataStreams).to receive(:set_produce_checkpoint) do |**_kwargs, &block|
64+
block.call('data_streams_key', 'data_streams_value')
65+
end
66+
end
67+
68+
it 'calls set_produce_checkpoint and injects headers' do
69+
message = {topic: 'some_topic', payload: 'hello'}
70+
71+
middleware.call(message)
72+
73+
expect(Datadog::DataStreams).to have_received(:set_produce_checkpoint).with(
74+
type: 'kafka',
75+
destination: 'some_topic',
76+
auto_instrumentation: true
77+
)
78+
expect(message[:headers]).to include('data_streams_key' => 'data_streams_value')
79+
end
80+
81+
it 'initializes headers if not present' do
82+
message = {topic: 'some_topic', payload: 'hello'}
83+
84+
middleware.call(message)
85+
86+
expect(Datadog::DataStreams).to have_received(:set_produce_checkpoint).with(
87+
type: 'kafka',
88+
destination: 'some_topic',
89+
auto_instrumentation: true
90+
)
91+
end
92+
93+
it 'preserves existing headers' do
94+
message = {topic: 'some_topic', payload: 'hello', headers: {'existing' => 'header'}}
95+
96+
middleware.call(message)
97+
98+
expect(message[:headers]).to include(
99+
'data_streams_key' => 'data_streams_value',
100+
'existing' => 'header'
101+
)
102+
end
103+
end
104+
105+
context 'when DataStreams is disabled' do
106+
before do
107+
allow(Datadog::DataStreams).to receive(:enabled?).and_return(false)
108+
allow(Datadog::DataStreams).to receive(:set_produce_checkpoint)
109+
end
110+
111+
it 'does not call set_produce_checkpoint' do
112+
message = {topic: 'some_topic', payload: 'hello'}
113+
114+
middleware.call(message)
115+
116+
expect(Datadog::DataStreams).not_to have_received(:set_produce_checkpoint)
117+
end
118+
end
63119
end
64120
end

spec/datadog/tracing/contrib/waterdrop/patcher_spec.rb

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'datadog/tracing/contrib/support/spec_helper'
4-
5-
# FFI::Function background native thread
6-
ThreadHelpers.with_leaky_thread_creation(:rdkafka) do
7-
require 'waterdrop'
8-
end
4+
require 'waterdrop'
95
require 'datadog'
106

117
RSpec.describe 'Waterdrop patcher' do
@@ -72,5 +68,19 @@
7268
)
7369
end
7470
end
71+
72+
context 'when DataStreams is enabled' do
73+
before do
74+
allow(Datadog::DataStreams).to receive(:enabled?).and_return(true)
75+
end
76+
77+
it 'patches without errors' do
78+
expect do
79+
WaterDrop::Producer.new do |config|
80+
config.client_class = WaterDrop::Clients::Buffered
81+
end
82+
end.not_to raise_error
83+
end
84+
end
7585
end
7686
end

spec/datadog/tracing/contrib/waterdrop/producer_spec.rb

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'datadog/tracing/contrib/support/spec_helper'
4-
5-
# FFI::Function background native thread
6-
ThreadHelpers.with_leaky_thread_creation(:rdkafka) do
7-
require 'waterdrop'
8-
end
4+
require 'waterdrop'
95
require 'datadog'
106

117
RSpec.describe 'Waterdrop monitor' do

0 commit comments

Comments
 (0)