Skip to content

Commit 4d6369e

Browse files
authored
Merge pull request #4901 from DataDog/eric.firth/dsm-ruby
DSM Implementation for Ruby with Karafka instrumented
2 parents 333959d + 728b9c7 commit 4d6369e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2835
-5
lines changed

Rakefile

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ CORE_WITH_LIBDATADOG_API = [
2727
'spec/datadog/core/process_discovery_spec.rb',
2828
'spec/datadog/core/configuration/stable_config_spec.rb',
2929
'spec/datadog/core/ddsketch_spec.rb',
30+
'spec/datadog/data_streams/**/*_spec.rb',
31+
].freeze
32+
33+
# Data Streams Monitoring (DSM) requires libdatadog_api for DDSketch
34+
# Add new instrumentation libraries here as they gain DSM support
35+
DSM_ENABLED_LIBRARIES = [
36+
:kafka,
37+
:karafka
3038
].freeze
3139

3240
# rubocop:disable Metrics/BlockLength
@@ -82,8 +90,8 @@ namespace :spec do
8290
desc '' # "Explicitly hiding from `rake -T`"
8391
RSpec::Core::RakeTask.new(:main) do |t, args|
8492
t.pattern = 'spec/**/*_spec.rb'
85-
t.exclude_pattern = 'spec/**/{appsec/integration,contrib,benchmark,redis,auto_instrument,opentelemetry,profiling,crashtracking,error_tracking,rubocop}/**/*_spec.rb,' \
86-
' spec/**/{auto_instrument,opentelemetry,process_discovery,stable_config,ddsketch}_spec.rb,' \
93+
t.exclude_pattern = 'spec/**/{appsec/integration,contrib,benchmark,redis,auto_instrument,opentelemetry,profiling,crashtracking,error_tracking,rubocop,data_streams}/**/*_spec.rb,' \
94+
' spec/**/{auto_instrument,opentelemetry,process_discovery,stable_config,ddsketch}*_spec.rb,' \
8795
' spec/datadog/gem_packaging_spec.rb'
8896
t.rspec_opts = args.to_a.join(' ')
8997
end
@@ -295,6 +303,22 @@ namespace :spec do
295303
end
296304
end
297305

306+
# Ensure DSM-enabled contrib tests compile libdatadog_api before running (MRI Ruby only)
307+
# If compilation fails (e.g., new Ruby version without prebuilt extension), tests will skip via DDSketch.supported?
308+
unless RUBY_PLATFORM == 'java'
309+
task :compile_libdatadog_for_dsm do
310+
Rake::Task["compile:libdatadog_api.#{RUBY_VERSION[/\d+.\d+/]}_#{RUBY_PLATFORM}"].invoke
311+
rescue => e
312+
# Compilation failed (likely unsupported Ruby version) - tests will skip gracefully
313+
puts "Warning: libdatadog_api compilation failed: #{e.class}: #{e}"
314+
puts "DSM tests will be skipped for this Ruby version"
315+
end
316+
317+
DSM_ENABLED_LIBRARIES.each do |task_name|
318+
Rake::Task["spec:#{task_name}"].enhance([:compile_libdatadog_for_dsm])
319+
end
320+
end
321+
298322
namespace :appsec do
299323
task all: [
300324
:main,

Steepfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ target :datadog do
136136
ignore 'lib/datadog/core/utils/time.rb'
137137
ignore 'lib/datadog/core/vendor/multipart-post/multipart/post/multipartable.rb'
138138
ignore 'lib/datadog/core/worker.rb'
139+
ignore 'lib/datadog/data_streams/configuration/settings.rb'
139140
ignore 'lib/datadog/core/workers/async.rb'
140141
ignore 'lib/datadog/core/workers/interval_loop.rb'
141142
ignore 'lib/datadog/core/workers/polling.rb'

lib/datadog.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
require_relative 'datadog/profiling'
99
require_relative 'datadog/appsec'
1010
require_relative 'datadog/di'
11+
require_relative 'datadog/data_streams'
1112

1213
# Line probes will not work on Ruby < 2.6 because of lack of :script_compiled
1314
# trace point. Activate DI automatically on supported Ruby versions but

lib/datadog/core/configuration/components.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
require_relative '../crashtracking/component'
2020
require_relative '../environment/agent_info'
2121
require_relative '../process_discovery'
22+
require_relative '../../data_streams/processor'
2223

2324
module Datadog
2425
module Core
@@ -75,6 +76,20 @@ def build_crashtracker(settings, agent_settings, logger:)
7576

7677
Datadog::Core::Crashtracking::Component.build(settings, agent_settings, logger: logger)
7778
end
79+
80+
def build_data_streams(settings, agent_settings, logger)
81+
return unless settings.data_streams.enabled
82+
83+
Datadog::DataStreams::Processor.new(
84+
interval: settings.data_streams.interval,
85+
logger: logger,
86+
settings: settings,
87+
agent_settings: agent_settings
88+
)
89+
rescue => e
90+
logger.warn("Failed to initialize Data Streams Monitoring: #{e.class}: #{e}")
91+
nil
92+
end
7893
end
7994

8095
attr_reader \
@@ -90,7 +105,8 @@ def build_crashtracker(settings, agent_settings, logger:)
90105
:error_tracking,
91106
:dynamic_instrumentation,
92107
:appsec,
93-
:agent_info
108+
:agent_info,
109+
:data_streams
94110

95111
def initialize(settings)
96112
@settings = settings
@@ -126,6 +142,7 @@ def initialize(settings)
126142
@appsec = Datadog::AppSec::Component.build_appsec_component(settings, telemetry: telemetry)
127143
@dynamic_instrumentation = Datadog::DI::Component.build(settings, agent_settings, @logger, telemetry: telemetry)
128144
@error_tracking = Datadog::ErrorTracking::Component.build(settings, @tracer, @logger)
145+
@data_streams = self.class.build_data_streams(settings, agent_settings, @logger)
129146
@environment_logger_extra[:dynamic_instrumentation_enabled] = !!@dynamic_instrumentation
130147

131148
# Configure non-privileged components.
@@ -195,6 +212,9 @@ def shutdown!(replacement = nil)
195212
# Shutdown workers
196213
runtime_metrics.stop(true, close_metrics: false)
197214

215+
# Shutdown Data Streams Monitoring processor
216+
data_streams&.stop(true)
217+
198218
# Shutdown the old metrics, unless they are still being used.
199219
# (e.g. custom Statsd instances.)
200220
#

lib/datadog/core/configuration/supported_configurations.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ module Configuration
3333
"DD_APPSEC_WAF_DEBUG" => {version: ["A"]},
3434
"DD_APPSEC_WAF_TIMEOUT" => {version: ["A"]},
3535
"DD_CRASHTRACKING_ENABLED" => {version: ["A"]},
36+
"DD_DATA_STREAMS_ENABLED" => {version: ["A"]},
3637
"DD_DBM_PROPAGATION_MODE" => {version: ["A"]},
3738
"DD_DISABLE_DATADOG_RAILS" => {version: ["A"]},
3839
"DD_DYNAMIC_INSTRUMENTATION_ENABLED" => {version: ["A"]},

lib/datadog/core/ddsketch.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# frozen_string_literal: true
22

3-
require 'datadog/core'
4-
53
module Datadog
64
module Core
75
# Used to access ddsketch APIs.

lib/datadog/data_streams.rb

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'data_streams/processor'
4+
require_relative 'data_streams/pathway_context'
5+
require_relative 'data_streams/configuration/settings'
6+
require_relative 'data_streams/extensions'
7+
require_relative 'core/utils/time'
8+
9+
module Datadog
10+
# Datadog Data Streams Monitoring public API.
11+
#
12+
# The Datadog team ensures that public methods in this module
13+
# only receive backwards compatible changes, and breaking changes
14+
# will only occur in new major versions releases.
15+
# @public_api
16+
module DataStreams
17+
class << self
18+
# Set a produce checkpoint for Data Streams Monitoring
19+
#
20+
# @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns')
21+
# @param destination [String] The destination (e.g., topic, exchange, stream name)
22+
# @param auto_instrumentation [Boolean] Whether this checkpoint was set by auto-instrumentation (default: false)
23+
# @param tags [Hash] Additional tags to include
24+
# @yield [key, value] Block to inject context into carrier
25+
# @return [String, nil] Base64 encoded pathway context or nil if disabled
26+
# @public_api
27+
def set_produce_checkpoint(type:, destination:, auto_instrumentation: false, tags: {}, &block)
28+
processor&.set_produce_checkpoint(
29+
type: type,
30+
destination: destination,
31+
manual_checkpoint: !auto_instrumentation,
32+
tags: tags,
33+
&block
34+
)
35+
end
36+
37+
# Set a consume checkpoint for Data Streams Monitoring
38+
#
39+
# @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns')
40+
# @param source [String] The source (e.g., topic, exchange, stream name)
41+
# @param auto_instrumentation [Boolean] Whether this checkpoint was set by auto-instrumentation (default: false)
42+
# @param tags [Hash] Additional tags to include
43+
# @yield [key] Block to extract context from carrier
44+
# @return [String, nil] Base64 encoded pathway context or nil if disabled
45+
# @public_api
46+
def set_consume_checkpoint(type:, source:, auto_instrumentation: false, tags: {}, &block)
47+
processor&.set_consume_checkpoint(
48+
type: type,
49+
source: source,
50+
manual_checkpoint: !auto_instrumentation,
51+
tags: tags,
52+
&block
53+
)
54+
end
55+
56+
# Track Kafka produce offset for lag monitoring
57+
#
58+
# @param topic [String] The Kafka topic name
59+
# @param partition [Integer] The partition number
60+
# @param offset [Integer] The offset of the produced message
61+
# @return [Boolean, nil] true if tracking succeeded, nil if disabled
62+
# @!visibility private
63+
def track_kafka_produce(topic, partition, offset)
64+
processor&.track_kafka_produce(topic, partition, offset, Core::Utils::Time.now)
65+
end
66+
67+
# Track Kafka message consumption for consumer lag monitoring
68+
#
69+
# @param topic [String] The Kafka topic name
70+
# @param partition [Integer] The partition number
71+
# @param offset [Integer] The offset of the consumed message
72+
# @return [Boolean, nil] true if tracking succeeded, nil if disabled
73+
# @!visibility private
74+
def track_kafka_consume(topic, partition, offset)
75+
processor&.track_kafka_consume(topic, partition, offset, Core::Utils::Time.now)
76+
end
77+
78+
# Check if Data Streams Monitoring is enabled and available
79+
#
80+
# @return [Boolean] true if the processor is available
81+
# @public_api
82+
def enabled?
83+
!processor.nil?
84+
end
85+
86+
private
87+
88+
def processor
89+
components.data_streams
90+
end
91+
92+
def components
93+
Datadog.send(:components)
94+
end
95+
end
96+
97+
# Expose Data Streams to global shared objects
98+
Extensions.activate!
99+
end
100+
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'configuration/settings'
4+
5+
module Datadog
6+
module DataStreams
7+
# Configuration for Data Streams Monitoring
8+
module Configuration
9+
end
10+
end
11+
end
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../../core/environment/variable_helpers'
4+
require_relative '../ext'
5+
6+
module Datadog
7+
module DataStreams
8+
module Configuration
9+
# Configuration settings for Data Streams Monitoring.
10+
module Settings
11+
def self.extended(base)
12+
base = base.singleton_class unless base.is_a?(Class)
13+
add_settings!(base)
14+
end
15+
16+
def self.add_settings!(base)
17+
base.class_eval do
18+
# Data Streams Monitoring configuration
19+
# @public_api
20+
settings :data_streams do
21+
# Whether Data Streams Monitoring is enabled. When enabled, the library will
22+
# collect and report data lineage information for messaging systems.
23+
#
24+
# @default `DD_DATA_STREAMS_ENABLED` environment variable, otherwise `false`.
25+
# @return [Boolean]
26+
option :enabled do |o|
27+
o.type :bool
28+
o.env Ext::ENV_ENABLED
29+
o.default false
30+
end
31+
32+
# The interval (in seconds) at which Data Streams Monitoring stats are flushed.
33+
#
34+
# @default 10.0
35+
# @env '_DD_TRACE_STATS_WRITER_INTERVAL'
36+
# @return [Float]
37+
# @!visibility private
38+
option :interval do |o|
39+
o.type :float
40+
o.env '_DD_TRACE_STATS_WRITER_INTERVAL'
41+
o.default 10.0
42+
end
43+
end
44+
end
45+
end
46+
end
47+
end
48+
end
49+
end

lib/datadog/data_streams/ext.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# frozen_string_literal: true
2+
3+
module Datadog
4+
module DataStreams
5+
# Constants for Data Streams Monitoring configuration
6+
# @public_api
7+
module Ext
8+
ENV_ENABLED = 'DD_DATA_STREAMS_ENABLED'
9+
end
10+
end
11+
end

0 commit comments

Comments
 (0)