From aa4601a1f46936680640ca2e92db478f2fc11d2c Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 13 Jun 2026 11:29:02 +0900 Subject: [PATCH 1/4] KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Client Side. --- .../StreamsGroupHeartbeatRequestManager.java | 5 ++ ...reamsGroupHeartbeatRequestManagerTest.java | 26 +++++++++ .../StreamsMembershipManagerTest.java | 56 +++++++++++++++++++ .../apache/kafka/streams/StreamsConfig.java | 6 -- .../kafka/streams/StreamsConfigTest.java | 33 ++++++----- 5 files changed, 103 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 283625cf2fc22..724eb640abf83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -679,6 +679,11 @@ private void onErrorResponse(final StreamsGroupHeartbeatResponse response, final heartbeatRequestState.reset(); break; + case UNRELEASED_INSTANCE_ID: + logger.error("StreamsGroupHeartbeatRequest failed due to {}: {}", error, errorMessage); + handleFatalFailure(error.exception(errorMessage)); + break; + case UNSUPPORTED_VERSION: logger.error("StreamsGroupHeartbeatRequest failed due to {}: {}", error, UNSUPPORTED_VERSION_ERROR_MESSAGE); handleFatalFailure(error.exception(UNSUPPORTED_VERSION_ERROR_MESSAGE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 3ede3565c11ba..a7304b197f5d2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -1329,6 +1329,7 @@ public void testTopicAuthorizationFailedErrorResponse() { names = { "INVALID_REQUEST", "GROUP_MAX_SIZE_REACHED", + "UNRELEASED_INSTANCE_ID", "UNSUPPORTED_VERSION", "STREAMS_INVALID_TOPOLOGY", "STREAMS_INVALID_TOPOLOGY_EPOCH", @@ -1457,6 +1458,7 @@ private static Stream provideOtherErrors() { Errors.GROUP_MAX_SIZE_REACHED, Errors.FENCED_MEMBER_EPOCH, Errors.UNKNOWN_MEMBER_ID, + Errors.UNRELEASED_INSTANCE_ID, Errors.UNSUPPORTED_VERSION, Errors.STREAMS_INVALID_TOPOLOGY, Errors.STREAMS_INVALID_TOPOLOGY_EPOCH, @@ -1493,6 +1495,30 @@ public void testPollOnCloseWhenIsLeaving() { assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch()); } + @ParameterizedTest + @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = {"DEFAULT", "REMAIN_IN_GROUP"}) + public void testPollOnCloseWhenStaticMemberIsLeaving(final CloseOptions.GroupMembershipOperation operation) { + final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); + when(membershipManager.isLeavingGroup()).thenReturn(true); + when(membershipManager.leaveGroupOperation()).thenReturn(operation); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_STATIC_MEMBER_EPOCH); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0); + StreamsGroupHeartbeatRequest streamsRequest = + (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); + + assertEquals(GROUP_ID, streamsRequest.data().groupId()); + assertEquals(MEMBER_ID, streamsRequest.data().memberId()); + assertEquals(LEAVE_GROUP_STATIC_MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); + } + @Test public void testMaximumTimeToWaitPollTimerExpired() { try ( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java index 5761eefd09e2d..66ce9ae4a027d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java @@ -1542,6 +1542,56 @@ public void testOnHeartbeatSuccessWhenInLeaving() { verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId()); } + @Test + public void testStaticMemberRemainInGroupUsesStaticLeaveEpochOnClose() { + CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP; + MemberState expectedState = MemberState.LEAVING; + int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; + verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); + } + + @Test + public void testStaticMemberDefaultUsesLeaveGroupStaticMemberEpochOnClose() { + CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.DEFAULT; + MemberState expectedState = MemberState.LEAVING; + int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; + verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); + } + + @Test + public void testStaticMemberLeaveGroupUsesLeaveGroupEpochOnClose() { + CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.LEAVE_GROUP; + MemberState expectedState = MemberState.LEAVING; + int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); + } + + private void verifyStaticMemberLeaveOnClose( + CloseOptions.GroupMembershipOperation membershipOperation, + MemberState expectedMemberState, + int expectedMemberEpoch + ) { + final Metrics localMetrics = new Metrics(time); + StreamsMembershipManager membershipManagerWithStaticMember = new StreamsMembershipManager( + GROUP_ID, + Optional.of("instance-1"), + streamsRebalanceData, + subscriptionState, + backgroundEventHandler, + new LogContext("test"), + time, + localMetrics + ); + membershipManagerWithStaticMember.registerStateListener(memberStateListener); + joining(membershipManagerWithStaticMember); + + CompletableFuture onGroupLeft = membershipManagerWithStaticMember.leaveGroupOnClose(membershipOperation); + + assertEquals(expectedMemberState, membershipManagerWithStaticMember.state()); + assertEquals(expectedMemberEpoch, membershipManagerWithStaticMember.memberEpoch()); + assertFalse(onGroupLeft.isDone()); + } + @Test public void testOnHeartbeatSuccessWhenInUnsubscribeLeaveNotInProgress() { membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks( @@ -2688,6 +2738,12 @@ private void joining() { verifyInStateJoining(membershipManager); } + private void joining(StreamsMembershipManager givenMembershipManager) { + givenMembershipManager.onSubscriptionUpdated(); + givenMembershipManager.onConsumerPoll(); + verifyInStateJoining(givenMembershipManager); + } + private void reconcile(final StreamsGroupHeartbeatResponse response) { membershipManager.onHeartbeatSuccess(response); membershipManager.poll(time.milliseconds()); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d9e1569fe36b7..19eb2080c7e50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1615,12 +1615,6 @@ protected StreamsConfig(final Map props, private void verifyStreamsProtocolCompatibility(final boolean doLog) { if (doLog && isStreamsProtocolEnabled()) { - final Map mainConsumerConfigs = getMainConsumerConfigs("dummy", "dummy", -1); - final String instanceId = (String) mainConsumerConfigs.get(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG); - if (instanceId != null && !instanceId.isEmpty()) { - throw new ConfigException("Streams rebalance protocol does not support static membership. " - + "Please set group.protocol=classic or remove group.instance.id from the configuration."); - } if (getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG) != 0) { log.warn("Warmup replicas are not supported yet with the streams protocol and will be ignored. " + "If you want to use warmup replicas, please set group.protocol=classic."); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index c8f358d9fe6e6..40b5364f991b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -243,6 +243,22 @@ public void testGetGroupInstanceIdConfigs() { assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); } + @ParameterizedTest + @ValueSource(strings = {"", StreamsConfig.CONSUMER_PREFIX, StreamsConfig.MAIN_CONSUMER_PREFIX}) + public void shouldAllowStaticMembershipWhenStreamsProtocolUsed(final String prefix) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(prefix + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1"); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map mainConsumerConfigs = + streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + + assertThat( + mainConsumerConfigs.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), + equalTo("static-member-1-" + threadIdx) + ); + } + @Test public void consumerConfigMustContainStreamPartitionAssignorConfig() { props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 42); @@ -1864,23 +1880,6 @@ public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() { } } - @ParameterizedTest - @ValueSource(strings = {"", StreamsConfig.CONSUMER_PREFIX, StreamsConfig.MAIN_CONSUMER_PREFIX}) - public void shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership(final String prefix) { - final Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); - props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); - props.put(prefix + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1"); - - final ConfigException exception = assertThrows( - ConfigException.class, - () -> new StreamsConfig(props) - ); - assertTrue(exception.getMessage().contains("Streams rebalance protocol does not support static membership. " + - "Please set group.protocol=classic or remove group.instance.id from the configuration.")); - } - @Test public void shouldSetDefaultDeadLetterQueue() { final StreamsConfig config = new StreamsConfig(props); From 44d3eb355617c09146453ffd59f4706a7107e318 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Mon, 22 Jun 2026 08:08:02 +0900 Subject: [PATCH 2/4] Addressing review. --- .../StreamsMembershipManagerTest.java | 70 +++++++++--------- .../streams/developer-guide/config-streams.md | 2 +- .../streams-rebalance-protocol.md | 4 +- docs/streams/upgrade-guide.md | 4 +- ...fkaStreamsCloseOptionsIntegrationTest.java | 72 ++++++++++++++++++- 5 files changed, 111 insertions(+), 41 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java index 66ce9ae4a027d..82bc64d7a3f30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java @@ -40,6 +40,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -60,6 +62,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; @@ -84,6 +87,7 @@ public class StreamsMembershipManagerTest { private static final String GROUP_ID = "test-group"; + private static final String INSTANCE_ID = "instance-1"; private static final int MEMBER_EPOCH = 1; private static final String SUBTOPOLOGY_ID_0 = "subtopology-0"; @@ -1452,7 +1456,7 @@ public void testOnHeartbeatRequestSkippedWhenInLeaving() { public void testLeaveGroupEpochIsStaticMemberEpochForStaticMember() { final StreamsMembershipManager staticMember = new StreamsMembershipManager( GROUP_ID, - Optional.of("instance-1"), + Optional.of(INSTANCE_ID), streamsRebalanceData, subscriptionState, backgroundEventHandler, new LogContext("test"), time, new Metrics(time) ); @@ -1463,7 +1467,7 @@ public void testLeaveGroupEpochIsStaticMemberEpochForStaticMember() { public void testLeaveGroupEpochIsDynamicMemberEpochForStaticMemberWithLeaveGroupOperation() { final StreamsMembershipManager staticMember = new StreamsMembershipManager( GROUP_ID, - Optional.of("instance-1"), + Optional.of(INSTANCE_ID), streamsRebalanceData, subscriptionState, backgroundEventHandler, new LogContext("test"), time, new Metrics(time) ); @@ -1476,7 +1480,7 @@ public void testLeaveGroupEpochIsDynamicMemberEpochForStaticMemberWithLeaveGroup public void testLeaveGroupEpochIsStaticMemberEpochForStaticMemberWithRemainInGroup() { final StreamsMembershipManager staticMember = new StreamsMembershipManager( GROUP_ID, - Optional.of("instance-1"), + Optional.of(INSTANCE_ID), streamsRebalanceData, subscriptionState, backgroundEventHandler, new LogContext("test"), time, new Metrics(time) ); @@ -1490,7 +1494,7 @@ public void testIsLeavingGroupReturnsTrueForStaticMemberWithRemainInGroupOperati setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic"); final StreamsMembershipManager staticMember = new StreamsMembershipManager( GROUP_ID, - Optional.of("instance-1"), + Optional.of(INSTANCE_ID), streamsRebalanceData, subscriptionState, backgroundEventHandler, new LogContext("test"), time, new Metrics(time) ); @@ -1542,39 +1546,16 @@ public void testOnHeartbeatSuccessWhenInLeaving() { verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId()); } - @Test - public void testStaticMemberRemainInGroupUsesStaticLeaveEpochOnClose() { - CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP; - MemberState expectedState = MemberState.LEAVING; - int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; - verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); - } - - @Test - public void testStaticMemberDefaultUsesLeaveGroupStaticMemberEpochOnClose() { - CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.DEFAULT; - MemberState expectedState = MemberState.LEAVING; - int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; - verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); - } - - @Test - public void testStaticMemberLeaveGroupUsesLeaveGroupEpochOnClose() { - CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.LEAVE_GROUP; - MemberState expectedState = MemberState.LEAVING; - int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; - verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); - } - - private void verifyStaticMemberLeaveOnClose( - CloseOptions.GroupMembershipOperation membershipOperation, - MemberState expectedMemberState, - int expectedMemberEpoch + @ParameterizedTest + @MethodSource("staticMemberLeaveOnCloseOperations") + public void testStaticMemberUsesExpectedLeaveEpochOnClose( + final CloseOptions.GroupMembershipOperation operation, + final int expectedEpoch ) { final Metrics localMetrics = new Metrics(time); StreamsMembershipManager membershipManagerWithStaticMember = new StreamsMembershipManager( GROUP_ID, - Optional.of("instance-1"), + Optional.of(INSTANCE_ID), streamsRebalanceData, subscriptionState, backgroundEventHandler, @@ -1585,13 +1566,30 @@ private void verifyStaticMemberLeaveOnClose( membershipManagerWithStaticMember.registerStateListener(memberStateListener); joining(membershipManagerWithStaticMember); - CompletableFuture onGroupLeft = membershipManagerWithStaticMember.leaveGroupOnClose(membershipOperation); + CompletableFuture onGroupLeft = membershipManagerWithStaticMember.leaveGroupOnClose(operation); - assertEquals(expectedMemberState, membershipManagerWithStaticMember.state()); - assertEquals(expectedMemberEpoch, membershipManagerWithStaticMember.memberEpoch()); + assertEquals(MemberState.LEAVING, membershipManagerWithStaticMember.state()); + assertEquals(expectedEpoch, membershipManagerWithStaticMember.memberEpoch()); assertFalse(onGroupLeft.isDone()); } + private static Stream staticMemberLeaveOnCloseOperations() { + return Stream.of( + Arguments.of( + CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP, + StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH + ), + Arguments.of( + CloseOptions.GroupMembershipOperation.DEFAULT, + StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH + ), + Arguments.of( + CloseOptions.GroupMembershipOperation.LEAVE_GROUP, + StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH + ) + ); + } + @Test public void testOnHeartbeatSuccessWhenInUnsubscribeLeaveNotInProgress() { membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks( diff --git a/docs/streams/developer-guide/config-streams.md b/docs/streams/developer-guide/config-streams.md index 961308fa3b5c0..51bacbc20663a 100644 --- a/docs/streams/developer-guide/config-streams.md +++ b/docs/streams/developer-guide/config-streams.md @@ -1398,7 +1398,7 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter ### group.protocol -> The group protocol used by the Kafka Streams client used for coordination. It determines how the client will communicate with the Kafka brokers and other clients in the same group. The default value is `"classic"`, which is the classic consumer group protocol. Can be set to `"streams"` (requires broker-side enablement) to enable the new Kafka Streams group protocol. +> The group protocol used by the Kafka Streams client used for coordination. It determines how the client will communicate with the Kafka brokers and other clients in the same group. The default value is `"classic"`, which is the classic consumer group protocol. Can be set to `"streams"` (requires broker-side enablement) to enable the new Kafka Streams group protocol. When set to `"streams"`, `group.instance.id` can be used for static membership. ### rack.aware.assignment.non_overlap_cost diff --git a/docs/streams/developer-guide/streams-rebalance-protocol.md b/docs/streams/developer-guide/streams-rebalance-protocol.md index 80dcb3b5e720d..eb49f43dc5ded 100644 --- a/docs/streams/developer-guide/streams-rebalance-protocol.md +++ b/docs/streams/developer-guide/streams-rebalance-protocol.md @@ -49,12 +49,12 @@ The following features are available in the current release: * **Offline Migration**: After shutting down all members and waiting for their `session.timeout.ms` to expire (or forcing an explicit group leave), a classic group can be converted to a streams group and a streams group can be converted to a classic group. The only broker-side group data that will be preserved are the committed offsets. Internal topics (changelog and repartition topics) will continue to exist as regular Kafka topics. +* **Static Membership**: Streams applications can configure `group.instance.id` when using `group.protocol=streams`. Kafka Streams derives unique group instance IDs for its stream threads internally. + # What's Not Supported in This Version The following features are not yet available and should be avoided when using the new protocol: -* **Static Membership**: Setting a client `instance.id` will be rejected. - * **Topology Updates**: If a topology is changed significantly (e.g., by adding new source topics or changing the number of subtopologies), a new streams group must be created. * **High Availability Assignor**: Only the sticky assignor is supported. This implies that "warmup tasks" and rack aware assignment are not supported yet. diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md index 387bef80920b6..76839ccc0037c 100644 --- a/docs/streams/upgrade-guide.md +++ b/docs/streams/upgrade-guide.md @@ -83,6 +83,8 @@ Kafka Streams now persists state store changelog offsets inside each state store As part of KIP-1035, the per-store changelog offset is written into RocksDB on each commit and is made durable on disk only when RocksDB flushes its memtable to an SST file — either organically once the memtable fills `write_buffer_size` (16 MB by default), or on a clean store close. Earlier releases force-flushed RocksDB on every commit. A consequence is that for a **low-traffic store** whose memtable rarely fills, the on-disk offset can lag the store's actual position until the next clean shutdown. For the durability model and guidance on tuning flush frequency for low-traffic stores, see [Memory Management: RocksDB](/{{version}}/documentation/streams/developer-guide/memory-mgmt.html#rocksdb-offset-durability). If the process then exits uncleanly (for example SIGKILL/OOM-kill, or a KafkaStreams#close that does not complete within the shutdown grace period) and changelog retention or compaction has since advanced the changelog's log-start offset past that stale offset, the restore consumer seeks out of range on restart — logged as OffsetOutOfRangeException/TaskCorruptedException — and the task is automatically re-initialized from the changelog (no data loss, but a full re-restore). +Kafka Streams now supports static membership with the Streams Rebalance Protocol. Applications using `group.protocol=streams` may configure `group.instance.id`; Kafka Streams derives unique group instance IDs for its stream threads internally. + ### Header-aware state stores for the Processor API (KIP-1271) {#kip-1271-headers-aware-stores} Kafka Streams adds **header-aware** state stores. Opt in with the new `Stores` suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` factories. For example: @@ -183,7 +185,7 @@ This Early Access release covers a subset of the functionality detailed in [KIP- **What's Not Included in Early Access** - * **Static Membership:** Setting a client `instance.id` will be rejected. + * **Static Membership:** Setting `group.instance.id` was rejected in the 4.1 Early Access release. Static membership is supported with the Streams Rebalance Protocol starting in 4.3.0. * **Topology Updates:** If a topology is changed significantly (e.g., by adding new source topics or changing the number of sub-topologies), a new streams group must be created. * **High Availability Assignor:** Only the sticky assignor is supported. * **Regular Expressions:** Pattern-based topic subscription is not supported. diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index 26aa359583c6b..07a04bd32f8e3 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -18,8 +18,10 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.StreamsGroupDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -49,6 +51,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.File; import java.io.IOException; @@ -72,6 +76,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest { protected static final String INPUT_TOPIC = "inputTopic"; protected static final String OUTPUT_TOPIC = "outputTopic"; + protected static final String GROUP_INSTANCE_ID = "someGroupInstance"; protected Properties streamsConfig; protected static KafkaStreams streams; @@ -111,7 +116,7 @@ public void before(final TestInfo testName) throws Exception { streamsConfig = new Properties(); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); - streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance"); + streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, GROUP_INSTANCE_ID); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath()); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); @@ -246,6 +251,48 @@ public void testCloseOptionsRemainInGroupStreamsProtocol() throws Exception { "Group should still have a member after REMAIN_IN_GROUP close under Streams protocol"); } + @ParameterizedTest + @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = {"DEFAULT", "REMAIN_IN_GROUP"}) + public void testStaticMemberCloseUsesStaticLeaveEpochStreamsProtocol( + final CloseOptions.GroupMembershipOperation operation + ) throws Exception { + final int numStreamThreads = 2; + streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + + streams.close(CloseOptions.groupMembershipOperation(operation) + .withTimeout(Duration.ofSeconds(30))); + + waitForStaticStreamsGroupMembersEpoch( + streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), + GROUP_INSTANCE_ID, + StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH, + numStreamThreads + ); + } + + @Test + public void testStaticMemberLeaveGroupStreamsProtocol() throws Exception { + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + + streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP) + .withTimeout(Duration.ofSeconds(30))); + + waitForEmptyStreamGroup( + adminClient, + streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), + 0 + ); + } + protected Topology setupTopologyWithoutIntermediateUserTopic() { final StreamsBuilder builder = new StreamsBuilder(); @@ -272,4 +319,27 @@ private void add10InputElements() { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds()); } } + + private void waitForStaticStreamsGroupMembersEpoch( + final String applicationId, + final String baseInstanceId, + final int expectedEpoch, + final int expectedMemberCount + ) throws Exception { + TestUtils.waitForCondition(() -> { + final StreamsGroupDescription groupDescription = + adminClient.describeStreamsGroups(Collections.singletonList(applicationId)) + .describedGroups() + .get(applicationId) + .get(); + + return groupDescription.members().size() == expectedMemberCount && + groupDescription.members().stream().allMatch(member -> + member.instanceId() + .filter(instanceId -> instanceId.startsWith(baseInstanceId + "-")) + .isPresent() && + member.memberEpoch() == expectedEpoch); + }, "Static streams group members did not reach expected member epoch " + expectedEpoch); + } + } From d15471d8bb13301f09a04612bc52acae289563fc Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 13 Jun 2026 12:02:33 +0900 Subject: [PATCH 3/4] KAFKA-20169: Add ducktape test codes. --- ...icMemberPersistentProcessIdTestClient.java | 93 +++++++++ tests/kafkatest/services/streams.py | 27 +++ ...static_membership_streams_protocol_test.py | 192 ++++++++++++++++++ 3 files changed, 312 insertions(+) create mode 100644 streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java create mode 100644 tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java new file mode 100644 index 0000000000000..dcf89dcda5d44 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.internals.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; + +import java.util.Objects; +import java.util.Properties; + +public class StaticMemberPersistentProcessIdTestClient { + + private static final String TEST_NAME = "StaticMemberPersistentProcessIdTestClient"; + + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println(TEST_NAME + " requires one argument (properties-file) but none provided: "); + } + + System.out.println("StreamsTest instance started"); + + final String propFileName = args[0]; + final Properties streamsProperties = Utils.loadProps(propFileName); + + final String groupInstanceId = + Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); + + System.out.println(TEST_NAME + " instance started with group.instance.id " + groupInstanceId); + System.out.println("props=" + streamsProperties); + System.out.flush(); + + final StreamsBuilder builder = new StreamsBuilder(); + final String inputTopic = (String) Objects.requireNonNull(streamsProperties.remove("input.topic")); + + final KStream dataStream = builder.stream(inputTopic); + dataStream.peek((k, v) -> System.out.printf("PROCESSED key=%s value=%s%n", k, v)); + + // Keep one persistent store so the Streams processId survives restart. + final KeyValueBytesStoreSupplier persistentStoreSupplier = + Stores.persistentKeyValueStore("process-id-store"); + dataStream.groupByKey().count(Materialized.as(persistentStoreSupplier)); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, TEST_NAME); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.setStateListener((newState, oldState) -> { + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + System.out.println("REBALANCING -> RUNNING"); + System.out.flush(); + } + }); + + streams.start(); + + Exit.addShutdownHook("streams-shutdown-hook", () -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("Static membership persistent-process-id test closed"); + System.out.flush(); + }); + } +} \ No newline at end of file diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 49ba32ad8e100..6bbcf8da8963e 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -655,6 +655,33 @@ def prop_file(self): return cfg.render() +class StaticMemberPersistentProcessIdTestService(StreamsTestBaseService): + def __init__(self, test_context, kafka, group_instance_id, num_threads, group_protocol): + super(StaticMemberPersistentProcessIdTestService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StaticMemberPersistentProcessIdTestClient", + "") + self.INPUT_TOPIC = None + self.GROUP_INSTANCE_ID = group_instance_id + self.GROUP_PROTOCOL = group_protocol + self.NUM_THREADS = num_threads + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.state_dir, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + streams_property.NUM_THREADS: self.NUM_THREADS, + streams_property.GROUP_PROTOCOL: self.GROUP_PROTOCOL, + consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID, + consumer_property.SESSION_TIMEOUT_MS: 60000, # set longer session timeout for static member test + 'input.topic': self.INPUT_TOPIC, + "acceptable.recovery.lag": "9223372036854775807" # enable a one-shot assignment + } + + + cfg = KafkaConfig(**properties) + return cfg.render() + + class CooperativeRebalanceUpgradeService(StreamsTestBaseService): def __init__(self, test_context, kafka): super(CooperativeRebalanceUpgradeService, self).__init__(test_context, diff --git a/tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py b/tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py new file mode 100644 index 0000000000000..5368260be85af --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py @@ -0,0 +1,192 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0. + +import re + +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test + +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.streams import StaticMemberPersistentProcessIdTestService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.tests.streams.utils import verify_running, verify_stopped, stop_processors + + +class StreamsStaticMembershipStreamsProtocolTest(Test): + """ + Streams group protocol specific static membership test. + This test verifies the following behavior under the streams group protocol: + + 1. The bounced static member reuses the same persisted processId. + 2. Surviving members do not reconcile because of another member's temporary bounce. + """ + + input_topic = "inputTopic" + running_message = "REBALANCING -> RUNNING" + stopped_message = "Static membership persistent-process-id test closed" + processed_message = "PROCESSED" + + num_threads = 3 + num_bounces = 3 + group_protocol = "streams" + + initial_process_id_pattern = re.compile(r"No process id found on disk, got fresh process id ([0-9a-fA-F-]+)") + random_process_id_pattern = re.compile(r"Created new process id: ([0-9a-fA-F-]+)") + reused_process_id_pattern = re.compile(r"Reading UUID from process file: ([0-9a-fA-F-]+)") + + def __init__(self, test_context): + super(StreamsStaticMembershipStreamsProtocolTest, self).__init__(test_context) + self.topics = { + self.input_topic: {"partitions": 18}, + } + + self.kafka = KafkaService(self.test_context, num_nodes=3, + zk=None, topics=self.topics, controller_num_nodes_override=1) + + self.producer = VerifiableProducer(self.test_context, + 1, + self.kafka, + self.input_topic, + throughput=1000, + acks=1) + + @cluster(num_nodes=8) + @matrix(metadata_quorum=[quorum.isolated_kraft]) + def test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation(self, metadata_quorum): + self.kafka.start() + + processor1 = StaticMemberPersistentProcessIdTestService(self.test_context, self.kafka, "consumer-A", self.num_threads, self.group_protocol) + processor2 = StaticMemberPersistentProcessIdTestService(self.test_context, self.kafka, "consumer-B", self.num_threads, self.group_protocol) + processor3 = StaticMemberPersistentProcessIdTestService(self.test_context, self.kafka, "consumer-C", self.num_threads, self.group_protocol) + + processors = [processor1, processor2, processor3] + + self.producer.start() + + for processor in processors: + processor.CLEAN_NODE_ENABLED = False + processor.INPUT_TOPIC = self.input_topic + verify_running(processor, self.running_message) + + self.verify_processing(processors) + + baseline_process_ids = { + processor: self.assert_initial_process_id_persisted(processor) + for processor in processors + } + + for _ in range(self.num_bounces): + for bounced in processors: + checkpoints = { + processor: { + "log": self._line_count(processor, processor.LOG_FILE), + "stdout": self._line_count(processor, processor.STDOUT_FILE), + } + for processor in processors + } + + verify_stopped(bounced, self.stopped_message) + verify_running(bounced, self.running_message) + + self.assert_same_process_id_reused(bounced, checkpoints[bounced]["log"], baseline_process_ids[bounced]) + + for survivor in processors: + if survivor is bounced: + continue + self.assert_survivor_was_unaffected(survivor, checkpoints[survivor]["log"]) + + self.verify_processing(processors) + + stop_processors(processors, self.stopped_message) + + self.producer.stop() + self.kafka.stop(timeout_sec=120) + + def verify_processing(self, processors): + for processor in processors: + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + monitor.wait_until(self.processed_message, + timeout_sec=60,err_msg="Never saw processing of %s on %s" % (self.processed_message, str(processor.node.account)) + ) + + def thread_instance_ids(self, processor): + return ["%s-%d" % (processor.GROUP_INSTANCE_ID, thread_id) + for thread_id in range(1, self.num_threads + 1)] + + def _line_count(self, processor, path): + output = list( + processor.node.account.ssh_capture("awk 'END {print NR}' %s" % path, allow_fail=True) + ) + if not output: + return 0 + return int(output[0].strip() or 0) + + def _read_lines_since(self, processor, path, line_number): + first_line = max(1, line_number + 1) + return list( + processor.node.account.ssh_capture("sed -n '%d,$p' %s" % (first_line, path), allow_fail=True) + ) + + def assert_initial_process_id_persisted(self, processor): + log = "".join(self._read_lines_since(processor, processor.LOG_FILE, 0)) + + fresh_matches = self.initial_process_id_pattern.findall(log) + random_matches = self.random_process_id_pattern.findall(log) + + assert fresh_matches, ( + "Did not see initial persisted process id creation for %s" + % processor.GROUP_INSTANCE_ID + ) + assert not random_matches, ( + "Unexpected non-persistent process id creation for %s: %s" + % (processor.GROUP_INSTANCE_ID, random_matches) + ) + + return fresh_matches[-1] + + def assert_same_process_id_reused(self, processor, log_checkpoint, expected_process_id): + log = "".join( + self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint) + ) + + reused_matches = self.reused_process_id_pattern.findall(log) + + assert expected_process_id in reused_matches, ( + "Did not see reused process id %s for %s. saw=%s" + % (expected_process_id, processor.GROUP_INSTANCE_ID, reused_matches) + ) + assert "Created new process id:" not in log, ( + "Unexpected random process id creation after restart for %s" + % processor.GROUP_INSTANCE_ID + ) + assert "No process id found on disk, got fresh process id" not in log, ( + "Unexpected fresh process id creation after restart for %s" + % processor.GROUP_INSTANCE_ID + ) + + + def assert_survivor_was_unaffected(self, processor, log_checkpoint): + log = "".join( + self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint) + ) + + forbidden_patterns = [ + r"transitioned from STABLE to RECONCILING", + r"Target assignment updated from", + r"Assigned tasks with local epoch", + ] + + for thread_instance_id in self.thread_instance_ids(processor): + for pattern in forbidden_patterns: + full_pattern = r"instanceId=%s.*%s" % ( + re.escape(thread_instance_id), + pattern + ) + assert not re.search(full_pattern, log), ( + "Surviving static member %s unexpectedly logged forbidden pattern '%s' " + "during another member's bounce" + % (thread_instance_id, pattern) + ) \ No newline at end of file From 0d1a0d910b54c0c562ac0d8ab2f68a98a1084ebb Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Mon, 22 Jun 2026 10:09:58 +0900 Subject: [PATCH 4/4] Addressing review. --- ...icMemberPersistentProcessIdTestClient.java | 93 --------- .../streams/tests/StaticMemberTestClient.java | 13 ++ tests/kafkatest/services/streams.py | 30 +-- ...static_membership_streams_protocol_test.py | 192 ------------------ .../streams/streams_static_membership_test.py | 163 ++++++++++++++- 5 files changed, 172 insertions(+), 319 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java delete mode 100644 tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java deleted file mode 100644 index dcf89dcda5d44..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.tests; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.internals.Exit; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; - -import java.util.Objects; -import java.util.Properties; - -public class StaticMemberPersistentProcessIdTestClient { - - private static final String TEST_NAME = "StaticMemberPersistentProcessIdTestClient"; - - @SuppressWarnings("unchecked") - public static void main(final String[] args) throws Exception { - if (args.length < 1) { - System.err.println(TEST_NAME + " requires one argument (properties-file) but none provided: "); - } - - System.out.println("StreamsTest instance started"); - - final String propFileName = args[0]; - final Properties streamsProperties = Utils.loadProps(propFileName); - - final String groupInstanceId = - Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); - - System.out.println(TEST_NAME + " instance started with group.instance.id " + groupInstanceId); - System.out.println("props=" + streamsProperties); - System.out.flush(); - - final StreamsBuilder builder = new StreamsBuilder(); - final String inputTopic = (String) Objects.requireNonNull(streamsProperties.remove("input.topic")); - - final KStream dataStream = builder.stream(inputTopic); - dataStream.peek((k, v) -> System.out.printf("PROCESSED key=%s value=%s%n", k, v)); - - // Keep one persistent store so the Streams processId survives restart. - final KeyValueBytesStoreSupplier persistentStoreSupplier = - Stores.persistentKeyValueStore("process-id-store"); - dataStream.groupByKey().count(Materialized.as(persistentStoreSupplier)); - - final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, TEST_NAME); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - config.putAll(streamsProperties); - - final KafkaStreams streams = new KafkaStreams(builder.build(), config); - streams.setStateListener((newState, oldState) -> { - if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { - System.out.println("REBALANCING -> RUNNING"); - System.out.flush(); - } - }); - - streams.start(); - - Exit.addShutdownHook("streams-shutdown-hook", () -> { - System.out.println("closing Kafka Streams instance"); - System.out.flush(); - streams.close(); - System.out.println("Static membership persistent-process-id test closed"); - System.out.flush(); - }); - } -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java index 0ebfc0f1cfb4c..ff88afddd26ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -24,6 +24,9 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import java.util.Objects; import java.util.Properties; @@ -31,6 +34,7 @@ public class StaticMemberTestClient { private static final String TEST_NAME = "StaticMemberTestClient"; + private static final String PERSISTENT_PROCESS_ID_STORE_ENABLED = "persistent.process.id.store.enabled"; @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { @@ -56,6 +60,15 @@ public static void main(final String[] args) throws Exception { final KStream dataStream = builder.stream(inputTopic); dataStream.peek((k, v) -> System.out.printf("PROCESSED key=%s value=%s%n", k, v)); + final boolean persistentProcessIdStoreEnabled = Boolean.parseBoolean( + (String) streamsProperties.remove(PERSISTENT_PROCESS_ID_STORE_ENABLED) + ); + if (persistentProcessIdStoreEnabled) { + final KeyValueBytesStoreSupplier persistentStoreSupplier = + Stores.persistentKeyValueStore("process-id-store"); + dataStream.groupByKey().count(Materialized.as(persistentStoreSupplier)); + } + final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, TEST_NAME); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 6bbcf8da8963e..ddb1b2f0cb072 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -632,39 +632,17 @@ def prop_file(self): class StaticMemberTestService(StreamsTestBaseService): - def __init__(self, test_context, kafka, group_instance_id, num_threads): + def __init__(self, test_context, kafka, group_instance_id, num_threads, group_protocol="classic", + persistent_process_id_store_enabled=False): super(StaticMemberTestService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StaticMemberTestClient", "") self.INPUT_TOPIC = None self.GROUP_INSTANCE_ID = group_instance_id - self.NUM_THREADS = num_threads - def prop_file(self): - properties = {streams_property.STATE_DIR: self.state_dir, - streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), - streams_property.NUM_THREADS: self.NUM_THREADS, - consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID, - consumer_property.SESSION_TIMEOUT_MS: 60000, # set longer session timeout for static member test - 'input.topic': self.INPUT_TOPIC, - "acceptable.recovery.lag": "9223372036854775807" # enable a one-shot assignment - } - - - cfg = KafkaConfig(**properties) - return cfg.render() - - -class StaticMemberPersistentProcessIdTestService(StreamsTestBaseService): - def __init__(self, test_context, kafka, group_instance_id, num_threads, group_protocol): - super(StaticMemberPersistentProcessIdTestService, self).__init__(test_context, - kafka, - "org.apache.kafka.streams.tests.StaticMemberPersistentProcessIdTestClient", - "") - self.INPUT_TOPIC = None - self.GROUP_INSTANCE_ID = group_instance_id self.GROUP_PROTOCOL = group_protocol self.NUM_THREADS = num_threads + self.PERSISTENT_PROCESS_ID_STORE_ENABLED = persistent_process_id_store_enabled def prop_file(self): properties = {streams_property.STATE_DIR: self.state_dir, @@ -677,6 +655,8 @@ def prop_file(self): "acceptable.recovery.lag": "9223372036854775807" # enable a one-shot assignment } + if self.PERSISTENT_PROCESS_ID_STORE_ENABLED: + properties["persistent.process.id.store.enabled"] = "true" cfg = KafkaConfig(**properties) return cfg.render() diff --git a/tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py b/tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py deleted file mode 100644 index 5368260be85af..0000000000000 --- a/tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py +++ /dev/null @@ -1,192 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0. - -import re - -from ducktape.mark import matrix -from ducktape.mark.resource import cluster -from ducktape.tests.test import Test - -from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.streams import StaticMemberPersistentProcessIdTestService -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.tests.streams.utils import verify_running, verify_stopped, stop_processors - - -class StreamsStaticMembershipStreamsProtocolTest(Test): - """ - Streams group protocol specific static membership test. - This test verifies the following behavior under the streams group protocol: - - 1. The bounced static member reuses the same persisted processId. - 2. Surviving members do not reconcile because of another member's temporary bounce. - """ - - input_topic = "inputTopic" - running_message = "REBALANCING -> RUNNING" - stopped_message = "Static membership persistent-process-id test closed" - processed_message = "PROCESSED" - - num_threads = 3 - num_bounces = 3 - group_protocol = "streams" - - initial_process_id_pattern = re.compile(r"No process id found on disk, got fresh process id ([0-9a-fA-F-]+)") - random_process_id_pattern = re.compile(r"Created new process id: ([0-9a-fA-F-]+)") - reused_process_id_pattern = re.compile(r"Reading UUID from process file: ([0-9a-fA-F-]+)") - - def __init__(self, test_context): - super(StreamsStaticMembershipStreamsProtocolTest, self).__init__(test_context) - self.topics = { - self.input_topic: {"partitions": 18}, - } - - self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=None, topics=self.topics, controller_num_nodes_override=1) - - self.producer = VerifiableProducer(self.test_context, - 1, - self.kafka, - self.input_topic, - throughput=1000, - acks=1) - - @cluster(num_nodes=8) - @matrix(metadata_quorum=[quorum.isolated_kraft]) - def test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation(self, metadata_quorum): - self.kafka.start() - - processor1 = StaticMemberPersistentProcessIdTestService(self.test_context, self.kafka, "consumer-A", self.num_threads, self.group_protocol) - processor2 = StaticMemberPersistentProcessIdTestService(self.test_context, self.kafka, "consumer-B", self.num_threads, self.group_protocol) - processor3 = StaticMemberPersistentProcessIdTestService(self.test_context, self.kafka, "consumer-C", self.num_threads, self.group_protocol) - - processors = [processor1, processor2, processor3] - - self.producer.start() - - for processor in processors: - processor.CLEAN_NODE_ENABLED = False - processor.INPUT_TOPIC = self.input_topic - verify_running(processor, self.running_message) - - self.verify_processing(processors) - - baseline_process_ids = { - processor: self.assert_initial_process_id_persisted(processor) - for processor in processors - } - - for _ in range(self.num_bounces): - for bounced in processors: - checkpoints = { - processor: { - "log": self._line_count(processor, processor.LOG_FILE), - "stdout": self._line_count(processor, processor.STDOUT_FILE), - } - for processor in processors - } - - verify_stopped(bounced, self.stopped_message) - verify_running(bounced, self.running_message) - - self.assert_same_process_id_reused(bounced, checkpoints[bounced]["log"], baseline_process_ids[bounced]) - - for survivor in processors: - if survivor is bounced: - continue - self.assert_survivor_was_unaffected(survivor, checkpoints[survivor]["log"]) - - self.verify_processing(processors) - - stop_processors(processors, self.stopped_message) - - self.producer.stop() - self.kafka.stop(timeout_sec=120) - - def verify_processing(self, processors): - for processor in processors: - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: - monitor.wait_until(self.processed_message, - timeout_sec=60,err_msg="Never saw processing of %s on %s" % (self.processed_message, str(processor.node.account)) - ) - - def thread_instance_ids(self, processor): - return ["%s-%d" % (processor.GROUP_INSTANCE_ID, thread_id) - for thread_id in range(1, self.num_threads + 1)] - - def _line_count(self, processor, path): - output = list( - processor.node.account.ssh_capture("awk 'END {print NR}' %s" % path, allow_fail=True) - ) - if not output: - return 0 - return int(output[0].strip() or 0) - - def _read_lines_since(self, processor, path, line_number): - first_line = max(1, line_number + 1) - return list( - processor.node.account.ssh_capture("sed -n '%d,$p' %s" % (first_line, path), allow_fail=True) - ) - - def assert_initial_process_id_persisted(self, processor): - log = "".join(self._read_lines_since(processor, processor.LOG_FILE, 0)) - - fresh_matches = self.initial_process_id_pattern.findall(log) - random_matches = self.random_process_id_pattern.findall(log) - - assert fresh_matches, ( - "Did not see initial persisted process id creation for %s" - % processor.GROUP_INSTANCE_ID - ) - assert not random_matches, ( - "Unexpected non-persistent process id creation for %s: %s" - % (processor.GROUP_INSTANCE_ID, random_matches) - ) - - return fresh_matches[-1] - - def assert_same_process_id_reused(self, processor, log_checkpoint, expected_process_id): - log = "".join( - self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint) - ) - - reused_matches = self.reused_process_id_pattern.findall(log) - - assert expected_process_id in reused_matches, ( - "Did not see reused process id %s for %s. saw=%s" - % (expected_process_id, processor.GROUP_INSTANCE_ID, reused_matches) - ) - assert "Created new process id:" not in log, ( - "Unexpected random process id creation after restart for %s" - % processor.GROUP_INSTANCE_ID - ) - assert "No process id found on disk, got fresh process id" not in log, ( - "Unexpected fresh process id creation after restart for %s" - % processor.GROUP_INSTANCE_ID - ) - - - def assert_survivor_was_unaffected(self, processor, log_checkpoint): - log = "".join( - self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint) - ) - - forbidden_patterns = [ - r"transitioned from STABLE to RECONCILING", - r"Target assignment updated from", - r"Assigned tasks with local epoch", - ] - - for thread_instance_id in self.thread_instance_ids(processor): - for pattern in forbidden_patterns: - full_pattern = r"instanceId=%s.*%s" % ( - re.escape(thread_instance_id), - pattern - ) - assert not re.search(full_pattern, log), ( - "Surviving static member %s unexpectedly logged forbidden pattern '%s' " - "during another member's bounce" - % (thread_instance_id, pattern) - ) \ No newline at end of file diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py index 96091e9fe8b13..d27016c249be6 100644 --- a/tests/kafkatest/tests/streams/streams_static_membership_test.py +++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re + from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.tests.test import Test @@ -31,6 +33,13 @@ class StreamsStaticMembershipTest(Test): pattern = 'PROCESSED' running_message = 'REBALANCING -> RUNNING' stopped_message = 'Static membership test closed' + num_threads = 3 + num_bounces = 3 + streams_group_protocol = "streams" + + initial_process_id_pattern = re.compile(r"No process id found on disk, got fresh process id ([0-9a-fA-F-]+)") + random_process_id_pattern = re.compile(r"Created new process id: ([0-9a-fA-F-]+)") + reused_process_id_pattern = re.compile(r"Reading UUID from process file: ([0-9a-fA-F-]+)") def __init__(self, test_context): super(StreamsStaticMembershipTest, self).__init__(test_context) @@ -53,12 +62,7 @@ def __init__(self, test_context): def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, metadata_quorum): self.kafka.start() - numThreads = 3 - processor1 = StaticMemberTestService(self.test_context, self.kafka, "consumer-A", numThreads) - processor2 = StaticMemberTestService(self.test_context, self.kafka, "consumer-B", numThreads) - processor3 = StaticMemberTestService(self.test_context, self.kafka, "consumer-C", numThreads) - - processors = [processor1, processor2, processor3] + processors = self.create_processors(self.num_threads) self.producer.start() @@ -70,8 +74,7 @@ def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self self.verify_processing(processors) # do several rolling bounces - num_bounces = 3 - for i in range(0, num_bounces): + for i in range(0, self.num_bounces): for processor in processors: verify_stopped(processor, self.stopped_message) verify_running(processor, self.running_message) @@ -79,7 +82,7 @@ def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self stable_generation = -1 for processor in processors: generations = extract_generation_from_logs(processor) - num_bounce_generations = num_bounces * numThreads + num_bounce_generations = self.num_bounces * self.num_threads assert num_bounce_generations <= len(generations), \ "Smaller than minimum expected %d generation messages, actual %d" % (num_bounce_generations, len(generations)) @@ -97,6 +100,70 @@ def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self self.producer.stop() self.kafka.stop(timeout_sec=120) + @cluster(num_nodes=8) + @matrix(metadata_quorum=[quorum.isolated_kraft]) + def test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation(self, metadata_quorum): + self.kafka.start() + + processors = self.create_processors( + self.num_threads, + group_protocol=self.streams_group_protocol, + persistent_process_id_store_enabled=True + ) + + self.producer.start() + + initial_log_checkpoints = {} + for processor in processors: + processor.CLEAN_NODE_ENABLED = False + self.set_topics(processor) + initial_log_checkpoints[processor] = self._line_count(processor, processor.LOG_FILE) + verify_running(processor, self.running_message) + + self.verify_processing(processors) + + baseline_process_ids = { + processor: self.assert_initial_process_id_persisted(processor, initial_log_checkpoints[processor]) + for processor in processors + } + + for _ in range(self.num_bounces): + for bounced in processors: + checkpoints = { + processor: self._line_count(processor, processor.LOG_FILE) + for processor in processors + } + + verify_stopped(bounced, self.stopped_message) + verify_running(bounced, self.running_message) + + self.assert_same_process_id_reused( + bounced, + checkpoints[bounced], + baseline_process_ids[bounced] + ) + + for survivor in processors: + if survivor is not bounced: + self.assert_survivor_was_unaffected(survivor, checkpoints[survivor]) + + self.verify_processing(processors) + + stop_processors(processors, self.stopped_message) + + self.producer.stop() + self.kafka.stop(timeout_sec=120) + + def create_processors(self, num_threads, group_protocol="classic", persistent_process_id_store_enabled=False): + return [ + StaticMemberTestService(self.test_context, self.kafka, "consumer-A", num_threads, group_protocol, + persistent_process_id_store_enabled), + StaticMemberTestService(self.test_context, self.kafka, "consumer-B", num_threads, group_protocol, + persistent_process_id_store_enabled), + StaticMemberTestService(self.test_context, self.kafka, "consumer-C", num_threads, group_protocol, + persistent_process_id_store_enabled) + ] + def verify_processing(self, processors): for processor in processors: with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: @@ -106,3 +173,81 @@ def verify_processing(self, processors): def set_topics(self, processor): processor.INPUT_TOPIC = self.input_topic + + def thread_instance_ids(self, processor): + return ["%s-%d" % (processor.GROUP_INSTANCE_ID, thread_id) + for thread_id in range(1, self.num_threads + 1)] + + def _line_count(self, processor, path): + output = list( + processor.node.account.ssh_capture("awk 'END {print NR}' %s" % path, allow_fail=True) + ) + if not output: + return 0 + return int(output[0].strip() or 0) + + def _read_lines_since(self, processor, path, line_number): + first_line = max(1, line_number + 1) + return list( + processor.node.account.ssh_capture("sed -n '%d,$p' %s" % (first_line, path), allow_fail=True) + ) + + def assert_initial_process_id_persisted(self, processor, log_checkpoint): + log = "".join(self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint)) + + fresh_matches = self.initial_process_id_pattern.findall(log) + random_matches = self.random_process_id_pattern.findall(log) + + assert fresh_matches, ( + "Did not see initial persisted process id creation for %s" + % processor.GROUP_INSTANCE_ID + ) + assert not random_matches, ( + "Unexpected non-persistent process id creation for %s: %s" + % (processor.GROUP_INSTANCE_ID, random_matches) + ) + + return fresh_matches[-1] + + def assert_same_process_id_reused(self, processor, log_checkpoint, expected_process_id): + log = "".join( + self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint) + ) + + reused_matches = self.reused_process_id_pattern.findall(log) + + assert expected_process_id in reused_matches, ( + "Did not see reused process id %s for %s. saw=%s" + % (expected_process_id, processor.GROUP_INSTANCE_ID, reused_matches) + ) + assert "Created new process id:" not in log, ( + "Unexpected random process id creation after restart for %s" + % processor.GROUP_INSTANCE_ID + ) + assert "No process id found on disk, got fresh process id" not in log, ( + "Unexpected fresh process id creation after restart for %s" + % processor.GROUP_INSTANCE_ID + ) + + def assert_survivor_was_unaffected(self, processor, log_checkpoint): + log = "".join( + self._read_lines_since(processor, processor.LOG_FILE, log_checkpoint) + ) + + forbidden_patterns = [ + r"transitioned from STABLE to RECONCILING", + r"Target assignment updated from", + r"Assigned tasks with local epoch", + ] + + for thread_instance_id in self.thread_instance_ids(processor): + for pattern in forbidden_patterns: + full_pattern = r"instanceId=%s.*%s" % ( + re.escape(thread_instance_id), + pattern + ) + assert not re.search(full_pattern, log), ( + "Surviving static member %s unexpectedly logged forbidden pattern '%s' " + "during another member's bounce" + % (thread_instance_id, pattern) + )