Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>>
if (partitionResponse.errorCode() == Errors.NONE.code()) {
final long startOffset = partitionResponse.startOffset();
final Optional<Integer> leaderEpoch = partitionResponse.leaderEpoch() < 0 ? Optional.empty() : Optional.of(partitionResponse.leaderEpoch());
final Optional<Long> lag = partitionResponse.lag() < 0 ? Optional.empty() : Optional.of(partitionResponse.lag());
// Negative offset indicates there is no start offset for this partition
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, lag));
}
} else {
log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
"type": "request",
"listeners": ["broker"],
"name": "DescribeShareGroupOffsetsRequest",
"validVersions": "0",
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces Lag in the response (KIP-1226).
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
"apiKey": 90,
"type": "response",
"name": "DescribeShareGroupOffsetsResponse",
"validVersions": "0",
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces Lag (KIP-1226).
"validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
Expand Down Expand Up @@ -48,6 +51,8 @@
"about": "The share-partition start offset." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the partition." },
{ "name": "Lag", "type": "int64", "versions": "1+", "ignorable": "true", "default": -1,
"about": "The share-partition lag." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3841,6 +3841,7 @@ private DescribeShareGroupOffsetsResponse createDescribeShareGroupOffsetsRespons
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(0)))))));
return new DescribeShareGroupOffsetsResponse(data);
}
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, CoordinatorRecord}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService, PartitionMetadataClient}
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
Expand Down Expand Up @@ -119,6 +119,8 @@ class BrokerServer(
var credentialProvider: CredentialProvider = _
var tokenCache: DelegationTokenCache = _

var partitionMetadataClient: PartitionMetadataClient = _

@volatile var groupCoordinator: GroupCoordinator = _

var groupConfigManager: GroupConfigManager = _
Expand Down Expand Up @@ -371,6 +373,8 @@ class BrokerServer(
/* create persister */
persister = createShareStatePersister()

partitionMetadataClient = createPartitionMetadataClient()

groupCoordinator = createGroupCoordinator()

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
Expand Down Expand Up @@ -620,6 +624,25 @@ class BrokerServer(
}
}

private def createPartitionMetadataClient(): PartitionMetadataClient = {
// This is a no-op implementation of PartitionMetadataClient. It always returns -1 as the latest offset for any
// requested topic partition.
// TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can fetch latest offsets via InterBrokerSendThread.
new PartitionMetadataClient {
override def listLatestOffsets(topicPartitions: util.Set[TopicPartition]
): util.Map[TopicPartition, util.concurrent.CompletableFuture[java.lang.Long]] = {
topicPartitions.asScala
.map { tp =>
tp -> CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
}
.toMap
.asJava
Comment on lines +634 to +639
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation will always retutn -1, am I reading right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Actually, this is a placeholder. I am working in parallel on a separate PR that will create an implementation class of PartitionMetadataClient using the InterBrokerSendThread to fetch the partition end offsets. Once that PR is completed, I can simply plug in the instance of the new impl class here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but it should be written as comment in BrokerServer.scala and in PR description. Can you please do that.

}

override def close(): Unit = {}
}
}

private def createGroupCoordinator(): GroupCoordinator = {
// Create group coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
Expand Down Expand Up @@ -651,6 +674,7 @@ class BrokerServer(
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.withAuthorizerPlugin(authorizerPlugin.toJava)
.withPartitionMetadataClient(partitionMetadataClient)
.build()
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3817,6 +3817,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicResponse.partitions.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionIndex)
.setStartOffset(-1)
.setLag(-1)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12635,18 +12635,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -12658,12 +12661,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -12680,6 +12685,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand Down Expand Up @@ -12779,18 +12785,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -12802,12 +12811,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(-1)
.setLag(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(-1)
.setLag(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
Expand All @@ -12824,6 +12835,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(-1)
.setLag(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
Expand All @@ -12844,18 +12856,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand Down Expand Up @@ -12940,18 +12955,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -12973,18 +12991,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -12996,12 +13017,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -13018,6 +13041,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand Down Expand Up @@ -13078,18 +13102,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -13101,12 +13128,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand All @@ -13123,6 +13152,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
.setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
Expand Down
Loading