diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java index 5b75a482aaeae..2d033510d7f5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java @@ -133,11 +133,12 @@ public ApiResult> if (partitionResponse.errorCode() == Errors.NONE.code()) { final long startOffset = partitionResponse.startOffset(); final Optional leaderEpoch = partitionResponse.leaderEpoch() < 0 ? Optional.empty() : Optional.of(partitionResponse.leaderEpoch()); + final Optional lag = partitionResponse.lag() < 0 ? Optional.empty() : Optional.of(partitionResponse.lag()); // Negative offset indicates there is no start offset for this partition if (partitionResponse.startOffset() < 0) { groupOffsetsListing.put(tp, null); } else { - groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty())); + groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, lag)); } } else { log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage()); diff --git a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json index f87c1fc394ce8..c1a9544a82a41 100644 --- a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json +++ b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json @@ -18,7 +18,10 @@ "type": "request", "listeners": ["broker"], "name": "DescribeShareGroupOffsetsRequest", - "validVersions": "0", + // Version 0 is the initial version (KIP-932). + // + // Version 1 introduces Lag in the response (KIP-1226). + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup", "versions": "0+", diff --git a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json index 692a265742485..f036de04e1a0b 100644 --- a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json +++ b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json @@ -17,7 +17,10 @@ "apiKey": 90, "type": "response", "name": "DescribeShareGroupOffsetsResponse", - "validVersions": "0", + // Version 0 is the initial version (KIP-932). + // + // Version 1 introduces Lag (KIP-1226). + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -48,6 +51,8 @@ "about": "The share-partition start offset." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of the partition." }, + { "name": "Lag", "type": "int64", "versions": "1+", "ignorable": "true", "default": -1, + "about": "The share-partition lag." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The partition-level error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 596828a08391b..7834db2e5fa31 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -3841,6 +3841,7 @@ private DescribeShareGroupOffsetsResponse createDescribeShareGroupOffsetsRespons .setPartitionIndex(0) .setErrorCode(Errors.NONE.code()) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(0))))))); return new DescribeShareGroupOffsetsResponse(data); } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 25a46d4f37dd5..5772a2879bc29 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid} import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, CoordinatorRecord} import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics} -import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService} +import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService, PartitionMetadataClient} import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService} import org.apache.kafka.coordinator.transaction.ProducerIdManager @@ -119,6 +119,8 @@ class BrokerServer( var credentialProvider: CredentialProvider = _ var tokenCache: DelegationTokenCache = _ + var partitionMetadataClient: PartitionMetadataClient = _ + @volatile var groupCoordinator: GroupCoordinator = _ var groupConfigManager: GroupConfigManager = _ @@ -371,6 +373,8 @@ class BrokerServer( /* create persister */ persister = createShareStatePersister() + partitionMetadataClient = createPartitionMetadataClient() + groupCoordinator = createGroupCoordinator() val producerIdManagerSupplier = () => ProducerIdManager.rpc( @@ -620,6 +624,25 @@ class BrokerServer( } } + private def createPartitionMetadataClient(): PartitionMetadataClient = { + // This is a no-op implementation of PartitionMetadataClient. It always returns -1 as the latest offset for any + // requested topic partition. + // TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can fetch latest offsets via InterBrokerSendThread. + new PartitionMetadataClient { + override def listLatestOffsets(topicPartitions: util.Set[TopicPartition] + ): util.Map[TopicPartition, util.concurrent.CompletableFuture[java.lang.Long]] = { + topicPartitions.asScala + .map { tp => + tp -> CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L)) + } + .toMap + .asJava + } + + override def close(): Unit = {} + } + } + private def createGroupCoordinator(): GroupCoordinator = { // Create group coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good @@ -651,6 +674,7 @@ class BrokerServer( .withGroupConfigManager(groupConfigManager) .withPersister(persister) .withAuthorizerPlugin(authorizerPlugin.toJava) + .withPartitionMetadataClient(partitionMetadataClient) .build() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1f1cae7a70bb5..674e379a13acc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3817,6 +3817,7 @@ class KafkaApis(val requestChannel: RequestChannel, topicResponse.partitions.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partitionIndex) .setStartOffset(-1) + .setLag(-1) .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 52a25e75ce382..7a5fa3793eb09 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -12635,18 +12635,21 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(1) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(2) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(3) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12658,12 +12661,14 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(10) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(20) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12680,6 +12685,7 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(0) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12779,18 +12785,21 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(1) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(2) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(3) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12802,12 +12811,14 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(10) .setStartOffset(-1) + .setLag(-1) .setLeaderEpoch(0) .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message) .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(20) .setStartOffset(-1) + .setLag(-1) .setLeaderEpoch(0) .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message) .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) @@ -12824,6 +12835,7 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(0) .setStartOffset(-1) + .setLag(-1) .setLeaderEpoch(0) .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message) .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) @@ -12844,18 +12856,21 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(1) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(2) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(3) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12940,18 +12955,21 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(1) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(2) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(3) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12973,18 +12991,21 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(1) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(2) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(3) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -12996,12 +13017,14 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(10) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(20) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -13018,6 +13041,7 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(0) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -13078,18 +13102,21 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(1) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(2) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(3) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -13101,12 +13128,14 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(10) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0), new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(20) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) @@ -13123,6 +13152,7 @@ class KafkaApisTest extends Logging { new DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(0) .setStartOffset(0) + .setLag(0) .setLeaderEpoch(1) .setErrorMessage(null) .setErrorCode(0) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 3cb59ca3f81ec..9b1a4793cd870 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -107,8 +107,10 @@ import org.apache.kafka.server.share.persister.PartitionErrorData; import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionStateData; +import org.apache.kafka.server.share.persister.PartitionStateSummaryData; import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters; +import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult; import org.apache.kafka.server.share.persister.TopicData; import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.timer.Timer; @@ -163,6 +165,7 @@ public static class Builder { private GroupConfigManager groupConfigManager; private Persister persister; private Optional> authorizerPlugin; + private PartitionMetadataClient partitionMetadataClient; public Builder( int nodeId, @@ -217,6 +220,11 @@ public Builder withAuthorizerPlugin(Optional> authorizerPlugi return this; } + public Builder withPartitionMetadataClient(PartitionMetadataClient partitionMetadataClient) { + this.partitionMetadataClient = partitionMetadataClient; + return this; + } + public GroupCoordinatorService build() { requireNonNull(config, "Config must be set."); requireNonNull(writer, "Writer must be set."); @@ -228,6 +236,7 @@ public GroupCoordinatorService build() { requireNonNull(groupConfigManager, "GroupConfigManager must be set."); requireNonNull(persister, "Persister must be set."); requireNonNull(authorizerPlugin, "Authorizer must be set."); + requireNonNull(partitionMetadataClient, "PartitionMetadataClient must be set."); String logPrefix = String.format("GroupCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); @@ -270,7 +279,8 @@ public GroupCoordinatorService build() { groupCoordinatorMetrics, groupConfigManager, persister, - timer + timer, + partitionMetadataClient ); } } @@ -320,6 +330,11 @@ public GroupCoordinatorService build() { */ private final Set consumerGroupAssignors; + /** + * The client used for getting partition end offsets + */ + private final PartitionMetadataClient partitionMetadataClient; + /** * The number of partitions of the __consumer_offsets topics. This is provided * when the component is started. @@ -349,7 +364,8 @@ public GroupCoordinatorService build() { GroupCoordinatorMetrics groupCoordinatorMetrics, GroupConfigManager groupConfigManager, Persister persister, - Timer timer + Timer timer, + PartitionMetadataClient partitionMetadataClient ) { this.log = logContext.logger(GroupCoordinatorService.class); this.config = config; @@ -363,6 +379,7 @@ public GroupCoordinatorService build() { .stream() .map(ConsumerGroupPartitionAssignor::name) .collect(Collectors.toSet()); + this.partitionMetadataClient = partitionMetadataClient; } /** @@ -1835,27 +1852,122 @@ private CompletableFuture - describeShareGroupOffsetsResponseTopicList.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() - .setTopicId(topicData.topicId()) - .setTopicName(requestTopicIdToNameMapping.get(topicData.topicId())) - .setPartitions(topicData.partitions().stream().map( - partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + // Now compute lag for each partition and build the final response. + computeShareGroupLagAndBuildResponse( + result, + requestTopicIdToNameMapping, + describeShareGroupOffsetsResponseTopicList, + future, + readSummaryRequestData.groupId() + ); + }); + return future; + } + + private void computeShareGroupLagAndBuildResponse( + ReadShareGroupStateSummaryResult readSummaryResult, + Map requestTopicIdToNameMapping, + List describeShareGroupOffsetsResponseTopicList, + CompletableFuture responseFuture, + String groupId + ) { + // This set keeps track of the partitions for which lag computation is needed. + Set partitionsToComputeLag = new HashSet<>(); + + readSummaryResult.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + if (shouldComputeSharePartitionLag(partitionData)) { + // If the readSummaryResult is successful for a partition, we need to compute lag. + partitionsToComputeLag.add(new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition())); + } + }); + }); + + // Fetch latest offsets for all partitions that need lag computation. + Map> partitionLatestOffsets = partitionsToComputeLag.isEmpty() ? Map.of() : + partitionMetadataClient.listLatestOffsets(partitionsToComputeLag); + + // Final response object to be built. It will include lag information computed from partitionMetadataClient. + DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseGroup = + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() + .setGroupId(groupId); + + // List of response topics to be set in the response group. + List responseTopics = new ArrayList<>(); + + CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new CompletableFuture[0])) + .whenComplete((result, error) -> { + // The error variable will not be null when one or more of the partitionLatestOffsets futures get completed exceptionally. + // If that is the case, then the same exception would be caught in the try catch executed below when .join() is called. + // Thus, we do not need to check error != null here. + readSummaryResult.topicsData().forEach(topicData -> { + // Build response for each topic. + DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topic = + new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicId(topicData.topicId()) + .setTopicName(requestTopicIdToNameMapping.get(topicData.topicId())); + + // Build response for each partition within the topic. + List partitionResponses = new ArrayList<>(); + + topicData.partitions().forEach(partitionData -> { + TopicPartition tp = new TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition()); + // For the partitions where lag computation is not needed, a partitionResponse is built directly. + // The lag is set to -1 (uninitialized lag) in these cases. If the persister returned an error for a + // partition, the startOffset is set to -1 (uninitialized offset) and the leaderEpoch is set to 0 + // (default epoch). This is consistent with OffsetFetch for situations in which there is no offset + // information to fetch. It's treated as absence of data, rather than an error + if (!shouldComputeSharePartitionLag(partitionData)) { + partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partitionData.partition()) .setStartOffset(partitionData.errorCode() == Errors.NONE.code() ? partitionData.startOffset() : PartitionFactory.UNINITIALIZED_START_OFFSET) .setLeaderEpoch(partitionData.errorCode() == Errors.NONE.code() ? partitionData.leaderEpoch() : PartitionFactory.DEFAULT_LEADER_EPOCH) - ).toList()) - )); + .setLag(PartitionFactory.UNINITIALIZED_LAG)); + } else { + try { + // This code is reached when allOf above is complete, which happens when all the + // individual futures are complete. Thus, the call to join() here is safe. + long partitionLatestOffset = partitionLatestOffsets.get(tp).join(); + // Compute lag as (partition end offset - startOffset - deliveryCompleteCount). + // Note, partition end offset, which is retrieved from partitionMetadataClient, is the offset of + // the next message to be produced, not the last message offset. Thus, the formula for lag computation + // does not need a +1 adjustment. + long lag = partitionLatestOffset - partitionData.startOffset() - partitionData.deliveryCompleteCount(); + partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partitionData.partition()) + .setStartOffset(partitionData.startOffset()) + .setLeaderEpoch(partitionData.leaderEpoch()) + .setLag(lag)); + } catch (CompletionException e) { + // If fetching latest offset for a partition failed, return the error in the response for that partition. + partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partitionData.partition()) + .setErrorCode(Errors.forException(e.getCause()).code()) + .setErrorMessage(e.getCause().getMessage())); + } + } + }); + topic.setPartitions(partitionResponses); + responseTopics.add(topic); + }); - future.complete( - new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() - .setGroupId(readSummaryRequestData.groupId()) - .setTopics(describeShareGroupOffsetsResponseTopicList)); + // Add topics which did not exist in the metadata image and were handled earlier. + responseTopics.addAll(describeShareGroupOffsetsResponseTopicList); + // Set topics in the response group. + responseGroup.setTopics(responseTopics); + // Complete the future with the built response. + responseFuture.complete(responseGroup); }); - return future; + } + + private boolean shouldComputeSharePartitionLag(PartitionStateSummaryData partitionData) { + // The share partition lag would be computed for a share partition ony if - + // 1. The read summary result for the partition is successful. + // 3. The start offset is initialized. + // 4. The delivery complete count is initialized. + return partitionData.errorCode() == Errors.NONE.code() && + partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET && + partitionData.deliveryCompleteCount() != PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java new file mode 100644 index 0000000000000..ea6d940143f4c --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Client interface for retrieving latest offsets for topic partitions. + */ +public interface PartitionMetadataClient extends AutoCloseable { + /** + * Lists the latest offsets for the provided topic partitions. + * + * @param topicPartitions A set of topic partitions. + * @return A map of topic partitions to the completableFuture of their latest offsets + */ + Map> listLatestOffsets( + Set topicPartitions + ); +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 01832467b0fbc..1a4efc6d98976 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -128,6 +128,7 @@ import java.net.InetAddress; import java.time.Duration; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -3714,7 +3715,8 @@ public void testDescribeShareGroupOffsetsWithNoOpPersister() throws InterruptedE .setTopicId(TOPIC_ID) .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partition) - .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)))) + .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) + .setLag(PartitionFactory.UNINITIALIZED_LAG)))) ); CompletableFuture future = @@ -3727,13 +3729,21 @@ public void testDescribeShareGroupOffsetsWithNoOpPersister() throws InterruptedE public void testDescribeShareGroupOffsetsWithDefaultPersister() throws InterruptedException, ExecutionException { CoordinatorRuntime runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); + + PartitionMetadataClient partitionMetadataClient = mock(PartitionMetadataClient.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() .setConfig(createConfig()) .setRuntime(runtime) .setPersister(persister) + .setPartitionMetadataClient(partitionMetadataClient) .build(true); service.startup(() -> 1); + Set partitionsToComputeLag = new HashSet<>(Set.of(new TopicPartition(TOPIC_NAME, 1))); + when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag)) + .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), CompletableFuture.completedFuture(41L))); + int partition = 1; DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() .setGroupId("share-group-id") @@ -3757,7 +3767,8 @@ public void testDescribeShareGroupOffsetsWithDefaultPersister() throws Interrupt .setTopicId(TOPIC_ID) .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partition) - .setStartOffset(21)))) + .setStartOffset(21) + .setLag(10L)))) ); ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() @@ -3767,6 +3778,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersister() throws Interrupt .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition) .setStartOffset(21) + .setDeliveryCompleteCount(10) .setStateEpoch(1))) ) ); @@ -3903,6 +3915,144 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() { assertFutureThrows(IllegalStateException.class, future, "Result is null for the read state summary"); } + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterReadSummaryPartitionError() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition))))); + + DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() + .setGroupId("share-group-id") + .setTopics( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) + .setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH) + .setLag(PartitionFactory.UNINITIALIZED_LAG)))) + ); + + ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message()) + )) + ) + ); + + ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData); + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData); + when(persister.readSummary( + ArgumentMatchers.eq(readShareGroupStateSummaryParameters) + )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetError() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + + PartitionMetadataClient partitionMetadataClient = mock(PartitionMetadataClient.class); + + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .setPartitionMetadataClient(partitionMetadataClient) + .build(true); + service.startup(() -> 1); + + Exception ex = new Exception("failure"); + + Set partitionsToComputeLag = new HashSet<>(Set.of(new TopicPartition(TOPIC_NAME, 1))); + when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag)) + .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), CompletableFuture.failedFuture(ex))); + + int partition = 1; + DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition))))); + + DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() + .setGroupId("share-group-id") + .setTopics( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.forException(ex).code()) + .setErrorMessage(ex.getMessage()) + )) + ) + ); + + ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setStartOffset(21) + .setDeliveryCompleteCount(10) + .setStateEpoch(1))) + ) + ); + + ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData); + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData); + when(persister.readSummary( + ArgumentMatchers.eq(readShareGroupStateSummaryParameters) + )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + @Test public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException { CoordinatorRuntime runtime = mockRuntime(); @@ -3964,12 +4114,20 @@ public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionExc public void testDescribeShareGroupAllOffsets() throws InterruptedException, ExecutionException { CoordinatorRuntime runtime = mockRuntime(); Persister persister = mock(DefaultStatePersister.class); + + PartitionMetadataClient partitionMetadataClient = mock(PartitionMetadataClient.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() .setConfig(createConfig()) .setRuntime(runtime) .setPersister(persister) + .setPartitionMetadataClient(partitionMetadataClient) .build(true); + Set partitionsToComputeLag = new HashSet<>(Set.of(new TopicPartition(TOPIC_NAME, 1))); + when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag)) + .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), CompletableFuture.completedFuture(41L))); + MetadataImage image = new MetadataImageBuilder() .addTopic(TOPIC_ID, TOPIC_NAME, 3) .build(); @@ -4003,7 +4161,8 @@ public void testDescribeShareGroupAllOffsets() throws InterruptedException, Exec .setTopicId(TOPIC_ID) .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() .setPartitionIndex(partition) - .setStartOffset(21)))) + .setStartOffset(21) + .setLag(10L)))) ); ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() @@ -4013,6 +4172,7 @@ public void testDescribeShareGroupAllOffsets() throws InterruptedException, Exec .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition) .setStartOffset(21) + .setDeliveryCompleteCount(10) .setStateEpoch(1))) ) ); @@ -4101,6 +4261,162 @@ public void testDescribeShareGroupAllOffsetsNullResult() { assertFutureThrows(IllegalStateException.class, future, "Result is null for the read state summary"); } + @Test + public void testDescribeShareGroupAllOffsetsReadSummaryPartitionError() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(TOPIC_ID, TOPIC_NAME, 3) + .build(); + + service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + + int partition = 1; + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-initialized-partitions"), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, Set.of(partition)))); + + DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() + .setGroupId("share-group-id") + .setTopics(null); + + ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition))))); + + DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() + .setGroupId("share-group-id") + .setTopics( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) + .setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH) + .setLag(PartitionFactory.UNINITIALIZED_LAG)))) + ); + + ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message()) + )) + ) + ); + + ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData); + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData); + when(persister.readSummary( + ArgumentMatchers.eq(readShareGroupStateSummaryParameters) + )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture future = + service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws InterruptedException, ExecutionException { + CoordinatorRuntime runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + + PartitionMetadataClient partitionMetadataClient = mock(PartitionMetadataClient.class); + + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .setPartitionMetadataClient(partitionMetadataClient) + .build(true); + + Exception ex = new Exception("failure"); + + Set partitionsToComputeLag = new HashSet<>(Set.of(new TopicPartition(TOPIC_NAME, 1))); + when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag)) + .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), CompletableFuture.failedFuture(ex))); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(TOPIC_ID, TOPIC_NAME, 3) + .build(); + + service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + + int partition = 1; + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("share-group-initialized-partitions"), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, Set.of(partition)))); + + DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() + .setGroupId("share-group-id") + .setTopics(null); + + ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition))))); + + DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseData = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup() + .setGroupId("share-group-id") + .setTopics( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.forException(ex).code()) + .setErrorMessage(ex.getMessage()) + )) + ) + ); + + ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setStartOffset(21) + .setDeliveryCompleteCount(10) + .setStateEpoch(1))) + ) + ); + + ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData); + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData); + when(persister.readSummary( + ArgumentMatchers.eq(readShareGroupStateSummaryParameters) + )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture future = + service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + @Test public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException { CoordinatorRuntime runtime = mockRuntime(); @@ -5650,6 +5966,7 @@ private static class GroupCoordinatorServiceBuilder { private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics(); private Persister persister = new NoOpStatePersister(); private CoordinatorMetadataImage metadataImage = null; + private PartitionMetadataClient partitionMetadataClient = null; GroupCoordinatorService build() { return build(false); @@ -5667,7 +5984,8 @@ GroupCoordinatorService build(boolean serviceStartup) { metrics, configManager, persister, - new MockTimer() + new MockTimer(), + partitionMetadataClient ); if (serviceStartup) { @@ -5703,6 +6021,11 @@ public GroupCoordinatorServiceBuilder setMetrics(GroupCoordinatorMetrics metrics this.metrics = metrics; return this; } + + public GroupCoordinatorServiceBuilder setPartitionMetadataClient(PartitionMetadataClient partitionMetadataClient) { + this.partitionMetadataClient = partitionMetadataClient; + return this; + } } private static DeleteShareGroupStateParameters createDeleteShareRequest(String groupId, Uuid topic, List partitions) { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java index ed998bb0b1f30..215e95ff085b9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java @@ -28,6 +28,7 @@ public class PartitionFactory { public static final int DEFAULT_STATE_EPOCH = 0; public static final int UNINITIALIZED_START_OFFSET = -1; public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1; + public static final long UNINITIALIZED_LAG = -1; public static final short DEFAULT_ERROR_CODE = Errors.NONE.code(); public static final int DEFAULT_LEADER_EPOCH = 0; public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();