Skip to content

Update offset commit error handling; use RebalanceInProgressError if applicable #2623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import threading
import time
import warnings
import weakref

from kafka.vendor import six
Expand Down Expand Up @@ -792,7 +793,7 @@ def coordinator_dead(self, error):
self.coordinator_id, self.group_id, error)
self.coordinator_id = None

def generation(self):
def generation_if_stable(self):
"""Get the current generation state if the group is stable.

Returns: the current generation or None if the group is unjoined/rebalancing
Expand All @@ -802,6 +803,15 @@ def generation(self):
return None
return self._generation

# deprecated
def generation(self):
warnings.warn("Function coordinator.generation() has been renamed to generation_if_stable()",
DeprecationWarning, stacklevel=2)
return self.generation_if_stable()

def rebalance_in_progress(self):
return self.state is MemberState.REBALANCING

def reset_generation(self, member_id=UNKNOWN_MEMBER_ID):
"""Reset the generation and member_id because we have fallen out of the group."""
with self._lock:
Expand Down
26 changes: 21 additions & 5 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ def _send_offset_commit_request(self, offsets):
if node_id is None:
return Future().failure(Errors.CoordinatorNotAvailableError)

# Verify node is ready
if not self._client.ready(node_id, metadata_priority=False):
log.debug("Node %s not ready -- failing offset commit request",
node_id)
return Future().failure(Errors.NodeNotReadyError)

# create the offset commit request
offset_data = collections.defaultdict(dict)
Expand All @@ -616,7 +621,7 @@ def _send_offset_commit_request(self, offsets):

version = self._client.api_version(OffsetCommitRequest, max_version=6)
if version > 1 and self._subscription.partitions_auto_assigned():
generation = self.generation()
generation = self.generation_if_stable()
else:
generation = Generation.NO_GENERATION

Expand All @@ -625,7 +630,18 @@ def _send_offset_commit_request(self, offsets):
# and let the user rejoin the group in poll()
if generation is None:
log.info("Failing OffsetCommit request since the consumer is not part of an active group")
return Future().failure(Errors.CommitFailedError('Group rebalance in progress'))
if self.rebalance_in_progress():
# if the client knows it is already rebalancing, we can use RebalanceInProgressError instead of
# CommitFailedError to indicate this is not a fatal error
return Future().failure(Errors.RebalanceInProgressError(
"Offset commit cannot be completed since the"
" consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance"
" by calling poll() and then retry the operation."))
else:
return Future().failure(Errors.CommitFailedError(
"Offset commit cannot be completed since the"
" consumer is not part of an active group for auto partition assignment; it is likely that the consumer"
" was kicked out of the group."))

if version == 0:
request = OffsetCommitRequest[version](
Expand Down Expand Up @@ -756,7 +772,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
# However, we do not need to reset generations and just request re-join, such that
# if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
self.request_rejoin()
future.failure(Errors.CommitFailedError('Group rebalance in progress'))
future.failure(Errors.CommitFailedError(error_type()))
return
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError):
Expand All @@ -765,7 +781,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
log.warning("OffsetCommit for group %s failed: %s",
self.group_id, error)
self.reset_generation()
future.failure(Errors.CommitFailedError())
future.failure(Errors.CommitFailedError(error_type()))
return
else:
log.error("Group %s failed to commit partition %s at offset"
Expand Down Expand Up @@ -804,7 +820,7 @@ def _send_offset_fetch_request(self, partitions):
return Future().failure(Errors.CoordinatorNotAvailableError)

# Verify node is ready
if not self._client.ready(node_id):
if not self._client.ready(node_id, metadata_priority=False):
log.debug("Node %s not ready -- failing offset fetch request",
node_id)
return Future().failure(Errors.NodeNotReadyError)
Expand Down
9 changes: 1 addition & 8 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@ class CommitFailedError(KafkaError):
def __init__(self, *args):
if not args:
args = ("Commit cannot be completed since the group has already"
" rebalanced and assigned the partitions to another member."
" This means that the time between subsequent calls to poll()"
" was longer than the configured max_poll_interval_ms, which"
" typically implies that the poll loop is spending too much"
" time message processing. You can address this either by"
" increasing the rebalance timeout with max_poll_interval_ms,"
" or by reducing the maximum size of batches returned in poll()"
" with max_poll_records.",)
" rebalanced and assigned the partitions to another member.",)
super(CommitFailedError, self).__init__(*args)


Expand Down
Loading