Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions pyatlan/client/atlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,14 @@ def __init__(self, **data):
# Pass self reference to transport for duplicate checking during retries
self._session = httpx.Client(
transport=PyatlanSyncTransport(
retry=self.retry, client=self, **transport_kwargs
retry=self.retry,
client=self,
limits=httpx.Limits(
max_connections=50,
max_keepalive_connections=10,
keepalive_expiry=30.0,
),
**transport_kwargs,
),
headers={
"x-atlan-agent": "sdk",
Expand Down Expand Up @@ -282,6 +289,41 @@ def _build_transport_proxy_config(self, data: Dict[str, Any]) -> Dict[str, Any]:
transport_kwargs["verify"] = ssl_cert_file
return transport_kwargs

def reset_http_session(self) -> None:
"""Close and rebuild the HTTP session to recover from a degraded connection pool."""
try:
self._session.close()
except Exception:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider: The bare except Exception: pass silently swallows all errors during session close. While this is intentional (to ensure we always proceed to create a new session), logging the exception at DEBUG level would help with troubleshooting:

Suggested change
except Exception:
except Exception as e:
LOGGER.debug("Failed to close old HTTP session: %s", e)

This preserves the recovery-at-all-costs behavior while leaving a trace for debugging.

pass
transport_kwargs = {}
if self.proxy is not None:
transport_kwargs["proxy"] = self.proxy
if self.verify is not None:
transport_kwargs["verify"] = self.verify
self._session = httpx.Client(
transport=PyatlanSyncTransport(
retry=self.retry,
client=self,
limits=httpx.Limits(
max_connections=50,
max_keepalive_connections=10,
keepalive_expiry=30.0,
),
**transport_kwargs,
),
headers={
"x-atlan-agent": "sdk",
"x-atlan-agent-id": "python",
"x-atlan-client-origin": "product_sdk",
"x-atlan-python-version": get_python_version(),
"x-atlan-client-type": "sync",
"User-Agent": f"Atlan-PythonSDK/{VERSION}",
},
event_hooks={"response": [log_response]},
)
self._401_has_retried.set(False)
LOGGER.warning("HTTP session reset: new connection pool created")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: LOGGER.warning() for a normal recovery operation may be too noisy in production. Consider using LOGGER.info() since this is expected recovery behavior, not an unexpected condition.


@property
def admin(self) -> AdminClient:
if self._admin_client is None:
Expand Down Expand Up @@ -554,7 +596,10 @@ def _call_api_internal(
try:
params["headers"]["X-Atlan-Request-Id"] = request_id_var.get()
timeout = httpx.Timeout(
None, connect=self.connect_timeout, read=self.read_timeout
None,
connect=self.connect_timeout,
read=self.read_timeout,
pool=30.0,
)
if binary_data:
response = self._session.request(
Expand Down Expand Up @@ -2014,7 +2059,14 @@ def max_retries( # type: ignore[misc]
transport_kwargs["verify"] = self.verify

new_transport = PyatlanSyncTransport(
retry=max_retries, client=self, **transport_kwargs
retry=max_retries,
client=self,
limits=httpx.Limits(
max_connections=50,
max_keepalive_connections=10,
keepalive_expiry=30.0,
),
**transport_kwargs,
)
self._session._transport = new_transport

Expand Down
10 changes: 10 additions & 0 deletions pyatlan/client/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ def _retry_operation(
logger.debug(
"_retry_operation retrying response=%s retry=%s", response, retry
)
# Close the response stream before sleeping so httpcore releases
# the connection back to the pool. Headers are already buffered
# in memory, so retry.sleep() can still read Retry-After.
if isinstance(response, httpx.Response):
response.close()
retry = retry.increment()
retry.sleep(response)

Expand Down Expand Up @@ -226,6 +231,11 @@ async def _retry_operation_async(
response,
retry,
)
# Close the response stream before sleeping so httpcore releases
# the connection back to the pool. Headers are already buffered
# in memory, so retry.asleep() can still read Retry-After.
if isinstance(response, httpx.Response):
await response.aclose()
retry = retry.increment()
await retry.asleep(response)

Expand Down
246 changes: 246 additions & 0 deletions tests/integration/test_connection_pool_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2026 Atlan Pte. Ltd.
"""
Integration tests for GOVFOUN-408: httpcore connection pool deadlock fix.

These tests run against a live Atlan tenant and verify:
1. Pool limits and timeout config survive the full client init and real HTTP
connections — not just object construction.
2. Concurrent requests complete without hanging under the configured pool size.
3. httpx.PoolTimeout propagates through the full SDK stack instead of being
swallowed or causing an indefinite hang.
4. max_retries context manager correctly installs and restores limits on a
client that is actively connected.
"""

import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait
from unittest.mock import patch

import httpx
import pytest
from httpx_retries import Retry

from pyatlan.client.atlan import AtlanClient
from pyatlan.client.transport import PyatlanSyncTransport


def _get_httpcore_pool(client: AtlanClient):
return client._session._transport._transport._pool


# ---------------------------------------------------------------------------
# Pool limits — config survives real HTTP connections
# ---------------------------------------------------------------------------


def test_pool_limits_on_live_client(client: AtlanClient):
"""
Pool limits are configured correctly on an AtlanClient built from the
integration environment (real base_url, real API key, real retry config).
The values are set at construction time and must survive unchanged.
"""
pool = _get_httpcore_pool(client)
assert pool._max_connections == 50
assert pool._keepalive_expiry == 30.0
assert pool._max_keepalive_connections == 10


# ---------------------------------------------------------------------------
# Concurrent requests — happy path
# ---------------------------------------------------------------------------


def test_concurrent_requests_complete_without_deadlock(client: AtlanClient):
"""
N concurrent requests (N << max_connections=50) must all complete.

Under the old config (pool=None, max_connections=100), if connections
accumulated CLOSE_WAIT sockets and filled all slots, threads waiting for
a connection would block on threading.Event.wait(timeout=None) forever.

With pool=30.0 and max_connections=50, threads either succeed or raise
PoolTimeout within 30s — they never hang indefinitely.
"""
n_threads = 5
timeout_seconds = 60

def make_request():
return client.user.get_current()

with ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = [executor.submit(make_request) for _ in range(n_threads)]
done, not_done = wait(futures, timeout=timeout_seconds)

# Only check for deadlock — individual futures may raise (auth errors, etc.)
assert len(not_done) == 0, (
f"{len(not_done)} of {n_threads} requests still pending after "
f"{timeout_seconds}s — possible connection pool deadlock"
)


# ---------------------------------------------------------------------------
# PoolTimeout propagation
# ---------------------------------------------------------------------------


def test_pool_timeout_propagates_through_sdk_stack(client: AtlanClient):
"""
httpx.PoolTimeout injected at the transport layer must propagate up
through the SDK without being swallowed or stalling.

Before the fix, pool=None meant threads blocked indefinitely on
threading.Event.wait(timeout=None). With pool=30.0 the SDK raises an
exception quickly instead.
"""
original_transport = client._session._transport

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fragile test: Accessing client._session._transport._transport._pool relies on internal httpx/httpcore implementation details. If httpx changes its internal structure, this test will break.

Consider adding a comment documenting this coupling, or wrapping it in a try/except with pytest.skip() if the internal structure changes:

def _get_httpcore_pool(client: AtlanClient):
    """Access the underlying httpcore pool. Relies on httpx internals; may need updating if httpx changes."""
    try:
        return client._session._transport._transport._pool
    except AttributeError:
        pytest.skip("httpx internal structure changed; update test helper")


test_transport = PyatlanSyncTransport(
retry=Retry(total=0),
client=client,
limits=httpx.Limits(
max_connections=50,
max_keepalive_connections=10,
keepalive_expiry=30.0,
),
)

def always_pool_timeout(request: httpx.Request) -> httpx.Response:
raise httpx.PoolTimeout("simulated pool exhaustion", request=request)

test_transport._transport.handle_request = always_pool_timeout # type: ignore[method-assign]
client._session._transport = test_transport

try:
start = time.monotonic()
with pytest.raises(Exception):
client.user.get_current()
elapsed = time.monotonic() - start

assert elapsed < 5.0, (
f"PoolTimeout took {elapsed:.1f}s to propagate — "
"it may be getting swallowed or retried unexpectedly"
)
finally:
client._session._transport = original_transport


# ---------------------------------------------------------------------------
# max_retries context manager
# ---------------------------------------------------------------------------


def test_max_retries_transport_limits_on_live_client(client: AtlanClient):
"""
max_retries context manager must install a transport with the same pool
limits on the real integration client (env-configured base_url, retry policy).
"""
captured: dict = {}
original_init = PyatlanSyncTransport.__init__

def capturing_init(self_t, retry=None, client=None, **kwargs): # noqa: ARG001
if "limits" in kwargs:
captured["limits"] = kwargs["limits"]
original_init(self_t, retry=retry, client=client, **kwargs)

with patch.object(PyatlanSyncTransport, "__init__", capturing_init):
with client.max_retries():
pass

limits = captured.get("limits")
assert limits is not None, "max_retries did not pass limits to PyatlanSyncTransport"
assert limits.max_connections == 50
assert limits.keepalive_expiry == 30.0
assert limits.max_keepalive_connections == 10


def test_max_retries_restores_original_transport(client: AtlanClient):
"""
max_retries must restore the original transport after the context exits,
even on the real integration client (env-configured base_url, retry policy).
"""
original_transport = client._session._transport

with client.max_retries():
pass

assert client._session._transport is original_transport


# ---------------------------------------------------------------------------
# Pool slot released during 429 retry sleep
# ---------------------------------------------------------------------------


def test_concurrent_429_retries_no_pool_timeout(client: AtlanClient):
"""
Connection slots must be freed before retry.sleep() when receiving 429.

GOVFOUN-408 root cause: PyatlanSyncTransport._retry_operation called
retry.sleep(response) while the response stream was still open, holding
the httpcore connection slot for the entire sleep duration. With N threads
all sleeping simultaneously, the pool was exhausted and subsequent requests
raised PoolTimeout.

Fix: response.close() before retry.sleep(). This test verifies that N
concurrent 429 retries complete without PoolTimeout when max_connections=N.
"""
n_threads = 5
call_lock = threading.Lock()
thread_calls: dict = {}

def counting_handle(request: httpx.Request) -> httpx.Response:
tid = threading.get_ident()
with call_lock:
count = thread_calls.get(tid, 0)
thread_calls[tid] = count + 1
if count == 0:
# First call per thread: 429 with zero Retry-After
return httpx.Response(429, headers={"Retry-After": "0"}, content=b"")
return httpx.Response(200, content=b"{}")

test_transport = PyatlanSyncTransport(
retry=Retry(
total=3,
backoff_factor=0,
status_forcelist=[429],
allowed_methods=["GET", "POST"],
respect_retry_after_header=True,
),
limits=httpx.Limits(
max_connections=n_threads,
max_keepalive_connections=2,
keepalive_expiry=5.0,
),
)
test_transport._transport.handle_request = counting_handle # type: ignore[method-assign]

original_transport = client._session._transport
client._session._transport = test_transport

try:
pool_timeout_errors: list = []

def make_request():
try:
client.user.get_current()
except httpx.PoolTimeout as exc:
pool_timeout_errors.append(str(exc))
except Exception:
pass # Auth/parse errors from mock 200 body are expected

with ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = [executor.submit(make_request) for _ in range(n_threads)]
done, not_done = wait(futures, timeout=30.0)

assert len(not_done) == 0, (
f"{len(not_done)} of {n_threads} threads hung after 30s — "
"possible connection pool deadlock"
)
assert not pool_timeout_errors, (
"PoolTimeout raised — connection slot held open during retry.sleep(): "
f"{pool_timeout_errors}"
)
finally:
client._session._transport = original_transport
Loading
Loading