Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions benchmarks/di_instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
# Need to require datadog/di explicitly because dynamic instrumentation is not
# currently integrated into the Ruby tracer due to being under development.
require 'datadog/di'
require 'datadog/di/proc_responder'

class DIInstrumentBenchmark
class Target
Expand Down Expand Up @@ -106,9 +107,11 @@ def run_benchmark
calls = 0
probe = Datadog::DI::Probe.new(id: 1, type: :log,
type_name: 'DIInstrumentBenchmark::Target', method_name: 'test_method')
rv = instrumenter.hook_method(probe) do
executed_proc = lambda do |context|
calls += 1
end
responder = Datadog::DI::ProcResponder.new(executed_proc)
rv = instrumenter.hook_method(probe, responder)
unless rv
raise "Method probe was not successfully installed"
end
Expand Down Expand Up @@ -149,9 +152,7 @@ def run_benchmark
calls = 0
probe = Datadog::DI::Probe.new(id: 1, type: :log,
file: file, line_no: line + 1)
rv = instrumenter.hook_line(probe) do
calls += 1
end
rv = instrumenter.hook_line(probe, responder)
unless rv
raise "Line probe (in method) was not successfully installed"
end
Expand Down Expand Up @@ -198,9 +199,7 @@ def run_benchmark
calls = 0
probe = Datadog::DI::Probe.new(id: 1, type: :log,
file: targeted_file, line_no: targeted_line + 1)
rv = instrumenter.hook_line(probe) do
calls += 1
end
rv = instrumenter.hook_line(probe, responder)
unless rv
raise "Line probe (targeted) was not successfully installed"
end
Expand Down
100 changes: 72 additions & 28 deletions lib/datadog/di/instrumenter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ def initialize(settings, serializer, logger, code_tracker: nil, telemetry: nil)
# from the method but from outside of the method).
Location = Struct.new(:path, :lineno, :label)

def hook_method(probe, &block)
unless block
raise ArgumentError, 'block is required'
end

def hook_method(probe, responder)
lock.synchronize do
if probe.instrumentation_module
# Already instrumented, warn?
Expand Down Expand Up @@ -130,10 +126,34 @@ def hook_method(probe, &block)
caller_locations: caller_locations,
)
continue = condition.satisfied?(context)
rescue
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
rescue => exc
# Evaluation error exception can be raised for "expected"
# errors, we probably need another setting to control whether
# these exceptions are propagated.
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions &&
!exc.is_a?(DI::Error::ExpressionEvaluationError)

if context
# We want to report evaluation errors for conditions
# as probe snapshots. However, if we failed to create
# the context, we won't be able to report anything as
# the probe notifier builder requires a context.
begin
responder.probe_condition_evaluation_failed_callback(context, exc)
rescue
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

# TODO log / report via telemetry?
end
else
_ = 42 # stop standard from wrecking this code

raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

# TODO log / report via telemetry?
# If execution gets here, there is probably a bug in the tracer.
end

# TODO log / report via telemetry?
continue = false
end
end
Expand Down Expand Up @@ -195,8 +215,7 @@ def hook_method(probe, &block)
caller_locations: caller_locs,
return_value: rv, duration: duration, exception: exc,)

# & is to stop steep complaints, block is always present here.
block&.call(context)
responder.probe_executed_callback(context)
if exc
raise exc
else
Expand Down Expand Up @@ -258,11 +277,7 @@ def unhook_method(probe)
# not for eval'd code, unless the eval'd code is associated with
# a file name and client invokes this method with the correct
# file name for the eval'd code.
def hook_line(probe, &block)
unless block
raise ArgumentError, 'No block given to hook_line'
end

def hook_line(probe, responder)
lock.synchronize do
if probe.instrumentation_trace_point
# Already instrumented, warn?
Expand Down Expand Up @@ -367,14 +382,44 @@ def hook_line(probe, &block)

if continue
if condition = probe.condition
context = Context.new(
locals: Instrumenter.get_local_variables(tp),
target_self: tp.self,
probe: probe, settings: settings, serializer: serializer,
path: tp.path,
caller_locations: caller_locations,
)
continue = condition.satisfied?(context)
begin
context = Context.new(
locals: Instrumenter.get_local_variables(tp),
target_self: tp.self,
probe: probe, settings: settings, serializer: serializer,
path: tp.path,
caller_locations: caller_locations,
)
continue = condition.satisfied?(context)
rescue => exc
# Evaluation error exception can be raised for "expected"
# errors, we probably need another setting to control whether
# these exceptions are propagated.
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions &&
!exc.is_a?(DI::Error::ExpressionEvaluationError)

continue = false
if context
# We want to report evaluation errors for conditions
# as probe snapshots. However, if we failed to create
# the context, we won't be able to report anything as
# the probe notifier builder requires a context.
begin
responder.probe_condition_evaluation_failed_callback(context, condition, exc)
rescue
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

# TODO log / report via telemetry?
end
else
_ = 42 # stop standard from wrecking this code

raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

# TODO log / report via telemetry?
# If execution gets here, there is probably a bug in the tracer.
end
end
end
end

Expand All @@ -393,8 +438,7 @@ def hook_line(probe, &block)
caller_locations: caller_locations,
)

# & is to stop steep complaints, block is always present here.
block&.call(context)
responder.probe_executed_callback(context)
end
rescue => exc
raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions
Expand Down Expand Up @@ -443,11 +487,11 @@ def unhook_line(probe)
end
end

