From 36324581ef2409f542e6be5a967ff67f913500a2 Mon Sep 17 00:00:00 2001 From: Dan O'Reilly Date: Mon, 15 Jun 2026 13:59:53 -0700 Subject: [PATCH] KAFKA-12759: Rebalance when a static member's subscription changes (classic protocol) Under the classic rebalance protocol with static membership, a static member that rejoins during the Stable state with the same selected protocol but a different set of subscribed topics did not trigger a rebalance. The newly subscribed topics (for example those discovered by a topics.regex member) were therefore never assigned to the group until the whole group was bounced for longer than session.timeout.ms. This is a consistency gap rather than a new capability. Every other classic-group join path that can change a member's subscription already rebalances: dynamic and known-member rejoins go through classicGroupJoinExistingMember, which rebalances whenever !member.matches(request.protocols()) (a byte equality of the embedded protocol metadata, so any subscription change trips it), and the classic-to-consumer upgrade bridge already bumps the group epoch via hasMemberSubscriptionChanged. The static-member replace path (updateStaticMemberThenRebalanceOrCompleteJoin) was the sole exception: to avoid needless rebalances when a static instance simply reconnects with a new member id, it short-circuited on the selected protocol name alone and never performed the metadata comparison the other paths rely on. This patch closes the gap by also comparing the rejoining member's own previous and new subscribed topics. The subscription is parsed with the version-safe V0 prefix already used by ClassicGroup#computeSubscribedTopics (the coordinator has parsed the embedded classic subscription this way for years, e.g. for offset expiration), so no new parsing capability is introduced. Only the subscribed topics are compared; owned partitions and user data do not require a new assignment and must not trigger a rebalance. When the consumer protocol cannot be parsed the coordinator falls back to its protocol-based behaviour, mirroring computeSubscribedTopics. The previous subscription is captured before the member's protocols are replaced. Tests: GroupMetadataManagerTest covers a subscription that grows and one that shrinks (rebalance), an owned-partitions-only change and an unchanged subscription across protocol versions (no rebalance), and a multi-member static group asserting the decision is based on the rejoining member's own subscription rather than the group-wide union. JoinGroupRequestTest adds an end-to-end test reproducing the reported scenario against a broker: a static member that rejoins with an additional topic triggers a rebalance. --- .../kafka/server/JoinGroupRequestTest.scala | 74 +++++ .../group/GroupMetadataManager.java | 84 +++++- .../group/GroupMetadataManagerTest.java | 285 ++++++++++++++++++ 3 files changed, 436 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala index 0f2ab3669c90d..7136b9d965aa1 100644 --- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala @@ -294,6 +294,80 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas } } + @ClusterTest + def testStaticMemberRejoiningWithChangedSubscriptionTriggersRebalance(): Unit = { + // Reproduces KAFKA-12759: a static member that rejoins with an additional subscribed topic must trigger + // a rebalance under the classic protocol, otherwise the newly subscribed topic is never assigned. + // Creates the __consumer_offsets topic because it won't be created automatically in this test + // because it does not use the FindCoordinator API. + createOffsetsTopic() + + createTopic(topic = "foo", numPartitions = 3) + createTopic(topic = "bar", numPartitions = 3) + + val version = ApiKeys.JOIN_GROUP.latestVersion(isUnstableApiEnabled) + + val fooSubscription = ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo")) + ).array + + val fooBarSubscription = ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription(List("foo", "bar").asJava) + ).array + + // A static member joins the group subscribed to foo. + val joinResponseData = sendJoinRequest( + groupId = "grp", + groupInstanceId = "group-instance-id", + metadata = fooSubscription, + version = version + ) + val memberId = joinResponseData.memberId + assertEquals(Errors.NONE.code, joinResponseData.errorCode) + assertEquals(1, joinResponseData.generationId) + + // Sync the member to move the group to Stable. + verifySyncGroupWithOldProtocol( + groupId = "grp", + memberId = memberId, + generationId = 1, + assignments = List(new SyncGroupRequestData.SyncGroupRequestAssignment() + .setMemberId(memberId) + .setAssignment(Array[Byte](1)) + ), + expectedAssignment = Array[Byte](1) + ) + + TestUtils.waitUntilTrue(() => { + val described = describeGroups(groupIds = List("grp")) + ClassicGroupState.STABLE.toString == described.head.groupState + }, msg = "The group is not in STABLE state.") + + // The static member rejoins with an additional subscribed topic. This must trigger a rebalance. + val rejoinResponseData = sendJoinRequest( + groupId = "grp", + groupInstanceId = "group-instance-id", + metadata = fooBarSubscription, + version = version + ) + + assertEquals(Errors.NONE.code, rejoinResponseData.errorCode) + assertEquals(2, rejoinResponseData.generationId) + + leaveGroup( + groupId = "grp", + memberId = rejoinResponseData.memberId, + useNewProtocol = false, + version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled) + ) + + deleteGroups( + groupIds = List("grp"), + expectedErrors = List(Errors.NONE), + version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled) + ) + } + private def testFencedStaticGroup( leaderMemberId: String, followerMemberId: String, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1f09b2908d91f..21ea4a3a913eb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -7819,6 +7819,9 @@ private CoordinatorResult updateStaticMemberThenRebalan int oldRebalanceTimeoutMs = newMember.rebalanceTimeoutMs(); int oldSessionTimeoutMs = newMember.sessionTimeoutMs(); JoinGroupRequestProtocolCollection oldProtocols = newMember.supportedProtocols(); + // Capture the member's subscribed topics before group.updateMember replaces its protocols below, so + // that they can be compared against the rejoining member's subscription further down. + Optional> oldSubscribedTopics = parseSubscribedTopics(group, oldProtocols, group.protocolName().orElse(null)); group.updateMember( newMember, @@ -7829,14 +7832,21 @@ private CoordinatorResult updateStaticMemberThenRebalan ); if (group.isInState(STABLE)) { - // Check if group's selected protocol of next generation will change, if not, simply store group to persist - // the updated static member, if yes, rebalance should be triggered to keep the group's assignment - // and selected protocol consistent + // Check if the group's selected protocol of the next generation will change, or if the member's + // set of subscribed topics changed. If neither changed, simply store the group to persist the + // updated static member. Otherwise, a rebalance should be triggered to keep the group's assignment, + // selected protocol and subscription consistent. String groupInstanceId = request.groupInstanceId(); String selectedProtocolForNextGeneration = group.selectProtocol(); - if (group.protocolName().orElse("").equals(selectedProtocolForNextGeneration)) { + boolean protocolUnchanged = group.protocolName().orElse("").equals(selectedProtocolForNextGeneration); + // A static member may rejoin with the same selected protocol but a different subscription (for + // example a topics.regex member that discovered new topics). When that happens the coordinator + // must rebalance, otherwise the newly subscribed topics would never be assigned to the group. + Optional> newSubscribedTopics = parseSubscribedTopics(group, newMember.supportedProtocols(), selectedProtocolForNextGeneration); + boolean subscriptionUnchanged = oldSubscribedTopics.equals(newSubscribedTopics); + if (protocolUnchanged && subscriptionUnchanged) { log.info("Static member which joins during Stable stage and doesn't affect " + - "the selected protocol will not trigger a rebalance."); + "the selected protocol or the subscribed topics will not trigger a rebalance."); Map groupAssignment = group.groupAssignment(); CompletableFuture appendFuture = new CompletableFuture<>(); @@ -7891,11 +7901,19 @@ private CoordinatorResult updateStaticMemberThenRebalan return new CoordinatorResult<>(records, appendFuture, false); } else { + String change; + if (!protocolUnchanged && !subscriptionUnchanged) { + change = "protocol and subscribed topics"; + } else if (!protocolUnchanged) { + change = "protocol"; + } else { + change = "subscribed topics"; + } return maybePrepareRebalanceOrCompleteJoin( group, - "Group's selectedProtocol will change because static member " + + "Group's assignment will change because static member " + newMember.memberId() + " with instance id " + groupInstanceId + - " joined with change of protocol; client reason: " + JoinGroupRequest.joinReason(request) + " joined with change of " + change + "; client reason: " + JoinGroupRequest.joinReason(request) ); } } else if (group.isInState(COMPLETING_REBALANCE)) { @@ -7916,6 +7934,58 @@ private CoordinatorResult updateStaticMemberThenRebalan return maybeCompleteJoinPhase(group); } + /** + * Parses the set of topics a classic member is subscribed to under the given protocol. This is used to + * detect a subscription change when a static member rejoins during the Stable stage with the same + * selected protocol but a different subscription (for example a topics.regex member that discovered new + * topics). Without it the coordinator would return the cached assignment and the newly subscribed topics + * would never be assigned to the group. + * + *

Only the subscribed topics are extracted. Other parts of the embedded subscription, such as the + * owned partitions or the user data, do not require a new assignment and must not trigger a rebalance. + * + *

Empty is returned when the group is not using the consumer protocol, when no protocol is selected, + * or when the embedded consumer protocol cannot be parsed. In those cases the coordinator falls back to + * its protocol-based behaviour: two members whose subscriptions are both empty compare equal and do not + * trigger a rebalance. This mirrors {@link ClassicGroup#computeSubscribedTopics()}, which also leaves the + * coordinator unaware of the subscribed topics when the consumer protocol cannot be parsed. + * + * @param group The group. + * @param protocols The member's supported protocols. + * @param protocolName The protocol whose subscription is extracted, or null if none is selected. + * @return The set of subscribed topics, or empty if it cannot be determined. + */ + private Optional> parseSubscribedTopics( + ClassicGroup group, + JoinGroupRequestProtocolCollection protocols, + String protocolName + ) { + if (protocolName == null || !group.usesConsumerGroupProtocol()) { + return Optional.empty(); + } + + JoinGroupRequestProtocol protocol = protocols.find(protocolName); + if (protocol == null) { + return Optional.empty(); + } + + try { + // The consumer protocol is parsed with V0 which is the based prefix of all versions. + // This way the consumer group manager does not depend on any specific existing or + // future versions of the consumer protocol. VO must prefix all new versions. + ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); + ConsumerProtocol.deserializeVersion(buffer); + return Optional.of(new HashSet<>( + ConsumerProtocol.deserializeConsumerProtocolSubscription(buffer, (short) 0).topics() + )); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol {}:{} of group {}. Consumer group coordinator is not aware of the subscribed topics.", + ConsumerProtocol.PROTOCOL_TYPE, protocolName, group.groupId(), e); + } + + return Optional.empty(); + } + /** * Handle a SyncGroupRequest. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index f6e4033d9aa2e..282afbe2fd7ea 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -7091,6 +7091,291 @@ public void testReplaceStaticMemberInStableStateWithUpdatedProtocolTriggersRebal assertTrue(group.isInState(COMPLETING_REBALANCE)); } + @Test + public void testReplaceStaticMemberInStableStateWithUpdatedSubscriptionTriggersRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same selected protocol but a larger + // set of subscribed topics (for example a topics.regex member that discovered a new topic) must + // trigger a rebalance so that the newly subscribed topic is assigned. See KAFKA-12759. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of())) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and an additional subscribed topic. The selected + // protocol does not change but the subscription does, which triggers a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of())) + ); + + assertTrue(joinResult.records.isEmpty()); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + } + + @Test + public void testReplaceStaticMemberInStableStateWithShrunkSubscriptionTriggersRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same selected protocol but a smaller + // set of subscribed topics must also trigger a rebalance, so that the partitions of the topics it no + // longer subscribes to are revoked and reassigned. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of())) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and a smaller subscription. The selected protocol does + // not change but the subscription does, which triggers a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of())) + ); + + assertTrue(joinResult.records.isEmpty()); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + } + + @Test + public void testReplaceStaticMemberInStableStateWithUpdatedOwnedPartitionsDoesNotTriggerRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same subscribed topics but different + // owned partitions must not trigger a rebalance. Only the subscribed topics drive the assignment. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of(new TopicPartition("foo", 0)))) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + String oldMemberId = response.memberId(); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID, the same subscribed topics but different owned + // partitions. This must not trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)))), + true, + true + ); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())), + joinResult.records + ); + assertFalse(joinResult.joinFuture.isDone()); + + // Write was successful. + joinResult.appendFuture.complete(null); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertNotEquals(oldMemberId, group.staticMemberId("group-instance-id")); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(STABLE)); + } + + @Test + public void testReplaceStaticMemberInStableStateWithUnchangedSubscriptionDoesNotTriggerRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same subscribed topics, even when + // serialized with a different consumer protocol version, must not trigger a rebalance. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of(), + ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION)) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + String oldMemberId = response.memberId(); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and the same subscribed topics serialized with the + // highest supported version. This must not trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of(), + ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION)), + true, + true + ); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())), + joinResult.records + ); + assertFalse(joinResult.joinFuture.isDone()); + + // Write was successful. + joinResult.appendFuture.complete(null); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertNotEquals(oldMemberId, group.staticMemberId("group-instance-id")); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(STABLE)); + } + + @Test + public void testReplaceStaticMemberInStableStateOnlyComparesTheRejoiningMemberSubscription() throws Exception { + // In a multi-member static group, the rejoin decision must be based on the rejoining member's own + // subscription change, not on the group-wide union of subscribed topics. This test sets up two static + // members subscribed to different topics and verifies that: + // 1) a member rejoining with an unchanged subscription does not trigger a rebalance, even though its + // subscription differs from the group-wide union; and + // 2) a member rejoining with an expanded subscription does trigger a rebalance. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestProtocolCollection fooProtocols = + GroupMetadataManagerTestContext.toConsumerProtocol(List.of("foo"), List.of()); + JoinGroupRequestProtocolCollection barProtocols = + GroupMetadataManagerTestContext.toConsumerProtocol(List.of("bar"), List.of()); + + // Build a stable group with two static members subscribed to different topics. The group-wide + // subscription is therefore {foo, bar}. This is set up directly rather than via + // staticMembersJoinAndRebalance because that helper subscribes all members to the same topics. + group.setProtocolName(Optional.of("range")); + group.add(new ClassicGroupMember( + "member-1", + Optional.of("instance-1"), + "client-id", + "client-host", + 10000, + 5000, + "consumer", + fooProtocols, + new byte[]{1} + )); + group.add(new ClassicGroupMember( + "member-2", + Optional.of("instance-2"), + "client-id", + "client-host", + 10000, + 5000, + "consumer", + barProtocols, + new byte[]{2} + )); + group.setLeaderId(Optional.of("member-1")); + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(STABLE); + group.setSubscribedTopics(group.computeSubscribedTopics()); + assertEquals(Optional.of(Set.of("foo", "bar")), group.subscribedTopics()); + int initialGenerationId = group.generationId(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("instance-1") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(fooProtocols) + .build(); + + // Member 1 rejoins with its unchanged {foo} subscription. Even though {foo} differs from the group-wide + // union {foo, bar}, this must not trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult unchangedJoinResult = context.sendClassicGroupJoin(request); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())), + unchangedJoinResult.records + ); + assertFalse(unchangedJoinResult.joinFuture.isDone()); + unchangedJoinResult.appendFuture.complete(null); + assertTrue(unchangedJoinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), unchangedJoinResult.joinFuture.get().errorCode()); + assertEquals(2, group.numMembers()); + assertEquals(initialGenerationId, group.generationId()); + assertTrue(group.isInState(STABLE)); + + // Member 1 rejoins with an expanded {foo, bar} subscription. This must trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult changedJoinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(List.of("foo", "bar"), List.of())) + ); + + // The group transitions to PreparingRebalance, which confirms the subscription change triggered a + // rebalance. The generation is only bumped once the rebalance completes. + assertTrue(changedJoinResult.records.isEmpty()); + assertEquals(2, group.numMembers()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + } + @Test public void testReplaceStaticMemberInStableStateErrors() throws Exception { // If the append future fails, confirm that the member is not updated.