From 373ecb8537e5eeaa4420a48b01b24a0b52b23979 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Wed, 13 Nov 2024 13:32:06 +0000 Subject: [PATCH 1/6] KAFKA-17783: Adding listeners to remove share partition on partition changes --- .../server/share/SharePartitionManager.java | 83 +++++++++++++++++-- .../main/scala/kafka/cluster/Partition.scala | 16 ++++ 2 files changed, 94 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 4288dd55703d7..773bfeb210bbe 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.cluster.PartitionListener; import kafka.server.ReplicaManager; import org.apache.kafka.clients.consumer.AcknowledgeType; @@ -639,6 +640,12 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio k -> { long start = time.hiResClockMs(); int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition()); + // Attach listener to Partition which shall invoke partition changes handlers. + // However, as there could be multiple share partitions (per group name) for a single topic-partition, + // hence create separate listener per share group which holds the share partition key + // to identify the share partition. + replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), + new SharePartitionListener(sharePartitionKey)); SharePartition partition = new SharePartition( sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition(), @@ -670,7 +677,7 @@ private void maybeCompleteInitializationWithException( } // Remove the partition from the cache as it's failed to initialize. - partitionCacheMap.remove(sharePartitionKey); + removeSharePartitionFromCache(sharePartitionKey); // The partition initialization failed, so complete the request with the exception. // The server should not be in this state, so log the error on broker and surface the same // to the client. The broker should not be in this state, investigate the root cause of the error. @@ -688,10 +695,7 @@ private void handleFencedSharePartitionException( // The share partition is fenced hence remove the partition from map and let the client retry. // But surface the error to the client so client might take some action i.e. re-fetch // the metadata and retry the fetch on new leader. - SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey); - if (sharePartition != null) { - sharePartition.markFenced(); - } + removeSharePartitionFromCache(sharePartitionKey); } } @@ -716,6 +720,75 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top return new SharePartitionKey(groupId, topicIdPartition); } + private void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey) { + SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey); + if (sharePartition != null) { + sharePartition.markFenced(); + } + } + + /** + * The SharePartitionListener is used to listen for partition events. The share partition is associated with + * the topic-partition, we need to handle the partition events for the share partition. + *

