Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@
<type fullname="System.Threading.ParameterizedThreadStart" />
<type fullname="System.Threading.Thread" />
<type fullname="System.Threading.ThreadAbortException" />
<type fullname="System.Threading.ThreadStart" />
<type fullname="System.Threading.ThreadState" />
</assembly>
<assembly fullname="System.Threading.ThreadPool">
Expand Down
132 changes: 53 additions & 79 deletions tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ internal class DataStreamsWriter : IDataStreamsWriter
private readonly long _bucketDurationMs;
private readonly BoundedConcurrentQueue<StatsPoint> _buffer = new(queueLimit: 10_000);
private readonly BoundedConcurrentQueue<BacklogPoint> _backlogBuffer = new(queueLimit: 10_000);
private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10);
private readonly DataStreamsAggregator _aggregator;
private readonly IDiscoveryService _discoveryService;
private readonly IDataStreamsApi _api;
private readonly bool _isInDefaultState;

private readonly TaskCompletionSource<bool> _processExit = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<bool> _tcsBackgroundThread = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
private readonly TimeSpan _flushTimeout = TimeSpan.FromSeconds(5);
private TaskCompletionSource<bool>? _currentFlushTcs;
private MemoryStream? _serializationBuffer;
private long _pointsDropped;
private int _flushRequested;
private Task? _processTask;
private long _flushRequested;
private Timer? _flushTimer;
private TaskCompletionSource<bool>? _currentFlushTcs;
private Thread? _backgroundProcessingThread;

private int _isSupported = SupportState.Unknown;
private bool _isInitialized;
Expand Down Expand Up @@ -89,13 +90,15 @@ private void Initialize()
{
lock (_initLock)
{
if (_processTask != null)
if (_backgroundProcessingThread != null)
{
return;
}

_processTask = Task.Run(ProcessQueueLoopAsync);
_processTask.ContinueWith(t => Log.Error(t.Exception, "Error in processing task"), TaskContinuationOptions.OnlyOnFaulted);
_backgroundProcessingThread = new Thread(ProcessQueueLoop);
_backgroundProcessingThread.IsBackground = true;
_backgroundProcessingThread.Start();

_flushTimer = new Timer(
x => ((DataStreamsWriter)x!).RequestFlush(),
this,
Expand Down Expand Up @@ -153,53 +156,36 @@ public async Task DisposeAsync()
#else
_flushTimer?.Dispose();
#endif
await FlushAndCloseAsync().ConfigureAwait(false);
_flushSemaphore.Dispose();
}

private async Task FlushAndCloseAsync()
{
// signal exit
if (!_processExit.TrySetResult(true))
{
return;
}

// nothing else to do, since the writer was not fully initialized
if (!Volatile.Read(ref _isInitialized) || _processTask == null)
// wait for thread to finish
if (_backgroundProcessingThread != null)
{
return;
await _tcsBackgroundThread.Task.WaitAsync(_flushTimeout).ConfigureAwait(false);
}

// request a final flush - as the _processExit flag is now set
// this ensures we will definitely flush all the stats
// (and sets the mutex if it isn't already set)
RequestFlush();
// trigger one final manual flush
await Task.Run(() => ProcessQueue(true, null)).ConfigureAwait(false);

// wait for the processing loop to complete
var completedTask = await Task.WhenAny(
_processTask,
Task.Delay(TimeSpan.FromSeconds(20)))
.ConfigureAwait(false);

if (completedTask != _processTask)
{
Log.Error("Could not flush all data streams stats before process exit");
}
// dispose
_flushSemaphore.Dispose();
}

public async Task FlushAsync()
{
await _flushSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var timeout = TimeSpan.FromSeconds(5);

if (_processExit.Task.IsCompleted)
{
return;
}

if (!Volatile.Read(ref _isInitialized) || _processTask == null)
if (!Volatile.Read(ref _isInitialized) || _backgroundProcessingThread == null)
{
return;
}
Expand All @@ -212,11 +198,11 @@ public async Task FlushAsync()
var completedTask = await Task.WhenAny(
tcs.Task,
_processExit.Task,
Task.Delay(timeout)).ConfigureAwait(false);
Task.Delay(_flushTimeout)).ConfigureAwait(false);

if (completedTask != tcs.Task)
{
Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
Log.Error("Data streams flush timeout after {Timeout}ms", _flushTimeout.TotalMilliseconds);
}
}
finally
Expand All @@ -231,7 +217,7 @@ private void RequestFlush()
Interlocked.Exchange(ref _flushRequested, 1);
}

