Skip to content

Commit 95e3713

Browse files
committed
Rework the StatsDManager
Make sure we can't dispose a stats consumer that's in use (as it will throw) Rework to use a "lease" mechanism to track usages Make passing in a statsmanager required
1 parent 48ce4c6 commit 95e3713

File tree

47 files changed

+1354
-294
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1354
-294
lines changed

tracer/missing-nullability-files.csv

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ src/Datadog.Trace/Tracer.cs
3434
src/Datadog.Trace/TracerConstants.cs
3535
src/Datadog.Trace/TracerManager.cs
3636
src/Datadog.Trace/TracerManagerFactory.cs
37-
src/Datadog.Trace/Agent/AgentWriter.cs
3837
src/Datadog.Trace/Agent/Api.cs
3938
src/Datadog.Trace/Agent/ClientStatsPayload.cs
40-
src/Datadog.Trace/Agent/IAgentWriter.cs
4139
src/Datadog.Trace/Agent/IApi.cs
4240
src/Datadog.Trace/Agent/IApiRequest.cs
4341
src/Datadog.Trace/Agent/IApiRequestFactory.cs

tracer/src/Datadog.Trace/Agent/AgentWriter.cs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55

6+
#nullable enable
7+
68
using System;
79
using System.Collections.Concurrent;
810
using System.Collections.Generic;
@@ -12,10 +14,8 @@
1214
using Datadog.Trace.Configuration;
1315
using Datadog.Trace.DogStatsd;
1416
using Datadog.Trace.Logging;
15-
using Datadog.Trace.Tagging;
1617
using Datadog.Trace.Telemetry;
1718
using Datadog.Trace.Telemetry.Metrics;
18-
using Datadog.Trace.Vendors.StatsdClient;
1919

2020
namespace Datadog.Trace.Agent
2121
{
@@ -25,10 +25,10 @@ internal class AgentWriter : IAgentWriter
2525

2626
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<AgentWriter>();
2727

28-
private static readonly ArraySegment<byte> EmptyPayload = new(new byte[] { 0x90 });
28+
private static readonly ArraySegment<byte> EmptyPayload = new([0x90]);
2929

3030
private readonly ConcurrentQueue<WorkItem> _pendingTraces = new ConcurrentQueue<WorkItem>();
31-
private readonly IDogStatsd _statsd;
31+
private readonly IStatsdManager _statsd;
3232
private readonly Task _flushTask;
3333
private readonly Task _serializationTask;
3434
private readonly TaskCompletionSource<bool> _processExit = new TaskCompletionSource<bool>();
@@ -43,7 +43,7 @@ internal class AgentWriter : IAgentWriter
4343
private readonly int _batchInterval;
4444
private readonly IKeepRateCalculator _traceKeepRateCalculator;
4545

46-
private readonly IStatsAggregator _statsAggregator;
46+
private readonly IStatsAggregator? _statsAggregator;
4747

4848
private readonly bool _apmTracingEnabled;
4949

@@ -67,7 +67,7 @@ internal class AgentWriter : IAgentWriter
6767

6868
private bool _traceMetricsEnabled;
6969

70-
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, TracerSettings settings)
70+
public AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, TracerSettings settings)
7171
: this(api, statsAggregator, statsd, maxBufferSize: settings.TraceBufferSize, batchInterval: settings.TraceBatchInterval, apmTracingEnabled: settings.ApmTracingEnabled, initialTracerMetricsEnabled: settings.Manager.InitialMutableSettings.TracerMetricsEnabled)
7272
{
7373
settings.Manager.SubscribeToChanges(changes =>
@@ -76,16 +76,17 @@ public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd
7676
&& mutable.TracerMetricsEnabled != changes.PreviousMutable.TracerMetricsEnabled)
7777
{
7878
Volatile.Write(ref _traceMetricsEnabled, mutable.TracerMetricsEnabled);
79+
_statsd.SetRequired(StatsdConsumer.AgentWriter, mutable.TracerMetricsEnabled);
7980
}
8081
});
8182
}
8283

83-
public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true, bool initialTracerMetricsEnabled = false)
84+
public AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true, bool initialTracerMetricsEnabled = false)
8485
: this(api, statsAggregator, statsd, MovingAverageKeepRateCalculator.CreateDefaultKeepRateCalculator(), automaticFlush, maxBufferSize, batchInterval, apmTracingEnabled, initialTracerMetricsEnabled)
8586
{
8687
}
8788

88-
internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled, bool initialTracerMetricsEnabled)
89+
internal AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled, bool initialTracerMetricsEnabled)
8990
{
9091
_statsAggregator = statsAggregator;
9192

@@ -104,6 +105,7 @@ internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd stat
104105

105106
_apmTracingEnabled = apmTracingEnabled;
106107
_traceMetricsEnabled = initialTracerMetricsEnabled;
108+
_statsd.SetRequired(StatsdConsumer.AgentWriter, initialTracerMetricsEnabled);
107109

108110
_serializationTask = automaticFlush ? Task.Factory.StartNew(SerializeTracesLoop, TaskCreationOptions.LongRunning) : Task.CompletedTask;
109111
_serializationTask.ContinueWith(t => Log.Error(t.Exception, "Error in serialization task"), TaskContinuationOptions.OnlyOnFaulted);
@@ -114,7 +116,7 @@ internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd stat
114116
_backBufferFlushTask = _frontBufferFlushTask = Task.CompletedTask;
115117
}
116118

117-
internal event Action Flushed;
119+
internal event Action? Flushed;
118120

119121
internal SpanBuffer ActiveBuffer => _activeBuffer;
120122

@@ -151,8 +153,12 @@ public void WriteTrace(ArraySegment<Span> trace)
151153

152154
if (Volatile.Read(ref _traceMetricsEnabled))
153155
{
154-
_statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
155-
_statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
156+
using var lease = _statsd.TryGetClientLease();
157+
if (lease.Client is { } statsd)
158+
{
159+
statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
160+
statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
161+
}
156162
}
157163
}
158164

@@ -255,7 +261,7 @@ private async Task FlushBuffersTaskLoopAsync()
255261
{
256262
tasks[2] = Task.Delay(TimeSpan.FromSeconds(1));
257263
await Task.WhenAny(tasks).ConfigureAwait(false);
258-
tasks[2] = null;
264+
tasks[2] = null!;
259265

260266
if (_forceFlush.Task.IsCompleted)
261267
{
@@ -327,8 +333,12 @@ async Task InternalBufferFlush()
327333
{
328334
if (Volatile.Read(ref _traceMetricsEnabled))
329335
{
330-
_statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
331-
_statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
336+
using var lease = _statsd.TryGetClientLease();
337+
if (lease.Client is { } statsd)
338+
{
339+
statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
340+
statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
341+
}
332342
}
333343

334344
var droppedTraces = Interlocked.Exchange(ref _droppedTraces, 0);
@@ -347,7 +357,7 @@ async Task InternalBufferFlush()
347357
{
348358
droppedP0Traces = Interlocked.Exchange(ref _droppedP0Traces, 0);
349359
droppedP0Spans = Interlocked.Exchange(ref _droppedP0Spans, 0);
350-
Log.Debug<int, int, long, long>("Flushing {Spans} spans across {Traces} traces. CanComputeStats is enabled with {DroppedP0Traces} droppedP0Traces and {DroppedP0Spans} droppedP0Spans", buffer.SpanCount, buffer.TraceCount, droppedP0Traces, droppedP0Spans);
360+
Log.Debug("Flushing {Spans} spans across {Traces} traces. CanComputeStats is enabled with {DroppedP0Traces} droppedP0Traces and {DroppedP0Spans} droppedP0Spans", buffer.SpanCount, buffer.TraceCount, droppedP0Traces, droppedP0Spans);
351361
// Metrics for unsampled traces/spans already recorded
352362
}
353363
else
@@ -388,7 +398,7 @@ async Task InternalBufferFlush()
388398
private void SerializeTrace(ArraySegment<Span> spans)
389399
{
390400
// Declaring as inline method because only safe to invoke in the context of SerializeTrace
391-
SpanBuffer SwapBuffers()
401+
SpanBuffer? SwapBuffers()
392402
{
393403
if (_activeBuffer == _frontBuffer)
394404
{
@@ -525,8 +535,12 @@ private void DropTrace(ArraySegment<Span> spans)
525535

526536
if (Volatile.Read(ref _traceMetricsEnabled))
527537
{
528-
_statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
529-
_statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
538+
using var lease = _statsd.TryGetClientLease();
539+
if (lease.Client is { } statsd)
540+
{
541+
statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
542+
statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
543+
}
530544
}
531545
}
532546

@@ -589,7 +603,7 @@ private void SerializeTracesLoop()
589603
private readonly struct WorkItem
590604
{
591605
public readonly ArraySegment<Span> Trace;
592-
public readonly Action Callback;
606+
public readonly Action? Callback;
593607

594608
public WorkItem(ArraySegment<Span> trace)
595609
{

tracer/src/Datadog.Trace/Agent/Api.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class Api : IApi
3333

3434
private readonly IDatadogLogger _log;
3535
private readonly IApiRequestFactory _apiRequestFactory;
36-
private readonly IDogStatsd _originalStatsd;
36+
private readonly IStatsdManager _statsd;
3737
private readonly string _containerId;
3838
private readonly string _entityId;
3939
private readonly Uri _tracesEndpoint;
@@ -42,13 +42,13 @@ internal class Api : IApi
4242
private readonly bool _partialFlushEnabled;
4343
private readonly SendCallback<SendStatsState> _sendStats;
4444
private readonly SendCallback<SendTracesState> _sendTraces;
45-
private IDogStatsd _statsd;
4645
private string _cachedResponse;
4746
private string _agentVersion;
47+
private bool _healthMetricsEnabled;
4848

4949
public Api(
5050
IApiRequestFactory apiRequestFactory,
51-
IDogStatsd statsd,
51+
IStatsdManager statsd,
5252
Action<Dictionary<string, float>> updateSampleRates,
5353
bool partialFlushEnabled,
5454
bool healthMetricsEnabled,
@@ -60,12 +60,13 @@ public Api(
6060
_sendStats = SendStatsAsyncImpl;
6161
_sendTraces = SendTracesAsyncImpl;
6262
_updateSampleRates = updateSampleRates;
63-
_originalStatsd = statsd;
63+
_statsd = statsd;
6464
ToggleTracerHealthMetrics(healthMetricsEnabled);
6565
_containerId = ContainerMetadata.GetContainerId();
6666
_entityId = ContainerMetadata.GetEntityId();
6767
_apiRequestFactory = apiRequestFactory;
6868
_partialFlushEnabled = partialFlushEnabled;
69+
_healthMetricsEnabled = healthMetricsEnabled;
6970
_tracesEndpoint = _apiRequestFactory.GetEndpoint(TracesPath);
7071
_log.Debug("Using traces endpoint {TracesEndpoint}", _tracesEndpoint.ToString());
7172
_statsEndpoint = _apiRequestFactory.GetEndpoint(StatsPath);
@@ -84,7 +85,8 @@ private enum SendResult
8485
[MemberNotNull(nameof(_statsd))]
8586
public void ToggleTracerHealthMetrics(bool enabled)
8687
{
87-
Interlocked.Exchange(ref _statsd, enabled ? _originalStatsd : null);
88+
Volatile.Write(ref _healthMetricsEnabled, enabled);
89+
_statsd.SetRequired(StatsdConsumer.TraceApi, enabled);
8890
}
8991

9092
public Task<bool> SendStatsAsync(StatsBuffer stats, long bucketDuration)
@@ -309,7 +311,9 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
309311

310312
try
311313
{
312-
var healthStats = Volatile.Read(ref _statsd);
314+
var healthMetricsEnabled = Volatile.Read(ref _healthMetricsEnabled);
315+
using var lease = healthMetricsEnabled ? _statsd.TryGetClientLease() : default;
316+
var healthStats = healthMetricsEnabled ? lease.Client : null;
313317
try
314318
{
315319
TelemetryFactory.Metrics.RecordCountTraceApiRequests();
@@ -332,7 +336,7 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
332336
string[] tags = { $"status:{response.StatusCode}" };
333337

334338
// count every response, grouped by status code
335-
healthStats?.Increment(TracerMetricNames.Api.Responses, tags: tags);
339+
healthStats.Increment(TracerMetricNames.Api.Responses, tags: tags);
336340
}
337341

338342
TelemetryFactory.Metrics.RecordCountTraceApiResponses(response.GetTelemetryStatusCodeMetricTag());

tracer/src/Datadog.Trace/Agent/IAgentWriter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55

6+
#nullable enable
7+
68
using System;
79
using System.Threading.Tasks;
810

tracer/src/Datadog.Trace/Agent/ManagedApi.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Threading;
1212
using System.Threading.Tasks;
1313
using Datadog.Trace.Configuration;
14+
using Datadog.Trace.DogStatsd;
1415
using Datadog.Trace.Vendors.StatsdClient;
1516

1617
namespace Datadog.Trace.Agent;
@@ -24,7 +25,7 @@ internal class ManagedApi : IApi
2425

2526
public ManagedApi(
2627
TracerSettings.SettingsManager settings,
27-
IDogStatsd statsd,
28+
IStatsdManager statsd,
2829
Action<Dictionary<string, float>> updateSampleRates,
2930
bool partialFlushEnabled)
3031
{

tracer/src/Datadog.Trace/Ci/Agent/ApmAgentWriter.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Datadog.Trace.Agent.DiscoveryService;
1212
using Datadog.Trace.Ci.EventModel;
1313
using Datadog.Trace.Configuration;
14+
using Datadog.Trace.DogStatsd;
1415

1516
namespace Datadog.Trace.Ci.Agent;
1617

@@ -30,16 +31,17 @@ public ApmAgentWriter(TracerSettings settings, Action<Dictionary<string, float>>
3031
var partialFlushEnabled = settings.PartialFlushEnabled;
3132
// CI Vis doesn't allow reconfiguration, so don't need to subscribe to changes
3233
var apiRequestFactory = TracesTransportStrategy.Get(settings.Manager.InitialExporterSettings);
33-
var api = new Api(apiRequestFactory, null, updateSampleRates, partialFlushEnabled, healthMetricsEnabled: false);
34+
var statsdManager = new StatsdManager(settings);
35+
var api = new Api(apiRequestFactory, statsdManager, updateSampleRates, partialFlushEnabled, healthMetricsEnabled: false);
3436
var statsAggregator = StatsAggregator.Create(api, settings, discoveryService);
3537

36-
_agentWriter = new AgentWriter(api, statsAggregator, null, maxBufferSize: maxBufferSize, apmTracingEnabled: settings.ApmTracingEnabled, initialTracerMetricsEnabled: settings.Manager.InitialMutableSettings.TracerMetricsEnabled);
38+
_agentWriter = new AgentWriter(api, statsAggregator, statsdManager, maxBufferSize: maxBufferSize, apmTracingEnabled: settings.ApmTracingEnabled, initialTracerMetricsEnabled: settings.Manager.InitialMutableSettings.TracerMetricsEnabled);
3739
}
3840

3941
// Internal for testing
40-
internal ApmAgentWriter(IApi api, int maxBufferSize = DefaultMaxBufferSize)
42+
internal ApmAgentWriter(IApi api, IStatsdManager statsdManager, int maxBufferSize = DefaultMaxBufferSize)
4143
{
42-
_agentWriter = new AgentWriter(api, null, null, maxBufferSize: maxBufferSize);
44+
_agentWriter = new AgentWriter(api, null, statsdManager, maxBufferSize: maxBufferSize);
4345
}
4446

4547
public void WriteEvent(IEvent @event)

tracer/src/Datadog.Trace/Ci/TestOptimizationTracerManager.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Datadog.Trace.Ci.EventModel;
1414
using Datadog.Trace.Configuration;
1515
using Datadog.Trace.DataStreamsMonitoring;
16+
using Datadog.Trace.DogStatsd;
1617
using Datadog.Trace.Logging;
1718
using Datadog.Trace.Logging.DirectSubmission;
1819
using Datadog.Trace.Logging.TracerFlare;
@@ -32,7 +33,7 @@ public TestOptimizationTracerManager(
3233
TracerSettings settings,
3334
IAgentWriter agentWriter,
3435
IScopeManager scopeManager,
35-
IDogStatsd statsd,
36+
IStatsdManager statsd,
3637
RuntimeMetricsWriter runtimeMetricsWriter,
3738
DirectLogSubmissionManager logSubmissionManager,
3839
ITelemetryController telemetry,
@@ -148,7 +149,7 @@ public LockedManager(
148149
TracerSettings settings,
149150
IAgentWriter agentWriter,
150151
IScopeManager scopeManager,
151-
IDogStatsd statsd,
152+
IStatsdManager statsd,
152153
RuntimeMetricsWriter runtimeMetricsWriter,
153154
DirectLogSubmissionManager logSubmissionManager,
154155
ITelemetryController telemetry,

tracer/src/Datadog.Trace/Ci/TestOptimizationTracerManagerFactory.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Datadog.Trace.Ci.Sampling;
1515
using Datadog.Trace.Configuration;
1616
using Datadog.Trace.DataStreamsMonitoring;
17+
using Datadog.Trace.DogStatsd;
1718
using Datadog.Trace.Logging.DirectSubmission;
1819
using Datadog.Trace.Logging.TracerFlare;
1920
using Datadog.Trace.RemoteConfigurationManagement;
@@ -41,7 +42,7 @@ protected override TracerManager CreateTracerManagerFrom(
4142
TracerSettings settings,
4243
IAgentWriter agentWriter,
4344
IScopeManager scopeManager,
44-
IDogStatsd statsd,
45+
IStatsdManager statsd,
4546
RuntimeMetricsWriter runtimeMetrics,
4647
DirectLogSubmissionManager logSubmissionManager,
4748
ITelemetryController telemetry,
@@ -87,7 +88,7 @@ protected override ITraceSampler GetSampler(TracerSettings settings)
8788

8889
protected override bool ShouldEnableRemoteConfiguration(TracerSettings settings) => false;
8990

90-
protected override IAgentWriter GetAgentWriter(TracerSettings settings, IDogStatsd statsd, Action<Dictionary<string, float>> updateSampleRates, IDiscoveryService discoveryService, TelemetrySettings telemetrySettings)
91+
protected override IAgentWriter GetAgentWriter(TracerSettings settings, IStatsdManager statsd, Action<Dictionary<string, float>> updateSampleRates, IDiscoveryService discoveryService, TelemetrySettings telemetrySettings)
9192
{
9293
// Check for agentless scenario
9394
if (_settings.Agentless)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// <copyright file="IStatsdManager.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
using System;
8+
using Datadog.Trace.Vendors.StatsdClient;
9+
10+
namespace Datadog.Trace.DogStatsd;
11+
12+
internal interface IStatsdManager : IDisposable
13+
{
14+
/// <summary>
15+
/// Obtain a <see cref="StatsdManager.StatsdClientLease"/> for accessing a <see cref="IDogStatsd"/> instance.
16+
/// The lease must be disposed after all references to the client have gone.
17+
/// </summary>
18+
StatsdManager.StatsdClientLease TryGetClientLease();
19+
20+
/// <summary>
21+
/// Called by users of <see cref="StatsdManager"/> to indicate that a "live" client is required.
22+
/// Each unique consumer of <see cref="StatsdManager"/> should set a different
23+
/// <see cref="StatsdConsumer"/> value.
24+
/// </summary>
25+
void SetRequired(StatsdConsumer consumer, bool enabled);
26+
}

tracer/src/Datadog.Trace/DogStatsd/NoOpStatsd.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Datadog.Trace.DogStatsd
1010
{
11-
internal class NoOpStatsd : IDogStatsd
11+
internal sealed class NoOpStatsd : IDogStatsd
1212
{
1313
public static readonly NoOpStatsd Instance = new();
1414

0 commit comments

Comments
 (0)