+ * The partition cache map stores share partitions against share partition key which comprises + * group and topic-partition. Instead of maintaining a separate map for topic-partition to share partitions, + * we can maintain the share partition key in the listener and create a new listener for each share partition. + */ + private class SharePartitionListener implements PartitionListener { + + private final SharePartitionKey sharePartitionKey; + + private SharePartitionListener(SharePartitionKey sharePartitionKey) { + this.sharePartitionKey = sharePartitionKey; + } + + /** + * The onFailed method is called when a Partition is marked offline. + * + * @param topicPartition The topic-partition that has been marked offline. + */ + @Override + public void onFailed(TopicPartition topicPartition) { + log.info("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}", + topicPartition, sharePartitionKey); + onUpdate(topicPartition); + } + + /** + * The onDeleted method is called when a Partition is deleted. + * + * @param topicPartition The topic-partition that has been deleted. + */ + @Override + public void onDeleted(TopicPartition topicPartition) { + log.info("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}", + topicPartition, sharePartitionKey); + onUpdate(topicPartition); + } + + /** + * The onFollower method is called when a Partition is marked follower. + * + * @param topicPartition The topic-partition that has been marked as follower. + */ + @Override + public void onFollower(TopicPartition topicPartition) { + log.info("The share partition leader change listener is invoked for the topic-partition: {}, share-partition: {}", + topicPartition, sharePartitionKey); + onUpdate(topicPartition); + } + + private void onUpdate(TopicPartition topicPartition) { + if (!sharePartitionKey.topicIdPartition().topicPartition().equals(topicPartition)) { + log.error("The share partition listener is invoked for the wrong topic-partition: {}, share-partition: {}", + topicPartition, sharePartitionKey); + return; + } + removeSharePartitionFromCache(sharePartitionKey); + } + } + static class ShareGroupMetrics { /** * share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) - The total number of offsets acknowledged for share groups (requests to be ack). diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e432ead8edb27..1892cc9f61dfa 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -82,6 +82,11 @@ trait PartitionListener { * that the partition was deleted but only that this broker does not host a replica of it any more. */ def onDeleted(partition: TopicPartition): Unit = {} + + /** + * Called when the Partition on this broker is marked as follower. + */ + def onFollower(partition: TopicPartition): Unit = {} } trait AlterPartitionListener { @@ -701,6 +706,15 @@ class Partition(val topicPartition: TopicPartition, } } + /** + * Invoke the partition listeners when the partition has been marked as follower. + */ + def invokeFollowerListeners(): Unit = { + listeners.forEach { listener => + listener.onFollower(topicPartition) + } + } + private def clear(): Unit = { remoteReplicasMap.clear() assignmentState = SimpleAssignmentState(Seq.empty) @@ -891,6 +905,8 @@ class Partition(val topicPartition: TopicPartition, s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.") } + // Invoke the follower transition listeners for the partition. + invokeFollowerListeners() // We must restart the fetchers when the leader epoch changed regardless of // whether the leader changed as well. isNewLeaderEpoch From 9e0efbb445cef7fa4cebd02980cd94f5d473cc65 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Wed, 13 Nov 2024 17:03:33 +0000 Subject: [PATCH 2/6] Correcting comments --- .../main/java/kafka/server/share/SharePartitionManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 773bfeb210bbe..95c93df26c21a 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -640,10 +640,10 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio k -> { long start = time.hiResClockMs(); int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition()); - // Attach listener to Partition which shall invoke partition changes handlers. + // Attach listener to Partition which shall invoke partition change handlers. // However, as there could be multiple share partitions (per group name) for a single topic-partition, - // hence create separate listener per share group which holds the share partition key - // to identify the share partition. + // hence create separate listeners per share partition which holds the share partition key + // to identify the respective share partition. replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), new SharePartitionListener(sharePartitionKey)); SharePartition partition = new SharePartition( From 5e1d78f25d048498142a7088124c8cc8e05465f5 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Thu, 28 Nov 2024 11:52:31 +0000 Subject: [PATCH 3/6] Correcting listener name, added tests --- .../server/share/SharePartitionManager.java | 36 +++----- .../main/scala/kafka/cluster/Partition.scala | 12 ++- .../scala/kafka/server/ReplicaManager.scala | 4 + .../share/SharePartitionManagerTest.java | 84 +++++++++++++++++++ .../unit/kafka/cluster/PartitionTest.scala | 10 ++- .../kafka/server/ReplicaManagerTest.scala | 56 +++++++++++++ 6 files changed, 170 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 9929e43a33378..67cb135909c49 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -615,7 +615,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio // hence create separate listeners per share partition which holds the share partition key // to identify the respective share partition. replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), - new SharePartitionListener(sharePartitionKey)); + new SharePartitionListener(sharePartitionKey, partitionCacheMap)); SharePartition partition = new SharePartition( sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition(), @@ -647,7 +647,7 @@ private void handleInitializationException( } // Remove the partition from the cache as it's failed to initialize. - removeSharePartitionFromCache(sharePartitionKey); + removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap); // The partition initialization failed, so add the partition to the erroneous partitions. log.debug("Error initializing share partition with key {}", sharePartitionKey, throwable); shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable); @@ -669,7 +669,7 @@ public void handleFencedSharePartitionException( // The share partition is fenced hence remove the partition from map and let the client retry. // But surface the error to the client so client might take some action i.e. re-fetch // the metadata and retry the fetch on new leader. - removeSharePartitionFromCache(sharePartitionKey); + removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap); } } @@ -677,8 +677,8 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top return new SharePartitionKey(groupId, topicIdPartition); } - private void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey) { - SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey); + private static void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey, Map map) { + SharePartition sharePartition = map.remove(sharePartitionKey); if (sharePartition != null) { sharePartition.markFenced(); } @@ -692,19 +692,17 @@ private void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey) * group and topic-partition. Instead of maintaining a separate map for topic-partition to share partitions, * we can maintain the share partition key in the listener and create a new listener for each share partition. */ - private class SharePartitionListener implements PartitionListener { + // Visible for testing. + static class SharePartitionListener implements PartitionListener { private final SharePartitionKey sharePartitionKey; + private final Map partitionCacheMap; - private SharePartitionListener(SharePartitionKey sharePartitionKey) { + SharePartitionListener(SharePartitionKey sharePartitionKey, Map partitionCacheMap) { this.sharePartitionKey = sharePartitionKey; + this.partitionCacheMap = partitionCacheMap; } - /** - * The onFailed method is called when a Partition is marked offline. - * - * @param topicPartition The topic-partition that has been marked offline. - */ @Override public void onFailed(TopicPartition topicPartition) { log.info("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}", @@ -712,11 +710,6 @@ public void onFailed(TopicPartition topicPartition) { onUpdate(topicPartition); } - /** - * The onDeleted method is called when a Partition is deleted. - * - * @param topicPartition The topic-partition that has been deleted. - */ @Override public void onDeleted(TopicPartition topicPartition) { log.info("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}", @@ -724,13 +717,8 @@ public void onDeleted(TopicPartition topicPartition) { onUpdate(topicPartition); } - /** - * The onFollower method is called when a Partition is marked follower. - * - * @param topicPartition The topic-partition that has been marked as follower. - */ @Override - public void onFollower(TopicPartition topicPartition) { + public void onLeaderToFollower(TopicPartition topicPartition) { log.info("The share partition leader change listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, sharePartitionKey); onUpdate(topicPartition); @@ -742,7 +730,7 @@ private void onUpdate(TopicPartition topicPartition) { topicPartition, sharePartitionKey); return; } - removeSharePartitionFromCache(sharePartitionKey); + removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap); } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1892cc9f61dfa..12c389d8a773b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -84,9 +84,9 @@ trait PartitionListener { def onDeleted(partition: TopicPartition): Unit = {} /** - * Called when the Partition on this broker is marked as follower. + * Called when the Partition on this broker is transitioned to follower from leader. */ - def onFollower(partition: TopicPartition): Unit = {} + def onLeaderToFollower(partition: TopicPartition): Unit = {} } trait AlterPartitionListener { @@ -707,11 +707,11 @@ class Partition(val topicPartition: TopicPartition, } /** - * Invoke the partition listeners when the partition has been marked as follower. + * Invoke the partition listeners when the partition has been transitioned to follower from leader. */ - def invokeFollowerListeners(): Unit = { + def invokeLeaderToFollowerListeners(): Unit = { listeners.forEach { listener => - listener.onFollower(topicPartition) + listener.onLeaderToFollower(topicPartition) } } @@ -905,8 +905,6 @@ class Partition(val topicPartition: TopicPartition, s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.") } - // Invoke the follower transition listeners for the partition. - invokeFollowerListeners() // We must restart the fetchers when the leader epoch changed regardless of // whether the leader changed as well. isNewLeaderEpoch diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b986ee20cdba6..0b0d6636d0a60 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2455,6 +2455,8 @@ class ReplicaManager(val config: KafkaConfig, if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { // Only change partition state when the leader is available if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) { + // Invoke the follower transition listeners for the partition. + partition.invokeLeaderToFollowerListeners() partitionsToMakeFollower += partition } } else { @@ -3013,6 +3015,8 @@ class ReplicaManager(val config: KafkaConfig, // where this broker is not in the ISR are stopped. partitionsToStopFetching.put(tp, false) } else if (isNewLeaderEpoch) { + // Invoke the follower transition listeners for the partition. + partition.invokeLeaderToFollowerListeners() // Otherwise, fetcher is restarted if the leader epoch has changed. partitionsToStartFetching.put(tp, partition) } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index eea0a75c56c6e..2bb9f20db118f 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -21,6 +21,7 @@ import kafka.server.LogReadResult; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; +import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.common.MetricName; @@ -132,6 +133,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @Timeout(120) @@ -2496,6 +2498,88 @@ public void testReplicaManagerFetchMultipleSharePartitionsException() { assertTrue(partitionCacheMap.isEmpty()); } + @Test + public void testListenerRegistration() { + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + ReplicaManager mockReplicaManager = mock(ReplicaManager.class); + Partition partition = mockPartition(); + when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withReplicaManager(mockReplicaManager) + .withTimer(mockTimer) + .build(); + + sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + // Validate that the listener is registered. + verify(mockReplicaManager, times(2)).maybeAddListener(any(), any()); + } + + @Test + public void testSharePartitionListenerOnFailed() { + SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); + Map partitionCacheMap = new HashMap<>(); + + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, partitionCacheMap); + testSharePartitionListener(sharePartitionKey, partitionCacheMap, partitionListener::onFailed); + } + + @Test + public void testSharePartitionListenerOnDeleted() { + SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); + Map partitionCacheMap = new HashMap<>(); + + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, partitionCacheMap); + testSharePartitionListener(sharePartitionKey, partitionCacheMap, partitionListener::onDeleted); + } + + @Test + public void testSharePartitionListenerOnLeaderToFollower() { + SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); + Map partitionCacheMap = new HashMap<>(); + + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, partitionCacheMap); + testSharePartitionListener(sharePartitionKey, partitionCacheMap, partitionListener::onLeaderToFollower); + } + + private void testSharePartitionListener(SharePartitionKey sharePartitionKey, + Map partitionCacheMap, Consumer listenerConsumer) { + // Add another share partition to the cache. + TopicPartition tp = new TopicPartition("foo", 1); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); + SharePartitionKey spk = new SharePartitionKey("grp", tpId); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + partitionCacheMap.put(sharePartitionKey, sp0); + partitionCacheMap.put(spk, sp1); + + // Invoke listener for first share partition. + listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition()); + + // Validate that the share partition is removed from the cache. + assertEquals(1, partitionCacheMap.size()); + assertFalse(partitionCacheMap.containsKey(sharePartitionKey)); + verify(sp0, times(1)).markFenced(); + + // Invoke listener for second share partition. + listenerConsumer.accept(tp); + // The second share partition should not be removed as the listener is attached to single topic partition. + assertEquals(1, partitionCacheMap.size()); + verify(sp1, times(0)).markFenced(); + } + private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() { return new ShareFetchResponseData.PartitionData().setPartitionIndex(0); } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index b6804d46bfd6b..cc8c7376209fb 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -77,6 +77,7 @@ object PartitionTest { private var highWatermark: Long = -1L private var failed: Boolean = false private var deleted: Boolean = false + private var leaderToFollower: Boolean = false override def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = { highWatermark = offset @@ -90,6 +91,10 @@ object PartitionTest { deleted = true } + override def onLeaderToFollower(partition: TopicPartition): Unit = { + leaderToFollower = true + } + private def clear(): Unit = { highWatermark = -1L failed = false @@ -104,7 +109,8 @@ object PartitionTest { def verify( expectedHighWatermark: Long = -1L, expectedFailed: Boolean = false, - expectedDeleted: Boolean = false + expectedDeleted: Boolean = false, + expectedLeaderToFollower: Boolean = false ): Unit = { assertEquals(expectedHighWatermark, highWatermark, "Unexpected high watermark") @@ -112,6 +118,8 @@ object PartitionTest { "Unexpected failed") assertEquals(expectedDeleted, deleted, "Unexpected deleted") + assertEquals(expectedLeaderToFollower, leaderToFollower, + "Unexpected leaderToFollower") clear() } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1ce3c7de5255a..4d1c0deb00ef1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6798,6 +6798,62 @@ class ReplicaManagerTest { assertEquals(Double.NaN, maxMetric.metricValue) } + @Test + def testBecomeFollowerInvokeLeaderToFollowerListener(): Unit = { + val mockTimer = new MockTimer(time) + val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) + + try { + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + + val listener = new MockPartitionListener + listener.verify() + // Register listener for the partition. + assertTrue(replicaManager.maybeAddListener(tp0, listener)) + + val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + + // None of the listener methods should be invoked yet. + listener.verify() + + // Become a follower and ensure that the delayed fetch returns immediately + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(2) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + // Leader to follower listener should be invoked. + listener.verify(expectedLeaderToFollower = true) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) try { From ad68da8068e5f0150908a5f0402c409593b51cbe Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Mon, 2 Dec 2024 23:19:15 +0000 Subject: [PATCH 4/6] Addressing review comments --- .../kafka/server/share/SharePartition.java | 24 +++++++-- .../server/share/SharePartitionManager.java | 34 ++++++++----- .../main/scala/kafka/cluster/Partition.scala | 10 ++-- .../scala/kafka/server/ReplicaManager.scala | 5 +- .../share/SharePartitionManagerTest.java | 28 +++++++---- .../server/share/SharePartitionTest.java | 4 +- .../unit/kafka/cluster/PartitionTest.scala | 13 ++--- .../kafka/server/ReplicaManagerTest.scala | 49 +++++++++++++++++-- 8 files changed, 124 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index ddc023a53151e..de6b0ca406de9 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -17,6 +17,7 @@ package kafka.server.share; import kafka.server.ReplicaManager; +import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; @@ -267,6 +268,11 @@ public static RecordState forId(byte id) { */ private final Persister persister; + /** + * The listener is used to notify the share partition manager when the share partition state changes. + */ + private final SharePartitionListener listener; + /** * The share partition start offset specifies the partition start offset from which the records * are cached in the cachedState of the sharePartition. @@ -311,10 +317,11 @@ public static RecordState forId(byte id) { Time time, Persister persister, ReplicaManager replicaManager, - GroupConfigManager groupConfigManager + GroupConfigManager groupConfigManager, + SharePartitionListener listener ) { this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs, - timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY); + timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener); } SharePartition( @@ -329,7 +336,8 @@ public static RecordState forId(byte id) { Persister persister, ReplicaManager replicaManager, GroupConfigManager groupConfigManager, - SharePartitionState sharePartitionState + SharePartitionState sharePartitionState, + SharePartitionListener listener ) { this.groupId = groupId; this.topicIdPartition = topicIdPartition; @@ -348,6 +356,7 @@ public static RecordState forId(byte id) { this.replicaManager = replicaManager; this.groupConfigManager = groupConfigManager; this.fetchOffsetMetadata = new OffsetMetadata(); + this.listener = listener; } /** @@ -1114,6 +1123,15 @@ void markFenced() { } } + /** + * Returns the share partition listener. + * + * @return The share partition listener. + */ + SharePartitionListener listener() { + return this.listener; + } + private boolean stateNotActive() { return partitionState() != SharePartitionState.ACTIVE; } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 67cb135909c49..732c275c69aee 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -614,8 +614,8 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio // However, as there could be multiple share partitions (per group name) for a single topic-partition, // hence create separate listeners per share partition which holds the share partition key // to identify the respective share partition. - replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), - new SharePartitionListener(sharePartitionKey, partitionCacheMap)); + SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCacheMap); + replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), listener); SharePartition partition = new SharePartition( sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition(), @@ -627,7 +627,8 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio time, persister, replicaManager, - groupConfigManager + groupConfigManager, + listener ); this.shareGroupMetrics.partitionLoadTime(start); return partition; @@ -647,7 +648,7 @@ private void handleInitializationException( } // Remove the partition from the cache as it's failed to initialize. - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap); + removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager); // The partition initialization failed, so add the partition to the erroneous partitions. log.debug("Error initializing share partition with key {}", sharePartitionKey, throwable); shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable); @@ -669,7 +670,7 @@ public void handleFencedSharePartitionException( // The share partition is fenced hence remove the partition from map and let the client retry. // But surface the error to the client so client might take some action i.e. re-fetch // the metadata and retry the fetch on new leader. - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap); + removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager); } } @@ -677,10 +678,12 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top return new SharePartitionKey(groupId, topicIdPartition); } - private static void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey, Map map) { + private static void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey, + Map map, ReplicaManager replicaManager) { SharePartition sharePartition = map.remove(sharePartitionKey); if (sharePartition != null) { sharePartition.markFenced(); + replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener()); } } @@ -692,34 +695,39 @@ private static void removeSharePartitionFromCache(SharePartitionKey sharePartiti * group and topic-partition. Instead of maintaining a separate map for topic-partition to share partitions, * we can maintain the share partition key in the listener and create a new listener for each share partition. */ - // Visible for testing. static class SharePartitionListener implements PartitionListener { private final SharePartitionKey sharePartitionKey; + private final ReplicaManager replicaManager; private final Map partitionCacheMap; - SharePartitionListener(SharePartitionKey sharePartitionKey, Map partitionCacheMap) { + SharePartitionListener( + SharePartitionKey sharePartitionKey, + ReplicaManager replicaManager, + Map partitionCacheMap + ) { this.sharePartitionKey = sharePartitionKey; + this.replicaManager = replicaManager; this.partitionCacheMap = partitionCacheMap; } @Override public void onFailed(TopicPartition topicPartition) { - log.info("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}", + log.debug("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, sharePartitionKey); onUpdate(topicPartition); } @Override public void onDeleted(TopicPartition topicPartition) { - log.info("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}", + log.debug("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, sharePartitionKey); onUpdate(topicPartition); } @Override - public void onLeaderToFollower(TopicPartition topicPartition) { - log.info("The share partition leader change listener is invoked for the topic-partition: {}, share-partition: {}", + public void onBecomingFollower(TopicPartition topicPartition) { + log.debug("The share partition leader change listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, sharePartitionKey); onUpdate(topicPartition); } @@ -730,7 +738,7 @@ private void onUpdate(TopicPartition topicPartition) { topicPartition, sharePartitionKey); return; } - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap); + removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager); } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 12c389d8a773b..a76104c79ce24 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -84,9 +84,9 @@ trait PartitionListener { def onDeleted(partition: TopicPartition): Unit = {} /** - * Called when the Partition on this broker is transitioned to follower from leader. + * Called when the Partition on this broker is transitioned to follower. */ - def onLeaderToFollower(partition: TopicPartition): Unit = {} + def onBecomingFollower(partition: TopicPartition): Unit = {} } trait AlterPartitionListener { @@ -707,11 +707,11 @@ class Partition(val topicPartition: TopicPartition, } /** - * Invoke the partition listeners when the partition has been transitioned to follower from leader. + * Invoke the partition listeners when the partition has been transitioned to follower. */ - def invokeLeaderToFollowerListeners(): Unit = { + def invokeOnBecomingFollowerListeners(): Unit = { listeners.forEach { listener => - listener.onLeaderToFollower(topicPartition) + listener.onBecomingFollower(topicPartition) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0b0d6636d0a60..85efaf4029acc 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2455,8 +2455,7 @@ class ReplicaManager(val config: KafkaConfig, if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { // Only change partition state when the leader is available if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) { - // Invoke the follower transition listeners for the partition. - partition.invokeLeaderToFollowerListeners() + // Skip invoking onBecomingFollower listeners as the listeners are not registered for zk-based features. partitionsToMakeFollower += partition } } else { @@ -3016,7 +3015,7 @@ class ReplicaManager(val config: KafkaConfig, partitionsToStopFetching.put(tp, false) } else if (isNewLeaderEpoch) { // Invoke the follower transition listeners for the partition. - partition.invokeLeaderToFollowerListeners() + partition.invokeOnBecomingFollowerListeners() // Otherwise, fetcher is restarted if the leader epoch has changed. partitionsToStartFetching.put(tp, partition) } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 2bb9f20db118f..91e9787b19508 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -2528,9 +2528,10 @@ public void testSharePartitionListenerOnFailed() { SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); Map partitionCacheMap = new HashMap<>(); + ReplicaManager mockReplicaManager = mock(ReplicaManager.class); - SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, partitionCacheMap); - testSharePartitionListener(sharePartitionKey, partitionCacheMap, partitionListener::onFailed); + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap); + testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onFailed); } @Test @@ -2538,23 +2539,29 @@ public void testSharePartitionListenerOnDeleted() { SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); Map partitionCacheMap = new HashMap<>(); + ReplicaManager mockReplicaManager = mock(ReplicaManager.class); - SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, partitionCacheMap); - testSharePartitionListener(sharePartitionKey, partitionCacheMap, partitionListener::onDeleted); + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap); + testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onDeleted); } @Test - public void testSharePartitionListenerOnLeaderToFollower() { + public void testSharePartitionListenerOnBecomingFollower() { SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); Map partitionCacheMap = new HashMap<>(); + ReplicaManager mockReplicaManager = mock(ReplicaManager.class); - SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, partitionCacheMap); - testSharePartitionListener(sharePartitionKey, partitionCacheMap, partitionListener::onLeaderToFollower); + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap); + testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onBecomingFollower); } - private void testSharePartitionListener(SharePartitionKey sharePartitionKey, - Map partitionCacheMap, Consumer listenerConsumer) { + private void testSharePartitionListener( + SharePartitionKey sharePartitionKey, + Map partitionCacheMap, + ReplicaManager mockReplicaManager, + Consumer listenerConsumer + ) { // Add another share partition to the cache. TopicPartition tp = new TopicPartition("foo", 1); TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); @@ -2572,12 +2579,15 @@ private void testSharePartitionListener(SharePartitionKey sharePartitionKey, assertEquals(1, partitionCacheMap.size()); assertFalse(partitionCacheMap.containsKey(sharePartitionKey)); verify(sp0, times(1)).markFenced(); + verify(mockReplicaManager, times(1)).removeListener(any(), any()); // Invoke listener for second share partition. listenerConsumer.accept(tp); // The second share partition should not be removed as the listener is attached to single topic partition. assertEquals(1, partitionCacheMap.size()); verify(sp1, times(0)).markFenced(); + // Verify the remove listener is not called for the second share partition. + verify(mockReplicaManager, times(1)).removeListener(any(), any()); } private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() { diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 3e90005902e10..a8359d239b8fd 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -21,6 +21,7 @@ import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.SharePartitionState; +import kafka.server.share.SharePartitionManager.SharePartitionListener; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -5534,7 +5535,8 @@ public static SharePartitionBuilder builder() { public SharePartition build() { return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount, - defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager, state); + defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager, + state, Mockito.mock(SharePartitionListener.class)); } } } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index cc8c7376209fb..0cb298eda1c38 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -77,7 +77,7 @@ object PartitionTest { private var highWatermark: Long = -1L private var failed: Boolean = false private var deleted: Boolean = false - private var leaderToFollower: Boolean = false + private var follower: Boolean = false override def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = { highWatermark = offset @@ -91,14 +91,15 @@ object PartitionTest { deleted = true } - override def onLeaderToFollower(partition: TopicPartition): Unit = { - leaderToFollower = true + override def onBecomingFollower(partition: TopicPartition): Unit = { + follower = true } private def clear(): Unit = { highWatermark = -1L failed = false deleted = false + follower = false } /** @@ -110,7 +111,7 @@ object PartitionTest { expectedHighWatermark: Long = -1L, expectedFailed: Boolean = false, expectedDeleted: Boolean = false, - expectedLeaderToFollower: Boolean = false + expectedFollower: Boolean = false ): Unit = { assertEquals(expectedHighWatermark, highWatermark, "Unexpected high watermark") @@ -118,8 +119,8 @@ object PartitionTest { "Unexpected failed") assertEquals(expectedDeleted, deleted, "Unexpected deleted") - assertEquals(expectedLeaderToFollower, leaderToFollower, - "Unexpected leaderToFollower") + assertEquals(expectedFollower, follower, + "Unexpected follower") clear() } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 4d1c0deb00ef1..a9be0c288d2b5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6799,7 +6799,50 @@ class ReplicaManagerTest { } @Test - def testBecomeFollowerInvokeLeaderToFollowerListener(): Unit = { + def testBecomeFollowerInvokeOnBecomingFollowerListener(): Unit = { + val localId = 1 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + // Attach listener to partition. + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) + replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val listener = new MockPartitionListener + assertTrue(replicaManager.maybeAddListener(topicPartition, listener)) + listener.verify() + + try { + // Make the local replica the leader + val leaderTopicsDelta = topicsCreateDelta(localId, true) + val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) + + replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) + + // Check the state of that partition and fetcher + val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) + assertTrue(leaderPartition.isLeader) + assertEquals(0, leaderPartition.getLeaderEpoch) + // On becoming follower listener should not be invoked yet. + listener.verify() + + // Change the local replica to follower + val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) + + // On becoming follower listener should be invoked. + listener.verify(expectedFollower = true) + + // Check the state of that partition. + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(1, followerPartition.getLeaderEpoch) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testBecomeFollowerNotInvokeOnBecomingFollowerListenerOnZk(): Unit = { val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) @@ -6847,8 +6890,8 @@ class ReplicaManagerTest { topicIds.asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - // Leader to follower listener should be invoked. - listener.verify(expectedLeaderToFollower = true) + // On becoming follower listener should not be invoked. + listener.verify() } finally { replicaManager.shutdown(checkpointHW = false) } From fa6b01e420eb033b3227a178e7339effb7e2fdb3 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 3 Dec 2024 23:40:57 +0000 Subject: [PATCH 5/6] Addressing review comments --- .../server/share/SharePartitionManager.java | 2 +- .../share/SharePartitionManagerTest.java | 4 +- .../kafka/server/ReplicaManagerTest.scala | 56 ------------------- 3 files changed, 3 insertions(+), 59 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 732c275c69aee..4847298e1e200 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -727,7 +727,7 @@ public void onDeleted(TopicPartition topicPartition) { @Override public void onBecomingFollower(TopicPartition topicPartition) { - log.debug("The share partition leader change listener is invoked for the topic-partition: {}, share-partition: {}", + log.debug("The share partition becoming follower listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, sharePartitionKey); onUpdate(topicPartition); } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 91e9787b19508..62680bd3c0cec 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -2581,9 +2581,9 @@ private void testSharePartitionListener( verify(sp0, times(1)).markFenced(); verify(mockReplicaManager, times(1)).removeListener(any(), any()); - // Invoke listener for second share partition. + // Invoke listener for non-matching share partition. listenerConsumer.accept(tp); - // The second share partition should not be removed as the listener is attached to single topic partition. + // The non-matching share partition should not be removed as the listener is attached to a different topic partition. assertEquals(1, partitionCacheMap.size()); verify(sp1, times(0)).markFenced(); // Verify the remove listener is not called for the second share partition. diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a9be0c288d2b5..9b6e2aac3fb82 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6841,62 +6841,6 @@ class ReplicaManagerTest { } } - @Test - def testBecomeFollowerNotInvokeOnBecomingFollowerListenerOnZk(): Unit = { - val mockTimer = new MockTimer(time) - val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) - - try { - val tp0 = new TopicPartition(topic, 0) - val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - - val listener = new MockPartitionListener - listener.verify() - // Register listener for the partition. - assertTrue(replicaManager.maybeAddListener(tp0, listener)) - - val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) - - // None of the listener methods should be invoked yet. - listener.verify() - - // Become a follower and ensure that the delayed fetch returns immediately - val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(2) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - // On becoming follower listener should not be invoked. - listener.verify() - } finally { - replicaManager.shutdown(checkpointHW = false) - } - } - private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) try { From 1a1eacef27492618dbbd9d7bbd4be8d064875f8f Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 3 Dec 2024 23:44:43 +0000 Subject: [PATCH 6/6] Addressing review comments --- .../java/kafka/server/share/SharePartitionManager.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 4847298e1e200..ac96fe348b3b5 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -678,8 +678,11 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top return new SharePartitionKey(groupId, topicIdPartition); } - private static void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey, - Map map, ReplicaManager replicaManager) { + private static void removeSharePartitionFromCache( + SharePartitionKey sharePartitionKey, + Map map, + ReplicaManager replicaManager + ) { SharePartition sharePartition = map.remove(sharePartitionKey); if (sharePartition != null) { sharePartition.markFenced();