Skip to content

Commit 57bed54

Browse files
change ChangeConcurrency implementation to prevent closing connection midflight. (#1577)
Co-authored-by: Tim Bussmann <[email protected]>
1 parent c3870c3 commit 57bed54

File tree

4 files changed

+229
-31
lines changed

4 files changed

+229
-31
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
namespace NServiceBus.Transport.RabbitMQ.TransportTests
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NServiceBus.TransportTests;
8+
using NUnit.Framework;
9+
10+
public class Changing_concurrency : NServiceBusTransportTest
11+
{
12+
[TestCase(TransportTransactionMode.ReceiveOnly)]
13+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
14+
[TestCase(TransportTransactionMode.TransactionScope)]
15+
public async Task Should_complete_current_message(TransportTransactionMode transactionMode)
16+
{
17+
var triggeredChangeConcurrency = CreateTaskCompletionSource();
18+
var concurrencyChanged = CreateTaskCompletionSource();
19+
int invocationCounter = 0;
20+
21+
await StartPump(async (context, ct) =>
22+
{
23+
Interlocked.Increment(ref invocationCounter);
24+
25+
_ = Task.Run(async () =>
26+
{
27+
var task = receiver.ChangeConcurrency(new PushRuntimeSettings(1), ct);
28+
triggeredChangeConcurrency.SetResult();
29+
await task;
30+
concurrencyChanged.SetResult();
31+
}, ct);
32+
33+
await triggeredChangeConcurrency.Task;
34+
35+
}, (_, ct) =>
36+
{
37+
Assert.Fail("Message processing should not fail");
38+
return Task.FromResult(ErrorHandleResult.RetryRequired);
39+
},
40+
transactionMode);
41+
42+
await SendMessage(InputQueueName);
43+
await concurrencyChanged.Task;
44+
await StopPump();
45+
Assert.AreEqual(1, invocationCounter, "message should successfully complete on first processing attempt");
46+
}
47+
48+
[TestCase(TransportTransactionMode.ReceiveOnly)]
49+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
50+
[TestCase(TransportTransactionMode.TransactionScope)]
51+
public async Task Should_dispatch_delayed_retries(TransportTransactionMode transactionMode)
52+
{
53+
int invocationCounter = 0;
54+
55+
var triggeredChangeConcurrency = CreateTaskCompletionSource();
56+
var sentMessageReceived = CreateTaskCompletionSource();
57+
58+
await StartPump((context, _) =>
59+
{
60+
Interlocked.Increment(ref invocationCounter);
61+
if (context.Headers.TryGetValue("FromOnError", out var value) && value == bool.TrueString)
62+
{
63+
sentMessageReceived.SetResult();
64+
}
65+
else
66+
{
67+
throw new Exception("triggering recoverability pipeline");
68+
}
69+
70+
return Task.CompletedTask;
71+
}, async (context, ct) =>
72+
{
73+
// same behavior as delayed retries
74+
_ = Task.Run(async () =>
75+
{
76+
var task = receiver.ChangeConcurrency(new PushRuntimeSettings(1), ct);
77+
triggeredChangeConcurrency.SetResult();
78+
await task;
79+
}, ct);
80+
81+
await triggeredChangeConcurrency.Task;
82+
await SendMessage(InputQueueName,
83+
new Dictionary<string, string>() { { "FromOnError", bool.TrueString } },
84+
context.TransportTransaction, cancellationToken: ct);
85+
return ErrorHandleResult.Handled;
86+
},
87+
transactionMode);
88+
89+
await SendMessage(InputQueueName);
90+
91+
await sentMessageReceived.Task;
92+
await StopPump();
93+
Assert.AreEqual(2, invocationCounter, "there should be exactly 2 messages (initial message and new message from recoverability pipeline)");
94+
}
95+
}
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
namespace NServiceBus.TransportTests
2+
{
3+
using System;
4+
using System.Collections.Concurrent;
5+
using System.Collections.Generic;
6+
using System.Threading.Tasks;
7+
using NUnit.Framework;
8+
using Transport;
9+
10+
public class When_restarting_endpoint : NServiceBusTransportTest
11+
{
12+
[TestCase(TransportTransactionMode.None)]
13+
[TestCase(TransportTransactionMode.ReceiveOnly)]
14+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
15+
[TestCase(TransportTransactionMode.TransactionScope)]
16+
public async Task Should_handle_multiple_stop_calls_to_started_receiver(TransportTransactionMode transactionMode)
17+
{
18+
await StartPump(
19+
(context, token) => Task.CompletedTask,
20+
(context, token) => Task.FromResult(ErrorHandleResult.Handled),
21+
transactionMode);
22+
23+
await receiver.StopReceive();
24+
Assert.DoesNotThrowAsync(() => receiver.StopReceive());
25+
}
26+
27+
[TestCase(TransportTransactionMode.None)]
28+
[TestCase(TransportTransactionMode.ReceiveOnly)]
29+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
30+
[TestCase(TransportTransactionMode.TransactionScope)]
31+
public async Task Should_allow_restarting_receivers(TransportTransactionMode transactionMode)
32+
{
33+
var messageReceived = CreateTaskCompletionSource();
34+
35+
await StartPump((context, token) =>
36+
{
37+
messageReceived.SetResult();
38+
return Task.CompletedTask;
39+
},
40+
(context, token) => Task.FromResult(ErrorHandleResult.Handled), transactionMode);
41+
42+
await receiver.StopReceive();
43+
await receiver.StartReceive();
44+
45+
await SendMessage(InputQueueName);
46+
await messageReceived.Task;
47+
}
48+
49+
[TestCase(TransportTransactionMode.None)]
50+
[TestCase(TransportTransactionMode.ReceiveOnly)]
51+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
52+
[TestCase(TransportTransactionMode.TransactionScope)]
53+
public async Task Should_gracefully_restart_processing(TransportTransactionMode transactionMode)
54+
{
55+
var receivedMessages = new ConcurrentQueue<string>();
56+
var pumpStopTriggered = CreateTaskCompletionSource();
57+
var followupMessageReceived = CreateTaskCompletionSource();
58+
await StartPump(async (context, token) =>
59+
{
60+
var messageType = context.Headers["Type"];
61+
receivedMessages.Enqueue(messageType);
62+
TestContext.WriteLine("Received message " + messageType);
63+
64+
switch (messageType)
65+
{
66+
case "Start":
67+
68+
// run async because the pump might block the return until all inflight messages are processed.
69+
var stopTask = Task.Run(async () =>
70+
{
71+
var t = receiver.StopReceive(token);
72+
pumpStopTriggered.SetResult();
73+
await t;
74+
TestContext.WriteLine("Stopped receiver");
75+
}, token);
76+
77+
await pumpStopTriggered.Task;
78+
await SendMessage(InputQueueName, new Dictionary<string, string>() { { "Type", "Followup" } },
79+
context.TransportTransaction, cancellationToken: token);
80+
81+
_ = stopTask.ContinueWith(async _ =>
82+
{
83+
await receiver.StartReceive(token);
84+
TestContext.WriteLine("Started receiver");
85+
}, token);
86+
87+
break;
88+
case "Followup":
89+
followupMessageReceived.SetResult();
90+
break;
91+
default:
92+
throw new ArgumentException();
93+
}
94+
},
95+
(context, token) =>
96+
{
97+
Assert.Fail($"Message failed processing: {context.Exception}");
98+
return Task.FromResult(ErrorHandleResult.Handled);
99+
}, transactionMode);
100+
101+
await SendMessage(InputQueueName, new Dictionary<string, string>() { { "Type", "Start" } });
102+
await followupMessageReceived.Task;
103+
await StopPump();
104+
105+
Assert.AreEqual(2, receivedMessages.Count);
106+
Assert.IsTrue(receivedMessages.TryDequeue(out var firstMessageType));
107+
Assert.AreEqual("Start", firstMessageType);
108+
Assert.IsTrue(receivedMessages.TryDequeue(out var secondMessageType));
109+
Assert.AreEqual("Followup", secondMessageType);
110+
}
111+
}
112+
}

src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs

+3-6
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,12 @@ internal void SetupInfrastructure(string[] sendingQueues)
5454
}
5555
}
5656

