Skip to content

Commit 4f8d525

Browse files
Improve the poison message deletion handling to reduce the chance of duplicates (#2765)
* Add test to verify behavior * Improve the poison message handling --------- Co-authored-by: danielmarbach <[email protected]>
1 parent 474cd7f commit 4f8d525

File tree

2 files changed

+79
-11
lines changed

2 files changed

+79
-11
lines changed

src/NServiceBus.Transport.SQS.Tests/InputQueuePumpTests.cs

+59
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,65 @@ await SetupInitializedPump(onMessage: (ctx, ct) =>
106106
Assert.That(mockSqsClient.DeleteMessageRequestsSent.Single().receiptHandle, Is.EqualTo(expectedReceiptHandle));
107107
}
108108

109+
static IEnumerable<object[]> PoisonMessageExceptions() =>
110+
[
111+
[new Exception()],
112+
[new ReceiptHandleIsInvalidException("Ooops")]
113+
];
114+
115+
[Theory]
116+
[TestCaseSource(nameof(PoisonMessageExceptions))]
117+
public async Task Poison_messages_failed_to_be_deleted_are_deleted_on_next_receive_without_processing(Exception exception)
118+
{
119+
var nativeMessageId = Guid.NewGuid().ToString();
120+
var messageId = Guid.NewGuid().ToString();
121+
var expectedFirstReceiptHandle = "receipt-handle-1";
122+
var expectedSecondReceiptHandle = "receipt-handle-2";
123+
124+
var processed = false;
125+
await SetupInitializedPump(onMessage: (ctx, ct) =>
126+
{
127+
processed = true;
128+
return Task.FromResult(0);
129+
});
130+
131+
var deleteRequest = mockSqsClient.DeleteMessageRequestResponse;
132+
mockSqsClient.DeleteMessageRequestResponse = request => throw exception;
133+
134+
var message = new Message
135+
{
136+
ReceiptHandle = expectedFirstReceiptHandle,
137+
MessageId = nativeMessageId,
138+
MessageAttributes = new Dictionary<string, MessageAttributeValue>
139+
{
140+
{Headers.MessageId, new MessageAttributeValue {StringValue = messageId}},
141+
},
142+
Body = null //poison message
143+
};
144+
145+
// First receive attempt
146+
await pump.ProcessMessage(message, CancellationToken.None).ConfigureAwait(false);
147+
148+
// Restore the previous behavior
149+
mockSqsClient.DeleteMessageRequestResponse = deleteRequest;
150+
// Second receive attempt of the same message that has already been moved to the error queue
151+
// but not deleted from the input queue. It will be received with a new receipt handle.
152+
message.ReceiptHandle = expectedSecondReceiptHandle;
153+
await pump.ProcessMessage(message, CancellationToken.None).ConfigureAwait(false);
154+
155+
Assert.Multiple(() =>
156+
{
157+
Assert.That(processed, Is.False);
158+
Assert.That(mockSqsClient.RequestsSent, Has.Count.EqualTo(1), "The message should not be attempted to be sent multiple times to the error queue.");
159+
Assert.That(mockSqsClient.DeleteMessageRequestsSent, Has.Count.EqualTo(2));
160+
});
161+
Assert.Multiple(() =>
162+
{
163+
Assert.That(mockSqsClient.DeleteMessageRequestsSent[0].receiptHandle, Is.EqualTo(expectedFirstReceiptHandle));
164+
Assert.That(mockSqsClient.DeleteMessageRequestsSent[1].receiptHandle, Is.EqualTo(expectedSecondReceiptHandle));
165+
});
166+
}
167+
109168
[Test]
110169
public async Task Expired_messages_are_deleted_without_processing()
111170
{

src/NServiceBus.Transport.SQS/InputQueuePump.cs

+20-11
Original file line numberDiff line numberDiff line change
@@ -590,12 +590,12 @@ await sqsClient
590590
}
591591
}
592592

