diff --git a/lib/datadog/tracing/contrib/karafka/framework.rb b/lib/datadog/tracing/contrib/karafka/framework.rb index 979af04310f..aca735a6956 100644 --- a/lib/datadog/tracing/contrib/karafka/framework.rb +++ b/lib/datadog/tracing/contrib/karafka/framework.rb @@ -9,18 +9,23 @@ module Karafka # - instrument parts of the framework when needed module Framework def self.setup + karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations + Datadog.configure do |datadog_config| - karafka_config = datadog_config.tracing[:karafka] - activate_waterdrop!(datadog_config, karafka_config) + karafka_configurations.each do |config_name, karafka_config| + activate_waterdrop!(datadog_config, config_name, karafka_config) + end end end # Apply relevant configuration from Karafka to WaterDrop - def self.activate_waterdrop!(datadog_config, karafka_config) + def self.activate_waterdrop!(datadog_config, config_name, karafka_config) datadog_config.tracing.instrument( :waterdrop, + enabled: karafka_config[:enabled], service_name: karafka_config[:service_name], distributed_tracing: karafka_config[:distributed_tracing], + describes: config_name ) end end diff --git a/lib/datadog/tracing/contrib/karafka/integration.rb b/lib/datadog/tracing/contrib/karafka/integration.rb index 8c0708b029a..45417c0c7db 100644 --- a/lib/datadog/tracing/contrib/karafka/integration.rb +++ b/lib/datadog/tracing/contrib/karafka/integration.rb @@ -38,6 +38,10 @@ def new_configuration def patcher Patcher end + + def resolver + @resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new + end end end end diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index 38fde6115e0..da70830f1e5 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -10,14 +10,6 @@ module Contrib module Karafka # Patch to add tracing to Karafka::Messages::Messages module MessagesPatch - def configuration - Datadog.configuration.tracing[:karafka] - end - - def propagation - @propagation ||= Contrib::Karafka::Distributed::Propagation.new - end - # `each` is the most popular access point to Karafka messages, # but not the only one # Other access patterns do not have a straightforward tracing avenue @@ -28,6 +20,7 @@ def each(&block) parent_trace_digest = Datadog::Tracing.active_trace&.to_digest @messages_array.each do |message| + configuration = datadog_configuration(message.topic) trace_digest = if configuration[:distributed_tracing] headers = if message.metadata.respond_to?(:raw_headers) message.metadata.raw_headers @@ -61,6 +54,12 @@ def each(&block) end end end + + private + + def datadog_configuration(topic) + Datadog.configuration.tracing[:karafka, topic] + end end module AppPatch diff --git a/lib/datadog/tracing/contrib/waterdrop/integration.rb b/lib/datadog/tracing/contrib/waterdrop/integration.rb index 7efe7e1d507..6e776157fe5 100644 --- a/lib/datadog/tracing/contrib/waterdrop/integration.rb +++ b/lib/datadog/tracing/contrib/waterdrop/integration.rb @@ -37,6 +37,10 @@ def new_configuration def patcher Patcher end + + def resolver + @resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new + end end end end diff --git a/lib/datadog/tracing/contrib/waterdrop/monitor.rb b/lib/datadog/tracing/contrib/waterdrop/monitor.rb index 480c5f8bb46..63dba40ccb0 100644 --- a/lib/datadog/tracing/contrib/waterdrop/monitor.rb +++ b/lib/datadog/tracing/contrib/waterdrop/monitor.rb @@ -18,10 +18,6 @@ module Monitor message.produced_sync ].freeze - def configuration - Datadog.configuration.tracing[:waterdrop] - end - def instrument(event_id, payload = {}, &block) return super unless TRACEABLE_EVENTS.include?(event_id) @@ -40,7 +36,7 @@ def instrument(event_id, payload = {}, &block) span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, payload[:messages].size) - payload[:messages].each { |message| inject(trace_digest, message) } if configuration[:distributed_tracing] + payload[:messages].each { |message| inject(trace_digest, message) } else action = event_id.sub('message.produced', 'produce') @@ -48,7 +44,7 @@ def instrument(event_id, payload = {}, &block) span.set_tag(Contrib::Karafka::Ext::TAG_PARTITION, payload[:message][:partition]) span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, 1) - inject(trace_digest, payload[:message]) if configuration[:distributed_tracing] + inject(trace_digest, payload[:message]) end span.resource = "waterdrop.#{action}" @@ -63,9 +59,17 @@ def instrument(event_id, payload = {}, &block) private def inject(trace_digest, message) + return unless datadog_configuration(message[:topic])[:distributed_tracing] + message[:headers] ||= {} WaterDrop.inject(trace_digest, message[:headers]) end + + # cache the configuration resolution per topic to avoid repeated lookups in message batches + def datadog_configuration(topic) + @datadog_configuration ||= {} + @datadog_configuration[topic] ||= Datadog.configuration.tracing[:waterdrop, topic] + end end end end diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index 0eee615c222..f981394d750 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -17,6 +17,7 @@ before do Datadog.configure do |c| c.tracing.instrument :karafka, configuration_options + c.tracing.instrument :karafka, describes: /special_/, distributed_tracing: false end end @@ -31,16 +32,12 @@ let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME } it 'is expected to send a span' do - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 + metadata = ::Karafka::Messages::Metadata.new(offset: 412, timestamp: Time.now, topic: 'topic_a') raw_payload = rand.to_s message = ::Karafka::Messages::Message.new(raw_payload, metadata) - allow(message).to receive(:timestamp).and_return(Time.now) - allow(message).to receive(:topic).and_return('topic_a') - - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(message.topic, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -55,6 +52,7 @@ end context 'when the message has tracing headers' do + let(:topic_name) { 'topic_a' } let(:message) do headers = {} producer_trace = nil @@ -64,15 +62,15 @@ producer_trace = trace Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers) end - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 - metadata[headers_accessor] = headers + metadata = ::Karafka::Messages::Metadata.new( + offset: 412, + headers_accessor => headers, + topic: topic_name, + timestamp: Time.now + ) raw_payload = rand.to_s - message = ::Karafka::Messages::Message.new(raw_payload, metadata) - allow(message).to receive(:timestamp).and_return(Time.now) - allow(message).to receive(:topic).and_return('topic_a') - message + ::Karafka::Messages::Message.new(raw_payload, metadata) end let(:headers_accessor) do ::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers' @@ -89,7 +87,7 @@ consumer_span = Datadog::Tracing.active_span consumer_trace = Datadog::Tracing.active_trace - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -113,6 +111,37 @@ end end + context 'when distributed tracing is disabled for the topic in particular' do + let(:topic_name) { 'special_topic' } + + it 'does not continue the span that produced the message' do + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + expect(spans).to have(3).items + + # assert that the message span is not continuation of the producer span + expect(span.parent_id).to eq(consumer_span.id) + expect(span.trace_id).to eq(consumer_trace.id) + + expect(span.links).to be_empty + expect(consumer_span.links).to be_empty + end + end + context 'when distributed tracing is not enabled' do let(:configuration_options) { { distributed_tracing: false } } @@ -124,7 +153,7 @@ consumer_span = Datadog::Tracing.active_span consumer_trace = Datadog::Tracing.active_trace - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -150,12 +179,11 @@ let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS } it 'is expected to send a span' do - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 + metadata = ::Karafka::Messages::Metadata.new(offset: 412, topic: 'topic_a') raw_payload = rand.to_s message = ::Karafka::Messages::Message.new(raw_payload, metadata) - job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message]) + job = double(executor: double(topic: double(name: message.topic, consumer: 'ABC'), partition: 0), messages: [message]) Karafka.monitor.instrument('worker.processed', { job: job }) do # Noop @@ -171,4 +199,33 @@ expect(span.resource).to eq 'ABC#consume' end end + + describe 'framework auto-instrumentation' do + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:waterdrop].reset_configuration! + example.run + Datadog.registry[:waterdrop].reset_configuration! + + # reset Karafka internal state as well + Karafka::App.config.internal.status.reset! + Karafka.refresh! + end + + it 'automatically enables waterdrop instrumentation' do + Karafka::App.setup do |c| + c.kafka = { 'bootstrap.servers': '127.0.0.1:9092' } + end + + expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true + expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true + expect(Datadog.configuration.tracing[:karafka, 'special_topic'][:enabled]).to be true + expect(Datadog.configuration.tracing[:karafka, 'special_topic'][:distributed_tracing]).to be false + + expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true + expect(Datadog.configuration.tracing[:waterdrop, 'special_topic'][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop, 'special_topic'][:distributed_tracing]).to be false + end + end end diff --git a/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb b/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb index 05f53fa565c..d736a49ac81 100644 --- a/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb +++ b/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb @@ -8,8 +8,6 @@ end require 'datadog' -puts "waterdrop version: #{WaterDrop::VERSION}" - RSpec.describe 'Waterdrop monitor' do before do Datadog.configure do |c|