Skip to content

Conversation

@chirag-wadhwa5
Copy link
Collaborator

@chirag-wadhwa5 chirag-wadhwa5 commented Nov 10, 2025

This PR is part of
KIP-1226.

This PR introduces an implementation class
NetworkPartitionMetadataClient for PartitionMetadataClient, that uses a
NetworkClient to send ListOffsetsRequest to the destination node. The
destination node should be the leader broker for the partitions in the
request and is retrieved from MetadataCache.

This new imple class will later be used in GroupCoordinatorService to
find the partition end offsets while computing share partition lag for
DescribeShareGroupOffsets request.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker group-coordinator labels Nov 10, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Nov 10, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, some basic questions in the comments. Also can you please add the change for the usage of PartitionMetadataClient from where it will be initialized.

Set<TopicPartition> topicPartitions
) {
if (topicPartitions == null || topicPartitions.isEmpty()) {
return new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new HashMap<>();
return Map.of();

Comment on lines 71 to 77
this.sendThread = new SendThread(
"NetworkPartitionMetadataClientSendThread",
networkClient,
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS), //30 seconds
this.time
);
this.sendThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to crerate a new NetworkClient with new connection to every broker or re-use some existing network client?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the usgae of this class in the same PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again if it needs to be lazily loaded then we might have to delay the start.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. It looks pretty complete with just a few comments.

return requests;
}

private void handleErrorResponse(PendingRequest pendingRequest, ClientResponse clientResponse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd put this method beneath handleResponse just for ease of reading and maintenance.

}
}

private ListOffsetsPartitionResponse createErrorPartitionResponse(TopicPartition tp, short errorCode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's put this utility method right at the end of the class.

assertTrue(futures.containsKey(tp));

ListOffsetsPartitionResponse response = futures.get(tp).get();
assertNotNull(response);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have an assertion that the future is completed in all of these tests please.

ListOffsetsPartitionResponse response = futures.get(tp).get();
assertNotNull(response);
assertEquals(PARTITION, response.partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), response.errorCode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check that the offset, timestamp and epoch are -1 in the responses for the error cases.

new ListOffsetsPartitionResponse()
.setPartitionIndex(PARTITION)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because -1 are the default values here, I think you could not set the values in this constructor. In this way, you would be testing that error paths which uses this response and do not set values do actually pick up the defaults with no extra effort.

/**
* Client interface for retrieving latest offsets for topic partitions.
*/
public interface PartitionMetadataClient extends AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this interface now as the other PR is merged.

Comment on lines 71 to 77
this.sendThread = new SendThread(
"NetworkPartitionMetadataClientSendThread",
networkClient,
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS), //30 seconds
this.time
);
this.sendThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the usgae of this class in the same PR?

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, some comments.

Comment on lines 630 to 636
NetworkUtils.buildNetworkClient(
"NetworkPartitionMetadataClient",
config,
metrics,
Time.SYSTEM,
new LogContext(s"[NetworkPartitionMetadataClient broker=${config.brokerId}]")
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to decide if we want the PartitionMetadataClient to be lazily loaded, if yes then we should pass a provider which can fetch the NetworkClient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good model since it's not going to be used in every cluster.


override def close(): Unit = {}
}
private def createNetworkPartitionMetadataClient(metadataCache: MetadataCache): PartitionMetadataClient = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
private def createNetworkPartitionMetadataClient(metadataCache: MetadataCache): PartitionMetadataClient = {
private def createPartitionMetadataClient(metadataCache: MetadataCache): PartitionMetadataClient = {

Comment on lines 71 to 77
this.sendThread = new SendThread(
"NetworkPartitionMetadataClientSendThread",
networkClient,
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS), //30 seconds
this.time
);
this.sendThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again if it needs to be lazily loaded then we might have to delay the start.


// Map to store futures for each TopicPartition
Map<TopicPartition, CompletableFuture<OffsetResponse>> futures = new HashMap<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove line break


if (leaderNodeOpt.isEmpty() || leaderNodeOpt.get().isEmpty()) {
// No leader available - complete with error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove line break

return;
}

log.debug("ListOffsets response received - {}", clientResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not this is the first line in the method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing the log only during success scenarios might help better while debugging future issues, rather than logging it everytime. I have changed the log statement to ListOffsets response received successfully, for better understanding

}

log.debug("ListOffsets response received - {}", clientResponse);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove line break


log.debug("ListOffsets response received - {}", clientResponse);

// Parse the response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment providing any meaningful insight?

*/
private record PendingRequest(
Node node,
List<TopicPartition> partitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are only required to log in case of error then you can write pendingRequest.futures.keySet() and avoid passing List<TopicPartition> partitions altogether.

for (ListOffsetsPartitionResponse partitionResponse : topicResponse.partitions()) {
TopicPartition tp = new TopicPartition(topicName, partitionResponse.partitionIndex());
// Remove the corresponding future from the map and complete it.
CompletableFuture<OffsetResponse> future = pendingRequest.futures.remove(tp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do get why do you want to remove the topicPartition from the futures so can complete the pending ones later, but it's not a good idea to modify the futures list, as it can be passed as unmodifiable by some code in future. Hence, either keep a set of topic partitions which are completed and then while iterating pendingRequest.futures check if it's already contained in the set or a better one is to avoid set and just check if the future is still pending i.e.

pendingRequest.futures.forEach((tp, future) -> {
                // If future is not completed yet hence topic-partition was not included in the response, complete with error
               if (!future.isDone()) {
                    future.complete(new OffsetResponse(-1, Errors.UNKNOWN_TOPIC_OR_PARTITION));
               }
            });

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, looks good mostly. Some comments.

return this;
}

public static NetworkPartitionMetadataClientBuilder bulider() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static NetworkPartitionMetadataClientBuilder bulider() {
static NetworkPartitionMetadataClientBuilder builder() {

return new NetworkPartitionMetadataClientBuilder();
}

public NetworkPartitionMetadataClient build() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public NetworkPartitionMetadataClient build() {
NetworkPartitionMetadataClient build() {

Comment on lines +88 to +90
NetworkPartitionMetadataClientBuilder withTime(Time time) {
this.time = time;
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these methods used ever? If not then remove them.

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class NetworkPartitionMetadataClientTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tests

  • Failure handling of condition: clientResponse.wasTimedOut() and clientResponse == null
  • Close method error condition - catch (InterruptedException e)


private final MetadataCache metadataCache;
private final Supplier<KafkaClient> networkClientSupplier;
private volatile SendThread sendThread;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move after the final variable declaration

this.networkClientSupplier = networkClientSupplier;
this.time = time;
this.listenerName = listenerName;
this.sendThread = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not required

Comment on lines +151 to +153
if (sendThread == null) {
synchronized (initializationLock) {
if (sendThread == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use AtomicBoolean with compareAndSet.

);
thread.start();
sendThread = thread;
log.debug("NetworkPartitionMetadataClient sendThread initialized and started");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.debug("NetworkPartitionMetadataClient sendThread initialized and started");
log.info("NetworkPartitionMetadataClient sendThread initialized and started");

Comment on lines 1949 to 1955
} catch (CompletionException e) {
// If fetching latest offset for a partition failed, return the error in the response for that partition.
partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setErrorCode(Errors.forException(e.getCause()).code())
.setErrorMessage(e.getCause().getMessage()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need catch block now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants