Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,80 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
}
}

@ClusterTest
def testStaticMemberRejoiningWithChangedSubscriptionTriggersRebalance(): Unit = {
// Reproduces KAFKA-12759: a static member that rejoins with an additional subscribed topic must trigger
// a rebalance under the classic protocol, otherwise the newly subscribed topic is never assigned.
// Creates the __consumer_offsets topic because it won't be created automatically in this test
// because it does not use the FindCoordinator API.
createOffsetsTopic()

createTopic(topic = "foo", numPartitions = 3)
createTopic(topic = "bar", numPartitions = 3)

val version = ApiKeys.JOIN_GROUP.latestVersion(isUnstableApiEnabled)

val fooSubscription = ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
).array

val fooBarSubscription = ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(List("foo", "bar").asJava)
).array

// A static member joins the group subscribed to foo.
val joinResponseData = sendJoinRequest(
groupId = "grp",
groupInstanceId = "group-instance-id",
metadata = fooSubscription,
version = version
)
val memberId = joinResponseData.memberId
assertEquals(Errors.NONE.code, joinResponseData.errorCode)
assertEquals(1, joinResponseData.generationId)

// Sync the member to move the group to Stable.
verifySyncGroupWithOldProtocol(
groupId = "grp",
memberId = memberId,
generationId = 1,
assignments = List(new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(memberId)
.setAssignment(Array[Byte](1))
),
expectedAssignment = Array[Byte](1)
)

TestUtils.waitUntilTrue(() => {
val described = describeGroups(groupIds = List("grp"))
ClassicGroupState.STABLE.toString == described.head.groupState
}, msg = "The group is not in STABLE state.")

// The static member rejoins with an additional subscribed topic. This must trigger a rebalance.
val rejoinResponseData = sendJoinRequest(
groupId = "grp",
groupInstanceId = "group-instance-id",
metadata = fooBarSubscription,
version = version
)

assertEquals(Errors.NONE.code, rejoinResponseData.errorCode)
assertEquals(2, rejoinResponseData.generationId)

leaveGroup(
groupId = "grp",
memberId = rejoinResponseData.memberId,
useNewProtocol = false,
version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
)

deleteGroups(
groupIds = List("grp"),
expectedErrors = List(Errors.NONE),
version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)
)
}

