Skip to content

Commit fb04626

Browse files
authored
KIP-394: handle MEMBER_ID_REQUIRED error w/ second join group request (#2598)
1 parent b1dae2e commit fb04626

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

kafka/coordinator/base.py

+25-7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ def __init__(self, generation_id, member_id, protocol):
3333
self.member_id = member_id
3434
self.protocol = protocol
3535

36+
@property
37+
def is_valid(self):
38+
return self.generation_id != DEFAULT_GENERATION_ID
39+
40+
def __eq__(self, other):
41+
return (self.generation_id == other.generation_id and
42+
self.member_id == other.member_id and
43+
self.protocol == other.protocol)
44+
45+
3646
Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None)
3747

3848

@@ -461,7 +471,8 @@ def join_group(self, timeout_ms=None):
461471
exception = future.exception
462472
if isinstance(exception, (Errors.UnknownMemberIdError,
463473
Errors.RebalanceInProgressError,
464-
Errors.IllegalGenerationError)):
474+
Errors.IllegalGenerationError,
475+
Errors.MemberIdRequiredError)):
465476
continue
466477
elif not future.retriable():
467478
raise exception # pylint: disable-msg=raising-bad-type
@@ -491,7 +502,7 @@ def _send_join_group_request(self):
491502
(protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
492503
for protocol, metadata in self.group_protocols()
493504
]
494-
version = self._client.api_version(JoinGroupRequest, max_version=3)
505+
version = self._client.api_version(JoinGroupRequest, max_version=4)
495506
if version == 0:
496507
request = JoinGroupRequest[version](
497508
self.group_id,
@@ -585,6 +596,11 @@ def _handle_join_group_response(self, future, send_time, response):
585596
future.failure(error)
586597
elif error_type is Errors.GroupAuthorizationFailedError:
587598
future.failure(error_type(self.group_id))
599+
elif error_type is Errors.MemberIdRequiredError:
600+
# Broker requires a concrete member id to be allowed to join the group. Update member id
601+
# and send another join group request in next cycle.
602+
self.reset_generation(response.member_id)
603+
future.failure(error_type())
588604
else:
589605
# unexpected error, throw the exception
590606
error = error_type()
@@ -762,10 +778,10 @@ def generation(self):
762778
return None
763779
return self._generation
764780

765-
def reset_generation(self):
766-
"""Reset the generation and memberId because we have fallen out of the group."""
781+
def reset_generation(self, member_id=UNKNOWN_MEMBER_ID):
782+
"""Reset the generation and member_id because we have fallen out of the group."""
767783
with self._lock:
768-
self._generation = Generation.NO_GENERATION
784+
self._generation = Generation(DEFAULT_GENERATION_ID, member_id, None)
769785
self.rejoin_needed = True
770786
self.state = MemberState.UNJOINED
771787

@@ -799,8 +815,10 @@ def _close_heartbeat_thread(self, timeout_ms=None):
799815
self._heartbeat_thread = None
800816

801817
def __del__(self):
802-
if hasattr(self, '_heartbeat_thread'):
818+
try:
803819
self._close_heartbeat_thread()
820+
except (TypeError, AttributeError):
821+
pass
804822

805823
def close(self, timeout_ms=None):
806824
"""Close the coordinator, leave the current group,
@@ -816,7 +834,7 @@ def maybe_leave_group(self, timeout_ms=None):
816834
with self._client._lock, self._lock:
817835
if (not self.coordinator_unknown()
818836
and self.state is not MemberState.UNJOINED
819-
and self._generation is not Generation.NO_GENERATION):
837+
and self._generation.is_valid):
820838

821839
# this is a minimal effort attempt to leave the group. we do not
822840
# attempt any resending if the request fails or times out.

kafka/protocol/group.py

+17-3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ class JoinGroupResponse_v3(Response):
5252
SCHEMA = JoinGroupResponse_v2.SCHEMA
5353

5454

55+
class JoinGroupResponse_v4(Response):
56+
API_KEY = 11
57+
API_VERSION = 4
58+
SCHEMA = JoinGroupResponse_v3.SCHEMA
59+
60+
5561
class JoinGroupRequest_v0(Request):
5662
API_KEY = 11
5763
API_VERSION = 0
@@ -95,14 +101,22 @@ class JoinGroupRequest_v3(Request):
95101
API_VERSION = 3
96102
RESPONSE_TYPE = JoinGroupResponse_v3
97103
SCHEMA = JoinGroupRequest_v2.SCHEMA
98-
UNKNOWN_MEMBER_ID = ''
104+
105+
106+
class JoinGroupRequest_v4(Request):
107+
API_KEY = 11
108+
API_VERSION = 4
109+
RESPONSE_TYPE = JoinGroupResponse_v4
110+
SCHEMA = JoinGroupRequest_v3.SCHEMA
99111

100112

101113
JoinGroupRequest = [
102-
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v3
114+
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2,
115+
JoinGroupRequest_v3, JoinGroupRequest_v4,
103116
]
104117
JoinGroupResponse = [
105-
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, JoinGroupResponse_v3
118+
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2,
119+
JoinGroupResponse_v3, JoinGroupResponse_v4,
106120
]
107121

108122

test/test_coordinator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ def test_close(mocker, coordinator):
304304
coordinator._handle_leave_group_response.assert_called_with('foobar')
305305

306306
assert coordinator.generation() is None
307-
assert coordinator._generation is Generation.NO_GENERATION
307+
assert coordinator._generation == Generation.NO_GENERATION
308308
assert coordinator.state is MemberState.UNJOINED
309309
assert coordinator.rejoin_needed is True
310310

0 commit comments

Comments
 (0)