diff --git a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs index 35dc1cedd..0bbef38e9 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs @@ -30,7 +30,6 @@ //--------------------------------------------------------------------------- using System; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -280,10 +279,20 @@ protected override async Task DoHandleCommandAsync(IncomingCommand cmd) { if (cmd.CommandId == ProtocolCommandId.BasicCancelOk) { - Debug.Assert(_consumerTag == new BasicCancelOk(cmd.MethodSpan)._consumerTag); - await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) - .ConfigureAwait(false); - _tcs.SetResult(true); + var result = new BasicCancelOk(cmd.MethodSpan); + if (_consumerTag == result._consumerTag) + { + await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) + .ConfigureAwait(false); + _tcs.SetResult(true); + } + else + { + string msg = string.Format("Consumer tag '{0}' does not match expected consumer tag for basic.cancel operation {1}", + result._consumerTag, _consumerTag); + var ex = new InvalidOperationException(msg); + _tcs.SetException(ex); + } } else { diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index d2ca45247..b22625510 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -32,7 +32,6 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Text; using System.Threading; @@ -148,13 +147,14 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken) enqueued = Enqueue(k); var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); - - return; + if (false == await k) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } } finally { @@ -180,12 +180,14 @@ private void HandleAck(ulong deliveryTag, bool multiple) { if (multiple) { - foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources.ToArray()) { if (pair.Key <= deliveryTag) { - pair.Value.SetResult(true); - _confirmsTaskCompletionSources.Remove(pair.Key, out _); + if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource? tcs)) + { + tcs.SetResult(true); + } } } } @@ -206,18 +208,20 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (multiple) { - foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources.ToArray()) { if (pair.Key <= deliveryTag) { - pair.Value.SetException(new PublishException(pair.Key, isReturn)); - _confirmsTaskCompletionSources.Remove(pair.Key, out _); + if (_confirmsTaskCompletionSources.TryRemove(pair.Key, out TaskCompletionSource? tcs)) + { + tcs.SetException(new PublishException(pair.Key, isReturn)); + } } } } else { - if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) + if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) { tcs.SetException(new PublishException(deliveryTag, isReturn)); } @@ -380,6 +384,11 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) .ConfigureAwait(false); } + catch (OperationCanceledException) + { + _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); + throw; + } finally { publisherConfirmationInfo.Dispose(); diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index b573d30e2..0bfc3e2b9 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -234,8 +234,7 @@ await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); } - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); await ConsumerDispatcher.WaitForShutdownAsync() .ConfigureAwait(false); @@ -387,8 +386,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); await MaybeConfirmSelect(cancellationToken) .ConfigureAwait(false); @@ -465,6 +463,14 @@ await c.HandleCommandAsync(cmd) } } + private static void AssertResultIsTrue(bool result) + { + if (false == result) + { + throw new InvalidOperationException(InternalConstants.BugFound); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] protected ValueTask ModelSendAsync(in T method, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod { @@ -978,8 +984,7 @@ await ModelSendAsync(in method, k.CancellationToken) await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch { @@ -1108,9 +1113,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1143,9 +1146,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1187,9 +1188,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1240,9 +1239,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1286,8 +1283,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1332,8 +1328,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1456,8 +1451,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1587,9 +1581,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1621,9 +1613,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1655,9 +1645,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { @@ -1689,9 +1677,7 @@ await ModelSendAsync(in method, k.CancellationToken) try { - bool result = await k; - Debug.Assert(result); - return; + AssertResultIsTrue(await k); } catch (OperationCanceledException) { diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 5b700ed51..53723e85a 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; @@ -43,7 +44,7 @@ namespace Test.Integration { public class TestFloodPublishing : IntegrationFixture { - private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5); + private static readonly TimeSpan ElapsedMax = TimeSpan.FromSeconds(10); private readonly byte[] _body = GetRandomBody(2048); public TestFloodPublishing(ITestOutputHelper output) : base(output) @@ -92,31 +93,68 @@ public async Task TestUnthrottledFloodPublishing() return Task.CompletedTask; }; - var publishTasks = new List(); + var queueArguments = new Dictionary + { + ["x-max-length"] = 131072, + ["x-overflow"] = "reject-publish" + }; + + QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, + passive: false, durable: false, exclusive: true, autoDelete: true, arguments: queueArguments); + string queueName = q.QueueName; + + var exceptions = new ConcurrentBag(); + + async Task WaitPublishTasksAsync(ICollection publishTasks) + { + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + exceptions.Add(ex); + } + } + + publishTasks.Clear(); + } + var stopwatch = Stopwatch.StartNew(); - int i = 0; int publishCount = 0; try { - for (i = 0; i < 65535 * 64; i++) + var tasks = new List(); + for (int j = 0; j < 8; j++) { - if (i % 65536 == 0) + tasks.Add(Task.Run(async () => { - if (stopwatch.Elapsed > FiveSeconds) + var publishTasks = new List(); + for (int i = 0; i < 65536; i++) { - break; - } - } + if (stopwatch.Elapsed > ElapsedMax) + { + await WaitPublishTasksAsync(publishTasks); + return; + } - publishCount++; - publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask()); + Interlocked.Increment(ref publishCount); + publishTasks.Add(_channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, mandatory: true, + body: _body)); - if (i % 500 == 0) - { - await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); - publishTasks.Clear(); - } + if (i % 128 == 0) + { + await WaitPublishTasksAsync(publishTasks); + } + } + + await WaitPublishTasksAsync(publishTasks); + })); } + + await Task.WhenAll(tasks).WaitAsync(WaitSpan); } finally { @@ -127,7 +165,8 @@ public async Task TestUnthrottledFloodPublishing() Assert.False(sawUnexpectedShutdown); if (IsVerbose) { - _output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed); + _output.WriteLine("[INFO] published {0} messages in {1}, exceptions: {2}", + publishCount, stopwatch.Elapsed, exceptions.Count); } }