private def testFencedStaticGroup(
leaderMemberId: String,
followerMemberId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7819,6 +7819,9 @@ private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalan
int oldRebalanceTimeoutMs = newMember.rebalanceTimeoutMs();
int oldSessionTimeoutMs = newMember.sessionTimeoutMs();
JoinGroupRequestProtocolCollection oldProtocols = newMember.supportedProtocols();
// Capture the member's subscribed topics before group.updateMember replaces its protocols below, so
// that they can be compared against the rejoining member's subscription further down.
Optional<Set<String>> oldSubscribedTopics = parseSubscribedTopics(group, oldProtocols, group.protocolName().orElse(null));

group.updateMember(
newMember,
Expand All @@ -7829,14 +7832,21 @@ private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalan
);

if (group.isInState(STABLE)) {
// Check if group's selected protocol of next generation will change, if not, simply store group to persist
// the updated static member, if yes, rebalance should be triggered to keep the group's assignment
// and selected protocol consistent
// Check if the group's selected protocol of the next generation will change, or if the member's
// set of subscribed topics changed. If neither changed, simply store the group to persist the
// updated static member. Otherwise, a rebalance should be triggered to keep the group's assignment,
// selected protocol and subscription consistent.
String groupInstanceId = request.groupInstanceId();
String selectedProtocolForNextGeneration = group.selectProtocol();
if (group.protocolName().orElse("").equals(selectedProtocolForNextGeneration)) {
boolean protocolUnchanged = group.protocolName().orElse("").equals(selectedProtocolForNextGeneration);
// A static member may rejoin with the same selected protocol but a different subscription (for
// example a topics.regex member that discovered new topics). When that happens the coordinator
// must rebalance, otherwise the newly subscribed topics would never be assigned to the group.
Optional<Set<String>> newSubscribedTopics = parseSubscribedTopics(group, newMember.supportedProtocols(), selectedProtocolForNextGeneration);
boolean subscriptionUnchanged = oldSubscribedTopics.equals(newSubscribedTopics);
if (protocolUnchanged && subscriptionUnchanged) {
log.info("Static member which joins during Stable stage and doesn't affect " +
"the selected protocol will not trigger a rebalance.");
"the selected protocol or the subscribed topics will not trigger a rebalance.");

Map<String, byte[]> groupAssignment = group.groupAssignment();
CompletableFuture<Void> appendFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -7891,11 +7901,19 @@ private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalan

return new CoordinatorResult<>(records, appendFuture, false);
} else {
String change;
if (!protocolUnchanged && !subscriptionUnchanged) {
change = "protocol and subscribed topics";
} else if (!protocolUnchanged) {
change = "protocol";
} else {
change = "subscribed topics";
}
return maybePrepareRebalanceOrCompleteJoin(
group,
"Group's selectedProtocol will change because static member " +
"Group's assignment will change because static member " +
newMember.memberId() + " with instance id " + groupInstanceId +
" joined with change of protocol; client reason: " + JoinGroupRequest.joinReason(request)
" joined with change of " + change + "; client reason: " + JoinGroupRequest.joinReason(request)
);
}
} else if (group.isInState(COMPLETING_REBALANCE)) {
Expand All @@ -7916,6 +7934,58 @@ private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalan
return maybeCompleteJoinPhase(group);
}

/**
* Parses the set of topics a classic member is subscribed to under the given protocol. This is used to
* detect a subscription change when a static member rejoins during the Stable stage with the same
* selected protocol but a different subscription (for example a topics.regex member that discovered new
* topics). Without it the coordinator would return the cached assignment and the newly subscribed topics
* would never be assigned to the group.
*
* <p>Only the subscribed topics are extracted. Other parts of the embedded subscription, such as the
* owned partitions or the user data, do not require a new assignment and must not trigger a rebalance.
*
* <p>Empty is returned when the group is not using the consumer protocol, when no protocol is selected,
* or when the embedded consumer protocol cannot be parsed. In those cases the coordinator falls back to
* its protocol-based behaviour: two members whose subscriptions are both empty compare equal and do not
* trigger a rebalance. This mirrors {@link ClassicGroup#computeSubscribedTopics()}, which also leaves the
* coordinator unaware of the subscribed topics when the consumer protocol cannot be parsed.
*
* @param group The group.
* @param protocols The member's supported protocols.
* @param protocolName The protocol whose subscription is extracted, or null if none is selected.
* @return The set of subscribed topics, or empty if it cannot be determined.
*/
private Optional<Set<String>> parseSubscribedTopics(
ClassicGroup group,
JoinGroupRequestProtocolCollection protocols,
String protocolName
) {
if (protocolName == null || !group.usesConsumerGroupProtocol()) {
return Optional.empty();
}

JoinGroupRequestProtocol protocol = protocols.find(protocolName);
if (protocol == null) {
return Optional.empty();
}

try {
// The consumer protocol is parsed with V0 which is the based prefix of all versions.
// This way the consumer group manager does not depend on any specific existing or
// future versions of the consumer protocol. VO must prefix all new versions.
ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
ConsumerProtocol.deserializeVersion(buffer);
return Optional.of(new HashSet<>(
ConsumerProtocol.deserializeConsumerProtocolSubscription(buffer, (short) 0).topics()
));
} catch (SchemaException e) {
log.warn("Failed to parse Consumer Protocol {}:{} of group {}. Consumer group coordinator is not aware of the subscribed topics.",
ConsumerProtocol.PROTOCOL_TYPE, protocolName, group.groupId(), e);
}

return Optional.empty();
}

/**
* Handle a SyncGroupRequest.
*
Expand Down
Loading
Loading