57-
public override Task Shutdown(CancellationToken cancellationToken = default)
57+
public override async Task Shutdown(CancellationToken cancellationToken = default)
5858
{
59-
foreach (IMessageReceiver receiver in Receivers.Values)
60-
{
61-
((MessagePump)receiver).Dispose();
62-
}
59+
await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken)))
60+
.ConfigureAwait(false);
6361

6462
channelProvider.Dispose();
65-
return Task.CompletedTask;
6663
}
6764

6865
public override string ToTransportAddress(QueueAddress address) => TranslateAddress(address);

src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs

+18-25
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
using global::RabbitMQ.Client.Exceptions;
1313
using Logging;
1414

15-
sealed class MessagePump : IMessageReceiver, IDisposable
15+
sealed class MessagePump : IMessageReceiver
1616
{
1717
static readonly ILog Logger = LogManager.GetLogger(typeof(MessagePump));
1818
static readonly TransportTransaction transportTransaction = new();
@@ -31,7 +31,6 @@ sealed class MessagePump : IMessageReceiver, IDisposable
3131
readonly FastConcurrentLru<string, int> deliveryAttempts = new(1_000);
3232
readonly FastConcurrentLru<string, bool> failedBasicAckMessages = new(1_000);
3333

34-
bool disposed;
3534
OnMessage onMessage;
3635
OnError onError;
3736
int maxConcurrency;
@@ -113,12 +112,13 @@ public Task StartReceive(CancellationToken cancellationToken = default)
113112
return Task.CompletedTask;
114113
}
115114

