Skip to content

Commit d2d1cdd

Browse files
authoredApr 7, 2025
Rename Coordinator errors to generic not group (#2585)
1 parent c2fe7c3 commit d2d1cdd

File tree

6 files changed

+43
-48
lines changed

6 files changed

+43
-48
lines changed
 

‎kafka/admin/client.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1460,9 +1460,9 @@ def list_consumer_groups(self, broker_ids=None):
14601460
list: List of tuples of Consumer Groups.
14611461
14621462
Raises:
1463-
GroupCoordinatorNotAvailableError: The coordinator is not
1463+
CoordinatorNotAvailableError: The coordinator is not
14641464
available, so cannot process requests.
1465-
GroupLoadInProgressError: The coordinator is loading and
1465+
CoordinatorLoadInProgressError: The coordinator is loading and
14661466
hence can't process requests.
14671467
"""
14681468
# While we return a list, internally use a set to prevent duplicates

‎kafka/coordinator/base.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ def _send_join_group_request(self):
478478
group leader
479479
"""
480480
if self.coordinator_unknown():
481-
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
481+
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
482482
return Future().failure(e)
483483

484484
elif not self._client.ready(self.coordinator_id, metadata_priority=False):
@@ -555,7 +555,7 @@ def _handle_join_group_response(self, future, send_time, response):
555555
else:
556556
self._on_join_follower().chain(future)
557557

558-
elif error_type is Errors.GroupLoadInProgressError:
558+
elif error_type is Errors.CoordinatorLoadInProgressError:
559559
log.debug("Attempt to join group %s rejected since coordinator %s"
560560
" is loading the group.", self.group_id, self.coordinator_id)
561561
# backoff and retry
@@ -567,8 +567,8 @@ def _handle_join_group_response(self, future, send_time, response):
567567
log.debug("Attempt to join group %s failed due to unknown member id",
568568
self.group_id)
569569
future.failure(error)
570-
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
571-
Errors.NotCoordinatorForGroupError):
570+
elif error_type in (Errors.CoordinatorNotAvailableError,
571+
Errors.NotCoordinatorError):
572572
# re-discover the coordinator and retry with backoff
573573
self.coordinator_dead(error_type())
574574
log.debug("Attempt to join group %s failed due to obsolete "
@@ -636,7 +636,7 @@ def _on_join_leader(self, response):
636636

637637
def _send_sync_group_request(self, request):
638638
if self.coordinator_unknown():
639-
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
639+
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
640640
return Future().failure(e)
641641

642642
# We assume that coordinator is ready if we're sending SyncGroup
@@ -674,8 +674,8 @@ def _handle_sync_group_response(self, future, send_time, response):
674674
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
675675
self.reset_generation()
676676
future.failure(error)
677-
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
678-
Errors.NotCoordinatorForGroupError):
677+
elif error_type in (Errors.CoordinatorNotAvailableError,
678+
Errors.NotCoordinatorError):
679679
error = error_type()
680680
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
681681
self.coordinator_dead(error)
@@ -732,7 +732,7 @@ def _handle_group_coordinator_response(self, future, response):
732732
self.heartbeat.reset_timeouts()
733733
future.success(self.coordinator_id)
734734

735-
elif error_type is Errors.GroupCoordinatorNotAvailableError:
735+
elif error_type is Errors.CoordinatorNotAvailableError:
736736
log.debug("Group Coordinator Not Available; retry")
737737
future.failure(error_type())
738738
elif error_type is Errors.GroupAuthorizationFailedError:
@@ -842,7 +842,7 @@ def _handle_leave_group_response(self, response):
842842
def _send_heartbeat_request(self):
843843
"""Send a heartbeat request"""
844844
if self.coordinator_unknown():
845-
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
845+
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
846846
return Future().failure(e)
847847

