diff --git a/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt b/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt index 54fd6b2f9..6026939c7 100644 --- a/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt +++ b/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt @@ -83,6 +83,7 @@ namespace NServiceBus public NServiceBus.PolicySettings Policies { get; } public System.Func QueueNameGenerator { get; set; } public string QueueNamePrefix { get; set; } + public long ReserveBytesInMessageSizeCalculation { get; set; } public NServiceBus.S3Settings S3 { get; set; } public Amazon.SimpleNotificationService.IAmazonSimpleNotificationService SnsClient { get; } public Amazon.SQS.IAmazonSQS SqsClient { get; } diff --git a/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs b/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs index 52eb23735..d5e4f9d6d 100644 --- a/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs @@ -32,7 +32,7 @@ public async Task Does_not_send_extra_properties_in_payload_by_default() var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -76,7 +76,7 @@ public async Task Includes_message_id_in_message_attributes(DispatchProperties p var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var expectedId = "1234"; @@ -101,7 +101,7 @@ public async Task Should_dispatch_isolated_unicast_operations() var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -139,7 +139,7 @@ public async Task Should_dispatch_isolated_multicast_operations() var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, null, new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null, - 15 * 60); + 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -178,7 +178,7 @@ public async Task Should_deduplicate_if_compatibility_mode_is_enabled_and_subscr var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, mockSnsClient, new QueueCache(null, dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), - null, 15 * 60); + null, 15 * 60, 0); mockSnsClient.ListSubscriptionsByTopicResponse = topic => new ListSubscriptionsByTopicResponse { @@ -224,7 +224,7 @@ public async Task Should_not_deduplicate_if_compatibility_mode_is_enabled_and_no var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, mockSnsClient, new QueueCache(mockSqsClient, dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), - null, 15 * 60); + null, 15 * 60, 0); var messageId = Guid.NewGuid().ToString(); var headers = new Dictionary() @@ -262,7 +262,7 @@ public async Task Should_not_dispatch_multicast_operation_if_topic_doesnt_exist( var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null, dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), - null, 15 * 60); + null, 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -293,7 +293,7 @@ public async Task Should_not_dispatch_multicast_operation_if_event_type_is_objec var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null, dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), - null, 15 * 60); + null, 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -321,7 +321,7 @@ public async Task Should_upload_large_multicast_operations_request_to_s3(bool wr var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null, dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), - new S3Settings("someBucket", keyPrefix, mockS3Client), 15 * 60, wrapOutgoingMessages: wrapMessage); + new S3Settings("someBucket", keyPrefix, mockS3Client), 15 * 60, 0, wrapOutgoingMessages: wrapMessage); var longBodyMessageId = Guid.NewGuid().ToString(); /* Crazy long message id will cause the message to go over limits because attributes count as well */ @@ -384,7 +384,7 @@ public void Should_raise_queue_does_not_exists_for_delayed_delivery_for_isolated }; var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var properties = new DispatchProperties { @@ -410,7 +410,7 @@ public async Task Should_batch_non_isolated_unicast_operations() var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -446,10 +446,10 @@ public async Task Should_batch_non_isolated_multicast_operations() var mockSnsClient = new MockSnsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null, - dest => QueueCache.GetSqsQueueName(dest, "")), + dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null, - 15 * 60); + 15 * 60, 0); var transportOperations = new TransportOperations( new TransportOperation( @@ -488,7 +488,7 @@ public void Should_raise_queue_does_not_exists_for_delayed_delivery_for_non_isol }; var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var properties = new DispatchProperties { @@ -519,7 +519,7 @@ public async Task Should_dispatch_non_batched_all_unicast_operations_that_failed var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var firstMessageIdThatWillFail = Guid.NewGuid().ToString(); var secondMessageIdThatWillFail = Guid.NewGuid().ToString(); @@ -610,7 +610,7 @@ public async Task Should_dispatch_non_batched_all_multicast_operations_that_fail var mockSnsClient = new MockSnsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), null, mockSnsClient, new QueueCache(null, - dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), new TopicCache(mockSnsClient, new SettingsHolder(), new EventToTopicsMappings(), new EventToEventsMappings(), (type, s) => TopicNameHelper.GetSnsTopicName(type, ""), ""), null, 15 * 60, 0); var firstMessageIdThatWillFail = Guid.NewGuid().ToString(); var secondMessageIdThatWillFail = Guid.NewGuid().ToString(); @@ -704,8 +704,8 @@ public async Task Should_upload_large_non_isolated_operations_to_s3(bool wrapMes var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, - new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, wrapOutgoingMessages: wrapMessage); + dest => QueueCache.GetSqsQueueName(dest, "")), null, + new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, 0, wrapOutgoingMessages: wrapMessage); var transportOperations = new TransportOperations( new TransportOperation( @@ -767,8 +767,8 @@ public async Task Should_upload_large_isolated_operations_request_to_s3(bool wra var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, - new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, wrapOutgoingMessages: wrapMessage); + dest => QueueCache.GetSqsQueueName(dest, "")), null, + new S3Settings("someBucket", "somePrefix", mockS3Client), 15 * 60, 0, wrapOutgoingMessages: wrapMessage); var transportOperations = new TransportOperations( new TransportOperation( @@ -830,7 +830,7 @@ public async Task Should_base64_encode_by_default() var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0); var msgBody = "my message body"; var msgBodyByte = Encoding.UTF8.GetBytes(msgBody); @@ -864,7 +864,7 @@ public async Task Should_not_wrap_if_configured_not_to() var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, wrapOutgoingMessages: false); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0, wrapOutgoingMessages: false); var msgBody = "my message body"; var msgBodyByte = Encoding.UTF8.GetBytes(msgBody); @@ -899,7 +899,7 @@ public async Task Should_handle_empty_body_gracefully(bool wrapMessage) var mockSqsClient = new MockSqsClient(); var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, - dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, wrapOutgoingMessages: wrapMessage); + dest => QueueCache.GetSqsQueueName(dest, "")), null, null, 15 * 60, 0, wrapOutgoingMessages: wrapMessage); var messageid = Guid.NewGuid().ToString(); @@ -926,8 +926,42 @@ public async Task Should_handle_empty_body_gracefully(bool wrapMessage) Assert.That(request.MessageBody, Is.Not.Null.Or.Empty); } - interface IEvent { } + [Test] + public async Task Does_upload_to_S3_when_payload_reserved_bytes_are_set() + { + var mockSqsClient = new MockSqsClient(); + var mockS3Client = new MockS3Client(); + + var dispatcher = new MessageDispatcher(new SettingsHolder(), mockSqsClient, null, new QueueCache(mockSqsClient, + dest => QueueCache.GetSqsQueueName(dest, "")), null, new S3Settings("bucketname", "keyprefix", mockS3Client), 15 * 60, 50); + + var transportOperations = new TransportOperations( + new TransportOperation( + new OutgoingMessage("1234", new Dictionary + { + {TransportHeaders.TimeToBeReceived, ExpectedTtbr.ToString()}, + {Headers.ReplyToAddress, ExpectedReplyToAddress}, + {Headers.MessageId, "093C17C6-D32E-44FE-9134-65C10C1287EB"} + }, Encoding.Default.GetBytes(new string('x', 256 * 1024))), + new UnicastAddressTag("address"), + [], + DispatchConsistency.Isolated)); + + var transportTransaction = new TransportTransaction(); + + await dispatcher.Dispatch(transportOperations, transportTransaction); + Assert.Multiple(() => + { + Assert.That(mockSqsClient.RequestsSent, Is.Not.Empty, "No requests sent"); + Assert.That(mockS3Client.PutObjectRequestsSent, Is.Not.Empty, "No S3 requests sent"); + }); + + var s3Request = mockS3Client.PutObjectRequestsSent.First(); + Assert.That(s3Request.InputStream.Length, Is.GreaterThan(0)); + } + + interface IEvent { } interface IMyEvent : IEvent { } class Event : IMyEvent { } class AnotherEvent : IMyEvent { } diff --git a/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs b/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs index 29d30c999..961bb5727 100644 --- a/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs @@ -61,7 +61,8 @@ public async Task ShouldDisposeSqsAndSnsClients(bool disposeSqs, bool disposeSns false, disposeSqs, disposeSns, - false); + false, + 0); await sut.Shutdown(CancellationToken.None); diff --git a/src/NServiceBus.Transport.SQS.Tests/SnsPreparedMessageTests.cs b/src/NServiceBus.Transport.SQS.Tests/SnsPreparedMessageTests.cs index dbcbc4c43..86e247358 100644 --- a/src/NServiceBus.Transport.SQS.Tests/SnsPreparedMessageTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/SnsPreparedMessageTests.cs @@ -25,6 +25,22 @@ public void CalculateSize_BodyTakenIntoAccount() Assert.That(message.Size, Is.EqualTo(expectedSize)); } + [Test] + public void CalculateSize_BodyAndReservedBytesTakenIntoAccount() + { + var expectedSize = 15; + + var message = new SnsPreparedMessage + { + Body = new string('a', 10), + ReserveBytesInMessageSizeCalculation = 5 + }; + + message.CalculateSize(); + + Assert.That(message.Size, Is.EqualTo(expectedSize)); + } + [Test] public void CalculateSize_TakesAttributesIntoAccount() { diff --git a/src/NServiceBus.Transport.SQS.Tests/SqsPreparedMessageTests.cs b/src/NServiceBus.Transport.SQS.Tests/SqsPreparedMessageTests.cs index e1e6a65c9..6b0295b94 100644 --- a/src/NServiceBus.Transport.SQS.Tests/SqsPreparedMessageTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/SqsPreparedMessageTests.cs @@ -25,6 +25,22 @@ public void CalculateSize_BodyTakenIntoAccount() Assert.That(message.Size, Is.EqualTo(expectedSize)); } + [Test] + public void CalculateSize_BodyAndReservedBytesTakenIntoAccount() + { + var expectedSize = 15; + + var message = new SqsPreparedMessage + { + Body = new string('a', 10), + ReserveBytesInMessageSizeCalculation = 5 + }; + + message.CalculateSize(); + + Assert.That(message.Size, Is.EqualTo(expectedSize)); + } + [Test] public void CalculateSize_TakesAttributesIntoAccount() { diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs index bfbd1f603..ae010073c 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs @@ -48,6 +48,13 @@ internal void SetupSnsClient(IAmazonSimpleNotificationService snsClient, bool ex /// public string QueueNamePrefix { get; set; } + /// + /// Specifies an arbitrary number of bytes that will be added to the calculated payload size + /// which is useful to account for any overhead of message attributes added outside the scope of NServiceBus + /// to address the SQS service message size limitation by uploading the message payload to S3. + /// + public long ReserveBytesInMessageSizeCalculation { get; set; } + /// /// Specifies a lambda function that allows to take control of the queue name generation logic. /// This is useful to overcome any limitations imposed by SQS. @@ -292,7 +299,8 @@ public override async Task Initialize(HostSettings host DoNotWrapOutgoingMessages, !sqsClient.ExternallyManaged, !snsClient.ExternallyManaged, - !SupportsDelayedDelivery + !SupportsDelayedDelivery, + ReserveBytesInMessageSizeCalculation ); if (hostSettings.SetupInfrastructure) diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs index d2473bbd3..855731d13 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs @@ -15,7 +15,7 @@ class SqsTransportInfrastructure : TransportInfrastructure { public SqsTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, QueueCache queueCache, TopicCache topicCache, S3Settings s3Settings, PolicySettings policySettings, int queueDelayTimeSeconds, string topicNamePrefix, bool doNotWrapOutgoingMessages, - bool shouldDisposeSqsClient, bool shouldDisposeSnsClient, bool disableDelayedDelivery) + bool shouldDisposeSqsClient, bool shouldDisposeSnsClient, bool disableDelayedDelivery, long reserveBytesInMessageSizeCalculation) { this.sqsClient = sqsClient; this.snsClient = snsClient; @@ -32,7 +32,7 @@ public SqsTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] r .ToDictionary(x => x.Id, x => x); Dispatcher = new MessageDispatcher(hostSettings.CoreSettings, sqsClient, snsClient, queueCache, topicCache, s3Settings, - queueDelayTimeSeconds, !doNotWrapOutgoingMessages); + queueDelayTimeSeconds, reserveBytesInMessageSizeCalculation, !doNotWrapOutgoingMessages); } IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, IAmazonSQS sqsClient, diff --git a/src/NServiceBus.Transport.SQS/MessageDispatcher.cs b/src/NServiceBus.Transport.SQS/MessageDispatcher.cs index 88437c894..29fd2b7c7 100644 --- a/src/NServiceBus.Transport.SQS/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.SQS/MessageDispatcher.cs @@ -26,8 +26,8 @@ public MessageDispatcher(IReadOnlySettings settings, IAmazonSQS sqsClient, TopicCache topicCache, S3Settings s3, int queueDelaySeconds, - bool wrapOutgoingMessages = true - ) + long reserveBytesInMessageSizeCalculation, + bool wrapOutgoingMessages = true) { this.topicCache = topicCache; this.s3 = s3; @@ -36,6 +36,7 @@ public MessageDispatcher(IReadOnlySettings settings, IAmazonSQS sqsClient, this.sqsClient = sqsClient; this.queueCache = queueCache; this.wrapOutgoingMessages = wrapOutgoingMessages; + this.reserveBytesInMessageSizeCalculation = reserveBytesInMessageSizeCalculation; transportMessageSerializerOptions = new JsonSerializerOptions { @@ -358,7 +359,7 @@ await sqsClient.SendMessageAsync(message.ToRequest(), cancellationToken) return null; } - var preparedMessage = new SqsPreparedMessage { MessageId = transportOperation.Message.MessageId }; + var preparedMessage = new SqsPreparedMessage { MessageId = transportOperation.Message.MessageId, ReserveBytesInMessageSizeCalculation = reserveBytesInMessageSizeCalculation }; await ApplyUnicastOperationMapping(transportOperation, preparedMessage, CalculateDelayedDeliverySeconds(transportOperation), GetNativeMessageAttributes(transportOperation, transportTransaction), cancellationToken).ConfigureAwait(false); @@ -393,7 +394,7 @@ async Task PrepareSqsMessageBasedOnBodySize(TransportMessage? transportMessage) async Task PrepareMessage(MulticastTransportOperation transportOperation, CancellationToken cancellationToken) { - var preparedMessage = new SnsPreparedMessage { MessageId = transportOperation.Message.MessageId }; + var preparedMessage = new SnsPreparedMessage { MessageId = transportOperation.Message.MessageId, ReserveBytesInMessageSizeCalculation = reserveBytesInMessageSizeCalculation }; await ApplyMulticastOperationMapping(transportOperation, preparedMessage, cancellationToken).ConfigureAwait(false); @@ -582,6 +583,7 @@ async ValueTask ApplyUnicastOperationMapping(UnicastTransportOperation transport readonly JsonSerializerOptions transportMessageSerializerOptions; readonly IAmazonSQS sqsClient; readonly QueueCache queueCache; + readonly long reserveBytesInMessageSizeCalculation; static readonly ILog Logger = LogManager.GetLogger(typeof(MessageDispatcher)); } diff --git a/src/NServiceBus.Transport.SQS/SnsPreparedMessage.cs b/src/NServiceBus.Transport.SQS/SnsPreparedMessage.cs index 81ab584d8..d7f3575c2 100644 --- a/src/NServiceBus.Transport.SQS/SnsPreparedMessage.cs +++ b/src/NServiceBus.Transport.SQS/SnsPreparedMessage.cs @@ -21,6 +21,7 @@ public string MessageId public string Body { get; set; } public string Destination { get; set; } public long Size { get; private set; } + public long ReserveBytesInMessageSizeCalculation { get; init; } public Dictionary MessageAttributes { get; } = []; @@ -28,6 +29,7 @@ public void CalculateSize() { Size = Body?.Length ?? 0; Size += CalculateAttributesSize(); + Size += ReserveBytesInMessageSizeCalculation; } long CalculateAttributesSize() diff --git a/src/NServiceBus.Transport.SQS/SqsPreparedMessage.cs b/src/NServiceBus.Transport.SQS/SqsPreparedMessage.cs index c4444cfb0..5754c8436 100644 --- a/src/NServiceBus.Transport.SQS/SqsPreparedMessage.cs +++ b/src/NServiceBus.Transport.SQS/SqsPreparedMessage.cs @@ -28,11 +28,13 @@ public string MessageId public string MessageGroupId { get; set; } public string MessageDeduplicationId { get; set; } public Dictionary MessageAttributes { get; } = []; + public long ReserveBytesInMessageSizeCalculation { get; init; } public void CalculateSize() { Size = Body?.Length ?? 0; Size += CalculateAttributesSize(); + Size += ReserveBytesInMessageSizeCalculation; } long CalculateAttributesSize()