593-
async Task MovePoisonMessageToErrorQueue(Message message, CancellationToken messageProcessingCancellationToken)
593+
async Task MovePoisonMessageToErrorQueue(Message message, CancellationToken cancellationToken)
594594
{
595595
string errorQueueUrl = null;
596596
try
597597
{
598-
errorQueueUrl = await queueCache.GetQueueUrl(errorQueueAddress, messageProcessingCancellationToken)
598+
errorQueueUrl = await queueCache.GetQueueUrl(errorQueueAddress, cancellationToken)
599599
.ConfigureAwait(false);
600600
// Ok to use LINQ here since this is not really a hot path
601601
var messageAttributeValues = message.MessageAttributes
@@ -606,12 +606,12 @@ await sqsClient.SendMessageAsync(new SendMessageRequest
606606
QueueUrl = errorQueueUrl,
607607
MessageBody = message.Body,
608608
MessageAttributes = messageAttributeValues
609-
}, messageProcessingCancellationToken).ConfigureAwait(false);
609+
}, cancellationToken).ConfigureAwait(false);
610610
// The Attributes on message are read-only attributes provided by SQS
611611
// and can't be re-sent. Unfortunately all the SQS metadata
612612
// such as SentTimestamp is reset with this send.
613613
}
614-
catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken))
614+
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
615615
{
616616
Logger.Error($"Error moving poison message to error queue at url {errorQueueUrl}. Moving back to input queue.", ex);
617617
try
@@ -621,11 +621,11 @@ await sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest
621621
QueueUrl = inputQueueUrl,
622622
ReceiptHandle = message.ReceiptHandle,
623623
VisibilityTimeout = 0
624-
}, messageProcessingCancellationToken).ConfigureAwait(false);
624+
}, cancellationToken).ConfigureAwait(false);
625625
}
626-
catch (Exception changeMessageVisibilityEx) when (!changeMessageVisibilityEx.IsCausedBy(messageProcessingCancellationToken))
626+
catch (Exception changeMessageVisibilityEx) when (!changeMessageVisibilityEx.IsCausedBy(cancellationToken))
627627
{
628-
Logger.Warn($"Error returning poison message back to input queue at url {inputQueueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx);
628+
Logger.Warn($"Error returning poison message with native ID '{message.MessageId}' back to input queue {inputQueueUrl}. Poison message will become available at the input queue again after the visibility timeout expires.", changeMessageVisibilityEx);
629629
}
630630

631631
return;
@@ -637,14 +637,23 @@ await sqsClient.DeleteMessageAsync(new DeleteMessageRequest
637637
{
638638
QueueUrl = inputQueueUrl,
639639
ReceiptHandle = message.ReceiptHandle
640-
}, messageProcessingCancellationToken).ConfigureAwait(false);
640+
}, CancellationToken.None) // We don't want the delete to be cancellable to avoid unnecessary duplicates of poison messages
641+
.ConfigureAwait(false);
641642
}
642-
catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken))
643+
catch (ReceiptHandleIsInvalidException ex)
643644
{
644-
Logger.Warn($"Error removing poison message from input queue {inputQueueUrl}. This may cause duplicate poison messages in the error queue for this endpoint.", ex);
645+
Logger.Error($"Error removing poison message with native ID '{message.MessageId}' from input queue {inputQueueUrl} because the visibility timeout expired. Poison message will become available at the input queue again and attempted to be removed on a best-effort basis. This may still cause duplicate poison messages in the error queue for this endpoint", ex);
646+
647+
messagesToBeDeleted.AddOrUpdate(message.MessageId, true);
648+
}
649+
catch (Exception ex)
650+
{
651+
Logger.Warn($"Error removing poison message with native ID '{message.MessageId}' from input queue {inputQueueUrl}. Poison message will become available at the input queue again and attempted to be removed on a best-effort basis. This may still cause duplicate poison messages in the error queue for this endpoint", ex);
652+
653+
messagesToBeDeleted.AddOrUpdate(message.MessageId, true);
645654
}
646655

647-
// If there is a message body in S3, simply leave it there
656+
// If there is a message body in S3, simply leave it there to be aged out by the S3 lifecycle policy when the TTL expires
648657
}
649658

650659
List<Task> pumpTasks;

0 commit comments

Comments
 (0)