Skip to content

More coordinator / heartbeat logging #2621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __eq__(self, other):
self.member_id == other.member_id and
self.protocol == other.protocol)

def __str__(self):
return "<Generation %s (member_id: %s, protocol: %s)>" % (self.generation_id, self.member_id, self.protocol)


Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None)

Expand Down Expand Up @@ -398,17 +401,16 @@ def _handle_join_success(self, member_assignment_bytes):
# will be invoked even if the consumer is woken up before
# finishing the rebalance
with self._lock:
log.info("Successfully joined group %s with generation %s",
self.group_id, self._generation.generation_id)
self.state = MemberState.STABLE
if self._heartbeat_thread:
self._heartbeat_thread.enable()

def _handle_join_failure(self, _):
def _handle_join_failure(self, exception):
# we handle failures below after the request finishes.
# if the join completes after having been woken up,
# the exception is ignored and we will rejoin
with self._lock:
log.info("Failed to join group %s: %s", self.group_id, exception)
self.state = MemberState.UNJOINED

def ensure_active_group(self, timeout_ms=None):
Expand Down Expand Up @@ -566,10 +568,9 @@ def _failed_request(self, node_id, request, future, error):
future.failure(error)

def _handle_join_group_response(self, future, send_time, response):
log.debug("Received JoinGroup response: %s", response)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
if self._sensors:
self._sensors.join_latency.record((time.time() - send_time) * 1000)
with self._lock:
Expand All @@ -583,6 +584,7 @@ def _handle_join_group_response(self, future, send_time, response):
response.member_id,
response.group_protocol)

log.info("Successfully joined group %s %s", self.group_id, self._generation)
if response.leader_id == response.member_id:
log.info("Elected group leader -- performing partition"
" assignments using %s", self._generation.protocol)
Expand All @@ -591,24 +593,24 @@ def _handle_join_group_response(self, future, send_time, response):
self._on_join_follower().chain(future)

