Skip to content

Commit 86a8d1f

Browse files
committed
Misc changes from lukebakken/amqp-string
* Add `RunTestsUntilFailure` to `build.ps1` * Add dedicated `Create` static methods for `WorkStruct` * Improve "Pipelining of requests forbidden" exception message. * Move event handler recovery tests to their own classes to enable parallel execution. * Use separate publish and consume connections. * Fix detection of unexpected exception initiator. * Ensure flood publishing test does not use exclusive queue.
1 parent 7057fd5 commit 86a8d1f

13 files changed

+418
-197
lines changed

build.ps1

+22-13
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,45 @@
11
[CmdletBinding(PositionalBinding=$false)]
22
param(
3-
[switch]$RunTests
3+
[switch]$RunTests,
4+
[switch]$RunTestsUntilFailure
45
)
56

7+
$ErrorActionPreference = 'Stop'
8+
Set-StrictMode -Version Latest
9+
$PSNativeCommandUseErrorActionPreference = $true
10+
611
Write-Host "Run Parameters:" -ForegroundColor Cyan
712
Write-Host "`tPSScriptRoot: $PSScriptRoot"
813
Write-Host "`tRunTests: $RunTests"
14+
Write-Host "`tRunTestsUntilFailure: $RunTestsUntilFailure"
915
Write-Host "`tdotnet --version: $(dotnet --version)"
1016

1117
Write-Host "[INFO] building all projects (Build.csproj traversal)..." -ForegroundColor "Magenta"
1218
dotnet build "$PSScriptRoot\Build.csproj"
1319
Write-Host "[INFO] done building." -ForegroundColor "Green"
1420

15-
if ($RunTests)
21+
if ($RunTests -or $RunTestsUntilFailure)
1622
{
1723
$tests_dir = Join-Path -Path $PSScriptRoot -ChildPath 'projects' | Join-Path -ChildPath 'Test'
1824
$unit_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Unit' | Join-Path -ChildPath 'Unit.csproj')
1925
$integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Integration' | Join-Path -ChildPath 'Integration.csproj')
2026
$sequential_integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'SequentialIntegration' | Join-Path -ChildPath 'SequentialIntegration.csproj')
2127

22-
foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file)
28+
Do
2329
{
24-
Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta"
25-
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
26-
if ($LASTEXITCODE -ne 0)
27-
{
28-
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
29-
Exit 1
30-
}
31-
else
30+
foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file)
3231
{
33-
Write-Host "[INFO] tests passed" -ForegroundColor "Green"
32+
Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta"
33+
& dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
34+
if ($LASTEXITCODE -ne 0)
35+
{
36+
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
37+
Exit 1
38+
}
39+
else
40+
{
41+
Write-Host "[INFO] tests passed" -ForegroundColor "Green"
42+
}
3443
}
35-
}
44+
} While ($RunTestsUntilFailure)
3645
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

+41-8
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consu
6464
if (false == _disposed && false == _quiesce)
6565
{
6666
AddConsumer(consumer, consumerTag);
67-
return _writer.WriteAsync(new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag), cancellationToken);
67+
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
68+
return _writer.WriteAsync(work, cancellationToken);
6869
}
6970
else
7071
{
@@ -78,7 +79,8 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
7879
{
7980
if (false == _disposed && false == _quiesce)
8081
{
81-
var work = new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
82+
IBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
83+
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
8284
return _writer.WriteAsync(work, cancellationToken);
8385
}
8486
else
@@ -91,7 +93,9 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken
9193
{
9294
if (false == _disposed && false == _quiesce)
9395
{
94-
return _writer.WriteAsync(new WorkStruct(WorkType.CancelOk, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken);
96+
IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
97+
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
98+
return _writer.WriteAsync(work, cancellationToken);
9599
}
96100
else
97101
{
@@ -103,7 +107,9 @@ public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken ca
103107
{
104108
if (false == _disposed && false == _quiesce)
105109
{
106-
return _writer.WriteAsync(new WorkStruct(WorkType.Cancel, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken);
110+
IBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
111+
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
112+
return _writer.WriteAsync(work, cancellationToken);
107113
}
108114
else
109115
{
@@ -226,7 +232,7 @@ await _worker
226232

227233
protected sealed override void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason)
228234
{
229-
_writer.TryWrite(new WorkStruct(consumer, reason));
235+
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
230236
}
231237

232238
protected override void InternalShutdown()
@@ -258,23 +264,23 @@ protected override Task InternalShutdownAsync()
258264
public readonly ShutdownEventArgs? Reason;
259265
public readonly WorkType WorkType;
260266

261-
public WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag)
267+
private WorkStruct(WorkType type, IBasicConsumer consumer, string consumerTag)
262268
: this()
263269
{
264270
WorkType = type;
265271
Consumer = consumer;
266272
ConsumerTag = consumerTag;
267273
}
268274

269-
public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
275+
private WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
270276
: this()
271277
{
272278
WorkType = WorkType.Shutdown;
273279
Consumer = consumer;
274280
Reason = reason;
275281
}
276282

277-
public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
283+
private WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
278284
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body)
279285
{
280286
WorkType = WorkType.Deliver;
@@ -289,6 +295,33 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag
289295
Reason = default;
290296
}
291297

298+
public static WorkStruct CreateCancel(IBasicConsumer consumer, string consumerTag)
299+
{
300+
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
301+
}
302+
303+
public static WorkStruct CreateCancelOk(IBasicConsumer consumer, string consumerTag)
304+
{
305+
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
306+
}
307+
308+
public static WorkStruct CreateConsumeOk(IBasicConsumer consumer, string consumerTag)
309+
{
310+
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
311+
}
312+
313+
public static WorkStruct CreateShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
314+
{
315+
return new WorkStruct(consumer, reason);
316+
}
317+
318+
public static WorkStruct CreateDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
319+
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body)
320+
{
321+
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
322+
exchange, routingKey, basicProperties, body);
323+
}
324+
292325
public void Dispose() => Body.Dispose();
293326
}
294327

projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void Enqueue(IRpcContinuation k)
8282
IRpcContinuation result = Interlocked.CompareExchange(ref _outstandingRpc, k, s_tmp);
8383
if (!(result is EmptyRpcContinuation))
8484
{
85-
throw new NotSupportedException("Pipelining of requests forbidden");
85+
throw new NotSupportedException($"Pipelining of requests forbidden (attempted: {k.GetType()}, enqueued: {result.GetType()})");
8686
}
8787
}
8888

projects/Test/Common/IntegrationFixture.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ protected ConnectionFactory CreateConnectionFactory()
538538

539539
protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)
540540
{
541-
if (args.Initiator == ShutdownInitiator.Peer)
541+
if (args.Initiator != ShutdownInitiator.Application)
542542
{
543543
IConnection conn = (IConnection)sender;
544544
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
@@ -547,7 +547,7 @@ protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)
547547

548548
protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
549549
{
550-
if (args.Initiator == ShutdownInitiator.Peer)
550+
if (args.Initiator != ShutdownInitiator.Application)
551551
{
552552
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
553553
}
@@ -556,7 +556,7 @@ protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args
556556

557557
protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)
558558
{
559-
if (args.Initiator == ShutdownInitiator.Peer)
559+
if (args.Initiator != ShutdownInitiator.Application)
560560
{
561561
IChannel ch = (IChannel)sender;
562562
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
@@ -565,7 +565,7 @@ protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)
565565

566566
protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
567567
{
568-
if (args.Initiator == ShutdownInitiator.Peer)
568+
if (args.Initiator != ShutdownInitiator.Application)
569569
{
570570
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
571571
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System.Threading;
33+
using System.Threading.Tasks;
34+
using RabbitMQ.Client.Impl;
35+
using Xunit;
36+
using Xunit.Abstractions;
37+
38+
namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel
39+
{
40+
public class TestRecoveryEventHandlers : TestConnectionRecoveryBase
41+
{
42+
public TestRecoveryEventHandlers(ITestOutputHelper output) : base(output)
43+
{
44+
}
45+
46+
[Fact]
47+
public async Task TestRecoveryEventHandlers_Called()
48+
{
49+
int counter = 0;
50+
((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter);
51+
52+
await CloseAndWaitForRecoveryAsync();
53+
await CloseAndWaitForRecoveryAsync();
54+
await CloseAndWaitForRecoveryAsync();
55+
await CloseAndWaitForRecoveryAsync();
56+
Assert.True(_channel.IsOpen);
57+
Assert.True(counter >= 3);
58+
}
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System.Threading;
33+
using System.Threading.Tasks;
34+
using Xunit;
35+
using Xunit.Abstractions;
36+
37+
namespace Test.Integration.ConnectionRecovery.EventHandlerRecovery.Channel
38+
{
39+
public class TestShutdownEventHandlers : TestConnectionRecoveryBase
40+
{
41+
public TestShutdownEventHandlers(ITestOutputHelper output) : base(output)
42+
{
43+
}
44+
45+
[Fact]
46+
public async Task TestShutdownEventHandlersOnChannel_Called()
47+
{
48+
int counter = 0;
49+
_channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter);
50+
51+
Assert.True(_channel.IsOpen);
52+
await CloseAndWaitForRecoveryAsync();
53+
await CloseAndWaitForRecoveryAsync();
54+
await CloseAndWaitForRecoveryAsync();
55+
await CloseAndWaitForRecoveryAsync();
56+
Assert.True(_channel.IsOpen);
57+
58+
Assert.True(counter >= 3);
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)