848848
elif not self._client.ready(self.coordinator_id, metadata_priority=False):
@@ -869,8 +869,8 @@ def _handle_heartbeat_response(self, future, send_time, response):
869869
log.debug("Received successful heartbeat response for group %s",
870870
self.group_id)
871871
future.success(None)
872-
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
873-
Errors.NotCoordinatorForGroupError):
872+
elif error_type in (Errors.CoordinatorNotAvailableError,
873+
Errors.NotCoordinatorError):
874874
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
875875
" is either not started or not valid", self.group_id,
876876
self.coordinator())

‎kafka/coordinator/consumer.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ def _send_offset_commit_request(self, offsets):
590590

591591
node_id = self.coordinator()
592592
if node_id is None:
593-
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
593+
return Future().failure(Errors.CoordinatorNotAvailableError)
594594

595595

596596
# create the offset commit request
@@ -719,14 +719,14 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
719719
" %s", self.group_id, tp, error_type.__name__)
720720
future.failure(error_type())
721721
return
722-
elif error_type is Errors.GroupLoadInProgressError:
722+
elif error_type is Errors.CoordinatorLoadInProgressError:
723723
# just retry
724724
log.debug("OffsetCommit for group %s failed: %s",
725725
self.group_id, error_type.__name__)
726726
future.failure(error_type(self.group_id))
727727
return
728-
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
729-
Errors.NotCoordinatorForGroupError,
728+
elif error_type in (Errors.CoordinatorNotAvailableError,
729+
Errors.NotCoordinatorError,
730730
Errors.RequestTimedOutError):
731731
log.debug("OffsetCommit for group %s failed: %s",
732732
self.group_id, error_type.__name__)
@@ -777,7 +777,7 @@ def _send_offset_fetch_request(self, partitions):
777777

778778
node_id = self.coordinator()
779779
if node_id is None:
780-
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
780+
return Future().failure(Errors.CoordinatorNotAvailableError)
781781

782782
# Verify node is ready
783783
if not self._client.ready(node_id):
@@ -812,10 +812,10 @@ def _handle_offset_fetch_response(self, future, response):
812812
error_type = Errors.for_code(response.error_code)
813813
log.debug("Offset fetch failed: %s", error_type.__name__)
814814
error = error_type()
815-
if error_type is Errors.GroupLoadInProgressError:
815+
if error_type is Errors.CoordinatorLoadInProgressError:
816816
# Retry
817817
future.failure(error)
818-
elif error_type is Errors.NotCoordinatorForGroupError:
818+
elif error_type is Errors.NotCoordinatorError:
819819
# re-discover the coordinator and retry
820820
self.coordinator_dead(error)
821821
future.failure(error)
@@ -841,10 +841,10 @@ def _handle_offset_fetch_response(self, future, response):
841841
error = error_type()
842842
log.debug("Group %s failed to fetch offset for partition"
843843
" %s: %s", self.group_id, tp, error)
844-
if error_type is Errors.GroupLoadInProgressError:
844+
if error_type is Errors.CoordinatorLoadInProgressError:
845845
# just retry
846846
future.failure(error)
847-
elif error_type is Errors.NotCoordinatorForGroupError:
847+
elif error_type is Errors.NotCoordinatorError:
848848
# re-discover the coordinator and retry
849849
self.coordinator_dead(error)
850850
future.failure(error)

‎kafka/errors.py

+12-17
Original file line numberDiff line numberDiff line change
@@ -218,33 +218,28 @@ class NetworkExceptionError(BrokerResponseError):
218218
invalid_metadata = True
219219

220220

221-
class GroupLoadInProgressError(BrokerResponseError):
221+
class CoordinatorLoadInProgressError(BrokerResponseError):
222222
errno = 14
223-
message = 'OFFSETS_LOAD_IN_PROGRESS'
224-
description = ('The broker returns this error code for an offset fetch'
225-
' request if it is still loading offsets (after a leader'
226-
' change for that offsets topic partition), or in response'
227-
' to group membership requests (such as heartbeats) when'
228-
' group metadata is being loaded by the coordinator.')
223+
message = 'COORDINATOR_LOAD_IN_PROGRESS'
224+
description = ('The broker returns this error code for txn or group requests,'
225+
' when the coordinator is loading and hence cant process requests')
229226
retriable = True
230227