elif error_type is Errors.CoordinatorLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator %s"
" is loading the group.", self.group_id, self.coordinator_id)
log.info("Attempt to join group %s rejected since coordinator %s"
" is loading the group.", self.group_id, self.coordinator_id)
# backoff and retry
future.failure(error_type(response))
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
error = error_type(self._generation.member_id)
self.reset_generation()
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
log.info("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
# re-discover the coordinator and retry with backoff
self.coordinator_dead(error_type())
log.debug("Attempt to join group %s failed due to obsolete "
"coordinator information: %s", self.group_id,
error_type.__name__)
log.info("Attempt to join group %s failed due to obsolete "
"coordinator information: %s", self.group_id,
error_type.__name__)
future.failure(error_type())
elif error_type in (Errors.InconsistentGroupProtocolError,
Errors.InvalidSessionTimeoutError,
Expand All @@ -619,12 +621,21 @@ def _handle_join_group_response(self, future, send_time, response):
self.group_id, error)
future.failure(error)
elif error_type is Errors.GroupAuthorizationFailedError:
log.error("Attempt to join group %s failed due to group authorization error",
self.group_id)
future.failure(error_type(self.group_id))
elif error_type is Errors.MemberIdRequiredError:
# Broker requires a concrete member id to be allowed to join the group. Update member id
# and send another join group request in next cycle.
log.info("Received member id %s for group %s; will retry join-group",
response.member_id, self.group_id)
self.reset_generation(response.member_id)
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.info("Attempt to join group %s failed due to RebalanceInProgressError,"
" which could indicate a replication timeout on the broker. Will retry.",
self.group_id)
future.failure(error_type())
else:
# unexpected error, throw the exception
error = error_type()
Expand Down Expand Up @@ -693,6 +704,7 @@ def _send_sync_group_request(self, request):
return future

def _handle_sync_group_response(self, future, send_time, response):
log.debug("Received SyncGroup response: %s", response)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
if self._sensors:
Expand Down Expand Up @@ -739,13 +751,13 @@ def _send_group_coordinator_request(self):
e = Errors.NodeNotReadyError(node_id)
return Future().failure(e)

log.debug("Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
version = self._client.api_version(FindCoordinatorRequest, max_version=2)
if version == 0:
request = FindCoordinatorRequest[version](self.group_id)
else:
request = FindCoordinatorRequest[version](self.group_id, 0)
log.debug("Sending group coordinator request for group %s to broker %s: %s",
self.group_id, node_id, request)
future = Future()
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_group_coordinator_response, future)
Expand Down Expand Up @@ -865,6 +877,7 @@ def maybe_leave_group(self, timeout_ms=None):
log.info('Leaving consumer group (%s).', self.group_id)
version = self._client.api_version(LeaveGroupRequest, max_version=2)
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
log.debug('Sending LeaveGroupRequest to %s: %s', self.coordinator_id, request)
future = self._client.send(self.coordinator_id, request)
future.add_callback(self._handle_leave_group_response)
future.add_errback(log.error, "LeaveGroup request failed: %s")
Expand All @@ -873,10 +886,11 @@ def maybe_leave_group(self, timeout_ms=None):
self.reset_generation()

def _handle_leave_group_response(self, response):
log.debug("Received LeaveGroupResponse: %s", response)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("LeaveGroup request for group %s returned successfully",
self.group_id)
log.info("LeaveGroup request for group %s returned successfully",
self.group_id)
else:
log.error("LeaveGroup request for group %s failed with error: %s",
self.group_id, error_type())
Expand All @@ -896,7 +910,7 @@ def _send_heartbeat_request(self):
request = HeartbeatRequest[version](self.group_id,
self._generation.generation_id,
self._generation.member_id)
heartbeat_log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future, time.time())
Expand All @@ -907,10 +921,10 @@ def _send_heartbeat_request(self):
def _handle_heartbeat_response(self, future, send_time, response):
if self._sensors:
self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
heartbeat_log.debug("Received heartbeat response for group %s: %s",
self.group_id, response)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
heartbeat_log.debug("Received successful heartbeat response for group %s",
self.group_id)
future.success(None)
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
Expand Down Expand Up @@ -1099,7 +1113,13 @@ def _run_once(self):
# the poll timeout has expired, which means that the
# foreground thread has stalled in between calls to
# poll(), so we explicitly leave the group.
heartbeat_log.warning('Heartbeat poll expired, leaving group')
heartbeat_log.warning(
"Consumer poll timeout has expired. This means the time between subsequent calls to poll()"
" was longer than the configured max_poll_interval_ms, which typically implies that"
" the poll loop is spending too much time processing messages. You can address this"
" either by increasing max_poll_interval_ms or by reducing the maximum size of batches"
" returned in poll() with max_poll_records."
)
self.coordinator.maybe_leave_group()

elif not self.coordinator.heartbeat.should_heartbeat():
Expand All @@ -1110,10 +1130,12 @@ def _run_once(self):
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

else:
heartbeat_log.debug('Sending heartbeat for group %s %s', self.coordinator.group_id, self.coordinator._generation)
self.coordinator.heartbeat.sent_heartbeat()
future = self.coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)
future.add_errback(self._handle_heartbeat_failure)

finally:
self.coordinator._lock.release()
try:
Expand All @@ -1124,6 +1146,7 @@ def _run_once(self):

def _handle_heartbeat_success(self, result):
with self.coordinator._lock:
heartbeat_log.debug('Heartbeat success')
self.coordinator.heartbeat.received_heartbeat()

def _handle_heartbeat_failure(self, exception):
Expand All @@ -1134,8 +1157,10 @@ def _handle_heartbeat_failure(self, exception):
# member in the group for as long as the duration of the
# rebalance timeout. If we stop sending heartbeats, however,
# then the session timeout may expire before we can rejoin.
heartbeat_log.debug('Treating RebalanceInProgressError as successful heartbeat')
self.coordinator.heartbeat.received_heartbeat()
else:
heartbeat_log.debug('Heartbeat failure: %s', exception)
self.coordinator.heartbeat.fail_heartbeat()
# wake up the thread if it's sleeping to reschedule the heartbeat
self.coordinator._lock.notify()
Loading