From 1e6f52c73b1ce0d12c8a3592838a2ccb96d64fc9 Mon Sep 17 00:00:00 2001 From: Amit Shah Date: Tue, 17 Sep 2024 12:03:51 +0100 Subject: [PATCH 1/8] Integrate large payload support for SQS This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary. As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action Relates to: celery/kombu#279 --- kombu/asynchronous/aws/ext.py | 2 ++ kombu/asynchronous/aws/sqs/ext.py | 2 ++ kombu/transport/SQS.py | 56 +++++++++++++++++++++++++++++-- requirements/extras/sqs.txt | 1 + 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/kombu/asynchronous/aws/ext.py b/kombu/asynchronous/aws/ext.py index d82e48f31..648da8ad7 100644 --- a/kombu/asynchronous/aws/ext.py +++ b/kombu/asynchronous/aws/ext.py @@ -4,12 +4,14 @@ try: import boto3 + import sqs_extended_client from botocore import exceptions from botocore.awsrequest import AWSRequest from botocore.httpsession import get_cert_path from botocore.response import get_response except ImportError: boto3 = None + sqs_extended_client = None class _void: pass diff --git a/kombu/asynchronous/aws/sqs/ext.py b/kombu/asynchronous/aws/sqs/ext.py index 72268b5db..96d4f5904 100644 --- a/kombu/asynchronous/aws/sqs/ext.py +++ b/kombu/asynchronous/aws/sqs/ext.py @@ -5,5 +5,7 @@ try: import boto3 + import sqs_extended_client except ImportError: boto3 = None + sqs_extended_client = None diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 7d3fa0cc9..5e32a84ee 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -133,6 +133,7 @@ from __future__ import annotations import base64 +import json import socket import string import uuid @@ -144,7 +145,7 @@ from vine import ensure_promise, promise, transform from kombu.asynchronous import get_event_loop -from kombu.asynchronous.aws.ext import boto3, exceptions +from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection from kombu.asynchronous.aws.sqs.message import AsyncMessage from kombu.log import get_logger @@ -498,6 +499,25 @@ def _message_to_python(self, message, queue_name, q_url): message['ReceiptHandle'], ) else: + + if ( + sqs_extended_client and + isinstance(payload, list) + and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS + ): + # Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload + s3_details = payload[1] + s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"] + + s3_client = self.s3() + response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key) + + # The message body is under a wrapper class called StreamingBody + streaming_body = response["Body"] + payload = json.loads( + self._optional_b64_decode(streaming_body.read()) + ) + try: properties = payload['properties'] delivery_info = payload['properties']['delivery_info'] @@ -713,6 +733,32 @@ def close(self): # if "can't set attribute" not in str(exc): # raise + def new_s3_client( + self, region, access_key_id, secret_access_key, session_token=None + ): + session = boto3.session.Session( + region_name=region, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + ) + is_secure = self.is_secure if self.is_secure is not None else True + client_kwargs = {"use_ssl": is_secure} + + if self.endpoint_url is not None: + client_kwargs["endpoint_url"] = self.endpoint_url + + client = session.client("s3", **client_kwargs) + + return client + + def s3(self): + return self.new_s3_client( + region=self.region, + access_key_id=self.conninfo.userid, + secret_access_key=self.conninfo.password, + ) + def new_sqs_client(self, region, access_key_id, secret_access_key, session_token=None): session = boto3.session.Session( @@ -729,7 +775,13 @@ def new_sqs_client(self, region, access_key_id, client_kwargs['endpoint_url'] = self.endpoint_url client_config = self.transport_options.get('client-config') or {} config = Config(**client_config) - return session.client('sqs', config=config, **client_kwargs) + client = session.client('sqs', config=config, **client_kwargs) + + if self.transport_options.get('large_payload_bucket') and sqs_extended_client: + client.large_payload_support = self.transport_options.get('large_payload_bucket') + client.use_legacy_attribute = False + + return client def sqs(self, queue=None): if queue is not None and self.predefined_queues: diff --git a/requirements/extras/sqs.txt b/requirements/extras/sqs.txt index 22a5270ed..f233379d3 100644 --- a/requirements/extras/sqs.txt +++ b/requirements/extras/sqs.txt @@ -1,2 +1,3 @@ boto3>=1.26.143 urllib3>=1.26.16 +amazon-sqs-extended-client>=1.0.1 From 794f09d62f7bb4b6bf9e694a355ce69d007d02a4 Mon Sep 17 00:00:00 2001 From: Amit Shah Date: Wed, 13 Nov 2024 11:27:54 +0000 Subject: [PATCH 2/8] Fix tests when sqs_extended_client is not installed The try/except block was triggering when sqs_extended_client isn't installed, which results in boto being overwritten with None --- kombu/asynchronous/aws/ext.py | 7 +++++-- kombu/asynchronous/aws/sqs/ext.py | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/kombu/asynchronous/aws/ext.py b/kombu/asynchronous/aws/ext.py index 648da8ad7..541b3f307 100644 --- a/kombu/asynchronous/aws/ext.py +++ b/kombu/asynchronous/aws/ext.py @@ -4,14 +4,12 @@ try: import boto3 - import sqs_extended_client from botocore import exceptions from botocore.awsrequest import AWSRequest from botocore.httpsession import get_cert_path from botocore.response import get_response except ImportError: boto3 = None - sqs_extended_client = None class _void: pass @@ -24,6 +22,11 @@ class BotoCoreError(Exception): get_response = _void() get_cert_path = _void() +try: + import sqs_extended_client +except ImportError: + sqs_extended_client = None + __all__ = ( 'exceptions', 'AWSRequest', 'get_response', 'get_cert_path', diff --git a/kombu/asynchronous/aws/sqs/ext.py b/kombu/asynchronous/aws/sqs/ext.py index 96d4f5904..8ca782ab3 100644 --- a/kombu/asynchronous/aws/sqs/ext.py +++ b/kombu/asynchronous/aws/sqs/ext.py @@ -5,7 +5,10 @@ try: import boto3 - import sqs_extended_client except ImportError: boto3 = None + +try: + import sqs_extended_client +except ImportError: sqs_extended_client = None From 228168be361588c70b4de5bdd1a1db9b9200779e Mon Sep 17 00:00:00 2001 From: Amit Shah Date: Tue, 3 Dec 2024 16:19:53 +0000 Subject: [PATCH 3/8] Add basic test for extracting results from s3 --- kombu/transport/SQS.py | 5 ++--- t/unit/transport/test_SQS.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 5e32a84ee..4ae3ce726 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -514,9 +514,8 @@ def _message_to_python(self, message, queue_name, q_url): # The message body is under a wrapper class called StreamingBody streaming_body = response["Body"] - payload = json.loads( - self._optional_b64_decode(streaming_body.read()) - ) + body = self._optional_b64_decode(streaming_body.read()) + payload = json.loads(body) try: properties = payload['properties'] diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index b6a1d6ae2..0219d3519 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -7,14 +7,17 @@ from __future__ import annotations import base64 +import json import os import random import string from datetime import datetime, timedelta +from io import BytesIO from queue import Empty from unittest.mock import Mock, patch import pytest +import sqs_extended_client from kombu import Connection, Exchange, Queue, messaging @@ -1035,3 +1038,30 @@ def test_message_attribute(self): assert message == output_message.payload # It's not propagated to the properties assert 'message_attributes' not in output_message.properties + + def test_message_to_python_with_sqs_extended_client(self): + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 's3://large-payload-bucket', 's3Key': 'payload.json'} + ] + + # Get the messages now + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock( + get_object=Mock( + return_value={'Body': BytesIO(json.dumps({"my_key": "Hello, World!"}).encode()), }) + ) + s3_mock.return_value = s3_client + + result = self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, self.queue_name, + 'test', + ) + + assert s3_client.get_object.called + + # Make sure they're payload-style objects + assert 'properties' in result + + # Data from s3 is loaded into the return payload + assert 'my_key' in result From 066e19205d961b5e425f2eedabbc0ab90650a1bf Mon Sep 17 00:00:00 2001 From: Amit Shah Date: Tue, 3 Dec 2024 16:26:59 +0000 Subject: [PATCH 4/8] Fix using sqs_extended_client in tests --- t/unit/transport/test_SQS.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 0219d3519..4fe454a4a 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -17,11 +17,11 @@ from unittest.mock import Mock, patch import pytest -import sqs_extended_client from kombu import Connection, Exchange, Queue, messaging boto3 = pytest.importorskip('boto3') +sqs_extended_client = pytest.importorskip('sqs_extended_client') from botocore.exceptions import ClientError # noqa From 2ffdb7afd75710d00f6684b44083831b7a899eca Mon Sep 17 00:00:00 2001 From: Amit Shah Date: Fri, 24 Jan 2025 17:01:17 +0000 Subject: [PATCH 5/8] Add tests for S3 client creation in SQS Channel Introduce two tests to verify S3 client creation behavior: one for insecure connections and another for custom endpoint usage. This ensures proper configuration of boto3 client initialization in these scenarios. --- t/unit/transport/test_SQS.py | 45 ++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 4fe454a4a..715ae7f5b 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -367,6 +367,51 @@ def test_optional_b64_decode(self): assert self.channel._optional_b64_decode(raw) == raw assert self.channel._optional_b64_decode(b"test123") == b"test123" + @patch('boto3.session.Session') + def test_new_s3_client_with_is_secure_false(self, mock_session): + self.channel.is_secure = False + self.channel.endpoint_url = None + + self.channel.new_s3_client( + region='us-west-2', + access_key_id='test_access_key', + secret_access_key='test_secret_key' + ) + + # assert isinstance(client, boto3.client('s3').__class__) + mock_session.assert_called_once_with( + region_name='us-west-2', + aws_access_key_id='test_access_key', + aws_secret_access_key='test_secret_key', + aws_session_token=None + ) + mock_session().client.assert_called_once_with( + 's3', use_ssl=False + ) + + @patch('boto3.session.Session') + def test_new_s3_client_with_custom_endpoint(self, mock_session): + mock_client = Mock() + mock_session.return_value.client.return_value = mock_client + + self.channel.is_secure = True + self.channel.endpoint_url = 'https://custom-endpoint.com' + + result = self.channel.new_s3_client('us-west-2', 'access_key', 'secret_key') + + mock_session.assert_called_once_with( + region_name='us-west-2', + aws_access_key_id='access_key', + aws_secret_access_key='secret_key', + aws_session_token=None + ) + mock_session.return_value.client.assert_called_once_with( + 's3', + use_ssl=True, + endpoint_url='https://custom-endpoint.com' + ) + assert result == mock_client + def test_messages_to_python(self): from kombu.asynchronous.aws.sqs.message import Message From 2c54f384921bd12b967a85e4d92c9e85abc7782a Mon Sep 17 00:00:00 2001 From: Aaron Kosel Date: Sun, 20 Jul 2025 16:13:30 -0400 Subject: [PATCH 6/8] Add documentation for Large Message Support --- docs/reference/kombu.transport.SQS.rst | 31 +++++++++++++++++++++++++- kombu/transport/SQS.py | 20 +++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/docs/reference/kombu.transport.SQS.rst b/docs/reference/kombu.transport.SQS.rst index e5136ebdc..e98e7de57 100644 --- a/docs/reference/kombu.transport.SQS.rst +++ b/docs/reference/kombu.transport.SQS.rst @@ -70,4 +70,33 @@ Message Attributes SQS supports sending message attributes along with the message body. To use this feature, you can pass a 'message_attributes' as keyword argument -to `basic_publish` method. \ No newline at end of file +to `basic_publish` method. + +Large Message Support +------------------------ + +SQS has a maximum message size limit of 256KB. To handle larger messages, +the SQS transport automatically supports the Amazon SQS Extended Client Library, +which uses S3 to store message payloads that exceed the SQS size limit. + +This feature is automatically available when using the SQS transport - no +additional installation or configuration is required as the necessary +dependencies are included with the SQS extras. + +**How it works:** + +- When sending a message larger than 256KB, the transport automatically stores + the message body in S3 +- SQS receives a reference pointer to the S3 object instead of the actual message +- When receiving the message, the transport transparently retrieves the payload + from S3 + +**IAM Permissions:** + +To use this feature, your AWS credentials need appropriate S3 permissions in +addition to standard SQS permissions: + +- ``s3:GetObject`` - for retrieving large messages +- ``s3:PutObject`` - for storing large messages + +The S3 bucket used for storage is managed by the SQS Extended Client Library. \ No newline at end of file diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index ea17abf15..9d9b69bde 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -125,6 +125,26 @@ For a complete list of settings you can adjust using this option see https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html +Large Message Support +--------------------- +SQS has a maximum message size limit of 256KB. To handle larger messages, +this transport automatically supports the Amazon SQS Extended Client Library, +which uses S3 to store message payloads that exceed the SQS size limit. + +This feature is automatically available when using the SQS transport - no +additional installation or configuration is required as the necessary +dependencies are included with the SQS extras. + +When a large message is sent: +- The message body is automatically stored in S3 +- SQS receives a reference pointer to the S3 object +- When the message is received, the transport transparently retrieves + the payload from S3 + +Note: You need appropriate S3 permissions in addition to SQS permissions +for this feature to work. The IAM policy should include s3:GetObject and +s3:PutObject permissions for the S3 bucket used by the extended client. + Features ======== * Type: Virtual From 736e41973fa5382ac3f3966ea6ca73e96e62b544 Mon Sep 17 00:00:00 2001 From: Aaron Kosel Date: Sun, 20 Jul 2025 16:22:17 -0400 Subject: [PATCH 7/8] Add more unit tests --- t/unit/transport/test_SQS.py | 406 +++++++++++++++++++++++++++++++++++ 1 file changed, 406 insertions(+) diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 794e03b40..e010addd5 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -1474,3 +1474,409 @@ def test_message_to_python_with_sqs_extended_client(self): # Data from s3 is loaded into the return payload assert 'my_key' in result + + def test_new_s3_client_creation(self): + """Test S3 client creation with different configurations.""" + # Test basic S3 client creation + client = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret' + ) + assert client is not None + + # Test with session token + client_with_token = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret', + session_token='test_token' + ) + assert client_with_token is not None + + # Test with custom endpoint URL + self.channel.endpoint_url = 'http://localhost:4566' # LocalStack URL + client_with_endpoint = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret' + ) + assert client_with_endpoint is not None + + # Test with is_secure=False + self.channel.is_secure = False + client_insecure = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret' + ) + assert client_insecure is not None + + def test_s3_method(self): + """Test the s3() convenience method.""" + with patch.object(self.channel, 'new_s3_client') as mock_new_s3_client: + mock_client = Mock() + mock_new_s3_client.return_value = mock_client + + result = self.channel.s3() + + # Verify it was called with correct parameters + mock_new_s3_client.assert_called_once_with( + region=self.channel.region, + access_key_id=self.channel.conninfo.userid, + secret_access_key=self.channel.conninfo.password, + ) + assert result == mock_client + + def test_message_to_python_with_sqs_extended_client_error_handling(self): + """Test error handling when S3 operations fail.""" + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'large-payload-bucket', 's3Key': 'payload.json'} + ] + + # Test S3 GetObject error + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + from botocore.exceptions import ClientError + s3_client = Mock() + s3_client.get_object.side_effect = ClientError( + {'Error': {'Code': 'NoSuchKey', 'Message': 'The specified key does not exist.'}}, + 'GetObject' + ) + s3_mock.return_value = s3_client + + with pytest.raises(ClientError): + self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + def test_message_to_python_with_corrupted_s3_payload(self): + """Test handling of corrupted S3 payload.""" + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'large-payload-bucket', 's3Key': 'payload.json'} + ] + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + # Return invalid JSON from S3 + s3_client = Mock( + get_object=Mock( + return_value={'Body': BytesIO(b'invalid json data')} + ) + ) + s3_mock.return_value = s3_client + + with pytest.raises(json.JSONDecodeError): + self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + def test_message_to_python_with_base64_encoded_s3_payload(self): + """Test handling of base64 encoded S3 payload.""" + import base64 + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'large-payload-bucket', 's3Key': 'payload.json'} + ] + + payload_data = {"encoded": "data", "test": "value"} + encoded_payload = base64.b64encode(json.dumps(payload_data).encode()) + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock( + get_object=Mock( + return_value={'Body': BytesIO(encoded_payload)} + ) + ) + s3_mock.return_value = s3_client + + # Enable base64 encoding for this test + self.channel.sqs_base64_encoding = True + + result = self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + assert s3_client.get_object.called + assert 'properties' in result + assert result == payload_data + + def test_message_to_python_without_sqs_extended_client(self): + """Test that normal messages work when sqs_extended_client is not available.""" + # Temporarily set sqs_extended_client to None + original_client = sqs_extended_client + import kombu.transport.SQS as sqs_module + sqs_module.sqs_extended_client = None + + try: + normal_message = {"normal": "message", "data": "test"} + + result = self.channel._message_to_python( + {'Body': json.dumps(normal_message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + assert 'properties' in result + assert result['normal'] == 'message' + finally: + # Restore original client + sqs_module.sqs_extended_client = original_client + + def test_message_to_python_with_missing_s3_details(self): + """Test handling of malformed extended client message.""" + # Message with missing S3 details + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {} # Missing s3BucketName and s3Key + ] + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock() + s3_mock.return_value = s3_client + + with pytest.raises(KeyError): + self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + def test_optional_b64_decode(self): + """Test the _optional_b64_decode method.""" + # Test with valid base64 + import base64 + original_data = b"Hello, World!" + encoded_data = base64.b64encode(original_data) + + result = self.channel._optional_b64_decode(encoded_data) + assert result == original_data + + # Test with invalid base64 + invalid_b64 = b"This is not base64!!!" + result = self.channel._optional_b64_decode(invalid_b64) + assert result == invalid_b64 + + # Test with base64-like but not actually base64 + looks_like_b64 = b"SGVsbG8=" # Valid base64 format + result = self.channel._optional_b64_decode(looks_like_b64) + assert result == b"Hello" + + # Test with whitespace around base64 + padded_b64 = b" " + encoded_data + b" " + result = self.channel._optional_b64_decode(padded_b64) + assert result == original_data + + # Test with empty input + result = self.channel._optional_b64_decode(b"") + assert result == b"" + + # Test with non-base64 that passes regex + # Create a string that might pass regex but fail decode + tricky_string = b"AAAA!!!!" + result = self.channel._optional_b64_decode(tricky_string) + assert result == tricky_string + + def test_decode_python_message_body(self): + """Test _decode_python_message_body method.""" + # Test with regular string + message = "test message" + result = self.channel._decode_python_message_body(message) + assert result == message.encode() + + # Test with base64 encoded message when sqs_base64_encoding is True + self.channel.sqs_base64_encoding = True + import base64 + original = b"encoded message" + encoded = base64.b64encode(original).decode() + + result = self.channel._decode_python_message_body(encoded) + assert result == original + + # Test with already bytes + byte_message = b"byte message" + result = self.channel._decode_python_message_body(byte_message) + assert result == byte_message + + def test_message_to_python_noack_queue(self): + """Test message handling for no-ack queues.""" + # Add queue to noack_queues + self.channel._noack_queues.add(self.queue_name) + + message_body = {"test": "data"} + message = { + 'Body': json.dumps(message_body), + 'ReceiptHandle': 'test-handle' + } + + with patch.object(self.channel, '_delete_message') as mock_delete: + result = self.channel._message_to_python( + message, + self.queue_name, + 'test-url' + ) + + # Verify delete was called for no-ack queue + mock_delete.assert_called_once_with(self.queue_name, message) + assert result == message_body + + def test_envelope_payload(self): + """Test the _envelope_payload method.""" + payload = {"test": "data"} + raw_text = json.dumps(payload) + message = { + 'ReceiptHandle': 'test-handle', + 'MessageId': 'test-message-id' + } + q_url = 'https://sqs.region.amazonaws.com/123456/queue' + + result = self.channel._envelope_payload(payload, raw_text, message, q_url) + + # Verify the envelope structure + assert 'properties' in result + assert 'delivery_info' in result['properties'] + assert result['properties']['delivery_tag'] == 'test-handle' + assert result['properties']['delivery_info']['sqs_message'] == message + assert result['properties']['delivery_info']['sqs_queue'] == q_url + + def test_delete_message(self): + """Test the _delete_message method.""" + message = {'ReceiptHandle': 'test-handle'} + + with patch.object(self.channel, 'asynsqs') as mock_asynsqs: + mock_queue_client = Mock() + mock_asynsqs.return_value = mock_queue_client + + with patch.object(self.channel, '_new_queue') as mock_new_queue: + mock_new_queue.return_value = 'test-queue-url' + + self.channel._delete_message(self.queue_name, message) + + mock_asynsqs.assert_called_once_with(queue=self.queue_name) + mock_queue_client.delete_message.assert_called_once_with( + 'test-queue-url', + 'test-handle' + ) + + def test_s3_streaming_body_handling(self): + """Test handling of S3 StreamingBody response.""" + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'test-bucket', 's3Key': 'test-key.json'} + ] + + # Create a mock StreamingBody + class MockStreamingBody: + def __init__(self, data): + self.data = data + + def read(self): + return self.data + + payload_data = {"streaming": "data", "test": True} + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock() + s3_client.get_object.return_value = { + 'Body': MockStreamingBody(json.dumps(payload_data).encode()) + } + s3_mock.return_value = s3_client + + result = self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test-url', + ) + + # Verify S3 was called with correct parameters + s3_client.get_object.assert_called_once_with( + Bucket='test-bucket', + Key='test-key.json' + ) + + # Verify the payload was correctly extracted + assert 'properties' in result + assert result['streaming'] == 'data' + assert result['test'] is True + + def test_s3_client_with_predefined_queue_credentials(self): + """Test S3 client creation with predefined queue credentials.""" + # Setup predefined queue with specific credentials + predefined_queue = { + 'url': 'https://sqs.us-east-1.amazonaws.com/123456/test-queue', + 'access_key_id': 'predefined_key', + 'secret_access_key': 'predefined_secret', + 'session_token': 'predefined_token' + } + + # Mock channel with predefined queue + with patch.object(self.channel, 'predefined_queues', {self.queue_name: predefined_queue}): + with patch.object(self.channel, 'new_s3_client') as mock_new_s3: + mock_client = Mock() + mock_new_s3.return_value = mock_client + + # Simulate getting S3 client for a predefined queue + # This would happen in a real scenario when processing a message + self.channel.new_s3_client( + region=self.channel.region, + access_key_id=predefined_queue['access_key_id'], + secret_access_key=predefined_queue['secret_access_key'], + session_token=predefined_queue.get('session_token') + ) + + mock_new_s3.assert_called_once_with( + region=self.channel.region, + access_key_id='predefined_key', + secret_access_key='predefined_secret', + session_token='predefined_token' + ) + + def test_message_to_python_integration(self): + """Integration test for full message flow with large payload.""" + # Test the complete flow from SQS message to Python object + large_payload = { + "large": "data" * 100, # Simulate large data + "metadata": {"size": "large", "version": 1} + } + + s3_reference = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'integration-bucket', 's3Key': 'large-message.json'} + ] + + sqs_message = { + 'Body': json.dumps(s3_reference), + 'ReceiptHandle': 'integration-handle', + 'MessageId': 'integration-msg-id', + 'Attributes': { + 'ApproximateReceiveCount': '1', + 'SentTimestamp': '1234567890' + } + } + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock() + s3_client.get_object.return_value = { + 'Body': BytesIO(json.dumps(large_payload).encode()) + } + s3_mock.return_value = s3_client + + # Process the message + result = self.channel._message_to_python( + sqs_message, + self.queue_name, + 'https://queue-url' + ) + + # Verify the complete result + assert result == large_payload + assert 'properties' in result + assert 'delivery_info' in result['properties'] + assert result['properties']['delivery_tag'] == 'integration-handle' + assert result['properties']['delivery_info']['sqs_queue'] == 'https://queue-url' From a0acb9e70d8b46148bc81d6055ca230089d98f18 Mon Sep 17 00:00:00 2001 From: Aaron Kosel Date: Sun, 20 Jul 2025 16:23:52 -0400 Subject: [PATCH 8/8] Add sqs extras to ci --- requirements/test-ci.txt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 044d8c805..177489d1e 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -8,9 +8,7 @@ pymongo>=4.1.1; sys_platform != 'win32' -r extras/msgpack.txt -r extras/azureservicebus.txt -r extras/azurestoragequeues.txt -boto3>=1.26.143; sys_platform != 'win32' -pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" -urllib3>=1.26.16; sys_platform != 'win32' +-r extras/sqs.txt -r extras/consul.txt -r extras/zookeeper.txt -r extras/brotli.txt