Skip to content

Conversation

@chirag-wadhwa5
Copy link
Collaborator

@chirag-wadhwa5 chirag-wadhwa5 commented Nov 6, 2025

This PR is part of
KIP-1226.

This PR computes the share partition lag in GroupCoordinatorService
using deliveryCompleteCount received from readSummary, and partition
end offsets received from adminClient.listOffstes. The computed lag is
returned to the end user in DescribeShareGroupOffsetsResponse.

NOTE: The GroupCoordinator is built with a no-op implementation of
PartitionMetadataClient, which returns -1 as the partition end offset
for any requested topic partition. This will later be replaced with an
actual implementation that uses InterBrokerSendThread to retrieve
partition end offsets via ListOffsets RPC.

Reviewers: Apoorv Mittal [email protected], Andrew Schofield
[email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients group-coordinator labels Nov 6, 2025
@chirag-wadhwa5 chirag-wadhwa5 marked this pull request as draft November 6, 2025 11:18
@chirag-wadhwa5 chirag-wadhwa5 marked this pull request as ready for review November 6, 2025 13:22
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Nov 6, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, trying to understand the approach. Some basic doubts.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it in metrics package?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class should be in org.apache.kafka.coordinator.group.

Comment on lines +631 to +636
topicPartitions.asScala
.map { tp =>
tp -> CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
}
.toMap
.asJava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation will always retutn -1, am I reading right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Actually, this is a placeholder. I am working in parallel on a separate PR that will create an implementation class of PartitionMetadataClient using the InterBrokerSendThread to fetch the partition end offsets. Once that PR is completed, I can simply plug in the instance of the new impl class here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but it should be written as comment in BrokerServer.scala and in PR description. Can you please do that.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify the GroupCoordinatorService code. Also please write comments in code, general practice, else it takes time to understand what you really want to do in code.

return future;
}

private void computeLagAndBuildResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void computeLagAndBuildResponse(
private void computeShareGroupLagAndBuildResponse(

tp,
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is error in partitionData for any partition then we won't get startOffset hence it's safe to put UNINITIALIZED_START_OFFSET here, correct? Can you please write this as comment. The reason I am asking for the comment as there are 2 OR conditions earlier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I think the comment above explains why we set UNINITIALIZED_START_OFFSET in teh case where persister returns an error. I will extend the comment to include explanation for the other OR condition though.

.setLeaderEpoch(partitionData.errorCode() == Errors.NONE.code() ? partitionData.leaderEpoch() : PartitionFactory.DEFAULT_LEADER_EPOCH)
).toList())
));
if (partitionData.errorCode() != Errors.NONE.code() || partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for groups where startOffset is not yet initialized for them the lag will not be calculated, is it intended?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Persister returns startOffset as -1 (uninitialized offset) for share partitions for which consumption hasn't begun yet. Thus, lag computation is not needed in these situations, since the persister does not yet know from where the consumption will begin. So, -1 (uninitialized lag) is returned here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, make sense. Please add that as comment.

Comment on lines 1911 to 1936
CompletableFuture<Void> lagComputationFuture = partitionLatestOffsets.get(tp)
.handle((latestOffset, throwable) -> {
if (throwable != null) {
partitionsResponses.put(
tip,
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setErrorCode(Errors.forException(throwable).code())
.setErrorMessage(throwable.getMessage())
);
} else {
// Compute lag: lag = partitionLatestOffset - startOffset + 1 - deliveryCompleteCount
long lag = latestOffset - partitionData.startOffset() + 1 - partitionData.deliveryCompleteCount();
partitionsResponses.put(
tip,
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setLag(lag)
);
}
return null;
});

lagComputationFutures.add(lagComputationFuture);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is handling code for individual futures but then also they are added in a list where again the handling exists, why? Can't we just wait for the futures to just complete and then iterate over the original map?

Something like below:

CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new CompletableFuture<?>[0]))
            .whenComplete((result, error) -> {
              ....
              ....
              readSummaryResult.topicsData().forEach(topicData -> {
                topicData.partitions().forEach(partitionData -> {
                      TopicPartition tp = new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition());
                      TopicIdPartition tip = new TopicIdPartition(topicData.topicId(), tp);
                      if (partitionData.errorCode() == Errors.NONE.code() && partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
                          // The call to join() is safe here because of the allOf above i.e. the futures
                          // have already completed.
                          Long lag = partitionLatestOffsets.get(tp).join();
                          ...
            });

Comment on lines 1867 to 1881
ReadShareGroupStateSummaryResult readSummaryResult,
Map<Uuid, String> requestTopicIdToNameMapping,
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList,
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> responseFuture,
String groupId
) {
Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
Map<TopicIdPartition, DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition> partitionsResponses = new HashMap<>();

readSummaryResult.topicsData().forEach(topicData -> {
topicData.partitions().forEach(partitionData -> {
TopicIdPartition tp = new TopicIdPartition(
topicData.topicId(),
new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition())
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is overly complicated. Why can't it be a simple one like first get the partitions for which lag is to be computed and then in a single parse when all futures of lag calculation are complted then fill the result.

readSummaryResult.topicsData().forEach(topicData ->
            topicData.partitions().forEach(partitionData -> {
                if (partitionData.errorCode() == Errors.NONE.code()) {
                    partitionsToComputeLag.add(new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition()));
                }
        }));

.....
.....

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, we can still simplify the processing in GroupCoordinatoService.

CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> responseFuture,
String groupId
) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line break not needed.

});

// Fetch latest offsets for all partitions that need lag computation.
Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = partitionsToComputeLag.isEmpty() ? new HashMap<>() :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = partitionsToComputeLag.isEmpty() ? new HashMap<>() :
Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = partitionsToComputeLag.isEmpty() ? Map.of() :

.setGroupId(readSummaryRequestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new CompletableFuture<?>[0]))
.whenComplete((result, error) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why error is not checked prior parsing the result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allOf will complete exceptionally and thus throw an error only in case a subset of the list of original futures fails. As per my understanding, the exact error is CompletableException with cause being the exception thrown in any one of the failing futures. That wouldn't help us at all, because no matter if the future returned by allOf() failrs or succeeds, I have a try catch block around the statement where I join individual original futures, handling both the cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allOf will complete exceptionally and thus throw an error

It will not throw exception, you need to check error is not null. You can write a test and verify as well.

return future;
}

private DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup BuildDescribeShareGroupOffsetsResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the method name starts with capital B?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, will replace this in the next commit

Set<TopicPartition> partitionsToComputeLag = new HashSet<>();

// This map stores the final DescribeShareGroupOffsetsResponsePartition, including the lag, for all the partitions.
Map<TopicIdPartition, DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition> partitionsResponses = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What purpose does this map serve? I don't see any need of having it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I have changed the logic to make it more efficient.

Comment on lines 1882 to 1885
TopicIdPartition tp = new TopicIdPartition(
topicData.topicId(),
new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition())
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather filling partial resonse here and then in another parse, you should fill the responses together fro better readability. i.e.

First only get the partitions to compute lag:

readSummaryResult.topicsData().forEach(topicData ->
            topicData.partitions().forEach(partitionData -> {
                if (partitionData.errorCode() == Errors.NONE.code() || partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
                    partitionsToComputeLag.add(new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition()));
                }
        }));

Then fill the response in another parse of readSummaryResult.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I'll continue with my review.

private GroupConfigManager groupConfigManager;
private Persister persister;
private Optional<Plugin<Authorizer>> authorizerPlugin;
private PartitionMetadataClient partitionMetadataClient;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do a null check in the build() method too?

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, it's coming well. Some more comments.

.setGroupId(readSummaryRequestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new CompletableFuture<?>[0]))
.whenComplete((result, error) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allOf will complete exceptionally and thus throw an error

It will not throw exception, you need to check error is not null. You can write a test and verify as well.

Comment on lines 1948 to 1956
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> responseTopics = new ArrayList<>();
for (Map.Entry<Uuid, List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>> entry : topicToPartitionResults.entrySet()) {
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topic =
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicId(entry.getKey())
.setTopicName(requestTopicIdToNameMapping.get(entry.getKey()))
.setPartitions(entry.getValue());
responseTopics.add(topic);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need another parse to fill this and can't be done in previous iteration of readSummaryResult?

* @param topicPartitions A set of topic partitions.
* @return A map of topic partitions to the completableFuture of their latest offsets
*/
Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have Map over TopicIdPartition i.e.

Suggested change
Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
Map<TopicIdPartition, CompletableFuture<Long>> listLatestOffsets(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic ID is actually not required at all. So the listLatestOffsets method is responsible for 2 important things ->

  1. Find the destination node for a particular topic partition, which is the leader broker for that partition. This information is retrieved using metadataCache, and the specific method for it requires only the topic name, not the id.
  2. The ListOffsetsRequest is built and sent to the previously calculated destination node. The requestData object also requires only the name, not the ID.

The PR for an implementation of PartitionMetadataClient is already created. #20852

Maybe that can provide a better picture for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, I think we'll want to use topic ID, but the underlying RPC doesn't support topic ID yet. I'm happy for this to be based on topic name for now.

* @return A map of topic partitions to the completableFuture of their latest offsets
*/
Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
Set<TopicPartition> topicPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Set<TopicPartition> topicPartitions
Set<TopicIdPartition> topicIdPartitions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM!

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

* @param topicPartitions A set of topic partitions.
* @return A map of topic partitions to the completableFuture of their latest offsets
*/
Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, I think we'll want to use topic ID, but the underlying RPC doesn't support topic ID yet. I'm happy for this to be based on topic name for now.

@AndrewJSchofield AndrewJSchofield merged commit 1146f97 into apache:trunk Nov 11, 2025
25 checks passed
// For the partitions where lag computation is not needed, a partitionResponse is built directly.
// The lag is set to -1 (uninitialized lag) in these cases. If the persister returned an error for a
// partition, the startOffset is set to -1 (uninitialized offset) and the leaderEpoch is set to 0
// (default epoch). This is consistent with OffsetFetch for situations in which there is no offset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why is zero being used instead of -1? I assume zero is a valid epoch, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants