|
7 | 7 | from __future__ import annotations |
8 | 8 |
|
9 | 9 | import base64 |
| 10 | +import json |
10 | 11 | import os |
11 | 12 | import random |
12 | 13 | import string |
13 | 14 | from datetime import datetime, timedelta |
| 15 | +from io import BytesIO |
14 | 16 | from queue import Empty |
15 | 17 | from unittest.mock import Mock, patch |
16 | 18 |
|
17 | 19 | import pytest |
| 20 | +import sqs_extended_client |
18 | 21 |
|
19 | 22 | from kombu import Connection, Exchange, Queue, messaging |
20 | 23 |
|
@@ -1007,3 +1010,30 @@ def test_message_attribute(self): |
1007 | 1010 | assert message == output_message.payload |
1008 | 1011 | # It's not propagated to the properties |
1009 | 1012 | assert 'message_attributes' not in output_message.properties |
| 1013 | + |
| 1014 | + def test_message_to_python_with_sqs_extended_client(self): |
| 1015 | + message = [ |
| 1016 | + sqs_extended_client.client.MESSAGE_POINTER_CLASS, |
| 1017 | + {'s3BucketName': 's3://large-payload-bucket', 's3Key': 'payload.json'} |
| 1018 | + ] |
| 1019 | + |
| 1020 | + # Get the messages now |
| 1021 | + with patch('kombu.transport.SQS.Channel.s3') as s3_mock: |
| 1022 | + s3_client = Mock( |
| 1023 | + get_object=Mock( |
| 1024 | + return_value={'Body': BytesIO(json.dumps({"my_key": "Hello, World!"}).encode()), }) |
| 1025 | + ) |
| 1026 | + s3_mock.return_value = s3_client |
| 1027 | + |
| 1028 | + result = self.channel._message_to_python( |
| 1029 | + {'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, self.queue_name, |
| 1030 | + 'test', |
| 1031 | + ) |
| 1032 | + |
| 1033 | + assert s3_client.get_object.called |
| 1034 | + |
| 1035 | + # Make sure they're payload-style objects |
| 1036 | + assert 'properties' in result |
| 1037 | + |
| 1038 | + # Data from s3 is loaded into the return payload |
| 1039 | + assert 'my_key' in result |
0 commit comments