Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a transport-level configuration option for message payload byte reservation #2768

Merged
merged 6 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ namespace NServiceBus
public NServiceBus.PolicySettings Policies { get; }
public System.Func<string, string, string> QueueNameGenerator { get; set; }
public string QueueNamePrefix { get; set; }
public long ReserveBytesInMessageSizeCalculation { get; set; }
public NServiceBus.S3Settings S3 { get; set; }
public Amazon.SimpleNotificationService.IAmazonSimpleNotificationService SnsClient { get; }
public Amazon.SQS.IAmazonSQS SqsClient { get; }
Expand Down
82 changes: 58 additions & 24 deletions src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task Does_not_send_extra_properties_in_payload_by_default()
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -76,7 +76,7 @@ public async Task Includes_message_id_in_message_attributes(DispatchProperties p
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var expectedId = "1234";

Expand All @@ -101,7 +101,7 @@ public async Task Should_dispatch_isolated_unicast_operations()
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -139,7 +139,7 @@ public async Task Should_dispatch_isolated_multicast_operations()
var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, null,
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(),
new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null,
15 * 60);
15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -178,7 +178,7 @@ public async Task Should_deduplicate_if_compatibility_mode_is_enabled_and_subscr
var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, mockSnsClient, new QueueCache(null,
dest => QueueCache.GetSqsQueueName(dest, "")),
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""),
null, 15 * 60);
null, 15 * 60, 0);

mockSnsClient.ListSubscriptionsByTopicResponse = topic => new ListSubscriptionsByTopicResponse
{
Expand Down Expand Up @@ -224,7 +224,7 @@ public async Task Should_not_deduplicate_if_compatibility_mode_is_enabled_and_no
var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, mockSnsClient, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")),
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""),
null, 15 * 60);
null, 15 * 60, 0);

var messageId = Guid.NewGuid().ToString();
var headers = new Dictionary<string, string>()
Expand Down Expand Up @@ -262,7 +262,7 @@ public async Task Should_not_dispatch_multicast_operation_if_topic_doesnt_exist(
var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null,
dest => QueueCache.GetSqsQueueName(dest, "")),
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""),
null, 15 * 60);
null, 15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -293,7 +293,7 @@ public async Task Should_not_dispatch_multicast_operation_if_event_type_is_objec
var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null,
dest => QueueCache.GetSqsQueueName(dest, "")),
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""),
null, 15 * 60);
null, 15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -321,7 +321,7 @@ public async Task Should_upload_large_multicast_operations_request_to_s3(bool wr
var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null,
dest => QueueCache.GetSqsQueueName(dest, "")),
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""),
new S3Settings("someBucket", keyPrefix, mockS3Client), 15 * 60, wrapOutgoingMessages: wrapMessage);
new S3Settings("someBucket", keyPrefix, mockS3Client), 15 * 60, 0, wrapOutgoingMessages: wrapMessage);

var longBodyMessageId = Guid.NewGuid().ToString();
/* Crazy long message id will cause the message to go over limits because attributes count as well */
Expand Down Expand Up @@ -384,7 +384,7 @@ public void Should_raise_queue_does_not_exists_for_delayed_delivery_for_isolated
};

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var properties = new DispatchProperties
{
Expand All @@ -410,7 +410,7 @@ public async Task Should_batch_non_isolated_unicast_operations()
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -446,10 +446,10 @@ public async Task Should_batch_non_isolated_multicast_operations()
var mockSnsClient = new MockSnsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null,
dest => QueueCache.GetSqsQueueName(dest, "")),
dest => QueueCache.GetSqsQueueName(dest, "")),
new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(),
new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null,
15 * 60);
15 * 60, 0);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -488,7 +488,7 @@ public void Should_raise_queue_does_not_exists_for_delayed_delivery_for_non_isol
};

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var properties = new DispatchProperties
{
Expand Down Expand Up @@ -519,7 +519,7 @@ public async Task Should_dispatch_non_batched_all_unicast_operations_that_failed
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var firstMessageIdThatWillFail = Guid.NewGuid().ToString();
var secondMessageIdThatWillFail = Guid.NewGuid().ToString();
Expand Down Expand Up @@ -610,7 +610,7 @@ public async Task Should_dispatch_non_batched_all_multicast_operations_that_fail
var mockSnsClient = new MockSnsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null,
dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null, 15 * 60, 0);

