Skip to content

KAFKA-20169: Add ducktape test code for Streams Rebalance Protocol.#22561

Open
chickenchickenlove wants to merge 4 commits into
apache:trunkfrom
chickenchickenlove:KAFKA-20169-DUCK
Open

KAFKA-20169: Add ducktape test code for Streams Rebalance Protocol.#22561
chickenchickenlove wants to merge 4 commits into
apache:trunkfrom
chickenchickenlove:KAFKA-20169-DUCK

Conversation

@chickenchickenlove

@chickenchickenlove chickenchickenlove commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Summary

This PR is based on the KAFKA-20169-CLIENT-CORE branch.
(#22559) It adds a new ducktape
system test to verify that Static Membership works correctly with the
Streams rebalance protocol.

The new test covers the following behavior:

  • A bounced static Streams member reuses its persisted processId after
    restart.
  • Surviving static members are not forced into reconciliation when
    another static member temporarily bounces and rejoins.
  • The test runs with the Streams group protocol and repeatedly bounces
    each static member to validate stable behavior across restarts.

Changes

  • Added StaticMemberPersistentProcessIdTestClient, a Streams test
    client with a persistent state store so the Streams processId is
    written to disk and reused after restart.
  • Added a corresponding StaticMemberPersistentProcessIdTestService for
    ducktape.
  • Added a new ducktape test:
    StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation

Test Results

[INFO:2026-06-12 20:00:12,560]: Discovered 1 tests to run
[INFO:2026-06-12 20:00:12,561]: starting test run with session id
2026-06-12--005...
[INFO:2026-06-12 20:00:12,561]: running 1 tests...
[INFO:2026-06-12 20:00:12,561]: Triggering test 1 of 1...
[INFO:2026-06-12 20:00:12,566]: RunnerClient: Loading test {'directory':
'/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name':
'streams_static_membership_streams_protocol_test.py', 'cls_name':
'StreamsStaticMembershipStreamsProtocolTest', 'method_name':
'test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation',
'injected_args': {'metadata_quorum': 'ISOLATED_KRAFT'}}
[INFO:2026-06-12 20:00:12,568]: RunnerClient:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT:
on run 1/1
[INFO:2026-06-12 20:00:12,569]: RunnerClient:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT:
Setting up...
[INFO:2026-06-12 20:00:12,569]: RunnerClient:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT:
Running...
[INFO:2026-06-12 20:01:42,279]: RunnerClient:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT:
Tearing down...
[INFO:2026-06-12 20:02:04,736]: RunnerClient:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT:
PASS
[INFO:2026-06-12 20:02:04,738]: RunnerClient:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT:
Data: None
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.14.0
session_id:       2026-06-12--005
run time:         1 minute 52.289 seconds
tests run:        1
passed:           1
flaky:            0
failed:           0
ignored:          0
================================================================================
test_id:
kafkatest.tests.streams.streams_static_membership_streams_protocol_test.StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT
status:     PASS
run time:   1 minute 52.168 seconds

Reviewers: Lucas Brutschy lbrutschy@confluent.io

@lucasbru lucasbru requested review from Copilot and lucasbru and removed request for Copilot June 16, 2026 07:34
@lucasbru lucasbru self-assigned this Jun 16, 2026
@lucasbru lucasbru requested a review from Copilot June 16, 2026 07:51

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end ducktape coverage for Kafka Streams static membership when using the Streams rebalance protocol (group.protocol=streams), including a new Streams test client/service that persists the Streams processId across restarts and assertions that surviving members avoid reconciliation during a temporary bounce.

Changes:

  • Introduces a new ducktape system test that repeatedly bounces static Streams members and validates persisted processId reuse + no survivor reconciliation.
  • Adds a new Streams test client/service pair to persist state (and therefore the Streams processId) on disk across restarts.
  • Updates Streams/client logic and unit tests to allow static membership with the streams protocol and to handle UNRELEASED_INSTANCE_ID as a fatal heartbeat error.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py New ducktape test validating static membership stability under the Streams group protocol across rolling bounces.
tests/kafkatest/services/streams.py Adds a ducktape service wrapper for the new persistent-process-id Streams test client.
streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java New Streams system-test client that uses a persistent state store to ensure processId is reused after restart.
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java Updates config tests to assert group.instance.id is allowed when group.protocol=streams.
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java Removes the config-time rejection of static membership when the streams protocol is enabled.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java Adds unit coverage for static-member close semantics and expected leave epochs.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java Extends tests for UNRELEASED_INSTANCE_ID handling and validates poll-on-close request fields for static members.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java Treats UNRELEASED_INSTANCE_ID as a fatal heartbeat error.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +83 to +99
checkpoints = {
processor: {
"log": self._line_count(processor, processor.LOG_FILE),
"stdout": self._line_count(processor, processor.STDOUT_FILE),
}
for processor in processors
}

verify_stopped(bounced, self.stopped_message)
verify_running(bounced, self.running_message)

self.assert_same_process_id_reused(bounced, checkpoints[bounced]["log"], baseline_process_ids[bounced])

for survivor in processors:
if survivor is bounced:
continue
self.assert_survivor_was_unaffected(survivor, checkpoints[survivor]["log"])
Comment on lines +41 to +48
if (args.length < 1) {
System.err.println(TEST_NAME + " requires one argument (properties-file) but none provided: ");
}

System.out.println("StreamsTest instance started");

final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
@@ -0,0 +1,192 @@
# Licensed to the Apache Software Foundation (ASF) under one or more

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is largely a copy of the existing static membership test infra (StreamsStaticMembershipTest, StaticMemberTestService, StaticMemberTestClient). The only real new bits are the persistent store so the processId survives restart and group.protocol=streams. Could we instead adapt the existing test/service rather than forking new copies - e.g. parameterize StaticMemberTestService with group_protocol and add the persistent store to the existing client? The new assertion (survivor non-reconciliation / processId reuse) can be a new test method on the existing class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru
Thanks for the review!
You are right. I will fix this.

@chickenchickenlove

Copy link
Copy Markdown
Contributor Author

@lucasbru
Thanks for the review.
I've addressed it based on your comments and test results are here!

====================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.14.0
session_id:       2026-06-21--008
run time:         3 minutes 38.640 seconds
tests run:        2
passed:           2
flaky:            0
failed:           0
ignored:          0
====================================================================================================
test_id:    kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership.metadata_quorum=ISOLATED_KRAFT
status:     PASS
run time:   1 minute 37.775 seconds
----------------------------------------------------------------------------------------------------
test_id:    kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT
status:     PASS
run time:   2 minutes 0.611 seconds
----------------------------------------------------------------------------------------------------

When you get a chance, please take another look. 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants