Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ omit =
*/python?.?/*
*/site-packages/*
*/pypy/*
*kombu/async/http/curl.py
*kombu/asynchronous/http/curl.py
*kombu/async/http/urllib3_client.py
*kombu/five.py
*kombu/transport/mongodb.py
*kombu/transport/filesystem.py
Expand Down
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Ollie Walsh <[email protected]>
Pascal Hartig <[email protected]>
Patrick Schneider <[email protected]>
Paul McLanahan <[email protected]>
Paul Rysiavets <[email protected]>
Petar Radosevic <[email protected]>
Peter Hoffmann <[email protected]>
Pierre Riteau <[email protected]>
Expand Down
1 change: 1 addition & 0 deletions docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Kombu Asynchronous
kombu.asynchronous.http
kombu.asynchronous.http.base
kombu.asynchronous.http.curl
kombu.asynchronous.http.urllib3_client
kombu.asynchronous.aws
kombu.asynchronous.aws.connection
kombu.asynchronous.aws.sqs
Expand Down
11 changes: 11 additions & 0 deletions docs/reference/kombu.asynchronous.http.urllib3_client.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
============================================================
Async urllib3 HTTP Client - ``kombu.asynchronous.http.urllib3_client``
============================================================

.. contents::
:local:
.. currentmodule:: kombu.asynchronous.http.urllib3_client

.. automodule:: kombu.asynchronous.http.urllib3_client
:members:
:undoc-members:
19 changes: 9 additions & 10 deletions kombu/asynchronous/http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from kombu.asynchronous import get_event_loop
from kombu.asynchronous.http.base import Headers, Request, Response
from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response
from kombu.asynchronous.hub import Hub

if TYPE_CHECKING:
from kombu.asynchronous.http.curl import CurlClient

__all__ = ('Client', 'Headers', 'Response', 'Request')
__all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client')


def Client(hub: Hub | None = None, **kwargs: int) -> CurlClient:
def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient:
"""Create new HTTP client."""
from .curl import CurlClient
return CurlClient(hub, **kwargs)
if CurlClient.Curl is not None:
return CurlClient(hub, **kwargs)

from .urllib3_client import Urllib3Client
return Urllib3Client(hub, **kwargs)


def get_client(hub: Hub | None = None, **kwargs: int) -> CurlClient:
def get_client(hub: Hub | None = None, **kwargs: int) -> BaseClient:
"""Get or create HTTP client bound to the current event loop."""
hub = hub or get_event_loop()
try:
Expand Down
1 change: 1 addition & 0 deletions kombu/asynchronous/http/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl):
def ioctl(cmd):
if cmd == _pycurl.IOCMD_RESTARTREAD:
reqbuffer.seek(0)

setopt(_pycurl.IOCTLFUNCTION, ioctl)
Comment on lines 264 to 268
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

ioctl callback must return 0 – missing return value breaks libcurl contract.

pycurl expects the IOCTLFUNCTION callback to return an integer status (0 = OK).
With no explicit return, Python yields None, which libcurl treats as a non-zero error, aborting the transfer on some platforms/curl versions.

                 def ioctl(cmd):
                     if cmd == _pycurl.IOCMD_RESTARTREAD:
                         reqbuffer.seek(0)
+                    return 0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def ioctl(cmd):
if cmd == _pycurl.IOCMD_RESTARTREAD:
reqbuffer.seek(0)
setopt(_pycurl.IOCTLFUNCTION, ioctl)
def ioctl(cmd):
if cmd == _pycurl.IOCMD_RESTARTREAD:
reqbuffer.seek(0)
return 0
setopt(_pycurl.IOCTLFUNCTION, ioctl)
🤖 Prompt for AI Agents
In kombu/asynchronous/http/curl.py around lines 264 to 268, the ioctl callback
function lacks a return statement, causing it to return None by default. This
breaks the libcurl contract which expects an integer status code, with 0
indicating success. Fix this by adding an explicit return 0 at the end of the
ioctl function to signal successful handling of the command.

setopt(_pycurl.POSTFIELDSIZE, len(body))
else:
Expand Down
213 changes: 213 additions & 0 deletions kombu/asynchronous/http/urllib3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""HTTP Client using urllib3."""

from __future__ import annotations

import threading
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO

try:
import urllib3
except ImportError: # pragma: no cover
urllib3 = None
else:
from urllib3.util import Url, make_headers

from kombu.asynchronous.hub import Hub, get_event_loop
from kombu.exceptions import HttpError

from .base import BaseClient

__all__ = ('Urllib3Client',)

DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; urllib3)'
EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH'])


class Urllib3Client(BaseClient):
"""Urllib3 HTTP Client (using urllib3 with thread pool)."""

def __init__(self, hub: Hub | None = None, max_clients: int = 10):
if urllib3 is None:
raise ImportError('The urllib3 client requires the urllib3 library.')
hub = hub or get_event_loop()
super().__init__(hub)
self.max_clients = max_clients

# Thread pool for concurrent requests
self._executor = ThreadPoolExecutor(max_workers=max_clients)
self._pending = deque()
self._active_requests = {} # Track active requests
self._request_lock = threading.RLock() # Thread safety

self._timeout_check_tref = self.hub.call_repeatedly(
1.0, self._timeout_check,
)

def close(self):
"""Close the client and all connection pools."""
self._timeout_check_tref.cancel()
self._executor.shutdown(wait=False)

def add_request(self, request):
"""Add a request to the pending queue."""
with self._request_lock:
self._pending.append(request)
self._process_queue()
return request

def _get_pool(self, request):
"""Get or create a connection pool for the request."""
# Prepare connection kwargs
conn_kwargs = {}

# Network Interface
if request.network_interface:
conn_kwargs['source_address'] = (request.network_interface, 0)

# SSL Verification
conn_kwargs['cert_reqs'] = 'CERT_REQUIRED' if request.validate_cert else 'CERT_NONE'

# CA Certificates
if request.ca_certs is not None:
conn_kwargs['ca_certs'] = request.ca_certs
elif request.validate_cert is True:
try:
import certifi
conn_kwargs['ca_certs'] = certifi.where()
except ImportError:
pass

# Client Certificates
if request.client_cert is not None:
conn_kwargs['cert_file'] = request.client_cert
if request.client_key is not None:
conn_kwargs['key_file'] = request.client_key

# Handle proxy configuration
if request.proxy_host:
conn_kwargs['_proxy'] = Url(
scheme=None,
host=request.proxy_host,
port=request.proxy_port,
).url

if request.proxy_username:
conn_kwargs['_proxy_headers'] = make_headers(
proxy_basic_auth=f"{request.proxy_username}:{request.proxy_password or ''}"
)

pool = urllib3.connection_from_url(request.url, **conn_kwargs)
return pool

def _timeout_check(self):
"""Check for timeouts and process pending requests."""
self._process_queue()

def _process_queue(self):
"""Process the request queue in a thread-safe manner."""
with self._request_lock:
# Only process if we have pending requests and available capacity
if not self._pending or len(self._active_requests) >= self.max_clients:
return

# Process as many pending requests as we have capacity for
while self._pending and len(self._active_requests) < self.max_clients:
request = self._pending.popleft()
request_id = id(request)
self._active_requests[request_id] = request
# Submit the request to the thread pool
future = self._executor.submit(self._execute_request, request)
future.add_done_callback(
lambda f, req_id=request_id: self._request_complete(req_id)
)

def _request_complete(self, request_id):
"""Mark a request as complete and process the next pending request."""
with self._request_lock:
if request_id in self._active_requests:
del self._active_requests[request_id]

# Process more requests if available
self._process_queue()

def _execute_request(self, request):
"""Execute a single request using urllib3."""
# Prepare headers
headers = dict(request.headers)
headers.update(
make_headers(
user_agent=request.user_agent or DEFAULT_USER_AGENT,
accept_encoding=request.use_gzip,
)
)

# Authentication
if request.auth_username is not None:
auth_header = make_headers(
basic_auth=f"{request.auth_username}:{request.auth_password or ''}"
)
headers.update(auth_header)

# Process request body
body = None
if request.method in ('POST', 'PUT') and request.body:
body = request.body if isinstance(request.body, bytes) else request.body.encode('utf-8')

# Make the request using urllib3
try:
pool = self._get_pool(request)

# Execute the request
response = pool.request(
method=request.method,
url=request.url,
headers=headers,
body=body,
preload_content=True, # We want to preload content for compatibility
redirect=request.follow_redirects,
retries=False, # Handle redirects manually to match pycurl behavior
)

# Process response
buffer = BytesIO(response.data)
response_obj = self.Response(
request=request,
code=response.status,
headers=response.headers,
buffer=buffer,
effective_url=response.geturl() if hasattr(response, 'geturl') else request.url,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove the non-existent geturl() method call.

The urllib3 HTTPResponse object doesn't have a geturl() method. This will always use the fallback request.url.

Simplify to always use the request URL:

-                effective_url=response.geturl() if hasattr(response, 'geturl') else request.url,
+                effective_url=request.url,

If you need to track redirects, you could use the response's URL from the pool:

-                effective_url=response.geturl() if hasattr(response, 'geturl') else request.url,
+                effective_url=response.url if hasattr(response, 'url') else request.url,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
effective_url=response.geturl() if hasattr(response, 'geturl') else request.url,
- effective_url=response.geturl() if hasattr(response, 'geturl') else request.url,
+ effective_url=request.url,
🤖 Prompt for AI Agents
In kombu/asynchronous/http/urllib3_client.py at line 180, remove the call to the
non-existent geturl() method on the response object and simplify the code to
always use request.url for effective_url, since urllib3's HTTPResponse does not
have geturl(). If redirect tracking is needed, consider using the URL from the
connection pool response instead.

error=None
)
except urllib3.exceptions.HTTPError as e:
# Handle HTTPError specifically
response_obj = self.Response(
request=request,
code=599,
headers={},
buffer=None,
effective_url=None,
error=HttpError(599, str(e))
)
except Exception as e:
# Handle any other errors
response_obj = self.Response(
request=request,
code=599,
headers={},
buffer=None,
effective_url=None,
error=HttpError(599, str(e))
)

# Notify request completion
request.on_ready(response_obj)

def on_readable(self, fd):
"""Compatibility method for the event loop."""
pass

def on_writable(self, fd):
"""Compatibility method for the event loop."""
pass
1 change: 1 addition & 0 deletions requirements/pkgutils.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ bumpversion==0.6.0
pydocstyle==6.3.0
mypy==1.14.1
typing_extensions==4.12.2; python_version<"3.10"
types-pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython"
Loading