def hook(probe, &block)
def hook(probe, responder)
if probe.method?
hook_method(probe, &block)
hook_method(probe, responder)
elsif probe.line?
hook_line(probe, &block)
hook_line(probe, responder)
else
# TODO add test coverage for this path
logger.debug { "di: unknown probe type to hook: #{probe}" }
Expand Down
20 changes: 20 additions & 0 deletions lib/datadog/di/probe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ def initialize(id:, type:,
@rate_limit = rate_limit || (@capture_snapshot ? 1 : 5000)
@rate_limiter = Datadog::Core::TokenBucket.new(@rate_limit)

# At most one report per second.
# We create the rate limiter here even though it may never be used,
# to avoid having to synchronize the creation since method probes
# can be executed on multiple threads concurrently (even if line
# probes are never executed concurrently since those are done in a
# trace point).
if condition
@condition_evaluation_failed_rate_limiter = Datadog::Core::TokenBucket.new(1)
end

@emitting_notified = false
end

Expand Down Expand Up @@ -115,6 +125,16 @@ def initialize(id:, type:,
# Rate limiter object. For internal DI use only.
attr_reader :rate_limiter

# Rate limiter object for sending snapshots with evaluation errors
# for when probe condition evaluation fails.
# This rate limit is separate from the "base" rate limit for the probe
# because when the condition evaluation succeeds we want the "base"
# rate limit applied, not tainted by any evaluation errors
# (for example, the condition can be highly selective, and when it
# does not hold the evaluation may fail - we don't want to use up the
# probe rate limit for the errors).
attr_reader :condition_evaluation_failed_rate_limiter

def capture_snapshot?
@capture_snapshot
end
Expand Down
12 changes: 10 additions & 2 deletions lib/datadog/di/probe_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def add_probe(probe)
end

begin
instrumenter.hook(probe, &method(:probe_executed_callback))
instrumenter.hook(probe, self)

@installed_probes[probe.id] = probe
payload = probe_notification_builder.build_installed(probe)
Expand Down Expand Up @@ -184,7 +184,7 @@ def remove_other_probes(probe_ids)
begin
# TODO is it OK to hook from trace point handler?
# TODO the class is now defined, but can hooking still fail?
instrumenter.hook(probe, &method(:probe_executed_callback))
instrumenter.hook(probe, self)
@pending_probes.delete(probe.id)
break
rescue Error::DITargetNotDefined
Expand Down Expand Up @@ -242,6 +242,14 @@ def probe_executed_callback(context)
probe_notifier_worker.add_snapshot(payload)
end

def probe_condition_evaluation_failed_callback(context, expr, exc)
probe = context.probe
if probe.condition_evaluation_failed_rate_limiter&.allow?
payload = probe_notification_builder.build_condition_evaluation_failed(context, expr, exc)
probe_notifier_worker.add_snapshot(payload)
end
end

# Class/module definition trace point (:end type).
# Used to install hooks when the target classes/modules aren't yet
# defined when the hook request is received.
Expand Down
37 changes: 27 additions & 10 deletions lib/datadog/di/probe_notification_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ def build_snapshot(context)
end
end

message = nil
evaluation_errors = []
if segments = probe.template_segments
message, evaluation_errors = evaluate_template(segments, context)
end
build_snapshot_base(context,
evaluation_errors: evaluation_errors, message: message,
captures: captures)
end

def build_condition_evaluation_failed(context, expression, exception)
error = {
message: "#{exception.class}: #{exception}",
expr: expression.dsl_expr,
}
build_snapshot_base(context, evaluation_errors: [error])
end

private

def build_snapshot_base(context, evaluation_errors: [], captures: nil, message: nil)
probe = context.probe

timestamp = timestamp_now
duration = context.duration

location = if probe.line?
{
file: context.path,
Expand All @@ -103,13 +129,6 @@ def build_snapshot(context)
format_caller_locations(caller_locations)
end

timestamp = timestamp_now
message = nil
evaluation_errors = []
if segments = probe.template_segments
message, evaluation_errors = evaluate_template(segments, context)
end
duration = context.duration
{
service: settings.service,
"debugger.snapshot": {
Expand All @@ -132,7 +151,7 @@ def build_snapshot(context)
host: nil,
logger: {
name: probe.file,
method: probe.method_name || 'no_method',
method: probe.method_name,
thread_name: Thread.current.name,
# Dynamic instrumentation currently does not need thread_id for
# anything. It can be sent if a customer requests it at which point
Expand All @@ -150,8 +169,6 @@ def build_snapshot(context)
}
end

private

def build_status(probe, message:, status:)
{
service: settings.service,
Expand Down
32 changes: 32 additions & 0 deletions lib/datadog/di/proc_responder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module Datadog
module DI
# An adapter to convert procs to responders.
#
# Used in test suite and benchmarks.
#
# @api private
class ProcResponder
def initialize(executed_proc, failed_proc = nil)
@executed_proc = executed_proc
@failed_proc = failed_proc
end

attr_reader :executed_proc
attr_reader :failed_proc

def probe_executed_callback(context)
executed_proc.call(context)
end

def probe_condition_evaluation_failed_callback(context, exc)
if failed_proc.nil?
raise NotImplementedError, "Failed proc not provided"
end

failed_proc.call(context, exc)
end
end
end
end
6 changes: 3 additions & 3 deletions sig/datadog/di/instrumenter.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ module Datadog

attr_reader telemetry: Core::Telemetry::Component?

def hook_method: (Probe probe) ?{ (?) -> untyped } -> void
def hook_method: (Probe probe, untyped responder) -> void

def unhook_method: (Probe probe) -> void
def hook_line: (Probe probe) ?{ (?) -> untyped } -> void
def hook_line: (Probe probe, untyped responder) -> void

def unhook_line: (Probe probe) -> void

def hook: (Probe probe) { (?) -> untyped } -> void
def hook: (Probe probe, untyped responder) -> void

def unhook: (Probe probe) -> void

Expand Down
Loading