Skip to content

Commit 842f398

Browse files
authored
Always try ApiVersionsRequest v0, even on broker disconnect (#2603)
1 parent c35c161 commit 842f398

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

kafka/conn.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ def __init__(self, host, port, afi, **configs):
301301
if self.config['ssl_context'] is not None:
302302
self._ssl_context = self.config['ssl_context']
303303
self._api_versions_future = None
304+
self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms']
304305
self._sasl_auth_future = None
305306
self.last_attempt = 0
306307
self._gai = []
@@ -557,7 +558,8 @@ def _try_api_versions_check(self):
557558
else:
558559
request = ApiVersionsRequest[version]()
559560
future = Future()
560-
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
561+
self._api_versions_check_timeout /= 2
562+
response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout)
561563
response.add_callback(self._handle_api_versions_response, future)
562564
response.add_errback(self._handle_api_versions_failure, future)
563565
self._api_versions_future = future
@@ -566,7 +568,8 @@ def _try_api_versions_check(self):
566568
elif self._check_version_idx < len(self.VERSION_CHECKS):
567569
version, request = self.VERSION_CHECKS[self._check_version_idx]
568570
future = Future()
569-
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
571+
self._api_versions_check_timeout /= 2
572+
response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout)
570573
response.add_callback(self._handle_check_version_response, future, version)
571574
response.add_errback(self._handle_check_version_failure, future)
572575
self._api_versions_future = future
@@ -618,7 +621,13 @@ def _handle_api_versions_response(self, future, response):
618621

619622
def _handle_api_versions_failure(self, future, ex):
620623
future.failure(ex)
621-
self._check_version_idx = 0
624+
# Modern brokers should not disconnect on unrecognized api-versions request,
625+
# but in case they do we always want to try v0 as a fallback
626+
# otherwise switch to check_version probe.
627+
if self._api_versions_idx > 0:
628+
self._api_versions_idx = 0
629+
else:
630+
self._check_version_idx = 0
622631
# after failure connection is closed, so state should already be DISCONNECTED
623632

624633
def _handle_check_version_response(self, future, version, _response):

0 commit comments

Comments
 (0)