KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Client Side.#22559
Conversation
There was a problem hiding this comment.
Pull request overview
This PR completes client-side support for static membership (group.instance.id) when Kafka Streams uses the Streams rebalance protocol (group.protocol=streams, per KIP-1071). It removes prior config validation that blocked static membership under the Streams protocol, adds explicit fatal handling for UNRELEASED_INSTANCE_ID in the Streams heartbeat manager, and extends unit tests to cover the expected close/leave-epoch behavior for static members.
Changes:
- Allow
group.instance.idwhengroup.protocol=streams(including unprefixed,consumer., andmain.consumer.configs). - Treat
UNRELEASED_INSTANCE_IDas a known fatal error inStreamsGroupHeartbeatRequestManager. - Add/extend unit tests for static-member close semantics and poll-on-close request contents.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | Replaces the previous “should throw” assertion with a parameterized test validating static membership is accepted under Streams protocol and propagated to main consumer configs. |
| streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | Removes the Streams-protocol compatibility check that rejected group.instance.id, enabling static membership configuration. |
| clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java | Adds unit tests ensuring static members use the correct leave epoch on close for DEFAULT/REMAIN_IN_GROUP vs LEAVE_GROUP. |
| clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java | Extends fatal-error coverage to include UNRELEASED_INSTANCE_ID and verifies poll-on-close includes static leave epoch + instance id for static members. |
| clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java | Adds explicit fatal handling for UNRELEASED_INSTANCE_ID in error response processing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lucasbru
left a comment
There was a problem hiding this comment.
Thanks for the PR @chickenchickenlove !
| @Test | ||
| public void testStaticMemberRemainInGroupUsesStaticLeaveEpochOnClose() { | ||
| CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP; | ||
| MemberState expectedState = MemberState.LEAVING; | ||
| int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; | ||
| verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); | ||
| } | ||
|
|
||
| @Test | ||
| public void testStaticMemberDefaultUsesLeaveGroupStaticMemberEpochOnClose() { | ||
| CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.DEFAULT; | ||
| MemberState expectedState = MemberState.LEAVING; | ||
| int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; | ||
| verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); | ||
| } | ||
|
|
||
| @Test | ||
| public void testStaticMemberLeaveGroupUsesLeaveGroupEpochOnClose() { | ||
| CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.LEAVE_GROUP; | ||
| MemberState expectedState = MemberState.LEAVING; | ||
| int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; | ||
| verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); | ||
| } |
There was a problem hiding this comment.
These three only differ by the operation and expected epoch, could they be a single @ParameterizedTest like testPollOnCloseWhenStaticMemberIsLeaving above? Also the names are inconsistent for the same constant: UsesStaticLeaveEpoch vs UsesLeaveGroupStaticMemberEpoch.
| final Metrics localMetrics = new Metrics(time); | ||
| StreamsMembershipManager membershipManagerWithStaticMember = new StreamsMembershipManager( | ||
| GROUP_ID, | ||
| Optional.of("instance-1"), |
There was a problem hiding this comment.
Minor: this hardcodes "instance-1" while StreamsGroupHeartbeatRequestManagerTest uses an INSTANCE_ID constant. Worth a constant here too for consistency.
| @@ -1600,12 +1600,6 @@ protected StreamsConfig(final Map<?, ?> props, | |||
|
|
|||
| private void verifyStreamsProtocolCompatibility(final boolean doLog) { | |||
| if (doLog && isStreamsProtocolEnabled()) { | |||
There was a problem hiding this comment.
Now that group.instance.id is accepted with the streams protocol, are there docs that still state static membership isn't supported here? Worth checking the Streams upgrade/config docs so they don't contradict this.
There was a problem hiding this comment.
You're right.
There is indeed an issue there as well. I'll address that part too and include it in this PR.
|
|
||
| @ParameterizedTest | ||
| @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = {"DEFAULT", "REMAIN_IN_GROUP"}) | ||
| public void testPollOnCloseWhenStaticMemberIsLeaving(final CloseOptions.GroupMembershipOperation operation) { |
There was a problem hiding this comment.
Coverage here is unit-level only. Is there an integration/system test exercising static-member close under group.protocol=streams, or is that expected to come separately?
There was a problem hiding this comment.
I was planning to cover this with a Ducktape test.
I'll take a look at the existing integration tests and check whether this can be covered by adding a new test case.
…eams rebalance protocol at Client Side.
5d2ace3 to
44d3eb3
Compare
|
@lucasbru |
Summary
This PR adds the remaining client-side support for static membership
when Kafka Streams uses the streams rebalance protocol
(
group.protocol=streams) introduced by KIP-1071.The change allows
group.instance.idto be used with the streamsprotocol, sends the proper static-member leave epoch when a static
Streams member closes with
DEFAULTorREMAIN_IN_GROUP, and treatsUNRELEASED_INSTANCE_IDas a known fatal heartbeat error.Some close-related changes that were part of my previous full PR
#21603 are intentionally not
included here because they are already covered by
#21579, which introduced
CloseOptions.DEFAULTfor Kafka Streams.Changes
group.protocol=streams.UNRELEASED_INSTANCE_IDexplicitly inStreamsGroupHeartbeatRequestManager.DEFAULTandREMAIN_IN_GROUPuseLEAVE_GROUP_STATIC_MEMBER_EPOCH.LEAVE_GROUPusesLEAVE_GROUP_MEMBER_EPOCH.pollOnClosesends the static leave epoch and instance id.group.instance.idis allowedfor unprefixed,
consumer., andmain.consumer.configurations.Reviewers: Lucas Brutschy lbrutschy@confluent.io