diff --git a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml
index 6611cdd628ff..5a541892d278 100644
--- a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml
+++ b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml
@@ -1079,6 +1079,7 @@
+
diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
index 6c5c47b154d1..413e14587bd9 100644
--- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
+++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
@@ -27,20 +27,21 @@ internal class DataStreamsWriter : IDataStreamsWriter
private readonly long _bucketDurationMs;
private readonly BoundedConcurrentQueue _buffer = new(queueLimit: 10_000);
private readonly BoundedConcurrentQueue _backlogBuffer = new(queueLimit: 10_000);
- private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10);
private readonly DataStreamsAggregator _aggregator;
private readonly IDiscoveryService _discoveryService;
private readonly IDataStreamsApi _api;
private readonly bool _isInDefaultState;
private readonly TaskCompletionSource _processExit = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private readonly TaskCompletionSource _tcsBackgroundThread = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
+ private readonly TimeSpan _flushTimeout = TimeSpan.FromSeconds(5);
+ private TaskCompletionSource? _currentFlushTcs;
private MemoryStream? _serializationBuffer;
private long _pointsDropped;
- private int _flushRequested;
- private Task? _processTask;
+ private long _flushRequested;
private Timer? _flushTimer;
- private TaskCompletionSource? _currentFlushTcs;
+ private Thread? _backgroundProcessingThread;
private int _isSupported = SupportState.Unknown;
private bool _isInitialized;
@@ -89,13 +90,15 @@ private void Initialize()
{
lock (_initLock)
{
- if (_processTask != null)
+ if (_backgroundProcessingThread != null)
{
return;
}
- _processTask = Task.Run(ProcessQueueLoopAsync);
- _processTask.ContinueWith(t => Log.Error(t.Exception, "Error in processing task"), TaskContinuationOptions.OnlyOnFaulted);
+ _backgroundProcessingThread = new Thread(ProcessQueueLoop);
+ _backgroundProcessingThread.IsBackground = true;
+ _backgroundProcessingThread.Start();
+
_flushTimer = new Timer(
x => ((DataStreamsWriter)x!).RequestFlush(),
this,
@@ -153,38 +156,23 @@ public async Task DisposeAsync()
#else
_flushTimer?.Dispose();
#endif
- await FlushAndCloseAsync().ConfigureAwait(false);
- _flushSemaphore.Dispose();
- }
-
- private async Task FlushAndCloseAsync()
- {
+ // signal exit
if (!_processExit.TrySetResult(true))
{
return;
}
- // nothing else to do, since the writer was not fully initialized
- if (!Volatile.Read(ref _isInitialized) || _processTask == null)
+ // wait for thread to finish
+ if (_backgroundProcessingThread != null)
{
- return;
+ await _tcsBackgroundThread.Task.WaitAsync(_flushTimeout).ConfigureAwait(false);
}
- // request a final flush - as the _processExit flag is now set
- // this ensures we will definitely flush all the stats
- // (and sets the mutex if it isn't already set)
- RequestFlush();
+ // trigger one final manual flush
+ await Task.Run(() => ProcessQueue(true, null)).ConfigureAwait(false);
- // wait for the processing loop to complete
- var completedTask = await Task.WhenAny(
- _processTask,
- Task.Delay(TimeSpan.FromSeconds(20)))
- .ConfigureAwait(false);
-
- if (completedTask != _processTask)
- {
- Log.Error("Could not flush all data streams stats before process exit");
- }
+ // dispose
+ _flushSemaphore.Dispose();
}
public async Task FlushAsync()
@@ -192,14 +180,12 @@ public async Task FlushAsync()
await _flushSemaphore.WaitAsync().ConfigureAwait(false);
try
{
- var timeout = TimeSpan.FromSeconds(5);
-
if (_processExit.Task.IsCompleted)
{
return;
}
- if (!Volatile.Read(ref _isInitialized) || _processTask == null)
+ if (!Volatile.Read(ref _isInitialized) || _backgroundProcessingThread == null)
{
return;
}
@@ -212,11 +198,11 @@ public async Task FlushAsync()
var completedTask = await Task.WhenAny(
tcs.Task,
_processExit.Task,
- Task.Delay(timeout)).ConfigureAwait(false);
+ Task.Delay(_flushTimeout)).ConfigureAwait(false);
if (completedTask != tcs.Task)
{
- Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
+ Log.Error("Data streams flush timeout after {Timeout}ms", _flushTimeout.TotalMilliseconds);
}
}
finally
@@ -231,7 +217,7 @@ private void RequestFlush()
Interlocked.Exchange(ref _flushRequested, 1);
}
- private async Task WriteToApiAsync()
+ private void WriteToApi()
{
// This method blocks ingestion of new stats points into the aggregator,
// but they will continue to be added to the queue, and will be processed later
@@ -255,8 +241,7 @@ private async Task WriteToApiAsync()
// This flushes on the same thread as the processing loop
var data = new ArraySegment(_serializationBuffer.GetBuffer(), offset: 0, (int)_serializationBuffer.Length);
- var success = await _api.SendAsync(data).ConfigureAwait(false);
-
+ var success = _api.SendAsync(data).ConfigureAwait(false).GetAwaiter().GetResult();
var dropCount = Interlocked.Exchange(ref _pointsDropped, 0);
if (success)
{
@@ -269,58 +254,47 @@ private async Task WriteToApiAsync()
}
}
- private async Task ProcessQueueLoopAsync()
+ private void ProcessQueue(bool flush, TaskCompletionSource? tcs)
{
- var isFinalFlush = false;
- while (true)
+ try
{
- try
+ while (_buffer.TryDequeue(out var statsPoint))
{
- while (_buffer.TryDequeue(out var statsPoint))
- {
- _aggregator.Add(in statsPoint);
- }
-
- while (_backlogBuffer.TryDequeue(out var backlogPoint))
- {
- _aggregator.AddBacklog(in backlogPoint);
- }
-
- var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
- if (flushRequested == 1)
- {
- await WriteToApiAsync().ConfigureAwait(false);
- var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
- currentFlushTcs?.TrySetResult(true);
- FlushComplete?.Invoke(this, EventArgs.Empty);
- }
- }
- catch (Exception ex)
- {
- Log.Error(ex, "An error occured in the processing thread");
+ _aggregator.Add(in statsPoint);
}
- if (_processExit.Task.IsCompleted)
+ while (_backlogBuffer.TryDequeue(out var backlogPoint))
{
- if (isFinalFlush)
- {
- return;
- }
-
- // do one more loop to make sure everything is flushed
- RequestFlush();
- isFinalFlush = true;
- continue;
+ _aggregator.AddBacklog(in backlogPoint);
}
- // The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
- // and modified to avoid dealing with exceptions
- var tcs = new TaskCompletionSource();
- using (new Timer(s => ((TaskCompletionSource)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
+ if (flush)
{
- await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
+ WriteToApi();
+ FlushComplete?.Invoke(this, EventArgs.Empty);
}
}
+ catch (Exception ex)
+ {
+ Log.Error(ex, "An error occured in the processing thread");
+ }
+
+ tcs?.TrySetResult(true);
+ }
+
+ private void ProcessQueueLoop()
+ {
+ while (!_processExit.Task.IsCompleted)
+ {
+ var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
+ var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
+
+ ProcessQueue(flushRequested == 1, currentFlushTcs);
+ Thread.Sleep(20);
+ }
+
+ _tcsBackgroundThread.TrySetResult(true);
+ Log.Debug("Data Streams background thread has been completed");
}
private void HandleConfigUpdate(AgentConfiguration config)