diff --git a/kafka/conn.py b/kafka/conn.py index 31e1f8be9..8dd65c1c0 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -301,6 +301,7 @@ def __init__(self, host, port, afi, **configs): if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] self._api_versions_future = None + self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms'] self._sasl_auth_future = None self.last_attempt = 0 self._gai = [] @@ -557,7 +558,8 @@ def _try_api_versions_check(self): else: request = ApiVersionsRequest[version]() future = Future() - response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8)) + self._api_versions_check_timeout /= 2 + response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout) response.add_callback(self._handle_api_versions_response, future) response.add_errback(self._handle_api_versions_failure, future) self._api_versions_future = future @@ -566,7 +568,8 @@ def _try_api_versions_check(self): elif self._check_version_idx < len(self.VERSION_CHECKS): version, request = self.VERSION_CHECKS[self._check_version_idx] future = Future() - response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8)) + self._api_versions_check_timeout /= 2 + response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout) response.add_callback(self._handle_check_version_response, future, version) response.add_errback(self._handle_check_version_failure, future) self._api_versions_future = future @@ -618,7 +621,13 @@ def _handle_api_versions_response(self, future, response): def _handle_api_versions_failure(self, future, ex): future.failure(ex) - self._check_version_idx = 0 + # Modern brokers should not disconnect on unrecognized api-versions request, + # but in case they do we always want to try v0 as a fallback + # otherwise switch to check_version probe. + if self._api_versions_idx > 0: + self._api_versions_idx = 0 + else: + self._check_version_idx = 0 # after failure connection is closed, so state should already be DISCONNECTED def _handle_check_version_response(self, future, version, _response):