-
-
Notifications
You must be signed in to change notification settings - Fork 969
Integrate large payload support for SQS #2337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
akosel
wants to merge
10
commits into
celery:main
Choose a base branch
from
akosel:sqs-large-payload-support
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
1e6f52c
Integrate large payload support for SQS
794f09d
Fix tests when sqs_extended_client is not installed
228168b
Add basic test for extracting results from s3
066e192
Fix using sqs_extended_client in tests
2ffdb7a
Add tests for S3 client creation in SQS Channel
3694096
Merge PR #2116: Add SQS large payload support
akosel 2c54f38
Add documentation for Large Message Support
akosel 736e419
Add more unit tests
akosel a0acb9e
Add sqs extras to ci
akosel 3aa81f1
Merge branch 'celery:main' into sqs-large-payload-support
akosel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -125,6 +125,26 @@ | |||||
For a complete list of settings you can adjust using this option see | ||||||
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html | ||||||
|
||||||
Large Message Support | ||||||
--------------------- | ||||||
SQS has a maximum message size limit of 256KB. To handle larger messages, | ||||||
this transport automatically supports the Amazon SQS Extended Client Library, | ||||||
which uses S3 to store message payloads that exceed the SQS size limit. | ||||||
|
||||||
This feature is automatically available when using the SQS transport - no | ||||||
additional installation or configuration is required as the necessary | ||||||
dependencies are included with the SQS extras. | ||||||
|
||||||
When a large message is sent: | ||||||
- The message body is automatically stored in S3 | ||||||
- SQS receives a reference pointer to the S3 object | ||||||
- When the message is received, the transport transparently retrieves | ||||||
the payload from S3 | ||||||
|
||||||
Note: You need appropriate S3 permissions in addition to SQS permissions | ||||||
for this feature to work. The IAM policy should include s3:GetObject and | ||||||
s3:PutObject permissions for the S3 bucket used by the extended client. | ||||||
|
||||||
Features | ||||||
======== | ||||||
* Type: Virtual | ||||||
|
@@ -140,6 +160,7 @@ | |||||
|
||||||
import base64 | ||||||
import binascii | ||||||
import json | ||||||
import re | ||||||
import socket | ||||||
import string | ||||||
|
@@ -154,7 +175,7 @@ | |||||
from vine import ensure_promise, promise, transform | ||||||
|
||||||
from kombu.asynchronous import get_event_loop | ||||||
from kombu.asynchronous.aws.ext import boto3, exceptions | ||||||
from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client | ||||||
from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection | ||||||
from kombu.asynchronous.aws.sqs.message import AsyncMessage | ||||||
from kombu.log import get_logger | ||||||
|
@@ -507,6 +528,24 @@ def _message_to_python(self, message, queue_name, q_url): | |||||
self._delete_message(queue_name, message) | ||||||
return payload | ||||||
|
||||||
# Check if this is a large payload stored in S3 | ||||||
if ( | ||||||
sqs_extended_client and | ||||||
isinstance(payload, list) | ||||||
and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Consider extracting the sqs_extended_client.client.MESSAGE_POINTER_CLASS into a constant for better readability and maintainability.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
): | ||||||
# Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload | ||||||
s3_details = payload[1] | ||||||
s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"] | ||||||
|
||||||
s3_client = self.s3() | ||||||
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key) | ||||||
|
||||||
# The message body is under a wrapper class called StreamingBody | ||||||
streaming_body = response["Body"] | ||||||
body = self._optional_b64_decode(streaming_body.read()) | ||||||
payload = json.loads(body) | ||||||
|
||||||
return self._envelope_payload(payload, text, message, q_url) | ||||||
|
||||||
def _messages_to_python(self, messages, queue): | ||||||
|
@@ -740,6 +779,32 @@ def close(self): | |||||
# if "can't set attribute" not in str(exc): | ||||||
# raise | ||||||
|
||||||
def new_s3_client( | ||||||
self, region, access_key_id, secret_access_key, session_token=None | ||||||
): | ||||||
session = boto3.session.Session( | ||||||
region_name=region, | ||||||
aws_access_key_id=access_key_id, | ||||||
aws_secret_access_key=secret_access_key, | ||||||
aws_session_token=session_token, | ||||||
) | ||||||
is_secure = self.is_secure if self.is_secure is not None else True | ||||||
client_kwargs = {"use_ssl": is_secure} | ||||||
|
||||||
if self.endpoint_url is not None: | ||||||
client_kwargs["endpoint_url"] = self.endpoint_url | ||||||
|
||||||
client = session.client("s3", **client_kwargs) | ||||||
|
||||||
return client | ||||||
|
||||||
def s3(self): | ||||||
return self.new_s3_client( | ||||||
region=self.region, | ||||||
access_key_id=self.conninfo.userid, | ||||||
secret_access_key=self.conninfo.password, | ||||||
) | ||||||
|
||||||
def new_sqs_client(self, region, access_key_id, | ||||||
secret_access_key, session_token=None): | ||||||
session = boto3.session.Session( | ||||||
|
@@ -756,7 +821,13 @@ def new_sqs_client(self, region, access_key_id, | |||||
client_kwargs['endpoint_url'] = self.endpoint_url | ||||||
client_config = self.transport_options.get('client-config') or {} | ||||||
config = Config(**client_config) | ||||||
return session.client('sqs', config=config, **client_kwargs) | ||||||
client = session.client('sqs', config=config, **client_kwargs) | ||||||
|
||||||
if self.transport_options.get('large_payload_bucket') and sqs_extended_client: | ||||||
client.large_payload_support = self.transport_options.get('large_payload_bucket') | ||||||
client.use_legacy_attribute = False | ||||||
|
||||||
return client | ||||||
|
||||||
def sqs(self, queue=None): | ||||||
if queue is not None and self.predefined_queues: | ||||||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
boto3>=1.26.143 | ||
pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" | ||
urllib3>=1.26.16 | ||
amazon-sqs-extended-client>=1.0.1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition lacks proper error handling for cases where payload[0] access might fail if payload is an empty list. Consider adding bounds checking.
Copilot uses AI. Check for mistakes.