diff --git a/docs/reference/kombu.transport.SQS.rst b/docs/reference/kombu.transport.SQS.rst index e5136ebdc..e98e7de57 100644 --- a/docs/reference/kombu.transport.SQS.rst +++ b/docs/reference/kombu.transport.SQS.rst @@ -70,4 +70,33 @@ Message Attributes SQS supports sending message attributes along with the message body. To use this feature, you can pass a 'message_attributes' as keyword argument -to `basic_publish` method. \ No newline at end of file +to `basic_publish` method. + +Large Message Support +------------------------ + +SQS has a maximum message size limit of 256KB. To handle larger messages, +the SQS transport automatically supports the Amazon SQS Extended Client Library, +which uses S3 to store message payloads that exceed the SQS size limit. + +This feature is automatically available when using the SQS transport - no +additional installation or configuration is required as the necessary +dependencies are included with the SQS extras. + +**How it works:** + +- When sending a message larger than 256KB, the transport automatically stores + the message body in S3 +- SQS receives a reference pointer to the S3 object instead of the actual message +- When receiving the message, the transport transparently retrieves the payload + from S3 + +**IAM Permissions:** + +To use this feature, your AWS credentials need appropriate S3 permissions in +addition to standard SQS permissions: + +- ``s3:GetObject`` - for retrieving large messages +- ``s3:PutObject`` - for storing large messages + +The S3 bucket used for storage is managed by the SQS Extended Client Library. \ No newline at end of file diff --git a/kombu/asynchronous/aws/ext.py b/kombu/asynchronous/aws/ext.py index 3d030c4d8..7e5b7c6ea 100644 --- a/kombu/asynchronous/aws/ext.py +++ b/kombu/asynchronous/aws/ext.py @@ -28,6 +28,11 @@ def get_cert_path() -> str: "get_cert_path is unavailable because boto3 or botocore is not installed." ) +try: + import sqs_extended_client +except ImportError: + sqs_extended_client = None + __all__ = ( 'exceptions', 'AWSRequest', 'get_response', 'get_cert_path', ) diff --git a/kombu/asynchronous/aws/sqs/ext.py b/kombu/asynchronous/aws/sqs/ext.py index 72268b5db..8ca782ab3 100644 --- a/kombu/asynchronous/aws/sqs/ext.py +++ b/kombu/asynchronous/aws/sqs/ext.py @@ -7,3 +7,8 @@ import boto3 except ImportError: boto3 = None + +try: + import sqs_extended_client +except ImportError: + sqs_extended_client = None diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index d2dfc1bef..9d9b69bde 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -125,6 +125,26 @@ For a complete list of settings you can adjust using this option see https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html +Large Message Support +--------------------- +SQS has a maximum message size limit of 256KB. To handle larger messages, +this transport automatically supports the Amazon SQS Extended Client Library, +which uses S3 to store message payloads that exceed the SQS size limit. + +This feature is automatically available when using the SQS transport - no +additional installation or configuration is required as the necessary +dependencies are included with the SQS extras. + +When a large message is sent: +- The message body is automatically stored in S3 +- SQS receives a reference pointer to the S3 object +- When the message is received, the transport transparently retrieves + the payload from S3 + +Note: You need appropriate S3 permissions in addition to SQS permissions +for this feature to work. The IAM policy should include s3:GetObject and +s3:PutObject permissions for the S3 bucket used by the extended client. + Features ======== * Type: Virtual @@ -140,6 +160,7 @@ import base64 import binascii +import json import re import socket import string @@ -154,7 +175,7 @@ from vine import ensure_promise, promise, transform from kombu.asynchronous import get_event_loop -from kombu.asynchronous.aws.ext import boto3, exceptions +from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection from kombu.asynchronous.aws.sqs.message import AsyncMessage from kombu.log import get_logger @@ -507,6 +528,24 @@ def _message_to_python(self, message, queue_name, q_url): self._delete_message(queue_name, message) return payload + # Check if this is a large payload stored in S3 + if ( + sqs_extended_client and + isinstance(payload, list) + and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS + ): + # Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload + s3_details = payload[1] + s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"] + + s3_client = self.s3() + response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key) + + # The message body is under a wrapper class called StreamingBody + streaming_body = response["Body"] + body = self._optional_b64_decode(streaming_body.read()) + payload = json.loads(body) + return self._envelope_payload(payload, text, message, q_url) def _messages_to_python(self, messages, queue): @@ -740,6 +779,32 @@ def close(self): # if "can't set attribute" not in str(exc): # raise + def new_s3_client( + self, region, access_key_id, secret_access_key, session_token=None + ): + session = boto3.session.Session( + region_name=region, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + ) + is_secure = self.is_secure if self.is_secure is not None else True + client_kwargs = {"use_ssl": is_secure} + + if self.endpoint_url is not None: + client_kwargs["endpoint_url"] = self.endpoint_url + + client = session.client("s3", **client_kwargs) + + return client + + def s3(self): + return self.new_s3_client( + region=self.region, + access_key_id=self.conninfo.userid, + secret_access_key=self.conninfo.password, + ) + def new_sqs_client(self, region, access_key_id, secret_access_key, session_token=None): session = boto3.session.Session( @@ -756,7 +821,13 @@ def new_sqs_client(self, region, access_key_id, client_kwargs['endpoint_url'] = self.endpoint_url client_config = self.transport_options.get('client-config') or {} config = Config(**client_config) - return session.client('sqs', config=config, **client_kwargs) + client = session.client('sqs', config=config, **client_kwargs) + + if self.transport_options.get('large_payload_bucket') and sqs_extended_client: + client.large_payload_support = self.transport_options.get('large_payload_bucket') + client.use_legacy_attribute = False + + return client def sqs(self, queue=None): if queue is not None and self.predefined_queues: diff --git a/requirements/extras/sqs.txt b/requirements/extras/sqs.txt index 77f75030c..a0626a7d9 100644 --- a/requirements/extras/sqs.txt +++ b/requirements/extras/sqs.txt @@ -1,3 +1,4 @@ boto3>=1.26.143 pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" urllib3>=1.26.16 +amazon-sqs-extended-client>=1.0.1 diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 044d8c805..177489d1e 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -8,9 +8,7 @@ pymongo>=4.1.1; sys_platform != 'win32' -r extras/msgpack.txt -r extras/azureservicebus.txt -r extras/azurestoragequeues.txt -boto3>=1.26.143; sys_platform != 'win32' -pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" -urllib3>=1.26.16; sys_platform != 'win32' +-r extras/sqs.txt -r extras/consul.txt -r extras/zookeeper.txt -r extras/brotli.txt diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 1e7eddfcd..e010addd5 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -7,10 +7,12 @@ from __future__ import annotations import base64 +import json import os import random import string from datetime import datetime, timedelta +from io import BytesIO from queue import Empty from unittest.mock import Mock, patch @@ -19,6 +21,7 @@ from kombu import Connection, Exchange, Queue, messaging boto3 = pytest.importorskip('boto3') +sqs_extended_client = pytest.importorskip('sqs_extended_client') from botocore.exceptions import ClientError # noqa @@ -509,6 +512,51 @@ def test_envelope_payload(self, initial_payload, raw_text, message, q_url, expec assert result["properties"]["delivery_tag"] == message["ReceiptHandle"] + @patch('boto3.session.Session') + def test_new_s3_client_with_is_secure_false(self, mock_session): + self.channel.is_secure = False + self.channel.endpoint_url = None + + self.channel.new_s3_client( + region='us-west-2', + access_key_id='test_access_key', + secret_access_key='test_secret_key' + ) + + # assert isinstance(client, boto3.client('s3').__class__) + mock_session.assert_called_once_with( + region_name='us-west-2', + aws_access_key_id='test_access_key', + aws_secret_access_key='test_secret_key', + aws_session_token=None + ) + mock_session().client.assert_called_once_with( + 's3', use_ssl=False + ) + + @patch('boto3.session.Session') + def test_new_s3_client_with_custom_endpoint(self, mock_session): + mock_client = Mock() + mock_session.return_value.client.return_value = mock_client + + self.channel.is_secure = True + self.channel.endpoint_url = 'https://custom-endpoint.com' + + result = self.channel.new_s3_client('us-west-2', 'access_key', 'secret_key') + + mock_session.assert_called_once_with( + region_name='us-west-2', + aws_access_key_id='access_key', + aws_secret_access_key='secret_key', + aws_session_token=None + ) + mock_session.return_value.client.assert_called_once_with( + 's3', + use_ssl=True, + endpoint_url='https://custom-endpoint.com' + ) + assert result == mock_client + def test_messages_to_python(self): from kombu.asynchronous.aws.sqs.message import Message @@ -1399,3 +1447,436 @@ def test_message_attribute(self): assert message == output_message.payload # It's not propagated to the properties assert 'message_attributes' not in output_message.properties + + def test_message_to_python_with_sqs_extended_client(self): + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 's3://large-payload-bucket', 's3Key': 'payload.json'} + ] + + # Get the messages now + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock( + get_object=Mock( + return_value={'Body': BytesIO(json.dumps({"my_key": "Hello, World!"}).encode()), }) + ) + s3_mock.return_value = s3_client + + result = self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, self.queue_name, + 'test', + ) + + assert s3_client.get_object.called + + # Make sure they're payload-style objects + assert 'properties' in result + + # Data from s3 is loaded into the return payload + assert 'my_key' in result + + def test_new_s3_client_creation(self): + """Test S3 client creation with different configurations.""" + # Test basic S3 client creation + client = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret' + ) + assert client is not None + + # Test with session token + client_with_token = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret', + session_token='test_token' + ) + assert client_with_token is not None + + # Test with custom endpoint URL + self.channel.endpoint_url = 'http://localhost:4566' # LocalStack URL + client_with_endpoint = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret' + ) + assert client_with_endpoint is not None + + # Test with is_secure=False + self.channel.is_secure = False + client_insecure = self.channel.new_s3_client( + region='us-east-1', + access_key_id='test_key', + secret_access_key='test_secret' + ) + assert client_insecure is not None + + def test_s3_method(self): + """Test the s3() convenience method.""" + with patch.object(self.channel, 'new_s3_client') as mock_new_s3_client: + mock_client = Mock() + mock_new_s3_client.return_value = mock_client + + result = self.channel.s3() + + # Verify it was called with correct parameters + mock_new_s3_client.assert_called_once_with( + region=self.channel.region, + access_key_id=self.channel.conninfo.userid, + secret_access_key=self.channel.conninfo.password, + ) + assert result == mock_client + + def test_message_to_python_with_sqs_extended_client_error_handling(self): + """Test error handling when S3 operations fail.""" + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'large-payload-bucket', 's3Key': 'payload.json'} + ] + + # Test S3 GetObject error + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + from botocore.exceptions import ClientError + s3_client = Mock() + s3_client.get_object.side_effect = ClientError( + {'Error': {'Code': 'NoSuchKey', 'Message': 'The specified key does not exist.'}}, + 'GetObject' + ) + s3_mock.return_value = s3_client + + with pytest.raises(ClientError): + self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + def test_message_to_python_with_corrupted_s3_payload(self): + """Test handling of corrupted S3 payload.""" + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'large-payload-bucket', 's3Key': 'payload.json'} + ] + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + # Return invalid JSON from S3 + s3_client = Mock( + get_object=Mock( + return_value={'Body': BytesIO(b'invalid json data')} + ) + ) + s3_mock.return_value = s3_client + + with pytest.raises(json.JSONDecodeError): + self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + def test_message_to_python_with_base64_encoded_s3_payload(self): + """Test handling of base64 encoded S3 payload.""" + import base64 + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'large-payload-bucket', 's3Key': 'payload.json'} + ] + + payload_data = {"encoded": "data", "test": "value"} + encoded_payload = base64.b64encode(json.dumps(payload_data).encode()) + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock( + get_object=Mock( + return_value={'Body': BytesIO(encoded_payload)} + ) + ) + s3_mock.return_value = s3_client + + # Enable base64 encoding for this test + self.channel.sqs_base64_encoding = True + + result = self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + assert s3_client.get_object.called + assert 'properties' in result + assert result == payload_data + + def test_message_to_python_without_sqs_extended_client(self): + """Test that normal messages work when sqs_extended_client is not available.""" + # Temporarily set sqs_extended_client to None + original_client = sqs_extended_client + import kombu.transport.SQS as sqs_module + sqs_module.sqs_extended_client = None + + try: + normal_message = {"normal": "message", "data": "test"} + + result = self.channel._message_to_python( + {'Body': json.dumps(normal_message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + assert 'properties' in result + assert result['normal'] == 'message' + finally: + # Restore original client + sqs_module.sqs_extended_client = original_client + + def test_message_to_python_with_missing_s3_details(self): + """Test handling of malformed extended client message.""" + # Message with missing S3 details + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {} # Missing s3BucketName and s3Key + ] + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock() + s3_mock.return_value = s3_client + + with pytest.raises(KeyError): + self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test', + ) + + def test_optional_b64_decode(self): + """Test the _optional_b64_decode method.""" + # Test with valid base64 + import base64 + original_data = b"Hello, World!" + encoded_data = base64.b64encode(original_data) + + result = self.channel._optional_b64_decode(encoded_data) + assert result == original_data + + # Test with invalid base64 + invalid_b64 = b"This is not base64!!!" + result = self.channel._optional_b64_decode(invalid_b64) + assert result == invalid_b64 + + # Test with base64-like but not actually base64 + looks_like_b64 = b"SGVsbG8=" # Valid base64 format + result = self.channel._optional_b64_decode(looks_like_b64) + assert result == b"Hello" + + # Test with whitespace around base64 + padded_b64 = b" " + encoded_data + b" " + result = self.channel._optional_b64_decode(padded_b64) + assert result == original_data + + # Test with empty input + result = self.channel._optional_b64_decode(b"") + assert result == b"" + + # Test with non-base64 that passes regex + # Create a string that might pass regex but fail decode + tricky_string = b"AAAA!!!!" + result = self.channel._optional_b64_decode(tricky_string) + assert result == tricky_string + + def test_decode_python_message_body(self): + """Test _decode_python_message_body method.""" + # Test with regular string + message = "test message" + result = self.channel._decode_python_message_body(message) + assert result == message.encode() + + # Test with base64 encoded message when sqs_base64_encoding is True + self.channel.sqs_base64_encoding = True + import base64 + original = b"encoded message" + encoded = base64.b64encode(original).decode() + + result = self.channel._decode_python_message_body(encoded) + assert result == original + + # Test with already bytes + byte_message = b"byte message" + result = self.channel._decode_python_message_body(byte_message) + assert result == byte_message + + def test_message_to_python_noack_queue(self): + """Test message handling for no-ack queues.""" + # Add queue to noack_queues + self.channel._noack_queues.add(self.queue_name) + + message_body = {"test": "data"} + message = { + 'Body': json.dumps(message_body), + 'ReceiptHandle': 'test-handle' + } + + with patch.object(self.channel, '_delete_message') as mock_delete: + result = self.channel._message_to_python( + message, + self.queue_name, + 'test-url' + ) + + # Verify delete was called for no-ack queue + mock_delete.assert_called_once_with(self.queue_name, message) + assert result == message_body + + def test_envelope_payload(self): + """Test the _envelope_payload method.""" + payload = {"test": "data"} + raw_text = json.dumps(payload) + message = { + 'ReceiptHandle': 'test-handle', + 'MessageId': 'test-message-id' + } + q_url = 'https://sqs.region.amazonaws.com/123456/queue' + + result = self.channel._envelope_payload(payload, raw_text, message, q_url) + + # Verify the envelope structure + assert 'properties' in result + assert 'delivery_info' in result['properties'] + assert result['properties']['delivery_tag'] == 'test-handle' + assert result['properties']['delivery_info']['sqs_message'] == message + assert result['properties']['delivery_info']['sqs_queue'] == q_url + + def test_delete_message(self): + """Test the _delete_message method.""" + message = {'ReceiptHandle': 'test-handle'} + + with patch.object(self.channel, 'asynsqs') as mock_asynsqs: + mock_queue_client = Mock() + mock_asynsqs.return_value = mock_queue_client + + with patch.object(self.channel, '_new_queue') as mock_new_queue: + mock_new_queue.return_value = 'test-queue-url' + + self.channel._delete_message(self.queue_name, message) + + mock_asynsqs.assert_called_once_with(queue=self.queue_name) + mock_queue_client.delete_message.assert_called_once_with( + 'test-queue-url', + 'test-handle' + ) + + def test_s3_streaming_body_handling(self): + """Test handling of S3 StreamingBody response.""" + message = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'test-bucket', 's3Key': 'test-key.json'} + ] + + # Create a mock StreamingBody + class MockStreamingBody: + def __init__(self, data): + self.data = data + + def read(self): + return self.data + + payload_data = {"streaming": "data", "test": True} + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock() + s3_client.get_object.return_value = { + 'Body': MockStreamingBody(json.dumps(payload_data).encode()) + } + s3_mock.return_value = s3_client + + result = self.channel._message_to_python( + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, + self.queue_name, + 'test-url', + ) + + # Verify S3 was called with correct parameters + s3_client.get_object.assert_called_once_with( + Bucket='test-bucket', + Key='test-key.json' + ) + + # Verify the payload was correctly extracted + assert 'properties' in result + assert result['streaming'] == 'data' + assert result['test'] is True + + def test_s3_client_with_predefined_queue_credentials(self): + """Test S3 client creation with predefined queue credentials.""" + # Setup predefined queue with specific credentials + predefined_queue = { + 'url': 'https://sqs.us-east-1.amazonaws.com/123456/test-queue', + 'access_key_id': 'predefined_key', + 'secret_access_key': 'predefined_secret', + 'session_token': 'predefined_token' + } + + # Mock channel with predefined queue + with patch.object(self.channel, 'predefined_queues', {self.queue_name: predefined_queue}): + with patch.object(self.channel, 'new_s3_client') as mock_new_s3: + mock_client = Mock() + mock_new_s3.return_value = mock_client + + # Simulate getting S3 client for a predefined queue + # This would happen in a real scenario when processing a message + self.channel.new_s3_client( + region=self.channel.region, + access_key_id=predefined_queue['access_key_id'], + secret_access_key=predefined_queue['secret_access_key'], + session_token=predefined_queue.get('session_token') + ) + + mock_new_s3.assert_called_once_with( + region=self.channel.region, + access_key_id='predefined_key', + secret_access_key='predefined_secret', + session_token='predefined_token' + ) + + def test_message_to_python_integration(self): + """Integration test for full message flow with large payload.""" + # Test the complete flow from SQS message to Python object + large_payload = { + "large": "data" * 100, # Simulate large data + "metadata": {"size": "large", "version": 1} + } + + s3_reference = [ + sqs_extended_client.client.MESSAGE_POINTER_CLASS, + {'s3BucketName': 'integration-bucket', 's3Key': 'large-message.json'} + ] + + sqs_message = { + 'Body': json.dumps(s3_reference), + 'ReceiptHandle': 'integration-handle', + 'MessageId': 'integration-msg-id', + 'Attributes': { + 'ApproximateReceiveCount': '1', + 'SentTimestamp': '1234567890' + } + } + + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: + s3_client = Mock() + s3_client.get_object.return_value = { + 'Body': BytesIO(json.dumps(large_payload).encode()) + } + s3_mock.return_value = s3_client + + # Process the message + result = self.channel._message_to_python( + sqs_message, + self.queue_name, + 'https://queue-url' + ) + + # Verify the complete result + assert result == large_payload + assert 'properties' in result + assert 'delivery_info' in result['properties'] + assert result['properties']['delivery_tag'] == 'integration-handle' + assert result['properties']['delivery_info']['sqs_queue'] == 'https://queue-url'