private async Task WriteToApiAsync()
private void WriteToApi()
{
// This method blocks ingestion of new stats points into the aggregator,
// but they will continue to be added to the queue, and will be processed later
Expand All @@ -255,8 +241,7 @@ private async Task WriteToApiAsync()
// This flushes on the same thread as the processing loop
var data = new ArraySegment<byte>(_serializationBuffer.GetBuffer(), offset: 0, (int)_serializationBuffer.Length);

var success = await _api.SendAsync(data).ConfigureAwait(false);

var success = _api.SendAsync(data).ConfigureAwait(false).GetAwaiter().GetResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely can't do GetAwaiter().GetResult(); 🙁

var dropCount = Interlocked.Exchange(ref _pointsDropped, 0);
if (success)
{
Expand All @@ -269,58 +254,47 @@ private async Task WriteToApiAsync()
}
}

private async Task ProcessQueueLoopAsync()
private void ProcessQueue(bool flush, TaskCompletionSource<bool>? tcs)
{
var isFinalFlush = false;
while (true)
try
{
try
while (_buffer.TryDequeue(out var statsPoint))
{
while (_buffer.TryDequeue(out var statsPoint))
{
_aggregator.Add(in statsPoint);
}

while (_backlogBuffer.TryDequeue(out var backlogPoint))
{
_aggregator.AddBacklog(in backlogPoint);
}

var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
if (flushRequested == 1)
{
await WriteToApiAsync().ConfigureAwait(false);
var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
currentFlushTcs?.TrySetResult(true);
FlushComplete?.Invoke(this, EventArgs.Empty);
}
}
catch (Exception ex)
{
Log.Error(ex, "An error occured in the processing thread");
_aggregator.Add(in statsPoint);
}

if (_processExit.Task.IsCompleted)
while (_backlogBuffer.TryDequeue(out var backlogPoint))
{
if (isFinalFlush)
{
return;
}

// do one more loop to make sure everything is flushed
RequestFlush();
isFinalFlush = true;
continue;
_aggregator.AddBacklog(in backlogPoint);
}

// The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
// and modified to avoid dealing with exceptions
var tcs = new TaskCompletionSource<bool>();
using (new Timer(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
if (flush)
{
await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't responding / reacting to _processExit.Task anymore but I don't think a big deal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't responding / reacting to _processExit.Task anymore but I don't think a big deal

Yeah, given it's only a 15ms wait, I think that's fine, but it does make me wonder... a 15ms wait period makes this is a pretty fast spinning loop, no? 😕For context, we flush traces every 1000ms (1s)... On low-core (1) machines would this not potentially be taking up a lot of thread/CPU time?

What's more, the _resetEvent.Wait() that's never triggered is effectively a Thread.Sleep(). And doing a Thread.Sleep on a thread pool thread is generally not a good idea, as it blocks anything else doing work. @tonyredondo did a bunch of work to remove all the Thread.Sleep() from our async code for exactly this reason. If we do want to dedicate a thread to this, I think we need to turn this loop into a sync loop and create a dedicated long-running thread, but that might be more work than we'd like.

Overall, I don't know if we have any evidence that the TCS and Tasks here are the source of the perf issue yet? and I wonder if it's the potentially small value of _waitTimeSpan that's causing the issue due to this loop spinning too fast? 🤔 Have we considered alternatives like increasing the buffering wait time? Or taking an approach similar to the agent writer where we have a longer buffering period, with dedicated flushing when the buffers get to a certain "fullness"? It'll be a trade-off obviously, but I'm not sure which is best

WriteToApi();
FlushComplete?.Invoke(this, EventArgs.Empty);
}
}
catch (Exception ex)
{
Log.Error(ex, "An error occured in the processing thread");
}

tcs?.TrySetResult(true);
}

private void ProcessQueueLoop()
{
while (!_processExit.Task.IsCompleted)
{
var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);

ProcessQueue(flushRequested == 1, currentFlushTcs);
Thread.Sleep(20);
}

_tcsBackgroundThread.TrySetResult(true);
Log.Debug("Data Streams background thread has been completed");
}

private void HandleConfigUpdate(AgentConfiguration config)
Expand Down
Loading