KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol (1/N)#22245
Conversation
|
This PR is part of #21565 and implements only server-side protocol and test cases. |
5d0997c to
441be69
Compare
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
@lucasbru Hi! |
| .build(); | ||
|
|
||
| // Generate the records to replace the member. We don't care about the regular expression | ||
| // here because it is taken care of later after the static membership replacement. |
There was a problem hiding this comment.
nit: this comment is carried over from the consumer helper - streams has no regex subscriptions
| // here because it is taken care of later after the static membership replacement. | ||
| replaceStreamsMember(records, group, existingStaticMemberOrNull, newMember); | ||
|
|
||
| log.info("[GroupId {}][MemberId {}] Static member with instance id {} re-joins the stream group " + |
| * when a member is first created. | ||
| * | ||
| * @param groupId The group id. | ||
| * @param instanceId The instance id. |
There was a problem hiding this comment.
can we expand this? null vs non-null instanceId changes the return semantics here (always true on metadata change vs only on epoch-relevant change per KIP-1071), which is non-obvious from the doc
| * @param memberId The Member ID. | ||
| * @return The member corresponding to the given member ID or null if it does not exist | ||
| */ | ||
| public StreamsGroupMember dynamicMember(String memberId) { |
There was a problem hiding this comment.
this returns any member from the map, not just dynamic ones - the name and javadoc are a bit misleading. Today only called when instanceId == null so it works, but worth renaming or fixing the doc
There was a problem hiding this comment.
I removed this method because it is no longer needed.
| */ | ||
| public TasksTuple targetAssignment(String memberId) { | ||
| return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY); | ||
| public TasksTuple targetAssignment(String memberId, Optional<String> instanceId) { |
There was a problem hiding this comment.
javadoc wasn't updated for the new signature - missing @param entries and the instanceId fallback through staticMembers isn't described
| } | ||
| } | ||
|
|
||
| if (userEndpointChanged || (hasAssignedTasksChanged(member, updatedMember) && updatedMember.userEndpoint().isPresent())) { |
There was a problem hiding this comment.
we already compute hasAssignedTasksChanged as part of newlyJoinOrAssignmentChanged at line 2237 - could we hoist it to a local and reuse? full TasksTupleWithEpochs.equals runs twice on every steady-state heartbeat otherwise
| records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord( | ||
| groupId, | ||
| newMember.memberId(), | ||
| group.targetAssignment(oldMember.memberId(), oldMember.instanceId()) |
There was a problem hiding this comment.
same as in fromLastTargetAssignment - oldMember.memberId() is already the key, instanceId just does another staticMember lookup. Optional.empty() works here
There was a problem hiding this comment.
@lucasbru Good point!
That makes sense for replaceStreamsMember. In that method, oldMember.memberId() is already the exact target-assignment key, so passing Optional.empty() is clearer and avoids the extra static-member lookup.
However, I’m less sure that the same change is safe in fromLastTargetAssignment, though.
In the static-member replacement path, the member passed there can be the replacement member with the new member id, while the in-memory target assignment may still be keyed by the old member id because the replacement records have not been replayed yet.
In that case, the instance-id lookup is what lets us recover the previous assignment in the no-recompute path. Does that match your understanding, or am I missing another invariant here?
if (!canComputeNextTargetAssignment) {
...
return UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
}What do you think?
| // 1. The member is joining. | ||
| // 2. The member's assignment has been updated. | ||
| if (memberEpoch == 0 || hasAssignedTasksChanged(member, updatedMember)) { | ||
| boolean newlyJoinOrAssignmentChanged = memberEpoch == 0 || hasAssignedTasksChanged(member, updatedMember); |
There was a problem hiding this comment.
could we just reuse isJoining here? same condition, and the literal 0 reads a bit jarring since it was swapped for JOIN_GROUP_MEMBER_EPOCH a few lines up
| /** | ||
| * @return True if the member is in the Unrevoked state. | ||
| */ | ||
| public boolean isUnrevokedState() { |
There was a problem hiding this comment.
this is only used in one place (streamsGroupStaticMemberGroupLeave). Consumer doesn't have an equivalent accessor - it just inlines state == MemberState.UNREVOKED_PARTITIONS at all four of its call sites. Could we drop the method and inline the check?
| * | ||
| * @throws GroupMaxSizeReachedException if the maximum capacity has been reached. | ||
| */ | ||
| private void throwIfStreamsGroupIsFull( |
There was a problem hiding this comment.
consumer's throwIfConsumerGroupIsFull takes memberId to detect rejoin (group.hasMember(memberId)), this one takes instanceId. Could we align - either pass both or follow the same shape?
There was a problem hiding this comment.
I think throwIfConsumerGroupIsFull(...) has potential bug. I filled the ticket KAFKA-20601.
Also, I’ll update throwIfStreamsGroupIsFull() to use both the member ID and the instance ID!
| * | ||
| * @return The resolved streams group member. | ||
| */ | ||
| private StreamsGroupMember getOrMaybeCreateStaticStreamsGroupMember( |
There was a problem hiding this comment.
nit: continuation indentation here (and in replaceStreamsMember, streamsGroupStaticMemberGroupLeave, and the new throwIfXxx overloads) is 8 spaces, but getOrMaybeCreateDynamicStreamsGroupMember above and the consumer counterparts use 4. Worth normalizing
| * | ||
| * @throws UnreleasedInstanceIdException if the instance id received in the request is still in use by an existing static member. | ||
| */ | ||
| private void throwIfInstanceIdIsUnreleased(StreamsGroupMember member, String groupId, String receivedMemberId, String receivedInstanceId) { |
There was a problem hiding this comment.
LEAVE_GROUP_STATIC_MEMBER_EPOCH is already statically imported (from ConsumerGroupHeartbeatRequest, same value) - the consumer overload right above uses the static import. Could we do the same here for consistency?
| )); | ||
| } | ||
|
|
||
| private boolean hasUserEndpointChanged(StreamsGroupMember maybeOldMember, StreamsGroupMember updatedMember) { |
There was a problem hiding this comment.
could this be static? hasEpochRelevantMemberConfigChanged next to it is - neither uses instance state
| * | ||
| * @return A CoordinatorResult with a single record signifying that the static member is leaving. | ||
| */ | ||
| private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupStaticMemberGroupLeave( |
There was a problem hiding this comment.
consumerGroupStaticMemberGroupLeave doesn't do this state cleanup (UNREVOKED -> STABLE). The streams behavior is arguably more correct, but worth a code comment explaining why we need it here - otherwise a future "align with consumer" pass might strip it
There was a problem hiding this comment.
@squah-confluent Should consumer groups clean up the state for departing static members?
There was a problem hiding this comment.
Yes, I discussed something similar with @dajac at some point and if I remember correctly we wanted to treat unrevoked partitions as revoked on that path.
There was a problem hiding this comment.
If so, may I open a new ticket for ConsumerGroup?
There was a problem hiding this comment.
Yes, if there isn't one already, please go ahead and open a ticket!
There was a problem hiding this comment.
@squah-confluent
Thanks for your feedback!
It seems that there is no ticket for this.
I filled it in KAFKA-20641!
There was a problem hiding this comment.
@lucasbru @squah-confluent
Based on the comments here, I think this should be modified (#22473 (comment))
I made an commit for this (b275404)
If this looks good to you, I'll apply the same approach here.
What do you think?
There was a problem hiding this comment.
I see two options -- Either we wait until we have agreed on a solution for consumer groups in the other PR. Or, we remove the "state fix up" from this PR, and address both groups at the same time in the other PR.
There was a problem hiding this comment.
I made an commit for option 2 and created a ticket https://issues.apache.org/jira/browse/KAFKA-20680
| StreamsGroupMember member; | ||
| StreamsGroupMember maybeOldMember; | ||
| if (instanceId == null) { | ||
| maybeOldMember = group.dynamicMember(memberId); |
There was a problem hiding this comment.
I think maybeOldMember is redundant in the dynamic branch. It is consumed in three places below: hasReplacedStaticMember (always false for dynamic - same reference or null), hasUserEndpointChanged (gives the same result as passing member, since for existing heartbeats they are the same reference and for first joins member is a default with no endpoint), and buildEndpointToPartitions (only relevant for the replaced static case). Could we compute the replaced static member only inside the static branch and drop this assignment + the dynamicMember accessor?
There was a problem hiding this comment.
And in that shape hasReplacedStaticMember falls out too - it just becomes replacedStaticMember != null at the two call sites below. Drops one more boolean from the heartbeat method.
| boolean newlyJoinOrAssignmentChanged = memberEpoch == 0 || hasAssignedTasksChanged(member, updatedMember); | ||
| boolean hasReplacedStaticMember = maybeOldMember != null && !maybeOldMember.memberId().equals(updatedMember.memberId()); | ||
| boolean userEndpointChanged = hasUserEndpointChanged(maybeOldMember, updatedMember); | ||
| if (newlyJoinOrAssignmentChanged) { | ||
| response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs())); | ||
| response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks())); | ||
| response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks())); | ||
| } | ||
|
|
||
| if (newlyJoinOrAssignmentChanged || hasReplacedStaticMember || userEndpointChanged) { | ||
| group.invalidateCachedEndpointToPartitions(updatedMember.memberId()); | ||
| if (updatedMember.userEndpoint().isPresent()) { | ||
| // If no user endpoint is defined, there is no change in the endpoint information. | ||
| // Otherwise, bump the endpoint information epoch | ||
| group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1); | ||
| if (hasReplacedStaticMember) { | ||
| group.invalidateCachedEndpointToPartitions(maybeOldMember.memberId()); | ||
| } | ||
| } | ||
|
|
||
| if (userEndpointChanged || (hasAssignedTasksChanged(member, updatedMember) && updatedMember.userEndpoint().isPresent())) { | ||
| group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1); | ||
| } |
There was a problem hiding this comment.
this whole block is a bit hard to follow with three intermediate booleans whose names read like jargon. Folding in the related suggestions (isJoining reuse, cache hasAssignedTasksChanged, drop hasReplacedStaticMember/maybeOldMember in favor of a nullable replacedStaticMember), it could be:
boolean assignedTasksChanged = hasAssignedTasksChanged(member, updatedMember);
boolean endpointChanged = hasUserEndpointChanged(member, updatedMember);
// Echo the assignment back when joining or the assignment changed.
if (isJoining || assignedTasksChanged) {
response.setActiveTasks(...);
response.setStandbyTasks(...);
response.setWarmupTasks(...);
}
// Drop stale per-member endpoint mappings.
if (isJoining || assignedTasksChanged || endpointChanged || replacedStaticMember != null) {
group.invalidateCachedEndpointToPartitions(updatedMember.memberId());
if (replacedStaticMember != null) {
group.invalidateCachedEndpointToPartitions(replacedStaticMember.memberId());
}
}
// Bump the group's endpoint epoch so peers refetch endpoint-to-partition mappings.
if (endpointChanged || (assignedTasksChanged && updatedMember.userEndpoint().isPresent())) {
group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
}block-leading comments do the work the named booleans were doing, two cached expressions instead of three booleans, trivial conditions inlined.
|
I made a pass on the production code |
lucasbru
left a comment
There was a problem hiding this comment.
Left one comment on the update
| } | ||
|
|
||
| @Test | ||
| public void testStaticMemberRejoinWithUpdatedProcessIdBumpsStreamsGroupEpoch1() throws UnknownHostException { |
There was a problem hiding this comment.
the name says "WithUpdatedProcessIdBumpsStreamsGroupEpoch" but the test verifies the opposite - rejoin with the same processId (only clientId/host change) writes the metadata record but does NOT bump the group epoch. The 1 suffix also looks like a copy-paste collision fix. Could we rename, e.g. testStaticMemberRejoinWithSameProcessIdDoesNotBumpStreamsGroupEpoch?
There was a problem hiding this comment.
You are right. I missed it, My bad...!
Fixed it!
lucasbru
left a comment
There was a problem hiding this comment.
LGTM, thanks. I will check with @squah-confluent whether he wants to have another look
|
Yes, please rebase this. Could also be worth opening and closing the PR if the issue persists |
414ff61 to
d6c8502
Compare
|
@squah-confluent |
|
@lucasbru I'm happy to make the necessary updates, but before I spend time getting familiar with the KIP-1331 implementation, I'd like to confirm the expected scope for this PR. Should I keep the changes limited to the minimal conflict resolution, or is it expected that this PR also incorporates the newer KIP-1331-related changes? Alternatively, would it make more sense to wait until the KIP-1331 work is completed before updating this PR? I'd appreciate any guidance on the preferred approach. |
|
@chickenchickenlove Yeah we have a bit of a race condition between the two. But to be fair, the change that hit trunk now is the main thing that is going to conflict with this PR. Also the two changes should be pretty orthogonal, so conflict resolution should be fine. |
…eams rebalance protocol - server side
d6c8502 to
a10b596
Compare
Introduction
This PR enables "static membership for Streams group heartbeat" and
aligns group-epoch behavior with the KIP-1071 intent for Streams
rebalancing.
Notification
of the following member attributes changes: topology epoch, rack ID,
client tags, or process ID. However, the previous implementation
determined whether to bump the group epoch by comparing the entire
member record. In this PR, the static membership path is updated so that
the group epoch is bumped only when at least one of topology epoch, rack
ID, client tags, or process ID differs.
with the same instance ID, we treat this as not constituting a member
join/leave for the purpose of group-epoch bumping. Accordingly, the
implementation does not bump the group epoch in this case, since it does
not satisfy the “member join/leave” condition described in KIP-1071.
Changes
heartbeat.
memberIdfor sameinstanceId-2(temporary leave): keep static identity and writecurrent-assignment epoch
-2-1(actual leave): fence/remove member and bump group epochinstanceIdalready maps to an existing static member, skipmax-size rejection for replacement flow
id
id, client tags, process id)
member id.
currently valid.
Scope
Reviewers: Lucas Brutschy lbrutschy@confluent.io, Sean Quah
squah@confluent.io