diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala index 0f2ab3669c90d..7136b9d965aa1 100644 --- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala @@ -294,6 +294,80 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas } } + @ClusterTest + def testStaticMemberRejoiningWithChangedSubscriptionTriggersRebalance(): Unit = { + // Reproduces KAFKA-12759: a static member that rejoins with an additional subscribed topic must trigger + // a rebalance under the classic protocol, otherwise the newly subscribed topic is never assigned. + // Creates the __consumer_offsets topic because it won't be created automatically in this test + // because it does not use the FindCoordinator API. + createOffsetsTopic() + + createTopic(topic = "foo", numPartitions = 3) + createTopic(topic = "bar", numPartitions = 3) + + val version = ApiKeys.JOIN_GROUP.latestVersion(isUnstableApiEnabled) + + val fooSubscription = ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo")) + ).array + + val fooBarSubscription = ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription(List("foo", "bar").asJava) + ).array + + // A static member joins the group subscribed to foo. + val joinResponseData = sendJoinRequest( + groupId = "grp", + groupInstanceId = "group-instance-id", + metadata = fooSubscription, + version = version + ) + val memberId = joinResponseData.memberId + assertEquals(Errors.NONE.code, joinResponseData.errorCode) + assertEquals(1, joinResponseData.generationId) + + // Sync the member to move the group to Stable. + verifySyncGroupWithOldProtocol( + groupId = "grp", + memberId = memberId, + generationId = 1, + assignments = List(new SyncGroupRequestData.SyncGroupRequestAssignment() + .setMemberId(memberId) + .setAssignment(Array[Byte](1)) + ), + expectedAssignment = Array[Byte](1) + ) + + TestUtils.waitUntilTrue(() => { + val described = describeGroups(groupIds = List("grp")) + ClassicGroupState.STABLE.toString == described.head.groupState + }, msg = "The group is not in STABLE state.") + + // The static member rejoins with an additional subscribed topic. This must trigger a rebalance. + val rejoinResponseData = sendJoinRequest( + groupId = "grp", + groupInstanceId = "group-instance-id", + metadata = fooBarSubscription, + version = version + ) + + assertEquals(Errors.NONE.code, rejoinResponseData.errorCode) + assertEquals(2, rejoinResponseData.generationId) + + leaveGroup( + groupId = "grp", + memberId = rejoinResponseData.memberId, + useNewProtocol = false, + version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled) + ) + + deleteGroups( + groupIds = List("grp"), + expectedErrors = List(Errors.NONE), + version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled) + ) + } + private def testFencedStaticGroup( leaderMemberId: String, followerMemberId: String, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1f09b2908d91f..21ea4a3a913eb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -7819,6 +7819,9 @@ private CoordinatorResult updateStaticMemberThenRebalan int oldRebalanceTimeoutMs = newMember.rebalanceTimeoutMs(); int oldSessionTimeoutMs = newMember.sessionTimeoutMs(); JoinGroupRequestProtocolCollection oldProtocols = newMember.supportedProtocols(); + // Capture the member's subscribed topics before group.updateMember replaces its protocols below, so + // that they can be compared against the rejoining member's subscription further down. + Optional> oldSubscribedTopics = parseSubscribedTopics(group, oldProtocols, group.protocolName().orElse(null)); group.updateMember( newMember, @@ -7829,14 +7832,21 @@ private CoordinatorResult updateStaticMemberThenRebalan ); if (group.isInState(STABLE)) { - // Check if group's selected protocol of next generation will change, if not, simply store group to persist - // the updated static member, if yes, rebalance should be triggered to keep the group's assignment - // and selected protocol consistent + // Check if the group's selected protocol of the next generation will change, or if the member's + // set of subscribed topics changed. If neither changed, simply store the group to persist the + // updated static member. Otherwise, a rebalance should be triggered to keep the group's assignment, + // selected protocol and subscription consistent. String groupInstanceId = request.groupInstanceId(); String selectedProtocolForNextGeneration = group.selectProtocol(); - if (group.protocolName().orElse("").equals(selectedProtocolForNextGeneration)) { + boolean protocolUnchanged = group.protocolName().orElse("").equals(selectedProtocolForNextGeneration); + // A static member may rejoin with the same selected protocol but a different subscription (for + // example a topics.regex member that discovered new topics). When that happens the coordinator + // must rebalance, otherwise the newly subscribed topics would never be assigned to the group. + Optional> newSubscribedTopics = parseSubscribedTopics(group, newMember.supportedProtocols(), selectedProtocolForNextGeneration); + boolean subscriptionUnchanged = oldSubscribedTopics.equals(newSubscribedTopics); + if (protocolUnchanged && subscriptionUnchanged) { log.info("Static member which joins during Stable stage and doesn't affect " + - "the selected protocol will not trigger a rebalance."); + "the selected protocol or the subscribed topics will not trigger a rebalance."); Map groupAssignment = group.groupAssignment(); CompletableFuture appendFuture = new CompletableFuture<>(); @@ -7891,11 +7901,19 @@ private CoordinatorResult updateStaticMemberThenRebalan return new CoordinatorResult<>(records, appendFuture, false); } else { + String change; + if (!protocolUnchanged && !subscriptionUnchanged) { + change = "protocol and subscribed topics"; + } else if (!protocolUnchanged) { + change = "protocol"; + } else { + change = "subscribed topics"; + } return maybePrepareRebalanceOrCompleteJoin( group, - "Group's selectedProtocol will change because static member " + + "Group's assignment will change because static member " + newMember.memberId() + " with instance id " + groupInstanceId + - " joined with change of protocol; client reason: " + JoinGroupRequest.joinReason(request) + " joined with change of " + change + "; client reason: " + JoinGroupRequest.joinReason(request) ); } } else if (group.isInState(COMPLETING_REBALANCE)) { @@ -7916,6 +7934,58 @@ private CoordinatorResult updateStaticMemberThenRebalan return maybeCompleteJoinPhase(group); } + /** + * Parses the set of topics a classic member is subscribed to under the given protocol. This is used to + * detect a subscription change when a static member rejoins during the Stable stage with the same + * selected protocol but a different subscription (for example a topics.regex member that discovered new + * topics). Without it the coordinator would return the cached assignment and the newly subscribed topics + * would never be assigned to the group. + * + *

Only the subscribed topics are extracted. Other parts of the embedded subscription, such as the + * owned partitions or the user data, do not require a new assignment and must not trigger a rebalance. + * + *

Empty is returned when the group is not using the consumer protocol, when no protocol is selected, + * or when the embedded consumer protocol cannot be parsed. In those cases the coordinator falls back to + * its protocol-based behaviour: two members whose subscriptions are both empty compare equal and do not + * trigger a rebalance. This mirrors {@link ClassicGroup#computeSubscribedTopics()}, which also leaves the + * coordinator unaware of the subscribed topics when the consumer protocol cannot be parsed. + * + * @param group The group. + * @param protocols The member's supported protocols. + * @param protocolName The protocol whose subscription is extracted, or null if none is selected. + * @return The set of subscribed topics, or empty if it cannot be determined. + */ + private Optional> parseSubscribedTopics( + ClassicGroup group, + JoinGroupRequestProtocolCollection protocols, + String protocolName + ) { + if (protocolName == null || !group.usesConsumerGroupProtocol()) { + return Optional.empty(); + } + + JoinGroupRequestProtocol protocol = protocols.find(protocolName); + if (protocol == null) { + return Optional.empty(); + } + + try { + // The consumer protocol is parsed with V0 which is the based prefix of all versions. + // This way the consumer group manager does not depend on any specific existing or + // future versions of the consumer protocol. VO must prefix all new versions. + ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); + ConsumerProtocol.deserializeVersion(buffer); + return Optional.of(new HashSet<>( + ConsumerProtocol.deserializeConsumerProtocolSubscription(buffer, (short) 0).topics() + )); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol {}:{} of group {}. Consumer group coordinator is not aware of the subscribed topics.", + ConsumerProtocol.PROTOCOL_TYPE, protocolName, group.groupId(), e); + } + + return Optional.empty(); + } + /** * Handle a SyncGroupRequest. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index f6e4033d9aa2e..282afbe2fd7ea 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -7091,6 +7091,291 @@ public void testReplaceStaticMemberInStableStateWithUpdatedProtocolTriggersRebal assertTrue(group.isInState(COMPLETING_REBALANCE)); } + @Test + public void testReplaceStaticMemberInStableStateWithUpdatedSubscriptionTriggersRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same selected protocol but a larger + // set of subscribed topics (for example a topics.regex member that discovered a new topic) must + // trigger a rebalance so that the newly subscribed topic is assigned. See KAFKA-12759. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of())) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and an additional subscribed topic. The selected + // protocol does not change but the subscription does, which triggers a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of())) + ); + + assertTrue(joinResult.records.isEmpty()); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + } + + @Test + public void testReplaceStaticMemberInStableStateWithShrunkSubscriptionTriggersRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same selected protocol but a smaller + // set of subscribed topics must also trigger a rebalance, so that the partitions of the topics it no + // longer subscribes to are revoked and reassigned. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of())) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and a smaller subscription. The selected protocol does + // not change but the subscription does, which triggers a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of())) + ); + + assertTrue(joinResult.records.isEmpty()); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(1, group.numMembers()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + } + + @Test + public void testReplaceStaticMemberInStableStateWithUpdatedOwnedPartitionsDoesNotTriggerRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same subscribed topics but different + // owned partitions must not trigger a rebalance. Only the subscribed topics drive the assignment. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of(new TopicPartition("foo", 0)))) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + String oldMemberId = response.memberId(); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID, the same subscribed topics but different owned + // partitions. This must not trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo"), + List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)))), + true, + true + ); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())), + joinResult.records + ); + assertFalse(joinResult.joinFuture.isDone()); + + // Write was successful. + joinResult.appendFuture.complete(null); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertNotEquals(oldMemberId, group.staticMemberId("group-instance-id")); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(STABLE)); + } + + @Test + public void testReplaceStaticMemberInStableStateWithUnchangedSubscriptionDoesNotTriggerRebalance() throws Exception { + // A static member that rejoins during the Stable stage with the same subscribed topics, even when + // serialized with a different consumer protocol version, must not trigger a rebalance. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of(), + ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION)) + .build(); + + JoinGroupResponseData response = context.joinClassicGroupAndCompleteJoin(request, true, true); + assertEquals(Errors.NONE.code(), response.errorCode()); + String oldMemberId = response.memberId(); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + // Simulate successful sync group phase. + group.transitionTo(STABLE); + + // Static member rejoins with UNKNOWN_MEMBER_ID and the same subscribed topics serialized with the + // highest supported version. This must not trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol( + List.of("foo", "bar"), + List.of(), + ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION)), + true, + true + ); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())), + joinResult.records + ); + assertFalse(joinResult.joinFuture.isDone()); + + // Write was successful. + joinResult.appendFuture.complete(null); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertNotEquals(oldMemberId, group.staticMemberId("group-instance-id")); + assertEquals(1, group.numMembers()); + assertEquals(1, group.generationId()); + assertTrue(group.isInState(STABLE)); + } + + @Test + public void testReplaceStaticMemberInStableStateOnlyComparesTheRejoiningMemberSubscription() throws Exception { + // In a multi-member static group, the rejoin decision must be based on the rejoining member's own + // subscription change, not on the group-wide union of subscribed topics. This test sets up two static + // members subscribed to different topics and verifies that: + // 1) a member rejoining with an unchanged subscription does not trigger a rebalance, even though its + // subscription differs from the group-wide union; and + // 2) a member rejoining with an expanded subscription does trigger a rebalance. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + JoinGroupRequestProtocolCollection fooProtocols = + GroupMetadataManagerTestContext.toConsumerProtocol(List.of("foo"), List.of()); + JoinGroupRequestProtocolCollection barProtocols = + GroupMetadataManagerTestContext.toConsumerProtocol(List.of("bar"), List.of()); + + // Build a stable group with two static members subscribed to different topics. The group-wide + // subscription is therefore {foo, bar}. This is set up directly rather than via + // staticMembersJoinAndRebalance because that helper subscribes all members to the same topics. + group.setProtocolName(Optional.of("range")); + group.add(new ClassicGroupMember( + "member-1", + Optional.of("instance-1"), + "client-id", + "client-host", + 10000, + 5000, + "consumer", + fooProtocols, + new byte[]{1} + )); + group.add(new ClassicGroupMember( + "member-2", + Optional.of("instance-2"), + "client-id", + "client-host", + 10000, + 5000, + "consumer", + barProtocols, + new byte[]{2} + )); + group.setLeaderId(Optional.of("member-1")); + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(STABLE); + group.setSubscribedTopics(group.computeSubscribedTopics()); + assertEquals(Optional.of(Set.of("foo", "bar")), group.subscribedTopics()); + int initialGenerationId = group.generationId(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("instance-1") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(fooProtocols) + .build(); + + // Member 1 rejoins with its unchanged {foo} subscription. Even though {foo} differs from the group-wide + // union {foo, bar}, this must not trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult unchangedJoinResult = context.sendClassicGroupJoin(request); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())), + unchangedJoinResult.records + ); + assertFalse(unchangedJoinResult.joinFuture.isDone()); + unchangedJoinResult.appendFuture.complete(null); + assertTrue(unchangedJoinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), unchangedJoinResult.joinFuture.get().errorCode()); + assertEquals(2, group.numMembers()); + assertEquals(initialGenerationId, group.generationId()); + assertTrue(group.isInState(STABLE)); + + // Member 1 rejoins with an expanded {foo, bar} subscription. This must trigger a rebalance. + GroupMetadataManagerTestContext.JoinResult changedJoinResult = context.sendClassicGroupJoin( + request.setProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(List.of("foo", "bar"), List.of())) + ); + + // The group transitions to PreparingRebalance, which confirms the subscription change triggered a + // rebalance. The generation is only bumped once the rebalance completes. + assertTrue(changedJoinResult.records.isEmpty()); + assertEquals(2, group.numMembers()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + } + @Test public void testReplaceStaticMemberInStableStateErrors() throws Exception { // If the append future fails, confirm that the member is not updated.