KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Server Side.#21565
Conversation
|
Hi, @lucasbru ! |
squah-confluent
left a comment
There was a problem hiding this comment.
Thanks for the patch!
| /** | ||
| * @return Returns whether the given instance ID is currently associated with an existing static member. | ||
| */ | ||
| public boolean staticMemberExist(String instanceId) { | ||
| String memberId = staticMembers.get(instanceId); | ||
| if (memberId == null) | ||
| return false; | ||
|
|
||
| StreamsGroupMember member = members.get(memberId); | ||
| if (member == null) | ||
| return false; | ||
| return true; | ||
| } | ||
|
|
There was a problem hiding this comment.
Could we follow the same pattern as ConsumerGroup.hasStaticMember?
There was a problem hiding this comment.
@squah-confluent
It makes sense to me!
This method was added to prevent an unintended GroupMaxSizeReachedException from being thrown when a static member joins within throwsIfStreamsGroupsIsFull.
Unlike ConsumerGroup, StreamsGroup manages members and staticMembers separately. However, given the purpose above, it does not seem necessary to verify whether there is an actual member currently bound to the static member.
Even if we assume a case where a static member exists but no corresponding member exists, the impact on GroupMaxSizeReachedException remains unchanged.
| if (instanceId == null) { | ||
| targetAssignmentEpoch = group.assignmentEpoch(); | ||
| targetAssignment = group.targetAssignment(updatedMember.memberId()); | ||
| } else { | ||
| targetAssignmentEpoch = group.assignmentEpoch(); | ||
| StreamsGroupMember maybeOldStaticMember = group.staticMember(instanceId); | ||
| String maybeOldStaticMemberId = maybeOldStaticMember == null ? | ||
| updatedMember.memberId() : | ||
| maybeOldStaticMember.memberId(); | ||
| targetAssignment = group.targetAssignment(maybeOldStaticMemberId); | ||
| } |
There was a problem hiding this comment.
In consumer groups, this is written as
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
Could we follow the same pattern?
| * | ||
| * @return A CoordinatorResult with a single record signifying that the static member is leaving. | ||
| */ | ||
| private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupStaticMemberGroupLeave( |
There was a problem hiding this comment.
@lucasbru Do we need to reset the assignment epochs of tasks here?
There was a problem hiding this comment.
Sorry for making you confused. 🙇♂️
JFYI, This is why I implemented it that way. (AFAIK, KIP-1071 does not define the behavior for this case.)
My understanding is that the coordinator tracks pendingRevocationTasks and uses them to determine whether a task is still owned or not. pendingRevocationTasks represent tasks that the member is expected to revoke and report as revoked in the next heartbeat response.
However, when a static member terminates with epoch -2, we can generally assume the Streams application has shut down. In that case, even if we never receive a follow-up heartbeat response, it is reasonable to treat the revocation as completed.
Therefore, by writing pendingRevocationTasks as EMPTY, another member that is waiting to acquire those tasks can be assigned them immediately, without having to wait for the static member that left with -2 to come back.
What do you think?
There was a problem hiding this comment.
@squah-confluent meant resetting the assignment epochs on assignedTasks (not just clearing tasksPendingRevocation). KIP-1251 landed for consumer groups while this PR was open, and consumerGroupStaticMemberGroupLeave now does both — see GroupMetadataManager.java:4262-4269:
ConsumerGroupMember leavingStaticMember = new ConsumerGroupMember.Builder(member)
.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
.setPartitionsPendingRevocation(Map.of())
.resetAssignedPartitionsEpochsToZero()
.build();The comment there is: "Assignment epochs are reset to 0 so when the static member rejoins, partitions are considered assigned from epoch 0 to the new member ID." Same applies to Streams — when the -2 member rejoins under a new memberId, the active task epochs in assignedTasks should be 0 so they're treated as freshly assigned to the new memberId. Could you add an equivalent resetAssignedTasksEpochsToZero() on StreamsGroupMember.Builder and call it here?
There was a problem hiding this comment.
Ah, understood. Thanks for your comments!
Yes, I’ll add the equivalent resetAssignedTasksEpochsToZero() in the server-side PR and call it here.
| static boolean hasEpochRelevantMemberConfigChanged( | ||
| StreamsGroupMember oldMember, | ||
| StreamsGroupMember newMember | ||
| ) { | ||
| // The group epoch is bumped: (KIP-1071) | ||
| // - When a member updates its topology metadata, rack ID, client tags or process ID. | ||
| return !Objects.equals(oldMember.topologyEpoch(), newMember.topologyEpoch()) | ||
| || !Objects.equals(oldMember.rackId(), newMember.rackId()) |
There was a problem hiding this comment.
I’d also like to discuss this.
In KIP-1071, it explicitly states that the group epoch should be bumped only in those specific cases.
The group epoch is bumped:
- When a member joins or leaves the group.
- When a member is fenced or removed from the group by the group coordinator.
- When the partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.
- When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less than acceptable.recovery.lag.
- When a member updates its topology metadata, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration.
- When an assignment configuration for the group is updated.
However, the current code seems to evaluate the group-epoch bump condition more strictly than what KIP-1071 describes.
On the other hand, KIP-1071 also includes the following sentence:
Updates information of the member if needed. The group epoch is incremented if there is any change.
So there appears to be a conflict between the explicitly listed conditions for bumping the group epoch and the statement that the group epoch is incremented whenever anything changes. I think we should resolve this inconsistency. Once it’s clarified, we may need to adjust this method and the call sites that depend on it.
What do you think?
If I’m misunderstanding anything, please let me know! 🙇♂️
There was a problem hiding this comment.
I think the difference is mainly in configs that are always fixed in Kafka Streams clients
- rebalance.timeout.ms
- user.endpoint
- client ID
- client Host
So it did not make a difference before. But you are right that with static members, this does make a difference since we can restart the client without a rebalance.
But I think it would be fine to then always call this method, even for dynamic members.
There was a problem hiding this comment.
Understood!
Would it make sense to split this into a separate PR so we can keep the scope focused on static membership?
Which would you prefer?
There was a problem hiding this comment.
Let's keep it scoped to this PR. The method is needed here for the static-rejoin path, and converting the dynamic path to use the same KIP-1071 list is a small follow-up cleanup we can do separately — no need to bundle it.
There was a problem hiding this comment.
Thanks for the clarification. Understood!
I created KAFKA-20562 for this clean up task.
|
I did not review this PR, but I just had a question / comment about this. If this PR does address this already, great, but based on the comments (which I skimmed over) I believe the PR might not do this. For static group membership, when a member re-joins, there is a couple of corner case for which it it not enough to just return the previous assignment (or just bump the epoch), but we need to do more:
About Lucas' comment: For static member, when they are stopped and restarted, we cannot guarantee that the configs do not change. So we need to consider this case IMHO -- as a matter of fact, I know explicitly for Of course, bumping the epoch will be required in all these cases (not trying to say otherwise), but what I am saying is that bumping the epoch does not imply, that we need to re-compute the assignment for all cases, nor that it is sufficient to only bump the epoch for all cases. Btw: these things are very subtle, and we will need to update the documentation accordingly to explain how this all works -- wondering if it might even make sense to update the KIP with it? It's more than an implementation detail IMHO. Curious to hear what you think about this? |
|
@mjsax What you are saying was exactly my point, when I said "So it did not make a difference before. But you are right that with static members, this does make a difference since we can restart the client without a rebalance." And that's also how it was specified in KIP-1071. In the current code, we simplified the logic here since for dynamic members those properties won't change in practice, but we went the safe route to bump the group epoch and reassign if they change (which I guess can only happen in a non-standard implemtation of KS). KIP-1071 clearly specifies which properties need to bump the group epoch, in the first paragraph @chickenchickenlove mentioned. The second one seems like an oversimplification. But yeah, it's good to go through the list of properties again and make sure the KIP as updated correctly (since we have made bunch of changes). So KIP-1071 species
So I think the list looks okay. Here are other things that can change without a group epoch bump:
So I would for this particular check, the list in KIP-1071 good. But yeah, we need to take care to handle |
|
@mjsax @lucasbru For example, cases where assignments need to be recalculated seem to be handled (topology, processId, clientTags, rackId). However, I haven’t verified yet whether changes to application.server in a static member are propagated to other members. I will validate this through test code. I have one question: in this PR, should we consider scenarios where static and dynamic members coexist? This may be related to online migration, but I’m not sure whether it should be handled within the scope of this PR or addressed as part of the online migration work separately. If there are any other aspects you think I should consider, please let me know. |
Static and dynamic members can definitely coexist. But this is not really related to online migration. |
Thanks for your comments! |
|
@chickenchickenlove are you still working on thsi? |
|
@lucasbru thanks for your comment! Yes! Also, I think it will be ready for review once I add some test cases covering a scenario where static and dynamic members coexist. I have not written that scenario yet, but I should be able to add it soon. If there is anything else you would like me to check before the review, |
|
When resolving the merge conflict, we must remember to update |
|
Thanks @squah-confluent ! |
|
We should remember to update tests/kafkatest/tests/streams/streams_static_membership_test.py to include the new protocol. |
…eams rebalance protocol at Server Side.
5ba30e6 to
f291d80
Compare
|
While rebasing onto trunk, I found that the diff had become too large for me to confidently carry the rebase through correctly. So I decided not to continue with the rebase onto trunk, and instead proceeded as follows.
In the process, the previous commit history was removed. I’m sorry that this also removed the parts you had already taken the time to review. I believe this PR will be ready for review once I complete the following remaining items, and I will ping you again once they are done.
|
|
I added client-side implementation here. (originally, it was implemented in #21603)
Given this, we may need to wait until KIP-1284 is incorporated. |
|
@chickenchickenlove yes, we are close to accepting KIP-1284, hopefully it's going to be implemented soon. I'll push some other committer to look at it as well. As for streams_static_membership_test, have a look at streams_smoke_test. It runs for both the old and the new protocol. |
|
@lucasbru |
|
@chickenchickenlove Sorry for being so slow on this one. I am a bit overloaded at the moment. How are you dealing with endpointInterformationEpoch when a static member rejoins? Before we can merge this, we'll need to split it into several PRs, as this is too large to review. Could you suggest a way to break up the code? Also, I see that |
|
@lucasbru
Good point. I totally missed that path 🥲... I’ll address this in the server-side PR by invalidating the endpoint cache for both the replaced member id and the new member id, and by excluding the replaced member when building
How about splitting it like this?
I would keep the branches stacked locally like this: So PR 1 would be opened first against trunk. I would prepare the follow-up branches locally, then open PR 2 and PR 3 against trunk after PR 1 is merged, and PR 4 after PR 3 is merged.
I see! I will move the new Streams static membership tests into dedicated test classes, likely What do you think? |
|
@lucasbru |
|
Thanks, and sorry for the slow turnaround. Endpoint handling looks mostly right — invalidating the cache for both the new and replaced member IDs and excluding the replaced ID from The split into The 4-PR breakdown you proposed is fine. Let's go ahead and open the server-core PR (PR 1) against trunk so we can focus review there; once it merges we rebase the rest. Stacking server-tests as PR 2 is fine, though if it's mostly more coverage for the same code I'd be open to folding it into PR 1 if it doesn't bloat too much — your call based on the final size. No need to wait for #21579 for the server-side PRs. The broker only needs to accept -2 from static members, which is independent of how the client decides to send it. The client-side PR (PR 3) is the one that will likely need to coordinate with #21579 / KIP-1284. |
|
@lucasbru Also, sorry for pinging you repeatedly. Since I usually have more focused time to work on this over the weekend, I wanted to confirm the direction before then. 🙇♂️ |
|
@chickenchickenlove You only opened one follow-up PR for this one? Are you going to open more? |
|
@lucasbru
There are no further PRs planned at the moment. |
Okay. Just letting you know that I am going to be on leave starting 1st of July, so it seems to me we have to get this code in soon or revert the server-side changes. |
|
@lucasbru |
Introduction
This PR enables "static membership for Streams group heartbeat" and aligns group-epoch behavior with the KIP-1071 intent for Streams rebalancing.
Notification
Changes
memberIdfor sameinstanceId-2(temporary leave): keep static identity and write current-assignment epoch-2-1(actual leave): fence/remove member and bump group epochinstanceIdalready maps to an existing static member, skip max-size rejection for replacement flowScope
Related PR
Reviewers: Lucas Brutschy lbrutschy@confluent.io