diff --git a/pyatlan/client/atlan.py b/pyatlan/client/atlan.py index ae63ac21e..2acf22225 100644 --- a/pyatlan/client/atlan.py +++ b/pyatlan/client/atlan.py @@ -214,7 +214,14 @@ def __init__(self, **data): # Pass self reference to transport for duplicate checking during retries self._session = httpx.Client( transport=PyatlanSyncTransport( - retry=self.retry, client=self, **transport_kwargs + retry=self.retry, + client=self, + limits=httpx.Limits( + max_connections=50, + max_keepalive_connections=10, + keepalive_expiry=30.0, + ), + **transport_kwargs, ), headers={ "x-atlan-agent": "sdk", @@ -282,6 +289,41 @@ def _build_transport_proxy_config(self, data: Dict[str, Any]) -> Dict[str, Any]: transport_kwargs["verify"] = ssl_cert_file return transport_kwargs + def reset_http_session(self) -> None: + """Close and rebuild the HTTP session to recover from a degraded connection pool.""" + try: + self._session.close() + except Exception: + pass + transport_kwargs = {} + if self.proxy is not None: + transport_kwargs["proxy"] = self.proxy + if self.verify is not None: + transport_kwargs["verify"] = self.verify + self._session = httpx.Client( + transport=PyatlanSyncTransport( + retry=self.retry, + client=self, + limits=httpx.Limits( + max_connections=50, + max_keepalive_connections=10, + keepalive_expiry=30.0, + ), + **transport_kwargs, + ), + headers={ + "x-atlan-agent": "sdk", + "x-atlan-agent-id": "python", + "x-atlan-client-origin": "product_sdk", + "x-atlan-python-version": get_python_version(), + "x-atlan-client-type": "sync", + "User-Agent": f"Atlan-PythonSDK/{VERSION}", + }, + event_hooks={"response": [log_response]}, + ) + self._401_has_retried.set(False) + LOGGER.warning("HTTP session reset: new connection pool created") + @property def admin(self) -> AdminClient: if self._admin_client is None: @@ -554,7 +596,10 @@ def _call_api_internal( try: params["headers"]["X-Atlan-Request-Id"] = request_id_var.get() timeout = httpx.Timeout( - None, connect=self.connect_timeout, read=self.read_timeout + None, + connect=self.connect_timeout, + read=self.read_timeout, + pool=30.0, ) if binary_data: response = self._session.request( @@ -2014,7 +2059,14 @@ def max_retries( # type: ignore[misc] transport_kwargs["verify"] = self.verify new_transport = PyatlanSyncTransport( - retry=max_retries, client=self, **transport_kwargs + retry=max_retries, + client=self, + limits=httpx.Limits( + max_connections=50, + max_keepalive_connections=10, + keepalive_expiry=30.0, + ), + **transport_kwargs, ) self._session._transport = new_transport diff --git a/pyatlan/client/transport.py b/pyatlan/client/transport.py index 8a8e264e5..b8fa6378b 100644 --- a/pyatlan/client/transport.py +++ b/pyatlan/client/transport.py @@ -106,6 +106,11 @@ def _retry_operation( logger.debug( "_retry_operation retrying response=%s retry=%s", response, retry ) + # Close the response stream before sleeping so httpcore releases + # the connection back to the pool. Headers are already buffered + # in memory, so retry.sleep() can still read Retry-After. + if isinstance(response, httpx.Response): + response.close() retry = retry.increment() retry.sleep(response) @@ -226,6 +231,11 @@ async def _retry_operation_async( response, retry, ) + # Close the response stream before sleeping so httpcore releases + # the connection back to the pool. Headers are already buffered + # in memory, so retry.asleep() can still read Retry-After. + if isinstance(response, httpx.Response): + await response.aclose() retry = retry.increment() await retry.asleep(response) diff --git a/tests/integration/test_connection_pool_config.py b/tests/integration/test_connection_pool_config.py new file mode 100644 index 000000000..ab15dddfc --- /dev/null +++ b/tests/integration/test_connection_pool_config.py @@ -0,0 +1,246 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. +""" +Integration tests for GOVFOUN-408: httpcore connection pool deadlock fix. + +These tests run against a live Atlan tenant and verify: +1. Pool limits and timeout config survive the full client init and real HTTP + connections — not just object construction. +2. Concurrent requests complete without hanging under the configured pool size. +3. httpx.PoolTimeout propagates through the full SDK stack instead of being + swallowed or causing an indefinite hang. +4. max_retries context manager correctly installs and restores limits on a + client that is actively connected. +""" + +import threading +import time +from concurrent.futures import ThreadPoolExecutor, wait +from unittest.mock import patch + +import httpx +import pytest +from httpx_retries import Retry + +from pyatlan.client.atlan import AtlanClient +from pyatlan.client.transport import PyatlanSyncTransport + + +def _get_httpcore_pool(client: AtlanClient): + return client._session._transport._transport._pool + + +# --------------------------------------------------------------------------- +# Pool limits — config survives real HTTP connections +# --------------------------------------------------------------------------- + + +def test_pool_limits_on_live_client(client: AtlanClient): + """ + Pool limits are configured correctly on an AtlanClient built from the + integration environment (real base_url, real API key, real retry config). + The values are set at construction time and must survive unchanged. + """ + pool = _get_httpcore_pool(client) + assert pool._max_connections == 50 + assert pool._keepalive_expiry == 30.0 + assert pool._max_keepalive_connections == 10 + + +# --------------------------------------------------------------------------- +# Concurrent requests — happy path +# --------------------------------------------------------------------------- + + +def test_concurrent_requests_complete_without_deadlock(client: AtlanClient): + """ + N concurrent requests (N << max_connections=50) must all complete. + + Under the old config (pool=None, max_connections=100), if connections + accumulated CLOSE_WAIT sockets and filled all slots, threads waiting for + a connection would block on threading.Event.wait(timeout=None) forever. + + With pool=30.0 and max_connections=50, threads either succeed or raise + PoolTimeout within 30s — they never hang indefinitely. + """ + n_threads = 5 + timeout_seconds = 60 + + def make_request(): + return client.user.get_current() + + with ThreadPoolExecutor(max_workers=n_threads) as executor: + futures = [executor.submit(make_request) for _ in range(n_threads)] + done, not_done = wait(futures, timeout=timeout_seconds) + + # Only check for deadlock — individual futures may raise (auth errors, etc.) + assert len(not_done) == 0, ( + f"{len(not_done)} of {n_threads} requests still pending after " + f"{timeout_seconds}s — possible connection pool deadlock" + ) + + +# --------------------------------------------------------------------------- +# PoolTimeout propagation +# --------------------------------------------------------------------------- + + +def test_pool_timeout_propagates_through_sdk_stack(client: AtlanClient): + """ + httpx.PoolTimeout injected at the transport layer must propagate up + through the SDK without being swallowed or stalling. + + Before the fix, pool=None meant threads blocked indefinitely on + threading.Event.wait(timeout=None). With pool=30.0 the SDK raises an + exception quickly instead. + """ + original_transport = client._session._transport + + test_transport = PyatlanSyncTransport( + retry=Retry(total=0), + client=client, + limits=httpx.Limits( + max_connections=50, + max_keepalive_connections=10, + keepalive_expiry=30.0, + ), + ) + + def always_pool_timeout(request: httpx.Request) -> httpx.Response: + raise httpx.PoolTimeout("simulated pool exhaustion", request=request) + + test_transport._transport.handle_request = always_pool_timeout # type: ignore[method-assign] + client._session._transport = test_transport + + try: + start = time.monotonic() + with pytest.raises(Exception): + client.user.get_current() + elapsed = time.monotonic() - start + + assert elapsed < 5.0, ( + f"PoolTimeout took {elapsed:.1f}s to propagate — " + "it may be getting swallowed or retried unexpectedly" + ) + finally: + client._session._transport = original_transport + + +# --------------------------------------------------------------------------- +# max_retries context manager +# --------------------------------------------------------------------------- + + +def test_max_retries_transport_limits_on_live_client(client: AtlanClient): + """ + max_retries context manager must install a transport with the same pool + limits on the real integration client (env-configured base_url, retry policy). + """ + captured: dict = {} + original_init = PyatlanSyncTransport.__init__ + + def capturing_init(self_t, retry=None, client=None, **kwargs): # noqa: ARG001 + if "limits" in kwargs: + captured["limits"] = kwargs["limits"] + original_init(self_t, retry=retry, client=client, **kwargs) + + with patch.object(PyatlanSyncTransport, "__init__", capturing_init): + with client.max_retries(): + pass + + limits = captured.get("limits") + assert limits is not None, "max_retries did not pass limits to PyatlanSyncTransport" + assert limits.max_connections == 50 + assert limits.keepalive_expiry == 30.0 + assert limits.max_keepalive_connections == 10 + + +def test_max_retries_restores_original_transport(client: AtlanClient): + """ + max_retries must restore the original transport after the context exits, + even on the real integration client (env-configured base_url, retry policy). + """ + original_transport = client._session._transport + + with client.max_retries(): + pass + + assert client._session._transport is original_transport + + +# --------------------------------------------------------------------------- +# Pool slot released during 429 retry sleep +# --------------------------------------------------------------------------- + + +def test_concurrent_429_retries_no_pool_timeout(client: AtlanClient): + """ + Connection slots must be freed before retry.sleep() when receiving 429. + + GOVFOUN-408 root cause: PyatlanSyncTransport._retry_operation called + retry.sleep(response) while the response stream was still open, holding + the httpcore connection slot for the entire sleep duration. With N threads + all sleeping simultaneously, the pool was exhausted and subsequent requests + raised PoolTimeout. + + Fix: response.close() before retry.sleep(). This test verifies that N + concurrent 429 retries complete without PoolTimeout when max_connections=N. + """ + n_threads = 5 + call_lock = threading.Lock() + thread_calls: dict = {} + + def counting_handle(request: httpx.Request) -> httpx.Response: + tid = threading.get_ident() + with call_lock: + count = thread_calls.get(tid, 0) + thread_calls[tid] = count + 1 + if count == 0: + # First call per thread: 429 with zero Retry-After + return httpx.Response(429, headers={"Retry-After": "0"}, content=b"") + return httpx.Response(200, content=b"{}") + + test_transport = PyatlanSyncTransport( + retry=Retry( + total=3, + backoff_factor=0, + status_forcelist=[429], + allowed_methods=["GET", "POST"], + respect_retry_after_header=True, + ), + limits=httpx.Limits( + max_connections=n_threads, + max_keepalive_connections=2, + keepalive_expiry=5.0, + ), + ) + test_transport._transport.handle_request = counting_handle # type: ignore[method-assign] + + original_transport = client._session._transport + client._session._transport = test_transport + + try: + pool_timeout_errors: list = [] + + def make_request(): + try: + client.user.get_current() + except httpx.PoolTimeout as exc: + pool_timeout_errors.append(str(exc)) + except Exception: + pass # Auth/parse errors from mock 200 body are expected + + with ThreadPoolExecutor(max_workers=n_threads) as executor: + futures = [executor.submit(make_request) for _ in range(n_threads)] + done, not_done = wait(futures, timeout=30.0) + + assert len(not_done) == 0, ( + f"{len(not_done)} of {n_threads} threads hung after 30s — " + "possible connection pool deadlock" + ) + assert not pool_timeout_errors, ( + "PoolTimeout raised — connection slot held open during retry.sleep(): " + f"{pool_timeout_errors}" + ) + finally: + client._session._transport = original_transport diff --git a/tests/unit/test_connection_pool_config.py b/tests/unit/test_connection_pool_config.py new file mode 100644 index 000000000..607c3c8de --- /dev/null +++ b/tests/unit/test_connection_pool_config.py @@ -0,0 +1,245 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. +""" +Regression tests for GOVFOUN-408: httpcore connection pool deadlock fix. + +Root cause: csa-metadata-completeness hung indefinitely on masterc-prd because +two missing configs caused all 100 httpcore connection slots to fill with +CLOSE_WAIT zombies, then 109 worker threads blocked forever in +wait_for_connection(timeout=None). + +These tests verify the two SDK-side fixes: +1. httpx.Timeout has pool=30.0 — threads raise PoolTimeout after 30s instead + of waiting forever. +2. httpx.Limits has keepalive_expiry=30.0 — client retires idle connections + before nginx's keepalive_timeout=75s FIN, preventing CLOSE_WAIT accumulation. +""" + +import contextvars +from unittest.mock import Mock, patch + +import httpx +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.client.transport import PyatlanSyncTransport +from pyatlan.model.assets import AtlasGlossary + + +@pytest.fixture(autouse=True) +def set_env(monkeypatch): + monkeypatch.setenv("ATLAN_BASE_URL", "https://test.atlan.com") + monkeypatch.setenv("ATLAN_API_KEY", "test-api-key") + + +@pytest.fixture() +def client(): + return AtlanClient() + + +def _error_response(): + r = Mock() + r.status_code = 500 + r.text = "internal server error" + return r + + +def _trigger_api_call(client: AtlanClient) -> None: + """Drive any outbound request through _call_api_internal.""" + try: + client.asset.save(AtlasGlossary.creator(name="t")) + except Exception: + pass + + +def _get_httpcore_pool(client: AtlanClient): + transport = client._session._transport + assert isinstance(transport, PyatlanSyncTransport) + return transport._transport._pool + + +# --------------------------------------------------------------------------- +# Pool timeout +# --------------------------------------------------------------------------- + + +@patch.object(AtlanClient, "_session") +def test_pool_timeout_is_30_seconds(mock_session, client): + """pool=30.0 must be set — the live deadlock showed pool=None caused infinite blocking.""" + mock_session.request.return_value = _error_response() + _trigger_api_call(client) + assert mock_session.request.called, "no HTTP request was made" + timeout = mock_session.request.call_args.kwargs["timeout"] + assert timeout.pool == 30.0 + + +@patch.object(AtlanClient, "_session") +def test_pool_timeout_is_not_none(mock_session, client): + """pool timeout must never be None — threading.Event.wait(timeout=None) blocks forever.""" + mock_session.request.return_value = _error_response() + _trigger_api_call(client) + assert mock_session.request.called + timeout = mock_session.request.call_args.kwargs["timeout"] + assert timeout.pool is not None + + +@patch.object(AtlanClient, "_session") +def test_connect_timeout_unchanged(mock_session, client): + """connect timeout must still equal client.connect_timeout (default 30s).""" + mock_session.request.return_value = _error_response() + _trigger_api_call(client) + assert mock_session.request.called + timeout = mock_session.request.call_args.kwargs["timeout"] + assert timeout.connect == client.connect_timeout + + +@patch.object(AtlanClient, "_session") +def test_read_timeout_unchanged(mock_session, client): + """read timeout must still equal client.read_timeout (default 900s).""" + mock_session.request.return_value = _error_response() + _trigger_api_call(client) + assert mock_session.request.called + timeout = mock_session.request.call_args.kwargs["timeout"] + assert timeout.read == client.read_timeout + + +@patch.object(AtlanClient, "_session") +def test_pool_timeout_propagates_not_hangs(mock_session, client): + """httpx.PoolTimeout must propagate — if it were swallowed the workflow would still hang.""" + mock_session.request.side_effect = httpx.PoolTimeout( + "connection pool exhausted", request=None + ) + with pytest.raises(Exception): + client.asset.save(AtlasGlossary.creator(name="t")) + + +# --------------------------------------------------------------------------- +# Transport limits +# --------------------------------------------------------------------------- + + +def test_transport_max_connections_is_50(client): + """max_connections=50 reduces blast radius when CLOSE_WAIT sockets accumulate.""" + assert _get_httpcore_pool(client)._max_connections == 50 + + +def test_transport_keepalive_expiry_is_30_seconds(client): + """keepalive_expiry=30.0 — client closes idle connections before nginx's 75s FIN.""" + assert _get_httpcore_pool(client)._keepalive_expiry == 30.0 + + +def test_transport_max_keepalive_connections_is_10(client): + """max_keepalive_connections=10 bounds idle connections held in the pool.""" + assert _get_httpcore_pool(client)._max_keepalive_connections == 10 + + +# --------------------------------------------------------------------------- +# max_retries context manager +# --------------------------------------------------------------------------- + + +def _capture_transport_limits(client: AtlanClient) -> httpx.Limits: + """Enter max_retries, capture the limits used to construct the new transport.""" + captured: dict = {} + original_init = PyatlanSyncTransport.__init__ + + def capturing_init(self_t, retry=None, client=None, **kwargs): # noqa: ARG001 + if "limits" in kwargs: + captured["limits"] = kwargs["limits"] + original_init(self_t, retry=retry, client=client, **kwargs) + + with patch.object(PyatlanSyncTransport, "__init__", capturing_init): + with client.max_retries(): + pass + + assert "limits" in captured, "max_retries did not pass limits to PyatlanSyncTransport" + return captured["limits"] + + +def test_max_retries_transport_max_connections_is_50(client): + """max_retries context manager must use the same max_connections=50.""" + assert _capture_transport_limits(client).max_connections == 50 + + +def test_max_retries_transport_keepalive_expiry_is_30_seconds(client): + """max_retries context manager must use keepalive_expiry=30.0.""" + assert _capture_transport_limits(client).keepalive_expiry == 30.0 + + +def test_max_retries_transport_max_keepalive_connections_is_10(client): + """max_retries context manager must use max_keepalive_connections=10.""" + assert _capture_transport_limits(client).max_keepalive_connections == 10 + + +def test_max_retries_replaces_transport_inside_context(client): + """max_retries installs a fresh transport for the duration of the context.""" + original = client._session._transport + with client.max_retries(): + assert client._session._transport is not original + + +def test_max_retries_restores_original_transport_on_exit(client): + """max_retries restores the original transport when the context exits.""" + original = client._session._transport + with client.max_retries(): + pass + assert client._session._transport is original + + +# --------------------------------------------------------------------------- +# reset_http_session +# --------------------------------------------------------------------------- + + +class TestResetHttpSession: + def test_creates_new_session(self, client): + """reset_http_session() replaces _session with a brand-new httpx.Client.""" + old_session = client._session + client.reset_http_session() + assert client._session is not old_session + + def test_new_session_has_correct_limits(self, client): + """New session must use the same pool limits as the initial session.""" + client.reset_http_session() + assert _get_httpcore_pool(client)._max_connections == 50 + assert _get_httpcore_pool(client)._keepalive_expiry == 30.0 + assert _get_httpcore_pool(client)._max_keepalive_connections == 10 + + def test_closes_old_session(self, client): + """reset_http_session() calls close() on the old session before replacing it.""" + mock_close = Mock() + client._session.close = mock_close + client.reset_http_session() + mock_close.assert_called_once() + + def test_resets_401_retry_flag(self, client): + """_401_has_retried ContextVar must be reset to False after session rebuild.""" + def _run(): + client._401_has_retried.set(True) + client.reset_http_session() + return client._401_has_retried.get() + + result = contextvars.copy_context().run(_run) + assert result is False + + def test_preserves_proxy_kwargs(self, client): + """proxy and verify attrs are forwarded to the new PyatlanSyncTransport.""" + client.proxy = "http://proxy.example.com:8080" + client.verify = "/path/to/cert.pem" + + captured: dict = {} + original_init = PyatlanSyncTransport.__init__ + + def capturing_init(self_t, retry=None, client=None, **kwargs): # noqa: ARG001 + captured.update(kwargs) + original_init(self_t, retry=retry, client=client, **kwargs) + + # Patch httpx.HTTPTransport so it doesn't try to load the fake cert file + # from disk — we only need to verify the kwargs are forwarded correctly. + with patch.object(PyatlanSyncTransport, "__init__", capturing_init), patch( + "httpx.HTTPTransport", Mock(return_value=Mock()) + ): + client.reset_http_session() + + assert captured.get("proxy") == "http://proxy.example.com:8080" + assert captured.get("verify") == "/path/to/cert.pem" diff --git a/tests/unit/test_transport.py b/tests/unit/test_transport.py index d797ff24e..681b3421e 100644 --- a/tests/unit/test_transport.py +++ b/tests/unit/test_transport.py @@ -2,7 +2,7 @@ # Copyright 2025 Atlan Pte. Ltd. """Unit tests for pyatlan.client.transport and pyatlan.client.common.transport.""" -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest @@ -513,6 +513,73 @@ def test_automation_rerun_prevented_on_first_attempt(self): assert resp.status_code == 200 assert resp.json()["guidAssignments"][TEMP_GUID] == EXISTING_GUID + def test_response_closed_before_retry_sleep(self): + """response.close() must precede retry.sleep() — GOVFOUN-408 root cause fix. + + Open response holds a httpcore connection slot during sleep, causing + PoolTimeout cascade when all slots are occupied by sleeping retry threads. + """ + transport = PyatlanSyncTransport( + retry=Retry( + total=3, + backoff_factor=0, + allowed_methods=["POST"], + status_forcelist=[429], + ), + trust_env=False, + ) + call_order: list = [] + + resp_429 = httpx.Response(429, headers={"Retry-After": "0"}, content=b"") + original_close = resp_429.close + + def patched_close(): + call_order.append("close") + original_close() + + resp_429.close = patched_close + responses = iter([resp_429, httpx.Response(200)]) + transport._transport.handle_request = lambda req: next(responses) + req = httpx.Request("POST", "https://example.com") + + with patch.object( + Retry, "sleep", side_effect=lambda *a, **kw: call_order.append("sleep") + ): + transport.handle_request(req) + + assert "close" in call_order, "response.close() was never called" + assert "sleep" in call_order, "retry.sleep() was never called" + assert call_order.index("close") < call_order.index("sleep"), ( + "response.close() must precede retry.sleep() " + "to release the httpcore connection slot before the sleep" + ) + + def test_no_close_on_exception_retry(self): + """isinstance guard prevents calling .close() on an httpx.HTTPError. + + When a network exception causes a retry, the loop variable holds an + httpx.HTTPError (no .close()). Without the isinstance check this would + raise AttributeError and hide the original exception. + """ + transport = self._make_transport() + call_count = 0 + + def side_effect(req): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.ReadTimeout("timeout", request=req) + return httpx.Response(200) + + transport._transport.handle_request = side_effect + req = httpx.Request("POST", "https://example.com") + + with patch.object(Retry, "sleep"): + resp = transport.handle_request(req) + + assert resp.status_code == 200 + assert call_count == 2 + # --------------------------------------------------------------------------- # PyatlanAsyncTransport retry + duplicate prevention @@ -590,3 +657,57 @@ async def test_automation_rerun_prevented_on_first_attempt(self): inner_mock.assert_not_called() assert resp.status_code == 200 assert resp.json()["guidAssignments"][TEMP_GUID] == EXISTING_GUID + + @pytest.mark.asyncio + async def test_response_closed_before_retry_sleep(self): + """await response.aclose() must precede await retry.asleep() — async mirror of sync fix. + + Same GOVFOUN-408 root cause: open response holds the httpcore connection + slot during the retry sleep, causing PoolTimeout in async contexts. + """ + from pyatlan.client.transport import PyatlanAsyncTransport + + transport = PyatlanAsyncTransport( + retry=Retry( + total=3, + backoff_factor=0, + allowed_methods=["POST"], + status_forcelist=[429], + ), + trust_env=False, + ) + call_order: list = [] + + resp_429 = httpx.Response(429, headers={"Retry-After": "0"}, content=b"") + original_aclose = resp_429.aclose + + async def patched_aclose(): + call_order.append("aclose") + await original_aclose() + + resp_429.aclose = patched_aclose + + call_count = 0 + responses = [resp_429, httpx.Response(200)] + + async def handle_request(req): + nonlocal call_count + result = responses[call_count] + call_count += 1 + return result + + transport._transport.handle_async_request = handle_request + req = httpx.Request("POST", "https://example.com") + + async def recording_asleep(*args, **kwargs): + call_order.append("asleep") + + with patch.object(Retry, "asleep", recording_asleep): + await transport.handle_async_request(req) + + assert "aclose" in call_order, "response.aclose() was never called" + assert "asleep" in call_order, "retry.asleep() was never called" + assert call_order.index("aclose") < call_order.index("asleep"), ( + "response.aclose() must precede retry.asleep() " + "to release the httpcore connection slot before the sleep" + )