Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ internal class DataStreamsWriter : IDataStreamsWriter

private readonly object _initLock = new();
private readonly long _bucketDurationMs;
private readonly BoundedConcurrentQueue<StatsPoint> _buffer = new(queueLimit: 10_000);
private readonly BoundedConcurrentQueue<BacklogPoint> _backlogBuffer = new(queueLimit: 10_000);
private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10);
private readonly BoundedConcurrentQueue<StatsPoint> _buffer = new(queueLimit: 25_000);
private readonly BoundedConcurrentQueue<BacklogPoint> _backlogBuffer = new(queueLimit: 25_000);
private readonly DataStreamsAggregator _aggregator;
private readonly IDiscoveryService _discoveryService;
private readonly IDataStreamsApi _api;
Expand Down Expand Up @@ -313,13 +312,7 @@ private async Task ProcessQueueLoopAsync()
continue;
}

// The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
// and modified to avoid dealing with exceptions
var tcs = new TaskCompletionSource<bool>();
using (new Timer(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
{
await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
}
await Task.Delay(25).ConfigureAwait(false);
}
}

Expand Down
Loading