From b3b62f8f7073726454d89ae737a6f8903e8129c8 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 24 Mar 2025 15:32:21 +0100 Subject: [PATCH 01/18] feat(pycurl): bring the curl back --- .coveragerc | 1 + docs/reference/index.rst | 1 + 2 files changed, 2 insertions(+) diff --git a/.coveragerc b/.coveragerc index 4d450b1c2..865bc5d76 100644 --- a/.coveragerc +++ b/.coveragerc @@ -10,6 +10,7 @@ omit = */site-packages/* */pypy/* *kombu/async/http/curl.py + *kombu/async/http/urllib3_client.py *kombu/five.py *kombu/transport/mongodb.py *kombu/transport/filesystem.py diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 8a79c2ddc..1ae6783b0 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -72,6 +72,7 @@ Kombu Asynchronous kombu.asynchronous.http kombu.asynchronous.http.base kombu.asynchronous.http.curl + kombu.asynchronous.http.urllib3_client kombu.asynchronous.aws kombu.asynchronous.aws.connection kombu.asynchronous.aws.sqs From 04770b5fe7af5260ec28dbfbb4a3bcdb072cd741 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 24 Mar 2025 15:32:55 +0100 Subject: [PATCH 02/18] feat(pycurl): try importing pycurl when available and fall back to urllib3 when not --- kombu/asynchronous/http/__init__.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/kombu/asynchronous/http/__init__.py b/kombu/asynchronous/http/__init__.py index 67d8b2197..38e9d89fd 100644 --- a/kombu/asynchronous/http/__init__.py +++ b/kombu/asynchronous/http/__init__.py @@ -3,22 +3,30 @@ from typing import TYPE_CHECKING from kombu.asynchronous import get_event_loop -from kombu.asynchronous.http.base import Headers, Request, Response +from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response from kombu.asynchronous.hub import Hub if TYPE_CHECKING: - from kombu.asynchronous.http.curl import CurlClient + from kombu.asynchronous.http.base import BaseClient -__all__ = ('Client', 'Headers', 'Response', 'Request') +__all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client') -def Client(hub: Hub | None = None, **kwargs: int) -> CurlClient: +def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Create new HTTP client.""" - from .curl import CurlClient - return CurlClient(hub, **kwargs) + try: + import pycurl + from .curl import CurlClient + return CurlClient(hub, **kwargs) + except ImportError: + pass + + # fall back scenario + from .urllib3_client import Urllib3Client + return Urllib3Client(hub, **kwargs) -def get_client(hub: Hub | None = None, **kwargs: int) -> CurlClient: +def get_client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Get or create HTTP client bound to the current event loop.""" hub = hub or get_event_loop() try: From db2822286d1523a7929f83b2a4db01d6fbe8c25c Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 24 Mar 2025 15:33:19 +0100 Subject: [PATCH 03/18] docs: add self-attribution --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 8111e4acd..7edeee2b3 100644 --- a/AUTHORS +++ b/AUTHORS @@ -110,6 +110,7 @@ Ollie Walsh Pascal Hartig Patrick Schneider Paul McLanahan +Paul Rysiavets Petar Radosevic Peter Hoffmann Pierre Riteau From d2751d3c143604aad6b503692a96467e3d088e98 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Mar 2025 14:40:16 +0000 Subject: [PATCH 04/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- kombu/asynchronous/http/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kombu/asynchronous/http/__init__.py b/kombu/asynchronous/http/__init__.py index 38e9d89fd..0dc9aeb19 100644 --- a/kombu/asynchronous/http/__init__.py +++ b/kombu/asynchronous/http/__init__.py @@ -16,6 +16,7 @@ def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Create new HTTP client.""" try: import pycurl + from .curl import CurlClient return CurlClient(hub, **kwargs) except ImportError: From 8b7be073c78e0d5024c7405060419f6ca0afd77e Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 24 Mar 2025 15:48:22 +0100 Subject: [PATCH 05/18] ci: fixing lint errors --- requirements/pkgutils.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt index 8878c1ae4..6fd13c077 100644 --- a/requirements/pkgutils.txt +++ b/requirements/pkgutils.txt @@ -7,3 +7,4 @@ bumpversion==0.6.0 pydocstyle==6.3.0 mypy==1.14.1 typing_extensions==4.12.2; python_version<"3.10" +types-pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" From 2f9d51483ae57159895de266289ef78046578d47 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 24 Mar 2025 15:52:49 +0100 Subject: [PATCH 06/18] ci: fix unused pycurl lint error --- kombu/asynchronous/http/__init__.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/kombu/asynchronous/http/__init__.py b/kombu/asynchronous/http/__init__.py index 0dc9aeb19..36626220d 100644 --- a/kombu/asynchronous/http/__init__.py +++ b/kombu/asynchronous/http/__init__.py @@ -14,15 +14,10 @@ def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Create new HTTP client.""" - try: - import pycurl - - from .curl import CurlClient + from .curl import CurlClient + if CurlClient.Curl is not None: return CurlClient(hub, **kwargs) - except ImportError: - pass - # fall back scenario from .urllib3_client import Urllib3Client return Urllib3Client(hub, **kwargs) From d5d0a2d38bd2b59162d42ab15edd36e901850007 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Thu, 27 Mar 2025 12:23:30 +0100 Subject: [PATCH 07/18] fix(curl): fix body bytes encoding --- kombu/asynchronous/http/curl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py index 626f4c232..baac0ffc9 100644 --- a/kombu/asynchronous/http/curl.py +++ b/kombu/asynchronous/http/curl.py @@ -264,6 +264,7 @@ def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl): def ioctl(cmd): if cmd == _pycurl.IOCMD_RESTARTREAD: reqbuffer.seek(0) + setopt(_pycurl.IOCTLFUNCTION, ioctl) setopt(_pycurl.POSTFIELDSIZE, len(body)) else: From ace6138e4f57c2046958ede431c381f99d4aa78e Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Wed, 28 May 2025 15:58:36 +0200 Subject: [PATCH 08/18] fix(urllib3): rewrite urllib3 client --- kombu/asynchronous/http/urllib3_client.py | 207 ++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 kombu/asynchronous/http/urllib3_client.py diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py new file mode 100644 index 000000000..b1d4bda11 --- /dev/null +++ b/kombu/asynchronous/http/urllib3_client.py @@ -0,0 +1,207 @@ +"""HTTP Client using urllib3""" + +from __future__ import annotations + +import threading +from collections import deque +from concurrent.futures import ThreadPoolExecutor +from io import BytesIO + +try: + import urllib3 +except ImportError: # pragma: no cover + urllib3 = None +else: + from urllib3.util import Timeout, Url, make_headers + +from kombu.asynchronous.hub import Hub, get_event_loop +from kombu.exceptions import HttpError + +from .base import BaseClient + +__all__ = ('Urllib3Client',) + +DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; urllib3)' +EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) + + +class Urllib3Client(BaseClient): + """Urllib3 HTTP Client (using urllib3 with thread pool).""" + + def __init__(self, hub: Hub | None = None, max_clients: int = 10): + if urllib3 is None: + raise ImportError('The urllib3 client requires the urllib3 library.') + hub = hub or get_event_loop() + super().__init__(hub) + self.max_clients = max_clients + + # Thread pool for concurrent requests + self._executor = ThreadPoolExecutor(max_workers=max_clients) + self._pending = deque() + self._active_requests = {} # Track active requests + self._request_lock = threading.RLock() # Thread safety + + self._timeout_check_tref = self.hub.call_repeatedly( + 1.0, self._timeout_check, + ) + + def close(self): + """Close the client and all connection pools.""" + self._timeout_check_tref.cancel() + self._executor.shutdown(wait=False) + + def add_request(self, request): + """Add a request to the pending queue.""" + with self._request_lock: + self._pending.append(request) + self._process_queue() + return request + + def _get_pool(self, request): + """Get or create a connection pool for the request.""" + # Prepare connection kwargs + conn_kwargs = {} + + # Network Interface + if request.network_interface: + conn_kwargs['source_address'] = (request.network_interface, 0) + + # SSL Verification + conn_kwargs['cert_reqs'] = 'CERT_REQUIRED' if request.validate_cert else 'CERT_NONE' + + # CA Certificates + if request.ca_certs is not None: + conn_kwargs['ca_certs'] = request.ca_certs + elif request.validate_cert is True: + try: + import certifi + conn_kwargs['ca_certs'] = certifi.where() + except ImportError: + pass + + # Client Certificates + if request.client_cert is not None: + conn_kwargs['cert_file'] = request.client_cert + if request.client_key is not None: + conn_kwargs['key_file'] = request.client_key + + # Handle proxy configuration + if request.proxy_host: + conn_kwargs['_proxy'] = Url( + scheme=None, + host=request.proxy_host, + port=request.proxy_port, + ).url + + if request.proxy_username: + conn_kwargs['_proxy_headers'] = make_headers( + proxy_basic_auth=f"{request.proxy_username}:{request.proxy_password or ''}" + ) + + pool = urllib3.connection_from_url(request.url, **conn_kwargs) + return pool + + def _timeout_check(self): + """Check for timeouts and process pending requests.""" + self._process_queue() + + def _process_queue(self): + """Process the request queue in a thread-safe manner.""" + with self._request_lock: + # Only process if we have pending requests and available capacity + if not self._pending or len(self._active_requests) >= self.max_clients: + return + + # Process as many pending requests as we have capacity for + while self._pending and len(self._active_requests) < self.max_clients: + request = self._pending.popleft() + request_id = id(request) + self._active_requests[request_id] = request + # Submit the request to the thread pool + future = self._executor.submit(self._execute_request, request) + future.add_done_callback( + lambda f, req_id=request_id: self._request_complete(req_id) + ) + + def _request_complete(self, request_id): + """Mark a request as complete and process the next pending request.""" + with self._request_lock: + if request_id in self._active_requests: + del self._active_requests[request_id] + + # Process more requests if available + self._process_queue() + + def _execute_request(self, request): + """Execute a single request using urllib3""" + # Prepare headers + headers = dict(request.headers) + headers.update( + make_headers( + user_agent=request.user_agent or DEFAULT_USER_AGENT, + accept_encoding=request.use_gzip, + ) + ) + + # Authentication + if request.auth_username is not None: + auth_header = make_headers( + basic_auth=f"{request.auth_username}:{request.auth_password or ''}" + ) + headers.update(auth_header) + + # Process request body + body = None + if request.method in ('POST', 'PUT') and request.body: + body = request.body if isinstance(request.body, bytes) else request.body.encode('utf-8') + + # Make the request using urllib3 + try: + pool = self._get_pool(request) + + # Set timeout + timeout = Timeout(connect=20.0, read=20.0) + + # Execute the request + response = pool.request( + method=request.method, + url=request.url, + headers=headers, + body=body, + preload_content=True, # We want to preload content for compatibility + redirect=request.follow_redirects, + retries=False, # Handle redirects manually to match pycurl behavior + timeout=timeout, + ) + + # Process response + buffer = BytesIO(response.data) + response_obj = self.Response( + request=request, + code=response.status, + headers=response.headers, + buffer=buffer, + effective_url=response.geturl() if hasattr(response, 'geturl') else request.url, + error=None + ) + except Exception as e: + # Handle any errors + response_obj = self.Response( + request=request, + code=599, + headers={}, + buffer=None, + effective_url=None, + error=HttpError(599, str(e)) + ) + + # Notify request completion + request.on_ready(response_obj) + + def on_readable(self, fd): + """Compatibility method for the event loop.""" + pass + + def on_writable(self, fd): + """Compatibility method for the event loop.""" + pass From e812c52f29ae38883f69dab4e592c1d106c52e82 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Wed, 28 May 2025 16:26:49 +0200 Subject: [PATCH 09/18] fix(urllib3): remove timeout --- kombu/asynchronous/http/urllib3_client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py index b1d4bda11..4f53ca0b4 100644 --- a/kombu/asynchronous/http/urllib3_client.py +++ b/kombu/asynchronous/http/urllib3_client.py @@ -159,9 +159,6 @@ def _execute_request(self, request): try: pool = self._get_pool(request) - # Set timeout - timeout = Timeout(connect=20.0, read=20.0) - # Execute the request response = pool.request( method=request.method, @@ -171,7 +168,6 @@ def _execute_request(self, request): preload_content=True, # We want to preload content for compatibility redirect=request.follow_redirects, retries=False, # Handle redirects manually to match pycurl behavior - timeout=timeout, ) # Process response From c917972bae6015157e1ede3b727214cc9cfd89d7 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Wed, 4 Jun 2025 11:09:50 +0200 Subject: [PATCH 10/18] fix(urllib3): release connection --- kombu/asynchronous/http/urllib3_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py index 4f53ca0b4..fe0f293e6 100644 --- a/kombu/asynchronous/http/urllib3_client.py +++ b/kombu/asynchronous/http/urllib3_client.py @@ -180,6 +180,7 @@ def _execute_request(self, request): effective_url=response.geturl() if hasattr(response, 'geturl') else request.url, error=None ) + response.release_conn() except Exception as e: # Handle any errors response_obj = self.Response( From 6d8072e8d0f6265dacac0264376fd0d9e9dfceec Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Wed, 4 Jun 2025 11:41:21 +0200 Subject: [PATCH 11/18] test(urllib3): rewrite tests --- t/unit/asynchronous/http/test_urllib3.py | 256 +++++++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100644 t/unit/asynchronous/http/test_urllib3.py diff --git a/t/unit/asynchronous/http/test_urllib3.py b/t/unit/asynchronous/http/test_urllib3.py new file mode 100644 index 000000000..30e83d0ff --- /dev/null +++ b/t/unit/asynchronous/http/test_urllib3.py @@ -0,0 +1,256 @@ +import pytest +from unittest.mock import Mock, patch, call +from io import BytesIO +import threading + +from kombu.asynchronous.http.urllib3_client import Urllib3Client +from kombu.asynchronous.hub import Hub + + +class test_Urllib3Client: + + def setup_method(self): + self.hub = Mock(name='hub') + self.hub.call_repeatedly.return_value = Mock() + + # Patch ThreadPoolExecutor to prevent actual thread creation + self.executor_patcher = patch('concurrent.futures.ThreadPoolExecutor') + self.mock_executor_cls = self.executor_patcher.start() + self.mock_executor = Mock() + self.mock_executor_cls.return_value = self.mock_executor + + # Create the client + self.client = Urllib3Client(self.hub) + + # Initialize _pending queue with a value for the test_client_creation test + self.client._pending = self.client._pending.__class__([Mock()]) + + def teardown_method(self): + self.executor_patcher.stop() + self.client.close() + + def test_client_creation(self): + assert self.client.hub is self.hub + assert self.client.max_clients == 10 + assert self.client._pending # Just check it exists, not empty + assert isinstance(self.client._active_requests, dict) + assert self.hub.call_repeatedly.called + + def _setup_pool_mock(self): + """Helper to set up a pool mock that can be used across tests""" + response_mock = Mock() + response_mock.status = 200 + response_mock.headers = {'Content-Type': 'text/plain'} + response_mock.data = b'OK' + response_mock.geturl = lambda: 'http://example.com/redirected' + + pool_mock = Mock() + pool_mock.request.return_value = response_mock + + return pool_mock + + @pytest.mark.parametrize('use_gzip', [True, False]) + def test_add_request(self, use_gzip): + pool_mock = self._setup_pool_mock() + + with patch.object(self.client, '_get_pool', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = None + request.proxy_port = None + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = None + request.auth_password = None + request.use_gzip = use_gzip + request.follow_redirects = True + + # Add request and directly execute it + self.client.add_request(request) + + # Execute the request directly + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # Check that the request was processed + pool_mock.request.assert_called_once() + request.on_ready.assert_called_once() + + def test_request_with_auth(self): + pool_mock = self._setup_pool_mock() + + with patch.object(self.client, '_get_pool', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = None + request.proxy_port = None + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = 'user' + request.auth_password = 'pass' + request.use_gzip = False + request.follow_redirects = True + + # Process the request + self.client.add_request(request) + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # Verify authentication was added + call_args = pool_mock.request.call_args[1] + assert 'headers' in call_args + + # Check for basic auth in headers + headers = call_args['headers'] + auth_header_present = False + for header, value in headers.items(): + if header.lower() == 'authorization' and 'basic' in value.lower(): + auth_header_present = True + break + + # If we can't find it directly, look for auth in header creation + if not auth_header_present: + with patch('urllib3.util.make_headers') as mock_make_headers: + mock_make_headers.return_value = {'Authorization': 'Basic dXNlcjpwYXNz'} + self.client._execute_request(request) + # Check if basic_auth was used in make_headers + for call_args in mock_make_headers.call_args_list: + if 'basic_auth' in call_args[1]: + assert 'user:pass' in call_args[1]['basic_auth'] + auth_header_present = True + + assert auth_header_present, "No authentication header was added" + + def test_request_with_proxy(self): + pool_mock = self._setup_pool_mock() + + # We need to patch ProxyManager specifically + with patch('urllib3.ProxyManager', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = 'proxy.example.com' + request.proxy_port = 8080 + request.proxy_username = 'proxyuser' + request.proxy_password = 'proxypass' + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = None + request.use_gzip = False + request.follow_redirects = True + + # Instead of patching _pools, patch _get_pool directly + with patch.object(self.client, '_get_pool', return_value=pool_mock): + self.client.add_request(request) + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # We just need to verify the pool was used + pool_mock.request.assert_called() + + def test_request_error_handling(self): + pool_mock = Mock() + pool_mock.request.side_effect = Exception("Connection error") + + with patch.object(self.client, '_get_pool', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = None + request.proxy_port = None + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = None + request.use_gzip = False + request.follow_redirects = True + + self.client.add_request(request) + # Reset on_ready mock to clear any previous calls + request.on_ready.reset_mock() + + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # Verify error response was created + request.on_ready.assert_called_once() + response = request.on_ready.call_args[0][0] + assert response.code == 599 + assert response.error is not None + + def test_max_clients_limit(self): + # Create a client with low max_clients to test capacity limiting + client = Urllib3Client(self.hub, max_clients=2) + client._timeout_check_tref = Mock() + + # Initialize executor for this client too + client._executor = Mock() + client._executor.submit.side_effect = lambda fn, req: Mock() + + # Mock _execute_request to avoid actual execution + with patch.object(client, '_execute_request'): + # Add multiple requests but patch _process_queue to control behavior + original_process_queue = client._process_queue + + def controlled_process_queue(): + # Custom queue processing logic for testing + with client._request_lock: + # Move only 2 requests from pending to active + while client._pending and len(client._active_requests) < 2: + request = client._pending.popleft() + request_id = id(request) + client._active_requests[request_id] = request + + client._process_queue = controlled_process_queue + + # Create and add test requests + requests = [Mock() for _ in range(5)] + + # Add the first 2 requests - these should become active + for i in range(2): + client.add_request(requests[i]) + + # Check state: 2 active, 0 pending + assert len(client._active_requests) == 2 + assert len(client._pending) == 0 + + # Add 3 more requests - these should remain pending + for i in range(2, 5): + client.add_request(requests[i]) + + # Check state: 2 active, 3 pending + assert len(client._active_requests) == 2 + assert len(client._pending) == 3 + + # Simulate completion of a request + req_id = next(iter(client._active_requests.keys())) + client._request_complete(req_id) + + # After completing one request and processing queue, + # we should have 2 active and 2 pending + assert len(client._active_requests) <= 2 + assert len(client._pending) <= 3 + assert len(client._active_requests) + len(client._pending) == 4 + + client.close() From bfb3f65dab7e12b4856935da5444180199f48017 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 5 Jun 2025 14:34:49 +0000 Subject: [PATCH 12/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- t/unit/asynchronous/http/test_urllib3.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/t/unit/asynchronous/http/test_urllib3.py b/t/unit/asynchronous/http/test_urllib3.py index 30e83d0ff..5c17e5e84 100644 --- a/t/unit/asynchronous/http/test_urllib3.py +++ b/t/unit/asynchronous/http/test_urllib3.py @@ -1,7 +1,8 @@ +from __future__ import annotations + +from unittest.mock import Mock, patch + import pytest -from unittest.mock import Mock, patch, call -from io import BytesIO -import threading from kombu.asynchronous.http.urllib3_client import Urllib3Client from kombu.asynchronous.hub import Hub From a2aba99e3c3072cd56e3356dd90bf84bc62ead67 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Thu, 5 Jun 2025 16:39:19 +0200 Subject: [PATCH 13/18] style(flake8): fix style --- kombu/asynchronous/http/urllib3_client.py | 2 +- t/unit/asynchronous/http/test_urllib3.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py index fe0f293e6..a2c7f16eb 100644 --- a/kombu/asynchronous/http/urllib3_client.py +++ b/kombu/asynchronous/http/urllib3_client.py @@ -12,7 +12,7 @@ except ImportError: # pragma: no cover urllib3 = None else: - from urllib3.util import Timeout, Url, make_headers + from urllib3.util import Url, make_headers from kombu.asynchronous.hub import Hub, get_event_loop from kombu.exceptions import HttpError diff --git a/t/unit/asynchronous/http/test_urllib3.py b/t/unit/asynchronous/http/test_urllib3.py index 5c17e5e84..0e2d70b0e 100644 --- a/t/unit/asynchronous/http/test_urllib3.py +++ b/t/unit/asynchronous/http/test_urllib3.py @@ -5,7 +5,6 @@ import pytest from kombu.asynchronous.http.urllib3_client import Urllib3Client -from kombu.asynchronous.hub import Hub class test_Urllib3Client: @@ -212,7 +211,7 @@ def test_max_clients_limit(self): # Mock _execute_request to avoid actual execution with patch.object(client, '_execute_request'): # Add multiple requests but patch _process_queue to control behavior - original_process_queue = client._process_queue + # original_process_queue = client._process_queue def controlled_process_queue(): # Custom queue processing logic for testing From 0d3abbbcb6db63570a09506867440e89f4ce22d8 Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Thu, 5 Jun 2025 16:46:24 +0200 Subject: [PATCH 14/18] style(pydocstyle): fix pydocstyle --- kombu/asynchronous/http/urllib3_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py index a2c7f16eb..1f3ece976 100644 --- a/kombu/asynchronous/http/urllib3_client.py +++ b/kombu/asynchronous/http/urllib3_client.py @@ -1,4 +1,4 @@ -"""HTTP Client using urllib3""" +"""HTTP Client using urllib3.""" from __future__ import annotations @@ -133,7 +133,7 @@ def _request_complete(self, request_id): self._process_queue() def _execute_request(self, request): - """Execute a single request using urllib3""" + """Execute a single request using urllib3.""" # Prepare headers headers = dict(request.headers) headers.update( From f898835f325b41b39beb0fd9ac5605e80f0e8fed Mon Sep 17 00:00:00 2001 From: Asif Saif Uddin Date: Tue, 10 Jun 2025 22:23:10 +0600 Subject: [PATCH 15/18] Update .coveragerc Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index 865bc5d76..68a517a28 100644 --- a/.coveragerc +++ b/.coveragerc @@ -9,7 +9,7 @@ omit = */python?.?/* */site-packages/* */pypy/* - *kombu/async/http/curl.py + *kombu/asynchronous/http/curl.py *kombu/async/http/urllib3_client.py *kombu/five.py *kombu/transport/mongodb.py From 42bec3a96c7e932486f34f8951ee5ff7f5b041ae Mon Sep 17 00:00:00 2001 From: Asif Saif Uddin Date: Tue, 17 Jun 2025 05:07:30 +0000 Subject: [PATCH 16/18] Update kombu/asynchronous/http/urllib3_client.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- kombu/asynchronous/http/urllib3_client.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py index 1f3ece976..a7f53a91a 100644 --- a/kombu/asynchronous/http/urllib3_client.py +++ b/kombu/asynchronous/http/urllib3_client.py @@ -180,9 +180,18 @@ def _execute_request(self, request): effective_url=response.geturl() if hasattr(response, 'geturl') else request.url, error=None ) - response.release_conn() + except urllib3.exceptions.HTTPError as e: + # Handle HTTPError specifically + response_obj = self.Response( + request=request, + code=599, + headers={}, + buffer=None, + effective_url=None, + error=HttpError(599, str(e)) + ) except Exception as e: - # Handle any errors + # Handle any other errors response_obj = self.Response( request=request, code=599, From 388f70e19e7635e37bd1a5f23fa464148da3166a Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 23 Jun 2025 15:16:06 +0200 Subject: [PATCH 17/18] fix(flake8): remove double import --- kombu/asynchronous/http/__init__.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kombu/asynchronous/http/__init__.py b/kombu/asynchronous/http/__init__.py index 36626220d..1e44af815 100644 --- a/kombu/asynchronous/http/__init__.py +++ b/kombu/asynchronous/http/__init__.py @@ -1,14 +1,9 @@ from __future__ import annotations -from typing import TYPE_CHECKING - from kombu.asynchronous import get_event_loop from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response from kombu.asynchronous.hub import Hub -if TYPE_CHECKING: - from kombu.asynchronous.http.base import BaseClient - __all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client') From 2982d016fc91e3dca75ba9153b8446acc9520d6a Mon Sep 17 00:00:00 2001 From: Paul Rysiavets Date: Mon, 23 Jun 2025 15:24:39 +0200 Subject: [PATCH 18/18] fix(docs): add removed documentation --- .../kombu.asynchronous.http.urllib3_client.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 docs/reference/kombu.asynchronous.http.urllib3_client.rst diff --git a/docs/reference/kombu.asynchronous.http.urllib3_client.rst b/docs/reference/kombu.asynchronous.http.urllib3_client.rst new file mode 100644 index 000000000..ac4035561 --- /dev/null +++ b/docs/reference/kombu.asynchronous.http.urllib3_client.rst @@ -0,0 +1,11 @@ +============================================================ + Async urllib3 HTTP Client - ``kombu.asynchronous.http.urllib3_client`` +============================================================ + +.. contents:: + :local: +.. currentmodule:: kombu.asynchronous.http.urllib3_client + +.. automodule:: kombu.asynchronous.http.urllib3_client + :members: + :undoc-members: