From 11e6c1c4891d0e3e08d72cabe57701bd03cbfd38 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 13 May 2025 09:45:10 -0700 Subject: [PATCH 1/2] Catch more exceptions in heartbeat thread; log heartbeat configuration on start; raise KafkaConfigurationError --- kafka/coordinator/base.py | 4 ++-- kafka/coordinator/heartbeat.py | 22 +++++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 448659e62..e8899c3bc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1038,14 +1038,14 @@ def close(self, timeout_ms=None): def run(self): try: - heartbeat_log.debug('Heartbeat thread started') + heartbeat_log.debug('Heartbeat thread started: %s', self.coordinator.heartbeat) while not self.closed: self._run_once() except ReferenceError: heartbeat_log.debug('Heartbeat thread closed due to coordinator gc') - except RuntimeError as e: + except Exception as e: heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s", self.coordinator.group_id, e) self.failed = e diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 2f5930b63..edc9f4a36 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -1,8 +1,13 @@ from __future__ import absolute_import, division import copy +import logging import time +from kafka.errors import KafkaConfigurationError + +log = logging.getLogger(__name__) + class Heartbeat(object): DEFAULT_CONFIG = { @@ -20,9 +25,13 @@ def __init__(self, **configs): self.config[key] = configs[key] if self.config['group_id'] is not None: - assert (self.config['heartbeat_interval_ms'] - <= self.config['session_timeout_ms']), ( - 'Heartbeat interval must be lower than the session timeout') + if self.config['heartbeat_interval_ms'] >= self.config['session_timeout_ms']: + raise KafkaConfigurationError('Heartbeat interval must be lower than the session timeout (%s v %s)' % ( + self.config['heartbeat_interval_ms'], self.config['session_timeout_ms'])) + if self.config['heartbeat_interval_ms'] > (self.config['session_timeout_ms'] / 3): + log.warning('heartbeat_interval_ms is high relative to session_timeout_ms (%s v %s).' + ' Recommend heartbeat interval less than 1/3rd of session timeout', + self.config['heartbeat_interval_ms'], self.config['session_timeout_ms']) self.last_send = -1 * float('inf') self.last_receive = -1 * float('inf') @@ -66,3 +75,10 @@ def reset_timeouts(self): def poll_timeout_expired(self): return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) + + def __str__(self): + return ("").format(**self.config) From b3a4894419a2708c16643d43bbcfb3fbf81f6278 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 13 May 2025 10:00:22 -0700 Subject: [PATCH 2/2] fix indent; log exception --- kafka/coordinator/base.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e8899c3bc..6f1d1ee31 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -909,28 +909,28 @@ def _handle_heartbeat_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: heartbeat_log.debug("Received successful heartbeat response for group %s", - self.group_id) + self.group_id) future.success(None) elif error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)" - " is either not started or not valid", self.group_id, + " is either not started or not valid", self.group_id, self.coordinator()) self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: heartbeat_log.warning("Heartbeat failed for group %s because it is" - " rebalancing", self.group_id) + " rebalancing", self.group_id) self.request_rejoin() future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: heartbeat_log.warning("Heartbeat failed for group %s: generation id is not " - " current.", self.group_id) + " current.", self.group_id) self.reset_generation() future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: heartbeat_log.warning("Heartbeat: local member_id was not recognized;" - " this consumer needs to re-join") + " this consumer needs to re-join") self.reset_generation() future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: @@ -1046,8 +1046,8 @@ def run(self): heartbeat_log.debug('Heartbeat thread closed due to coordinator gc') except Exception as e: - heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s", - self.coordinator.group_id, e) + heartbeat_log.exception("Heartbeat thread for group %s failed due to unexpected error: %s", + self.coordinator.group_id, e) self.failed = e finally: