Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,11 @@ private void onErrorResponse(final StreamsGroupHeartbeatResponse response, final
heartbeatRequestState.reset();
break;

case UNRELEASED_INSTANCE_ID:
logger.error("StreamsGroupHeartbeatRequest failed due to {}: {}", error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;

case UNSUPPORTED_VERSION:
logger.error("StreamsGroupHeartbeatRequest failed due to {}: {}", error, UNSUPPORTED_VERSION_ERROR_MESSAGE);
handleFatalFailure(error.exception(UNSUPPORTED_VERSION_ERROR_MESSAGE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,7 @@ public void testTopicAuthorizationFailedErrorResponse() {
names = {
"INVALID_REQUEST",
"GROUP_MAX_SIZE_REACHED",
"UNRELEASED_INSTANCE_ID",
"UNSUPPORTED_VERSION",
"STREAMS_INVALID_TOPOLOGY",
"STREAMS_INVALID_TOPOLOGY_EPOCH",
Expand Down Expand Up @@ -1457,6 +1458,7 @@ private static Stream<Arguments> provideOtherErrors() {
Errors.GROUP_MAX_SIZE_REACHED,
Errors.FENCED_MEMBER_EPOCH,
Errors.UNKNOWN_MEMBER_ID,
Errors.UNRELEASED_INSTANCE_ID,
Errors.UNSUPPORTED_VERSION,
Errors.STREAMS_INVALID_TOPOLOGY,
Errors.STREAMS_INVALID_TOPOLOGY_EPOCH,
Expand Down Expand Up @@ -1493,6 +1495,30 @@ public void testPollOnCloseWhenIsLeaving() {
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch());
}

@ParameterizedTest
@EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = {"DEFAULT", "REMAIN_IN_GROUP"})
public void testPollOnCloseWhenStaticMemberIsLeaving(final CloseOptions.GroupMembershipOperation operation) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
when(membershipManager.isLeavingGroup()).thenReturn(true);
when(membershipManager.leaveGroupOperation()).thenReturn(operation);
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
when(membershipManager.groupId()).thenReturn(GROUP_ID);
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_STATIC_MEMBER_EPOCH);

NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(time.milliseconds());

assertEquals(1, result.unsentRequests.size());
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
StreamsGroupHeartbeatRequest streamsRequest =
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();

assertEquals(GROUP_ID, streamsRequest.data().groupId());
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
assertEquals(LEAVE_GROUP_STATIC_MEMBER_EPOCH, streamsRequest.data().memberEpoch());
assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
}

@Test
public void testMaximumTimeToWaitPollTimerExpired() {
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
Expand All @@ -60,6 +62,7 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
Expand All @@ -84,6 +87,7 @@
public class StreamsMembershipManagerTest {

private static final String GROUP_ID = "test-group";
private static final String INSTANCE_ID = "instance-1";
private static final int MEMBER_EPOCH = 1;

private static final String SUBTOPOLOGY_ID_0 = "subtopology-0";
Expand Down Expand Up @@ -1452,7 +1456,7 @@ public void testOnHeartbeatRequestSkippedWhenInLeaving() {
public void testLeaveGroupEpochIsStaticMemberEpochForStaticMember() {
final StreamsMembershipManager staticMember = new StreamsMembershipManager(
GROUP_ID,
Optional.of("instance-1"),
Optional.of(INSTANCE_ID),
streamsRebalanceData, subscriptionState, backgroundEventHandler,
new LogContext("test"), time, new Metrics(time)
);
Expand All @@ -1463,7 +1467,7 @@ public void testLeaveGroupEpochIsStaticMemberEpochForStaticMember() {
public void testLeaveGroupEpochIsDynamicMemberEpochForStaticMemberWithLeaveGroupOperation() {
final StreamsMembershipManager staticMember = new StreamsMembershipManager(
GROUP_ID,
Optional.of("instance-1"),
Optional.of(INSTANCE_ID),
streamsRebalanceData, subscriptionState, backgroundEventHandler,
new LogContext("test"), time, new Metrics(time)
);
Expand All @@ -1476,7 +1480,7 @@ public void testLeaveGroupEpochIsDynamicMemberEpochForStaticMemberWithLeaveGroup
public void testLeaveGroupEpochIsStaticMemberEpochForStaticMemberWithRemainInGroup() {
final StreamsMembershipManager staticMember = new StreamsMembershipManager(
GROUP_ID,
Optional.of("instance-1"),
Optional.of(INSTANCE_ID),
streamsRebalanceData, subscriptionState, backgroundEventHandler,
new LogContext("test"), time, new Metrics(time)
);
Expand All @@ -1490,7 +1494,7 @@ public void testIsLeavingGroupReturnsTrueForStaticMemberWithRemainInGroupOperati
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");
final StreamsMembershipManager staticMember = new StreamsMembershipManager(
GROUP_ID,
Optional.of("instance-1"),
Optional.of(INSTANCE_ID),
streamsRebalanceData, subscriptionState, backgroundEventHandler,
new LogContext("test"), time, new Metrics(time)
);
Expand Down Expand Up @@ -1542,6 +1546,50 @@ public void testOnHeartbeatSuccessWhenInLeaving() {
verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
}

@ParameterizedTest
@MethodSource("staticMemberLeaveOnCloseOperations")
public void testStaticMemberUsesExpectedLeaveEpochOnClose(
final CloseOptions.GroupMembershipOperation operation,
final int expectedEpoch
) {
final Metrics localMetrics = new Metrics(time);
StreamsMembershipManager membershipManagerWithStaticMember = new StreamsMembershipManager(
GROUP_ID,
Optional.of(INSTANCE_ID),
streamsRebalanceData,
subscriptionState,
backgroundEventHandler,
new LogContext("test"),
time,
localMetrics
);
membershipManagerWithStaticMember.registerStateListener(memberStateListener);
joining(membershipManagerWithStaticMember);

CompletableFuture<Void> onGroupLeft = membershipManagerWithStaticMember.leaveGroupOnClose(operation);

assertEquals(MemberState.LEAVING, membershipManagerWithStaticMember.state());
assertEquals(expectedEpoch, membershipManagerWithStaticMember.memberEpoch());
assertFalse(onGroupLeft.isDone());
}

private static Stream<Arguments> staticMemberLeaveOnCloseOperations() {
return Stream.of(
Arguments.of(
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP,
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH
),
Arguments.of(
CloseOptions.GroupMembershipOperation.DEFAULT,
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH
),
Arguments.of(
CloseOptions.GroupMembershipOperation.LEAVE_GROUP,
StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH
)
);
}

@Test
public void testOnHeartbeatSuccessWhenInUnsubscribeLeaveNotInProgress() {
membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(
Expand Down Expand Up @@ -2688,6 +2736,12 @@ private void joining() {
verifyInStateJoining(membershipManager);
}

private void joining(StreamsMembershipManager givenMembershipManager) {
givenMembershipManager.onSubscriptionUpdated();
givenMembershipManager.onConsumerPoll();
verifyInStateJoining(givenMembershipManager);
}

private void reconcile(final StreamsGroupHeartbeatResponse response) {
membershipManager.onHeartbeatSuccess(response);
membershipManager.poll(time.milliseconds());
Expand Down
2 changes: 1 addition & 1 deletion docs/streams/developer-guide/config-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter

### group.protocol

> The group protocol used by the Kafka Streams client used for coordination. It determines how the client will communicate with the Kafka brokers and other clients in the same group. The default value is `"classic"`, which is the classic consumer group protocol. Can be set to `"streams"` (requires broker-side enablement) to enable the new Kafka Streams group protocol.
> The group protocol used by the Kafka Streams client used for coordination. It determines how the client will communicate with the Kafka brokers and other clients in the same group. The default value is `"classic"`, which is the classic consumer group protocol. Can be set to `"streams"` (requires broker-side enablement) to enable the new Kafka Streams group protocol. When set to `"streams"`, `group.instance.id` can be used for static membership.

### rack.aware.assignment.non_overlap_cost

Expand Down
4 changes: 2 additions & 2 deletions docs/streams/developer-guide/streams-rebalance-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ The following features are available in the current release:

* **Offline Migration**: After shutting down all members and waiting for their `session.timeout.ms` to expire (or forcing an explicit group leave), a classic group can be converted to a streams group and a streams group can be converted to a classic group. The only broker-side group data that will be preserved are the committed offsets. Internal topics (changelog and repartition topics) will continue to exist as regular Kafka topics.

* **Static Membership**: Streams applications can configure `group.instance.id` when using `group.protocol=streams`. Kafka Streams derives unique group instance IDs for its stream threads internally.

# What's Not Supported in This Version

The following features are not yet available and should be avoided when using the new protocol:

* **Static Membership**: Setting a client `instance.id` will be rejected.

* **Topology Updates**: If a topology is changed significantly (e.g., by adding new source topics or changing the number of subtopologies), a new streams group must be created.

* **High Availability Assignor**: Only the sticky assignor is supported. This implies that "warmup tasks" and rack aware assignment are not supported yet.
Expand Down
4 changes: 3 additions & 1 deletion docs/streams/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Kafka Streams now persists state store changelog offsets inside each state store

As part of KIP-1035, the per-store changelog offset is written into RocksDB on each commit and is made durable on disk only when RocksDB flushes its memtable to an SST file — either organically once the memtable fills `write_buffer_size` (16 MB by default), or on a clean store close. Earlier releases force-flushed RocksDB on every commit. A consequence is that for a **low-traffic store** whose memtable rarely fills, the on-disk offset can lag the store's actual position until the next clean shutdown. For the durability model and guidance on tuning flush frequency for low-traffic stores, see [Memory Management: RocksDB](/{{version}}/documentation/streams/developer-guide/memory-mgmt.html#rocksdb-offset-durability). If the process then exits uncleanly (for example SIGKILL/OOM-kill, or a KafkaStreams#close that does not complete within the shutdown grace period) and changelog retention or compaction has since advanced the changelog's log-start offset past that stale offset, the restore consumer seeks out of range on restart — logged as OffsetOutOfRangeException/TaskCorruptedException — and the task is automatically re-initialized from the changelog (no data loss, but a full re-restore).

Kafka Streams now supports static membership with the Streams Rebalance Protocol. Applications using `group.protocol=streams` may configure `group.instance.id`; Kafka Streams derives unique group instance IDs for its stream threads internally.

### Header-aware state stores for the Processor API (KIP-1271) {#kip-1271-headers-aware-stores}

Kafka Streams adds **header-aware** state stores. Opt in with the new `Stores` suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` factories. For example:
Expand Down Expand Up @@ -183,7 +185,7 @@ This Early Access release covers a subset of the functionality detailed in [KIP-

**What's Not Included in Early Access**

* **Static Membership:** Setting a client `instance.id` will be rejected.
* **Static Membership:** Setting `group.instance.id` was rejected in the 4.1 Early Access release. Static membership is supported with the Streams Rebalance Protocol starting in 4.3.0.
* **Topology Updates:** If a topology is changed significantly (e.g., by adding new source topics or changing the number of sub-topologies), a new streams group must be created.
* **High Availability Assignor:** Only the sticky assignor is supported.
* **Regular Expressions:** Pattern-based topic subscription is not supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -49,6 +51,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.File;
import java.io.IOException;
Expand All @@ -72,6 +76,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {

protected static final String INPUT_TOPIC = "inputTopic";
protected static final String OUTPUT_TOPIC = "outputTopic";
protected static final String GROUP_INSTANCE_ID = "someGroupInstance";

protected Properties streamsConfig;
protected static KafkaStreams streams;
Expand Down Expand Up @@ -111,7 +116,7 @@ public void before(final TestInfo testName) throws Exception {

streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, GROUP_INSTANCE_ID);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath());
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Expand Down Expand Up @@ -246,6 +251,48 @@ public void testCloseOptionsRemainInGroupStreamsProtocol() throws Exception {
"Group should still have a member after REMAIN_IN_GROUP close under Streams protocol");
}

@ParameterizedTest
@EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = {"DEFAULT", "REMAIN_IN_GROUP"})
public void testStaticMemberCloseUsesStaticLeaveEpochStreamsProtocol(
final CloseOptions.GroupMembershipOperation operation
) throws Exception {
final int numStreamThreads = 2;
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());

streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close(CloseOptions.groupMembershipOperation(operation)
.withTimeout(Duration.ofSeconds(30)));

waitForStaticStreamsGroupMembersEpoch(
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
GROUP_INSTANCE_ID,
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
numStreamThreads
);
}

@Test
public void testStaticMemberLeaveGroupStreamsProtocol() throws Exception {
streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());

streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)
.withTimeout(Duration.ofSeconds(30)));

waitForEmptyStreamGroup(
adminClient,
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
0
);
}

protected Topology setupTopologyWithoutIntermediateUserTopic() {
final StreamsBuilder builder = new StreamsBuilder();

Expand All @@ -272,4 +319,27 @@ private void add10InputElements() {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds());
}
}

private void waitForStaticStreamsGroupMembersEpoch(
final String applicationId,
final String baseInstanceId,
final int expectedEpoch,
final int expectedMemberCount
) throws Exception {
TestUtils.waitForCondition(() -> {
final StreamsGroupDescription groupDescription =
adminClient.describeStreamsGroups(Collections.singletonList(applicationId))
.describedGroups()
.get(applicationId)
.get();

return groupDescription.members().size() == expectedMemberCount &&
groupDescription.members().stream().allMatch(member ->
member.instanceId()
.filter(instanceId -> instanceId.startsWith(baseInstanceId + "-"))
.isPresent() &&
member.memberEpoch() == expectedEpoch);
}, "Static streams group members did not reach expected member epoch " + expectedEpoch);
}

}
Loading
Loading