-
-
Notifications
You must be signed in to change notification settings - Fork 969
Added support for SQS fanout via AWS SNS #2372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
# Conflicts: # t/unit/transport/SQS/test_SQS.py # t/unit/transport/SQS/test_SQS_SNS.py
for more information, see https://pre-commit.ci
…sts running on older versions of pytest.
|
Hey @auvipy, Thanks for checking in on this PR. I have addressed the issues raised by the CI pipeline. Please let me know if you need any further changes. Best, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds SNS fanout support to the AWS SQS transport, enabling fanout exchanges to publish messages to multiple SQS queues via AWS SNS topics. The implementation introduces SNS topic management, queue subscriptions, and stale subscription cleanup while maintaining backward compatibility.
Key changes:
- Added SNS fanout functionality through a new
SNSclass with topic and subscription management - Refactored STS token handling into reusable methods shared between SQS and SNS services
- Introduced predefined exchanges configuration to complement existing predefined queues
Reviewed Changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| kombu/transport/SQS/init.py | Core integration of SNS fanout with SQS transport, including fanout exchange detection, queue subscription, and refactored STS token handling |
| kombu/transport/SQS/SNS.py | New module implementing SNS topic creation, message publishing, and subscription management |
| kombu/transport/SQS/exceptions.py | Moved exception classes to dedicated module for better organization |
| t/unit/transport/SQS/conftest.py | Centralized test fixtures and example data for SQS and SNS tests |
| t/unit/transport/SQS/test_SQS_SNS.py | Comprehensive test suite for SNS fanout functionality |
| t/unit/transport/SQS/test_SQS.py | Updated tests with fanout integration and refactored for shared fixtures |
| docs/reference/kombu.transport.SQS.rst | Documentation for new SNS and exceptions modules |
| README.rst | Updated transport comparison to reflect SNS-based fanout implementation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assert ( | ||
| sns_fanout._create_boto_client.call_args_list | ||
| == [ | ||
| call( | ||
| region="some-aws-region", | ||
| access_key_id="MyAccessKeyID", | ||
| secret_access_key="MySecretAccessKey", | ||
| ) | ||
| ] | ||
| != [ | ||
| call( | ||
| region="some-aws-region", access_key_id=None, secret_access_key=None | ||
| ) | ||
| ] | ||
| ) |
Copilot
AI
Oct 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion logic is incorrect. The expression (X == Y) != Z always evaluates based on the inequality comparison, making the equality check meaningless. This should likely be just assert sns_fanout._create_boto_client.call_args_list == [call(...)] without the chained inequality.
| assert ( | |
| sns_fanout._create_boto_client.call_args_list | |
| == [ | |
| call( | |
| region="some-aws-region", | |
| access_key_id="MyAccessKeyID", | |
| secret_access_key="MySecretAccessKey", | |
| ) | |
| ] | |
| != [ | |
| call( | |
| region="some-aws-region", access_key_id=None, secret_access_key=None | |
| ) | |
| ] | |
| ) | |
| assert sns_fanout._create_boto_client.call_args_list == [ | |
| call( | |
| region="some-aws-region", | |
| access_key_id="MyAccessKeyID", | |
| secret_access_key="MySecretAccessKey", | |
| ) | |
| ] | |
| assert sns_fanout._create_boto_client.call_args_list != [ | |
| call( | |
| region="some-aws-region", access_key_id=None, secret_access_key=None | |
| ) | |
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please cross check this suggestion
| client_kwargs['endpoint_url'] = self.endpoint_url | ||
| client_config = self.transport_options.get('client-config') or {} | ||
| config = Config(**client_config) | ||
| return session.client('sqs', config=config, **client_kwargs) |
Copilot
AI
Oct 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The service parameter is hardcoded to 'sqs' instead of using the service parameter from the method signature. This breaks SNS client creation when called with service="sns". Change to return session.client(service, config=config, **client_kwargs).
| return session.client('sqs', config=config, **client_kwargs) | |
| return session.client(service, config=config, **client_kwargs) |
| 'MessageSystemAttributeNames': ( | ||
| sorted(message_system_attrs) if message_system_attrs else [APPROXIMATE_RECEIVE_COUNT] | ||
| ) | ||
| "MessageAttributeNames": sorted(message_attrs) if message_attrs else [], |
Copilot
AI
Oct 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the return value from None to [] when message_attrs is empty, which may break existing code expecting None. The original code returned None in this case (line 1137 in the old version).
| "MessageAttributeNames": sorted(message_attrs) if message_attrs else [], | |
| "MessageAttributeNames": sorted(message_attrs) if message_attrs else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and this as well
Co-authored-by: Copilot <[email protected]>
RECREATED for #2339
This PR enhances the existing SNS support by adding fanout support using AWS SNS. Some existing SNS channel methods have been refactored to allow for easier re-use between SNS & SQS services.
As the Channel class was becoming somewhat sprawling, SNS support was added through composition in a subclass and is only initialised upon use of a fanout function.
To avoid breaking changes, 'supports_fanout' has been left as False by default, but can be enabled via transport_options.