Skip to content

Investigate & fix SemaphoreFullException #1819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -280,10 +279,20 @@ protected override async Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == ProtocolCommandId.BasicCancelOk)
{
Debug.Assert(_consumerTag == new BasicCancelOk(cmd.MethodSpan)._consumerTag);
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
.ConfigureAwait(false);
_tcs.SetResult(true);
var result = new BasicCancelOk(cmd.MethodSpan);
if (_consumerTag == result._consumerTag)
{
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
.ConfigureAwait(false);
_tcs.SetResult(true);
}
else
{
string msg = string.Format("Consumer tag '{0}' does not match expected consumer tag for basic.cancel operation {1}",
result._consumerTag, _consumerTag);
var ex = new InvalidOperationException(msg);
_tcs.SetException(ex);
}
}
else
{
Expand Down
33 changes: 21 additions & 12 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -148,13 +147,14 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
enqueued = Enqueue(k);

var method = new ConfirmSelect(false);

await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);

return;
if (false == await k)
{
throw new InvalidOperationException(InternalConstants.BugFound);
}
}
finally
{
Expand All @@ -180,12 +180,14 @@ private void HandleAck(ulong deliveryTag, bool multiple)
{
if (multiple)
{
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources.ToArray())
{
if (pair.Key <= deliveryTag)
{
pair.Value.SetResult(true);
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource<bool>? tcs))
{
tcs.SetResult(true);
}
}
}
}
Expand All @@ -206,18 +208,20 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
{
if (multiple)
{
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources.ToArray())
{
if (pair.Key <= deliveryTag)
{
pair.Value.SetException(new PublishException(pair.Key, isReturn));
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource<bool>? tcs))
{
tcs.SetException(new PublishException(pair.Key, isReturn));
}
}
}
}
else
{
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
tcs.SetException(new PublishException(deliveryTag, isReturn));
}
Expand Down Expand Up @@ -380,6 +384,11 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
throw;
}
finally
{
publisherConfirmationInfo.Dispose();
Expand Down
58 changes: 22 additions & 36 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
}

bool result = await k;
Debug.Assert(result);
AssertResultIsTrue(await k);

await ConsumerDispatcher.WaitForShutdownAsync()
.ConfigureAwait(false);
Expand Down Expand Up @@ -387,8 +386,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
AssertResultIsTrue(await k);

await MaybeConfirmSelect(cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -465,6 +463,14 @@ await c.HandleCommandAsync(cmd)
}
}

private static void AssertResultIsTrue(bool result)
{
if (false == result)
{
throw new InvalidOperationException(InternalConstants.BugFound);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod
{
Expand Down Expand Up @@ -978,8 +984,7 @@ await ModelSendAsync(in method, k.CancellationToken)
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
AssertResultIsTrue(await k);
}
catch
{
Expand Down Expand Up @@ -1108,9 +1113,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1143,9 +1146,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1187,9 +1188,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1240,9 +1239,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1286,8 +1283,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1332,8 +1328,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1456,8 +1451,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1587,9 +1581,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1621,9 +1613,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1655,9 +1645,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -1689,9 +1677,7 @@ await ModelSendAsync(in method, k.CancellationToken)

try
{
bool result = await k;
Debug.Assert(result);
return;
AssertResultIsTrue(await k);
}
catch (OperationCanceledException)
{
Expand Down
73 changes: 56 additions & 17 deletions projects/Test/Integration/TestFloodPublishing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
Expand All @@ -43,7 +44,7 @@ namespace Test.Integration
{
public class TestFloodPublishing : IntegrationFixture
{
private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5);
private static readonly TimeSpan ElapsedMax = TimeSpan.FromSeconds(10);
private readonly byte[] _body = GetRandomBody(2048);

public TestFloodPublishing(ITestOutputHelper output) : base(output)
Expand Down Expand Up @@ -92,31 +93,68 @@ public async Task TestUnthrottledFloodPublishing()
return Task.CompletedTask;
};

var publishTasks = new List<Task>();
var queueArguments = new Dictionary<string, object>
{
["x-max-length"] = 131072,
["x-overflow"] = "reject-publish"
};

QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty,
passive: false, durable: false, exclusive: true, autoDelete: true, arguments: queueArguments);
string queueName = q.QueueName;

var exceptions = new ConcurrentBag<Exception>();

async Task WaitPublishTasksAsync(ICollection<ValueTask> publishTasks)
{
foreach (ValueTask pt in publishTasks)
{
try
{
await pt;
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}

publishTasks.Clear();
}

var stopwatch = Stopwatch.StartNew();
int i = 0;
int publishCount = 0;
try
{
for (i = 0; i < 65535 * 64; i++)
var tasks = new List<Task>();
for (int j = 0; j < 8; j++)
{
if (i % 65536 == 0)
tasks.Add(Task.Run(async () =>
{
if (stopwatch.Elapsed > FiveSeconds)
var publishTasks = new List<ValueTask>();
for (int i = 0; i < 65536; i++)
{
break;
}
}
if (stopwatch.Elapsed > ElapsedMax)
{
await WaitPublishTasksAsync(publishTasks);
return;
}

publishCount++;
publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask());
Interlocked.Increment(ref publishCount);
publishTasks.Add(_channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, mandatory: true,
body: _body));

if (i % 500 == 0)
{
await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
publishTasks.Clear();
}
if (i % 128 == 0)
{
await WaitPublishTasksAsync(publishTasks);
}
}

await WaitPublishTasksAsync(publishTasks);
}));
}

await Task.WhenAll(tasks).WaitAsync(WaitSpan);
}
finally
{
Expand All @@ -127,7 +165,8 @@ public async Task TestUnthrottledFloodPublishing()
Assert.False(sawUnexpectedShutdown);
if (IsVerbose)
{
_output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
_output.WriteLine("[INFO] published {0} messages in {1}, exceptions: {2}",
publishCount, stopwatch.Elapsed, exceptions.Count);
}
}

Expand Down