diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index ad644aa52..4aa5c89bc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -33,6 +33,16 @@ def __init__(self, generation_id, member_id, protocol): self.member_id = member_id self.protocol = protocol + @property + def is_valid(self): + return self.generation_id != DEFAULT_GENERATION_ID + + def __eq__(self, other): + return (self.generation_id == other.generation_id and + self.member_id == other.member_id and + self.protocol == other.protocol) + + Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None) @@ -461,7 +471,8 @@ def join_group(self, timeout_ms=None): exception = future.exception if isinstance(exception, (Errors.UnknownMemberIdError, Errors.RebalanceInProgressError, - Errors.IllegalGenerationError)): + Errors.IllegalGenerationError, + Errors.MemberIdRequiredError)): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type @@ -491,7 +502,7 @@ def _send_join_group_request(self): (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) for protocol, metadata in self.group_protocols() ] - version = self._client.api_version(JoinGroupRequest, max_version=3) + version = self._client.api_version(JoinGroupRequest, max_version=4) if version == 0: request = JoinGroupRequest[version]( self.group_id, @@ -585,6 +596,11 @@ def _handle_join_group_response(self, future, send_time, response): future.failure(error) elif error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) + elif error_type is Errors.MemberIdRequiredError: + # Broker requires a concrete member id to be allowed to join the group. Update member id + # and send another join group request in next cycle. + self.reset_generation(response.member_id) + future.failure(error_type()) else: # unexpected error, throw the exception error = error_type() @@ -762,10 +778,10 @@ def generation(self): return None return self._generation - def reset_generation(self): - """Reset the generation and memberId because we have fallen out of the group.""" + def reset_generation(self, member_id=UNKNOWN_MEMBER_ID): + """Reset the generation and member_id because we have fallen out of the group.""" with self._lock: - self._generation = Generation.NO_GENERATION + self._generation = Generation(DEFAULT_GENERATION_ID, member_id, None) self.rejoin_needed = True self.state = MemberState.UNJOINED @@ -799,8 +815,10 @@ def _close_heartbeat_thread(self, timeout_ms=None): self._heartbeat_thread = None def __del__(self): - if hasattr(self, '_heartbeat_thread'): + try: self._close_heartbeat_thread() + except (TypeError, AttributeError): + pass def close(self, timeout_ms=None): """Close the coordinator, leave the current group, @@ -816,7 +834,7 @@ def maybe_leave_group(self, timeout_ms=None): with self._client._lock, self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION): + and self._generation.is_valid): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 3b32590ec..74e19c94b 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -52,6 +52,12 @@ class JoinGroupResponse_v3(Response): SCHEMA = JoinGroupResponse_v2.SCHEMA +class JoinGroupResponse_v4(Response): + API_KEY = 11 + API_VERSION = 4 + SCHEMA = JoinGroupResponse_v3.SCHEMA + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -95,14 +101,22 @@ class JoinGroupRequest_v3(Request): API_VERSION = 3 RESPONSE_TYPE = JoinGroupResponse_v3 SCHEMA = JoinGroupRequest_v2.SCHEMA - UNKNOWN_MEMBER_ID = '' + + +class JoinGroupRequest_v4(Request): + API_KEY = 11 + API_VERSION = 4 + RESPONSE_TYPE = JoinGroupResponse_v4 + SCHEMA = JoinGroupRequest_v3.SCHEMA JoinGroupRequest = [ - JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v3 + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, + JoinGroupRequest_v3, JoinGroupRequest_v4, ] JoinGroupResponse = [ - JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, JoinGroupResponse_v3 + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, + JoinGroupResponse_v3, JoinGroupResponse_v4, ] diff --git a/test/test_coordinator.py b/test/test_coordinator.py index bfd3a2187..251de566a 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -304,7 +304,7 @@ def test_close(mocker, coordinator): coordinator._handle_leave_group_response.assert_called_with('foobar') assert coordinator.generation() is None - assert coordinator._generation is Generation.NO_GENERATION + assert coordinator._generation == Generation.NO_GENERATION assert coordinator.state is MemberState.UNJOINED assert coordinator.rejoin_needed is True