Skip to content

Minor Heartbeat updates: catch more exceptions / log configuration / raise KafkaConfigurationError #2618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,28 +909,28 @@ def _handle_heartbeat_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
heartbeat_log.debug("Received successful heartbeat response for group %s",
self.group_id)
self.group_id)
future.success(None)
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
" is either not started or not valid", self.group_id,
self.coordinator())
self.coordinator_dead(error_type())
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
heartbeat_log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
" rebalancing", self.group_id)
self.request_rejoin()
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
heartbeat_log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
" current.", self.group_id)
self.reset_generation()
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
heartbeat_log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
" this consumer needs to re-join")
self.reset_generation()
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
Expand Down Expand Up @@ -1038,16 +1038,16 @@ def close(self, timeout_ms=None):

def run(self):
try:
heartbeat_log.debug('Heartbeat thread started')
heartbeat_log.debug('Heartbeat thread started: %s', self.coordinator.heartbeat)
while not self.closed:
self._run_once()

except ReferenceError:
heartbeat_log.debug('Heartbeat thread closed due to coordinator gc')

except RuntimeError as e:
heartbeat_log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
self.coordinator.group_id, e)
except Exception as e:
heartbeat_log.exception("Heartbeat thread for group %s failed due to unexpected error: %s",
self.coordinator.group_id, e)
self.failed = e

finally:
Expand Down
22 changes: 19 additions & 3 deletions kafka/coordinator/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from __future__ import absolute_import, division

import copy
import logging
import time

from kafka.errors import KafkaConfigurationError

log = logging.getLogger(__name__)


class Heartbeat(object):
DEFAULT_CONFIG = {
Expand All @@ -20,9 +25,13 @@ def __init__(self, **configs):
self.config[key] = configs[key]

if self.config['group_id'] is not None:
assert (self.config['heartbeat_interval_ms']
<= self.config['session_timeout_ms']), (
'Heartbeat interval must be lower than the session timeout')
if self.config['heartbeat_interval_ms'] >= self.config['session_timeout_ms']:
raise KafkaConfigurationError('Heartbeat interval must be lower than the session timeout (%s v %s)' % (
self.config['heartbeat_interval_ms'], self.config['session_timeout_ms']))
if self.config['heartbeat_interval_ms'] > (self.config['session_timeout_ms'] / 3):
log.warning('heartbeat_interval_ms is high relative to session_timeout_ms (%s v %s).'
' Recommend heartbeat interval less than 1/3rd of session timeout',
self.config['heartbeat_interval_ms'], self.config['session_timeout_ms'])

self.last_send = -1 * float('inf')
self.last_receive = -1 * float('inf')
Expand Down Expand Up @@ -66,3 +75,10 @@ def reset_timeouts(self):

def poll_timeout_expired(self):
return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000)

def __str__(self):
return ("<Heartbeat group_id={group_id}"
" heartbeat_interval_ms={heartbeat_interval_ms}"
" session_timeout_ms={session_timeout_ms}"
" max_poll_interval_ms={max_poll_interval_ms}"
" retry_backoff_ms={retry_backoff_ms}>").format(**self.config)
Loading