[13.x] Send bulk SQS jobs via SendMessageBatch#60645
Open
kieranbrown wants to merge 4 commits into
Open
Conversation
Dispatch jobs queued via bulk() (e.g. Bus::batch) using the SQS SendMessageBatch API instead of one SendMessage call per job. Entries are chunked to respect the SendMessageBatch limits of 10 messages and 1 MiB cumulative payload, then each chunk is sent sequentially and dispatch stops at the first failure — mirroring push(): jobs already sent stay queued, later chunks are not attempted, and the error surfaces to the caller. Stopping on failure also preserves FIFO ordering for free. Per-job afterCommit, unique/debounce locks, delays, overflow storage, and the JobQueueing / JobQueued events all behave identically to push(). The rollback-callback registration shared with push() is extracted into Queue::registerRollbackCallbacksForDeferredJob() and reused by SyncQueue. Request-level failures propagate as the SqsException the SDK throws. SendMessageBatch can also return HTTP 200 while rejecting individual entries (which the SDK does not raise for); per the AWS docs those must be checked, so a rejected entry is surfaced as an equivalent SqsException carrying the reported error code, message, and full result. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Thanks for submitting a PR! Note that draft PRs are not reviewed. If you would like a review, please mark your pull request as ready for review in the GitHub user interface. Pull requests that are abandoned in draft may be closed due to inactivity. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR makes the SQS queue's
bulk()method dispatch jobs using theSendMessageBatchAPI instead of issuing oneSendMessagecall per job.Why
bulk()is how batched work reaches the queue (e.g.Bus::batch([...])->dispatch()). Today the SQS driver loops and sends each job individually, so a 500-job batch makes 500 API calls.SendMessageBatchsends up to 10 messages per call, cutting that by ~10× — fewer round trips, lower cost, higher throughput.Behavior
SendMessageBatchlimits — 10 messages and 1 MiB cumulative payload (both documented limits are 1 MiB).push(): jobs already sent stay queued, later chunks aren't attempted, and the error surfaces to the caller. Stopping on failure also preserves FIFO ordering for free — no later message can arrive ahead of one that never sent.afterCommit,ShouldBeUnique/debounce lock release on rollback, delays, overflow storage, and theJobQueueing/JobQueuedevents all behave identically topush(). Because batching bypasses the per-jobenqueueUsing()path, the rollback-callback registration is extracted intoQueue::registerRollbackCallbacksForDeferredJob()and reused by bothSyncQueueand this method so the behavior stays in one place.Error handling
SqsExceptionthe SDK already throws — unchanged frompush().SendMessageBatchcan return HTTP 200 while rejecting individual entries, which the SDK does not raise for. Per the AWS docs — "Because the batch request can result in a combination of successful and unsuccessful actions, you should check for batch errors even when the call returns an HTTP status code of 200" — those are surfaced as an equivalentSqsExceptioncarrying the reported error code, message, and full result, so rejected jobs are never silently dropped.Notes
afterCommitis honored here (unlikeDatabaseQueue::bulk(), which currently ignores it) precisely because batching replaces the per-jobenqueueUsing()path that would otherwise apply it — so the manual partitioning restores parity rather than adding new behavior.Tests cover chunking (count + payload size), FIFO ordering and message-group/dedup handling, delay, overflow, event parity,
afterCommitdeferral, unique-lock rollback registration, and both failure modes.