231228

232-
class GroupCoordinatorNotAvailableError(BrokerResponseError):
229+
class CoordinatorNotAvailableError(BrokerResponseError):
233230
errno = 15
234-
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE'
235-
description = ('The broker returns this error code for group coordinator'
236-
' requests, offset commits, and most group management'
231+
message = 'COORDINATOR_NOT_AVAILABLE'
232+
description = ('The broker returns this error code for consumer and transaction'
237233
' requests if the offsets topic has not yet been created, or'
238-
' if the group coordinator is not active.')
234+
' if the group/txn coordinator is not active.')
239235
retriable = True
240236

241237

242-
class NotCoordinatorForGroupError(BrokerResponseError):
238+
class NotCoordinatorError(BrokerResponseError):
243239
errno = 16
244-
message = 'NOT_COORDINATOR_FOR_CONSUMER'
245-
description = ('The broker returns this error code if it receives an offset'
246-
' fetch or commit request for a group that it is not a'
247-
' coordinator for.')
240+
message = 'NOT_COORDINATOR'
241+
description = ('The broker returns this error code if it is not the correct'
242+
' coordinator for the specified consumer or transaction group')
248243
retriable = True
249244

250245

‎test/test_admin_integration.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from kafka.admin import (
1010
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
1111
from kafka.errors import (
12-
BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
12+
BrokerResponseError, KafkaError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError,
1313
GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError)
1414

1515

@@ -150,7 +150,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
150150
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
151151
"""Tests that the describe consumer group call fails if the group coordinator is not available
152152
"""
153-
with pytest.raises(GroupCoordinatorNotAvailableError):
153+
with pytest.raises(CoordinatorNotAvailableError):
154154
kafka_admin_client.describe_consumer_groups(['test'])
155155

156156

‎test/test_coordinator.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets):
444444
# No coordinator
445445
ret = patched_coord._send_offset_commit_request(offsets)
446446
assert ret.failed()
447-
assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
447+
assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError)
448448

449449

450450
@pytest.mark.parametrize('api_version,req_type', [
@@ -497,11 +497,11 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
497497
(OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
498498
Errors.InvalidCommitOffsetSizeError, False),
499499
(OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
500-
Errors.GroupLoadInProgressError, False),
500+
Errors.CoordinatorLoadInProgressError, False),
501501
(OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
502-
Errors.GroupCoordinatorNotAvailableError, True),
502+
Errors.CoordinatorNotAvailableError, True),
503503
(OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
504-
Errors.NotCoordinatorForGroupError, True),
504+
Errors.NotCoordinatorError, True),
505505
(OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
506506
Errors.RequestTimedOutError, True),
507507
(OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
@@ -557,7 +557,7 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions):
557557
# No coordinator
558558
ret = patched_coord._send_offset_fetch_request(partitions)
559559
assert ret.failed()
560-
assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
560+
assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError)
561561

562562

563563
@pytest.mark.parametrize('api_version,req_type', [
@@ -606,9 +606,9 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
606606

607607
@pytest.mark.parametrize('response,error,dead', [
608608
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 14), (1, 234, '', 14)])]),
609-
Errors.GroupLoadInProgressError, False),
609+
Errors.CoordinatorLoadInProgressError, False),
610610
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 16), (1, 234, '', 16)])]),
611-
Errors.NotCoordinatorForGroupError, True),
611+
Errors.NotCoordinatorError, True),
612612
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 25), (1, 234, '', 25)])]),
613613
Errors.UnknownMemberIdError, False),
614614
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 22), (1, 234, '', 22)])]),

0 commit comments

Comments
 (0)