var firstMessageIdThatWillFail = Guid.NewGuid().ToString();
var secondMessageIdThatWillFail = Guid.NewGuid().ToString();
Expand Down Expand Up @@ -704,8 +704,8 @@ public async Task Should_upload_large_non_isolated_operations_to_s3(bool wrapMes
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null,
new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, wrapOutgoingMessages: wrapMessage);
dest => QueueCache.GetSqsQueueName(dest, "")), null,
new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, 0, wrapOutgoingMessages: wrapMessage);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -767,8 +767,8 @@ public async Task Should_upload_large_isolated_operations_request_to_s3(bool wra
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null,
new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, wrapOutgoingMessages: wrapMessage);
dest => QueueCache.GetSqsQueueName(dest, "")), null,
new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, 0, wrapOutgoingMessages: wrapMessage);

var transportOperations = new TransportOperations(
new TransportOperation(
Expand Down Expand Up @@ -830,7 +830,7 @@ public async Task Should_base64_encode_by_default()
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0);

var msgBody = "my message body";
var msgBodyByte = Encoding.UTF8.GetBytes(msgBody);
Expand Down Expand Up @@ -864,7 +864,7 @@ public async Task Should_not_wrap_if_configured_not_to()
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, wrapOutgoingMessages: false);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0, wrapOutgoingMessages: false);

var msgBody = "my message body";
var msgBodyByte = Encoding.UTF8.GetBytes(msgBody);
Expand Down Expand Up @@ -899,7 +899,7 @@ public async Task Should_handle_empty_body_gracefully(bool wrapMessage)
var mockSqsClient = new MockSqsClient();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, wrapOutgoingMessages: wrapMessage);
dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0, wrapOutgoingMessages: wrapMessage);

var messageid = Guid.NewGuid().ToString();

Expand All @@ -926,8 +926,42 @@ public async Task Should_handle_empty_body_gracefully(bool wrapMessage)
Assert.That(request.MessageBody, Is.Not.Null.Or.Empty);
}

interface IEvent { }
[Test]
public async Task Does_upload_to_S3_when_payload_reserved_bytes_are_set()
{
var mockSqsClient = new MockSqsClient();
var mockS3Client = new MockS3Client();

var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient,
dest => QueueCache.GetSqsQueueName(dest, "")), null, new S3Settings("bucketname", "keyprefix", mockS3Client), 15 * 60, 50);

var transportOperations = new TransportOperations(
new TransportOperation(
new OutgoingMessage("1234", new Dictionary<string, string>
{
{TransportHeaders.TimeToBeReceived, ExpectedTtbr.ToString()},
{Headers.ReplyToAddress, ExpectedReplyToAddress},
{Headers.MessageId, "093C17C6-D32E-44FE-9134-65C10C1287EB"}
}, Encoding.Default.GetBytes(new string('x', 256 * 1024))),
new UnicastAddressTag("address"),
[],
DispatchConsistency.Isolated));

var transportTransaction = new TransportTransaction();

await dispatcher.Dispatch(transportOperations, transportTransaction);

Assert.Multiple(() =>
{
Assert.That(mockSqsClient.RequestsSent, Is.Not.Empty, "No requests sent");
Assert.That(mockS3Client.PutObjectRequestsSent, Is.Not.Empty, "No S3 requests sent");
});

var s3Request = mockS3Client.PutObjectRequestsSent.First();
Assert.That(s3Request.InputStream.Length, Is.GreaterThan(0));
}

