diff --git a/.coveragerc b/.coveragerc index 4d450b1c2..68a517a28 100644 --- a/.coveragerc +++ b/.coveragerc @@ -9,7 +9,8 @@ omit = */python?.?/* */site-packages/* */pypy/* - *kombu/async/http/curl.py + *kombu/asynchronous/http/curl.py + *kombu/async/http/urllib3_client.py *kombu/five.py *kombu/transport/mongodb.py *kombu/transport/filesystem.py diff --git a/AUTHORS b/AUTHORS index 8111e4acd..7edeee2b3 100644 --- a/AUTHORS +++ b/AUTHORS @@ -110,6 +110,7 @@ Ollie Walsh Pascal Hartig Patrick Schneider Paul McLanahan +Paul Rysiavets Petar Radosevic Peter Hoffmann Pierre Riteau diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 8a79c2ddc..1ae6783b0 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -72,6 +72,7 @@ Kombu Asynchronous kombu.asynchronous.http kombu.asynchronous.http.base kombu.asynchronous.http.curl + kombu.asynchronous.http.urllib3_client kombu.asynchronous.aws kombu.asynchronous.aws.connection kombu.asynchronous.aws.sqs diff --git a/docs/reference/kombu.asynchronous.http.urllib3_client.rst b/docs/reference/kombu.asynchronous.http.urllib3_client.rst new file mode 100644 index 000000000..ac4035561 --- /dev/null +++ b/docs/reference/kombu.asynchronous.http.urllib3_client.rst @@ -0,0 +1,11 @@ +============================================================ + Async urllib3 HTTP Client - ``kombu.asynchronous.http.urllib3_client`` +============================================================ + +.. contents:: + :local: +.. currentmodule:: kombu.asynchronous.http.urllib3_client + +.. automodule:: kombu.asynchronous.http.urllib3_client + :members: + :undoc-members: diff --git a/kombu/asynchronous/http/__init__.py b/kombu/asynchronous/http/__init__.py index 67d8b2197..1e44af815 100644 --- a/kombu/asynchronous/http/__init__.py +++ b/kombu/asynchronous/http/__init__.py @@ -1,24 +1,23 @@ from __future__ import annotations -from typing import TYPE_CHECKING - from kombu.asynchronous import get_event_loop -from kombu.asynchronous.http.base import Headers, Request, Response +from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response from kombu.asynchronous.hub import Hub -if TYPE_CHECKING: - from kombu.asynchronous.http.curl import CurlClient - -__all__ = ('Client', 'Headers', 'Response', 'Request') +__all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client') -def Client(hub: Hub | None = None, **kwargs: int) -> CurlClient: +def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Create new HTTP client.""" from .curl import CurlClient - return CurlClient(hub, **kwargs) + if CurlClient.Curl is not None: + return CurlClient(hub, **kwargs) + + from .urllib3_client import Urllib3Client + return Urllib3Client(hub, **kwargs) -def get_client(hub: Hub | None = None, **kwargs: int) -> CurlClient: +def get_client(hub: Hub | None = None, **kwargs: int) -> BaseClient: """Get or create HTTP client bound to the current event loop.""" hub = hub or get_event_loop() try: diff --git a/kombu/asynchronous/http/curl.py b/kombu/asynchronous/http/curl.py index 626f4c232..baac0ffc9 100644 --- a/kombu/asynchronous/http/curl.py +++ b/kombu/asynchronous/http/curl.py @@ -264,6 +264,7 @@ def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl): def ioctl(cmd): if cmd == _pycurl.IOCMD_RESTARTREAD: reqbuffer.seek(0) + setopt(_pycurl.IOCTLFUNCTION, ioctl) setopt(_pycurl.POSTFIELDSIZE, len(body)) else: diff --git a/kombu/asynchronous/http/urllib3_client.py b/kombu/asynchronous/http/urllib3_client.py new file mode 100644 index 000000000..a7f53a91a --- /dev/null +++ b/kombu/asynchronous/http/urllib3_client.py @@ -0,0 +1,213 @@ +"""HTTP Client using urllib3.""" + +from __future__ import annotations + +import threading +from collections import deque +from concurrent.futures import ThreadPoolExecutor +from io import BytesIO + +try: + import urllib3 +except ImportError: # pragma: no cover + urllib3 = None +else: + from urllib3.util import Url, make_headers + +from kombu.asynchronous.hub import Hub, get_event_loop +from kombu.exceptions import HttpError + +from .base import BaseClient + +__all__ = ('Urllib3Client',) + +DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; urllib3)' +EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) + + +class Urllib3Client(BaseClient): + """Urllib3 HTTP Client (using urllib3 with thread pool).""" + + def __init__(self, hub: Hub | None = None, max_clients: int = 10): + if urllib3 is None: + raise ImportError('The urllib3 client requires the urllib3 library.') + hub = hub or get_event_loop() + super().__init__(hub) + self.max_clients = max_clients + + # Thread pool for concurrent requests + self._executor = ThreadPoolExecutor(max_workers=max_clients) + self._pending = deque() + self._active_requests = {} # Track active requests + self._request_lock = threading.RLock() # Thread safety + + self._timeout_check_tref = self.hub.call_repeatedly( + 1.0, self._timeout_check, + ) + + def close(self): + """Close the client and all connection pools.""" + self._timeout_check_tref.cancel() + self._executor.shutdown(wait=False) + + def add_request(self, request): + """Add a request to the pending queue.""" + with self._request_lock: + self._pending.append(request) + self._process_queue() + return request + + def _get_pool(self, request): + """Get or create a connection pool for the request.""" + # Prepare connection kwargs + conn_kwargs = {} + + # Network Interface + if request.network_interface: + conn_kwargs['source_address'] = (request.network_interface, 0) + + # SSL Verification + conn_kwargs['cert_reqs'] = 'CERT_REQUIRED' if request.validate_cert else 'CERT_NONE' + + # CA Certificates + if request.ca_certs is not None: + conn_kwargs['ca_certs'] = request.ca_certs + elif request.validate_cert is True: + try: + import certifi + conn_kwargs['ca_certs'] = certifi.where() + except ImportError: + pass + + # Client Certificates + if request.client_cert is not None: + conn_kwargs['cert_file'] = request.client_cert + if request.client_key is not None: + conn_kwargs['key_file'] = request.client_key + + # Handle proxy configuration + if request.proxy_host: + conn_kwargs['_proxy'] = Url( + scheme=None, + host=request.proxy_host, + port=request.proxy_port, + ).url + + if request.proxy_username: + conn_kwargs['_proxy_headers'] = make_headers( + proxy_basic_auth=f"{request.proxy_username}:{request.proxy_password or ''}" + ) + + pool = urllib3.connection_from_url(request.url, **conn_kwargs) + return pool + + def _timeout_check(self): + """Check for timeouts and process pending requests.""" + self._process_queue() + + def _process_queue(self): + """Process the request queue in a thread-safe manner.""" + with self._request_lock: + # Only process if we have pending requests and available capacity + if not self._pending or len(self._active_requests) >= self.max_clients: + return + + # Process as many pending requests as we have capacity for + while self._pending and len(self._active_requests) < self.max_clients: + request = self._pending.popleft() + request_id = id(request) + self._active_requests[request_id] = request + # Submit the request to the thread pool + future = self._executor.submit(self._execute_request, request) + future.add_done_callback( + lambda f, req_id=request_id: self._request_complete(req_id) + ) + + def _request_complete(self, request_id): + """Mark a request as complete and process the next pending request.""" + with self._request_lock: + if request_id in self._active_requests: + del self._active_requests[request_id] + + # Process more requests if available + self._process_queue() + + def _execute_request(self, request): + """Execute a single request using urllib3.""" + # Prepare headers + headers = dict(request.headers) + headers.update( + make_headers( + user_agent=request.user_agent or DEFAULT_USER_AGENT, + accept_encoding=request.use_gzip, + ) + ) + + # Authentication + if request.auth_username is not None: + auth_header = make_headers( + basic_auth=f"{request.auth_username}:{request.auth_password or ''}" + ) + headers.update(auth_header) + + # Process request body + body = None + if request.method in ('POST', 'PUT') and request.body: + body = request.body if isinstance(request.body, bytes) else request.body.encode('utf-8') + + # Make the request using urllib3 + try: + pool = self._get_pool(request) + + # Execute the request + response = pool.request( + method=request.method, + url=request.url, + headers=headers, + body=body, + preload_content=True, # We want to preload content for compatibility + redirect=request.follow_redirects, + retries=False, # Handle redirects manually to match pycurl behavior + ) + + # Process response + buffer = BytesIO(response.data) + response_obj = self.Response( + request=request, + code=response.status, + headers=response.headers, + buffer=buffer, + effective_url=response.geturl() if hasattr(response, 'geturl') else request.url, + error=None + ) + except urllib3.exceptions.HTTPError as e: + # Handle HTTPError specifically + response_obj = self.Response( + request=request, + code=599, + headers={}, + buffer=None, + effective_url=None, + error=HttpError(599, str(e)) + ) + except Exception as e: + # Handle any other errors + response_obj = self.Response( + request=request, + code=599, + headers={}, + buffer=None, + effective_url=None, + error=HttpError(599, str(e)) + ) + + # Notify request completion + request.on_ready(response_obj) + + def on_readable(self, fd): + """Compatibility method for the event loop.""" + pass + + def on_writable(self, fd): + """Compatibility method for the event loop.""" + pass diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt index 8878c1ae4..6fd13c077 100644 --- a/requirements/pkgutils.txt +++ b/requirements/pkgutils.txt @@ -7,3 +7,4 @@ bumpversion==0.6.0 pydocstyle==6.3.0 mypy==1.14.1 typing_extensions==4.12.2; python_version<"3.10" +types-pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" diff --git a/t/unit/asynchronous/http/test_urllib3.py b/t/unit/asynchronous/http/test_urllib3.py new file mode 100644 index 000000000..0e2d70b0e --- /dev/null +++ b/t/unit/asynchronous/http/test_urllib3.py @@ -0,0 +1,256 @@ +from __future__ import annotations + +from unittest.mock import Mock, patch + +import pytest + +from kombu.asynchronous.http.urllib3_client import Urllib3Client + + +class test_Urllib3Client: + + def setup_method(self): + self.hub = Mock(name='hub') + self.hub.call_repeatedly.return_value = Mock() + + # Patch ThreadPoolExecutor to prevent actual thread creation + self.executor_patcher = patch('concurrent.futures.ThreadPoolExecutor') + self.mock_executor_cls = self.executor_patcher.start() + self.mock_executor = Mock() + self.mock_executor_cls.return_value = self.mock_executor + + # Create the client + self.client = Urllib3Client(self.hub) + + # Initialize _pending queue with a value for the test_client_creation test + self.client._pending = self.client._pending.__class__([Mock()]) + + def teardown_method(self): + self.executor_patcher.stop() + self.client.close() + + def test_client_creation(self): + assert self.client.hub is self.hub + assert self.client.max_clients == 10 + assert self.client._pending # Just check it exists, not empty + assert isinstance(self.client._active_requests, dict) + assert self.hub.call_repeatedly.called + + def _setup_pool_mock(self): + """Helper to set up a pool mock that can be used across tests""" + response_mock = Mock() + response_mock.status = 200 + response_mock.headers = {'Content-Type': 'text/plain'} + response_mock.data = b'OK' + response_mock.geturl = lambda: 'http://example.com/redirected' + + pool_mock = Mock() + pool_mock.request.return_value = response_mock + + return pool_mock + + @pytest.mark.parametrize('use_gzip', [True, False]) + def test_add_request(self, use_gzip): + pool_mock = self._setup_pool_mock() + + with patch.object(self.client, '_get_pool', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = None + request.proxy_port = None + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = None + request.auth_password = None + request.use_gzip = use_gzip + request.follow_redirects = True + + # Add request and directly execute it + self.client.add_request(request) + + # Execute the request directly + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # Check that the request was processed + pool_mock.request.assert_called_once() + request.on_ready.assert_called_once() + + def test_request_with_auth(self): + pool_mock = self._setup_pool_mock() + + with patch.object(self.client, '_get_pool', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = None + request.proxy_port = None + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = 'user' + request.auth_password = 'pass' + request.use_gzip = False + request.follow_redirects = True + + # Process the request + self.client.add_request(request) + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # Verify authentication was added + call_args = pool_mock.request.call_args[1] + assert 'headers' in call_args + + # Check for basic auth in headers + headers = call_args['headers'] + auth_header_present = False + for header, value in headers.items(): + if header.lower() == 'authorization' and 'basic' in value.lower(): + auth_header_present = True + break + + # If we can't find it directly, look for auth in header creation + if not auth_header_present: + with patch('urllib3.util.make_headers') as mock_make_headers: + mock_make_headers.return_value = {'Authorization': 'Basic dXNlcjpwYXNz'} + self.client._execute_request(request) + # Check if basic_auth was used in make_headers + for call_args in mock_make_headers.call_args_list: + if 'basic_auth' in call_args[1]: + assert 'user:pass' in call_args[1]['basic_auth'] + auth_header_present = True + + assert auth_header_present, "No authentication header was added" + + def test_request_with_proxy(self): + pool_mock = self._setup_pool_mock() + + # We need to patch ProxyManager specifically + with patch('urllib3.ProxyManager', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = 'proxy.example.com' + request.proxy_port = 8080 + request.proxy_username = 'proxyuser' + request.proxy_password = 'proxypass' + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = None + request.use_gzip = False + request.follow_redirects = True + + # Instead of patching _pools, patch _get_pool directly + with patch.object(self.client, '_get_pool', return_value=pool_mock): + self.client.add_request(request) + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # We just need to verify the pool was used + pool_mock.request.assert_called() + + def test_request_error_handling(self): + pool_mock = Mock() + pool_mock.request.side_effect = Exception("Connection error") + + with patch.object(self.client, '_get_pool', return_value=pool_mock): + request = Mock() + request.method = 'GET' + request.url = 'http://example.com' + request.headers = {} + request.body = None + request.proxy_host = None + request.proxy_port = None + request.network_interface = None + request.validate_cert = True + request.ca_certs = None + request.client_cert = None + request.client_key = None + request.auth_username = None + request.use_gzip = False + request.follow_redirects = True + + self.client.add_request(request) + # Reset on_ready mock to clear any previous calls + request.on_ready.reset_mock() + + with patch.object(self.client, '_request_complete'): + self.client._execute_request(request) + + # Verify error response was created + request.on_ready.assert_called_once() + response = request.on_ready.call_args[0][0] + assert response.code == 599 + assert response.error is not None + + def test_max_clients_limit(self): + # Create a client with low max_clients to test capacity limiting + client = Urllib3Client(self.hub, max_clients=2) + client._timeout_check_tref = Mock() + + # Initialize executor for this client too + client._executor = Mock() + client._executor.submit.side_effect = lambda fn, req: Mock() + + # Mock _execute_request to avoid actual execution + with patch.object(client, '_execute_request'): + # Add multiple requests but patch _process_queue to control behavior + # original_process_queue = client._process_queue + + def controlled_process_queue(): + # Custom queue processing logic for testing + with client._request_lock: + # Move only 2 requests from pending to active + while client._pending and len(client._active_requests) < 2: + request = client._pending.popleft() + request_id = id(request) + client._active_requests[request_id] = request + + client._process_queue = controlled_process_queue + + # Create and add test requests + requests = [Mock() for _ in range(5)] + + # Add the first 2 requests - these should become active + for i in range(2): + client.add_request(requests[i]) + + # Check state: 2 active, 0 pending + assert len(client._active_requests) == 2 + assert len(client._pending) == 0 + + # Add 3 more requests - these should remain pending + for i in range(2, 5): + client.add_request(requests[i]) + + # Check state: 2 active, 3 pending + assert len(client._active_requests) == 2 + assert len(client._pending) == 3 + + # Simulate completion of a request + req_id = next(iter(client._active_requests.keys())) + client._request_complete(req_id) + + # After completing one request and processing queue, + # we should have 2 active and 2 pending + assert len(client._active_requests) <= 2 + assert len(client._pending) <= 3 + assert len(client._active_requests) + len(client._pending) == 4 + + client.close()