Skip to content

Commit 8f77cc8

Browse files
authored
Update offset commit error handling; use RebalanceInProgressError if applicable (#2623)
1 parent 2f95590 commit 8f77cc8

File tree

3 files changed

+33
-14
lines changed

3 files changed

+33
-14
lines changed

kafka/coordinator/base.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import threading
77
import time
8+
import warnings
89
import weakref
910

1011
from kafka.vendor import six
@@ -797,7 +798,7 @@ def coordinator_dead(self, error):
797798
self.coordinator_id, self.group_id, error)
798799
self.coordinator_id = None
799800

800-
def generation(self):
801+
def generation_if_stable(self):
801802
"""Get the current generation state if the group is stable.
802803
803804
Returns: the current generation or None if the group is unjoined/rebalancing
@@ -807,6 +808,15 @@ def generation(self):
807808
return None
808809
return self._generation
809810

811+
# deprecated
812+
def generation(self):
813+
warnings.warn("Function coordinator.generation() has been renamed to generation_if_stable()",
814+
DeprecationWarning, stacklevel=2)
815+
return self.generation_if_stable()
816+
817+
def rebalance_in_progress(self):
818+
return self.state is MemberState.REBALANCING
819+
810820
def reset_generation(self, member_id=UNKNOWN_MEMBER_ID):
811821
"""Reset the generation and member_id because we have fallen out of the group."""
812822
with self._lock:

kafka/coordinator/consumer.py

+21-5
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,11 @@ def _send_offset_commit_request(self, offsets):
608608
if node_id is None:
609609
return Future().failure(Errors.CoordinatorNotAvailableError)
610610

611+
# Verify node is ready
612+
if not self._client.ready(node_id, metadata_priority=False):
613+
log.debug("Node %s not ready -- failing offset commit request",
614+
node_id)
615+
return Future().failure(Errors.NodeNotReadyError)
611616

612617
# create the offset commit request
613618
offset_data = collections.defaultdict(dict)
@@ -616,7 +621,7 @@ def _send_offset_commit_request(self, offsets):
616621

617622
version = self._client.api_version(OffsetCommitRequest, max_version=6)
618623
if version > 1 and self._subscription.partitions_auto_assigned():
619-
generation = self.generation()
624+
generation = self.generation_if_stable()
620625
else:
621626
generation = Generation.NO_GENERATION
622627

@@ -625,7 +630,18 @@ def _send_offset_commit_request(self, offsets):
625630
# and let the user rejoin the group in poll()
626631
if generation is None:
627632
log.info("Failing OffsetCommit request since the consumer is not part of an active group")
628-
return Future().failure(Errors.CommitFailedError('Group rebalance in progress'))
633+
if self.rebalance_in_progress():
634+
# if the client knows it is already rebalancing, we can use RebalanceInProgressError instead of
635+
# CommitFailedError to indicate this is not a fatal error
636+
return Future().failure(Errors.RebalanceInProgressError(
637+
"Offset commit cannot be completed since the"
638+
" consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance"
639+
" by calling poll() and then retry the operation."))
640+
else:
641+
return Future().failure(Errors.CommitFailedError(
642+
"Offset commit cannot be completed since the"
643+
" consumer is not part of an active group for auto partition assignment; it is likely that the consumer"
644+
" was kicked out of the group."))
629645

630646
if version == 0:
631647
request = OffsetCommitRequest[version](
@@ -756,7 +772,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
756772
# However, we do not need to reset generations and just request re-join, such that
757773
# if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
758774
self.request_rejoin()
759-
future.failure(Errors.CommitFailedError('Group rebalance in progress'))
775+
future.failure(Errors.CommitFailedError(error_type()))
760776
return
761777
elif error_type in (Errors.UnknownMemberIdError,
762778
Errors.IllegalGenerationError):
@@ -765,7 +781,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
765781
log.warning("OffsetCommit for group %s failed: %s",
766782
self.group_id, error)
767783
self.reset_generation()
768-
future.failure(Errors.CommitFailedError())
784+
future.failure(Errors.CommitFailedError(error_type()))
769785
return
770786
else:
771787
log.error("Group %s failed to commit partition %s at offset"
@@ -804,7 +820,7 @@ def _send_offset_fetch_request(self, partitions):
804820
return Future().failure(Errors.CoordinatorNotAvailableError)
805821

806822
# Verify node is ready
807-
if not self._client.ready(node_id):
823+
if not self._client.ready(node_id, metadata_priority=False):
808824
log.debug("Node %s not ready -- failing offset fetch request",
809825
node_id)
810826
return Future().failure(Errors.NodeNotReadyError)

kafka/errors.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,7 @@ class CommitFailedError(KafkaError):
2424
def __init__(self, *args):
2525
if not args:
2626
args = ("Commit cannot be completed since the group has already"
27-
" rebalanced and assigned the partitions to another member."
28-
" This means that the time between subsequent calls to poll()"
29-
" was longer than the configured max_poll_interval_ms, which"
30-
" typically implies that the poll loop is spending too much"
31-
" time message processing. You can address this either by"
32-
" increasing the rebalance timeout with max_poll_interval_ms,"
33-
" or by reducing the maximum size of batches returned in poll()"
34-
" with max_poll_records.",)
27+
" rebalanced and assigned the partitions to another member.",)
3528
super(CommitFailedError, self).__init__(*args)
3629

3730

0 commit comments

Comments
 (0)