interface IEvent { }
interface IMyEvent : IEvent { }
class Event : IMyEvent { }
class AnotherEvent : IMyEvent { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public async Task ShouldDisposeSqsAndSnsClients(bool disposeSqs, bool disposeSns
false,
disposeSqs,
disposeSns,
false);
false,
0);

await sut.Shutdown(CancellationToken.None);

Expand Down
16 changes: 16 additions & 0 deletions src/NServiceBus.Transport.SQS.Tests/SnsPreparedMessageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ public void CalculateSize_BodyTakenIntoAccount()
Assert.That(message.Size, Is.EqualTo(expectedSize));
}

[Test]
public void CalculateSize_BodyAndReservedBytesTakenIntoAccount()
{
var expectedSize = 15;

var message = new SnsPreparedMessage
{
Body = new string('a', 10),
ReserveBytesInMessageSizeCalculation = 5
};

message.CalculateSize();

Assert.That(message.Size, Is.EqualTo(expectedSize));
}

[Test]
public void CalculateSize_TakesAttributesIntoAccount()
{
Expand Down
16 changes: 16 additions & 0 deletions src/NServiceBus.Transport.SQS.Tests/SqsPreparedMessageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ public void CalculateSize_BodyTakenIntoAccount()
Assert.That(message.Size, Is.EqualTo(expectedSize));
}

[Test]
public void CalculateSize_BodyAndReservedBytesTakenIntoAccount()
{
var expectedSize = 15;

var message = new SqsPreparedMessage
{
Body = new string('a', 10),
ReserveBytesInMessageSizeCalculation = 5
};

message.CalculateSize();

Assert.That(message.Size, Is.EqualTo(expectedSize));
}

[Test]
public void CalculateSize_TakesAttributesIntoAccount()
{
Expand Down
10 changes: 9 additions & 1 deletion src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ internal void SetupSnsClient(IAmazonSimpleNotificationService snsClient, bool ex
/// </summary>
public string QueueNamePrefix { get; set; }

/// <summary>
/// Specifies an arbitrary number of bytes that will be added to the calculated payload size
/// which is useful to account for any overhead of message attributes added outside the scope of NServiceBus
/// to address the SQS service message size limitation by uploading the message payload to S3.
/// </summary>
public long ReserveBytesInMessageSizeCalculation { get; set; }

/// <summary>
/// Specifies a lambda function that allows to take control of the queue name generation logic.
/// This is useful to overcome any limitations imposed by SQS.
Expand Down Expand Up @@ -292,7 +299,8 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
DoNotWrapOutgoingMessages,
!sqsClient.ExternallyManaged,
!snsClient.ExternallyManaged,
!SupportsDelayedDelivery
!SupportsDelayedDelivery,
ReserveBytesInMessageSizeCalculation
);

if (hostSettings.SetupInfrastructure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SqsTransportInfrastructure : TransportInfrastructure
{
public SqsTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, IAmazonSQS sqsClient,
IAmazonSimpleNotificationService snsClient, QueueCache queueCache, TopicCache topicCache, S3Settings s3Settings, PolicySettings policySettings, int queueDelayTimeSeconds, string topicNamePrefix, bool doNotWrapOutgoingMessages,
bool shouldDisposeSqsClient, bool shouldDisposeSnsClient, bool disableDelayedDelivery)
bool shouldDisposeSqsClient, bool shouldDisposeSnsClient, bool disableDelayedDelivery, long reserveBytesInMessageSizeCalculation)
{
this.sqsClient = sqsClient;
this.snsClient = snsClient;
Expand All @@ -32,7 +32,7 @@ public SqsTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] r
.ToDictionary(x => x.Id, x => x);

Dispatcher = new MessageDispatcher(hostSettings.CoreSettings, sqsClient, snsClient, queueCache, topicCache, s3Settings,
queueDelayTimeSeconds, !doNotWrapOutgoingMessages);
queueDelayTimeSeconds, reserveBytesInMessageSizeCalculation, !doNotWrapOutgoingMessages);
}

IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, IAmazonSQS sqsClient,
Expand Down
Loading