116-
public Task ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken = default)
115+
public async Task ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken = default)
117116
{
118-
maxConcurrency = limitations.MaxConcurrency;
119117
Logger.InfoFormat("Calling a change concurrency and reconnecting with new value {0}.", limitations.MaxConcurrency);
120-
_ = Task.Run(() => Reconnect(messageProcessingCancellationTokenSource.Token), cancellationToken);
121-
return Task.CompletedTask;
118+
119+
await StopReceive(cancellationToken).ConfigureAwait(false);
120+
maxConcurrency = limitations.MaxConcurrency;
121+
await StartReceive(CancellationToken.None).ConfigureAwait(false);
122122
}
123123

124124
void ConnectToBroker()
@@ -148,6 +148,12 @@ void ConnectToBroker()
148148

149149
public async Task StopReceive(CancellationToken cancellationToken = default)
150150
{
151+
if (messagePumpCancellationTokenSource is null)
152+
{
153+
// Receiver hasn't been started or is already stopped
154+
return;
155+
}
156+
151157
messagePumpCancellationTokenSource?.Cancel();
152158

153159
using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
@@ -183,6 +189,12 @@ public async Task StopReceive(CancellationToken cancellationToken = default)
183189

184190
await connectionShutdownCompleted.Task.ConfigureAwait(false);
185191
}
192+
193+
messagePumpCancellationTokenSource?.Dispose();
194+
messagePumpCancellationTokenSource = null;
195+
messageProcessingCancellationTokenSource?.Dispose();
196+
connection.Dispose();
197+
circuitBreaker?.Dispose();
186198
}
187199

188200
#pragma warning disable PS0018 // A task-returning method should have a CancellationToken parameter unless it has a parameter implementing ICancellableContext
@@ -511,24 +523,5 @@ async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEv
511523
Logger.Warn($"Failed to acknowledge poison message because the channel was closed. The message was sent to queue '{queue}' but also returned to the original queue.", ex);
512524
}
513525
}
514-
515-
public void Dispose()
516-
{
517-
if (disposed)
518-
{
519-
return;
520-
}
521-
522-
circuitBreaker?.Dispose();
523-
messagePumpCancellationTokenSource?.Cancel();
524-
messagePumpCancellationTokenSource?.Dispose();
525-
// This makes sure that if the stop token hasn't been canceled the processing source is canceled
526-
// so that any possible reconnect attempt has the possibility to gracefully stop too.
527-
messageProcessingCancellationTokenSource?.Cancel();
528-
messageProcessingCancellationTokenSource?.Dispose();
529-
530-
connection?.Dispose();
531-
disposed = true;
532-
}
533526
}
534527
}

0 commit comments

Comments
 (0)