Skip to content

KAFKA-17783: Adding listeners to remove share partition on partition changes #17796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 4, 2024
24 changes: 21 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.server.share;

import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager.SharePartitionListener;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
Expand Down Expand Up @@ -267,6 +268,11 @@ public static RecordState forId(byte id) {
*/
private final Persister persister;

/**
* The listener is used to notify the share partition manager when the share partition state changes.
*/
private final SharePartitionListener listener;

/**
* The share partition start offset specifies the partition start offset from which the records
* are cached in the cachedState of the sharePartition.
Expand Down Expand Up @@ -311,10 +317,11 @@ public static RecordState forId(byte id) {
Time time,
Persister persister,
ReplicaManager replicaManager,
GroupConfigManager groupConfigManager
GroupConfigManager groupConfigManager,
SharePartitionListener listener
) {
this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY);
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener);
}

SharePartition(
Expand All @@ -329,7 +336,8 @@ public static RecordState forId(byte id) {
Persister persister,
ReplicaManager replicaManager,
GroupConfigManager groupConfigManager,
SharePartitionState sharePartitionState
SharePartitionState sharePartitionState,
SharePartitionListener listener
) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
Expand All @@ -348,6 +356,7 @@ public static RecordState forId(byte id) {
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
this.fetchOffsetMetadata = new OffsetMetadata();
this.listener = listener;
}

/**
Expand Down Expand Up @@ -1114,6 +1123,15 @@ void markFenced() {
}
}

/**
* Returns the share partition listener.
*
* @return The share partition listener.
*/
SharePartitionListener listener() {
return this.listener;
}

private boolean stateNotActive() {
return partitionState() != SharePartitionState.ACTIVE;
}
Expand Down
87 changes: 78 additions & 9 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server.share;

import kafka.cluster.PartitionListener;
import kafka.server.ReplicaManager;

import org.apache.kafka.clients.consumer.AcknowledgeType;
Expand Down Expand Up @@ -609,6 +610,12 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
k -> {
long start = time.hiResClockMs();
int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition());
// Attach listener to Partition which shall invoke partition change handlers.
// However, as there could be multiple share partitions (per group name) for a single topic-partition,
// hence create separate listeners per share partition which holds the share partition key
// to identify the respective share partition.
SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCacheMap);
replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), listener);
SharePartition partition = new SharePartition(
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
Expand All @@ -620,7 +627,8 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
time,
persister,
replicaManager,
groupConfigManager
groupConfigManager,
listener
);
this.shareGroupMetrics.partitionLoadTime(start);
return partition;
Expand All @@ -640,10 +648,7 @@ private void handleInitializationException(
}

// Remove the partition from the cache as it's failed to initialize.
SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey);
if (sharePartition != null) {
sharePartition.markFenced();
}
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
// The partition initialization failed, so add the partition to the erroneous partitions.
log.debug("Error initializing share partition with key {}", sharePartitionKey, throwable);
shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable);
Expand All @@ -665,17 +670,81 @@ public void handleFencedSharePartitionException(
// The share partition is fenced hence remove the partition from map and let the client retry.
// But surface the error to the client so client might take some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey);
if (sharePartition != null) {
sharePartition.markFenced();
}
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
}
}

private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
return new SharePartitionKey(groupId, topicIdPartition);
}

private static void removeSharePartitionFromCache(
SharePartitionKey sharePartitionKey,
Map<SharePartitionKey, SharePartition> map,
ReplicaManager replicaManager
) {
SharePartition sharePartition = map.remove(sharePartitionKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: A line break before the ) would separate the argument declarations from the statements more clearly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (sharePartition != null) {
sharePartition.markFenced();
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener());
}
}

/**
* The SharePartitionListener is used to listen for partition events. The share partition is associated with
* the topic-partition, we need to handle the partition events for the share partition.
* <p>
* The partition cache map stores share partitions against share partition key which comprises
* group and topic-partition. Instead of maintaining a separate map for topic-partition to share partitions,
* we can maintain the share partition key in the listener and create a new listener for each share partition.
*/
static class SharePartitionListener implements PartitionListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to make this a private class? This way, we don't need to pass in the states in SharePartitionManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I was initially thinking to do but it made writing tests hard as now I have to create an instance of SharePartitionManager and invoke calls to create SharePartition as well which will provide me the listener. Hence thought to make it static itself.


private final SharePartitionKey sharePartitionKey;
private final ReplicaManager replicaManager;
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;

SharePartitionListener(
SharePartitionKey sharePartitionKey,
ReplicaManager replicaManager,
Map<SharePartitionKey, SharePartition> partitionCacheMap
) {
this.sharePartitionKey = sharePartitionKey;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
}

@Override
public void onFailed(TopicPartition topicPartition) {
log.debug("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}",
topicPartition, sharePartitionKey);
onUpdate(topicPartition);
}

@Override
public void onDeleted(TopicPartition topicPartition) {
log.debug("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}",
topicPartition, sharePartitionKey);
onUpdate(topicPartition);
}

@Override
public void onBecomingFollower(TopicPartition topicPartition) {
log.debug("The share partition becoming follower listener is invoked for the topic-partition: {}, share-partition: {}",
topicPartition, sharePartitionKey);
onUpdate(topicPartition);
}

private void onUpdate(TopicPartition topicPartition) {
if (!sharePartitionKey.topicIdPartition().topicPartition().equals(topicPartition)) {
log.error("The share partition listener is invoked for the wrong topic-partition: {}, share-partition: {}",
topicPartition, sharePartitionKey);
return;
}
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
}
}

static class ShareGroupMetrics {
/**
* share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) - The total number of offsets acknowledged for share groups (requests to be ack).
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ trait PartitionListener {
* that the partition was deleted but only that this broker does not host a replica of it any more.
*/
def onDeleted(partition: TopicPartition): Unit = {}

/**
* Called when the Partition on this broker is transitioned to follower.
*/
def onBecomingFollower(partition: TopicPartition): Unit = {}
}

trait AlterPartitionListener {
Expand Down Expand Up @@ -701,6 +706,15 @@ class Partition(val topicPartition: TopicPartition,
}
}

/**
* Invoke the partition listeners when the partition has been transitioned to follower.
*/
def invokeOnBecomingFollowerListeners(): Unit = {
listeners.forEach { listener =>
listener.onBecomingFollower(topicPartition)
}
}

private def clear(): Unit = {
remoteReplicasMap.clear()
assignmentState = SimpleAssignmentState(Seq.empty)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2455,6 +2455,7 @@ class ReplicaManager(val config: KafkaConfig,
if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
// Only change partition state when the leader is available
if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
// Skip invoking onBecomingFollower listeners as the listeners are not registered for zk-based features.
partitionsToMakeFollower += partition
}
} else {
Expand Down Expand Up @@ -3013,6 +3014,8 @@ class ReplicaManager(val config: KafkaConfig,
// where this broker is not in the ISR are stopped.
partitionsToStopFetching.put(tp, false)
} else if (isNewLeaderEpoch) {
// Invoke the follower transition listeners for the partition.
partition.invokeOnBecomingFollowerListeners()
// Otherwise, fetcher is restarted if the leader epoch has changed.
partitionsToStartFetching.put(tp, partition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.SharePartitionManager.SharePartitionListener;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -132,6 +133,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Timeout(120)
Expand Down Expand Up @@ -2496,6 +2498,98 @@ public void testReplicaManagerFetchMultipleSharePartitionsException() {
assertTrue(partitionCacheMap.isEmpty());
}

@Test
public void testListenerRegistration() {
String groupId = "grp";
Uuid memberId = Uuid.randomUuid();

TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);

ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
Partition partition = mockPartition();
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);

SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();

sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
// Validate that the listener is registered.
verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
}

@Test
public void testSharePartitionListenerOnFailed() {
SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);

SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onFailed);
}

@Test
public void testSharePartitionListenerOnDeleted() {
SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);

SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onDeleted);
}

@Test
public void testSharePartitionListenerOnBecomingFollower() {
SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);

SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onBecomingFollower);
}

private void testSharePartitionListener(
SharePartitionKey sharePartitionKey,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ReplicaManager mockReplicaManager,
Consumer<TopicPartition> listenerConsumer
) {
// Add another share partition to the cache.
TopicPartition tp = new TopicPartition("foo", 1);
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
SharePartitionKey spk = new SharePartitionKey("grp", tpId);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
partitionCacheMap.put(sharePartitionKey, sp0);
partitionCacheMap.put(spk, sp1);

// Invoke listener for first share partition.
listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());

// Validate that the share partition is removed from the cache.
assertEquals(1, partitionCacheMap.size());
assertFalse(partitionCacheMap.containsKey(sharePartitionKey));
verify(sp0, times(1)).markFenced();
verify(mockReplicaManager, times(1)).removeListener(any(), any());

// Invoke listener for non-matching share partition.
listenerConsumer.accept(tp);
// The non-matching share partition should not be removed as the listener is attached to a different topic partition.
assertEquals(1, partitionCacheMap.size());
verify(sp1, times(0)).markFenced();
// Verify the remove listener is not called for the second share partition.
verify(mockReplicaManager, times(1)).removeListener(any(), any());
}

private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState;
import kafka.server.share.SharePartition.SharePartitionState;
import kafka.server.share.SharePartitionManager.SharePartitionListener;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -5534,7 +5535,8 @@ public static SharePartitionBuilder builder() {

public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount,
defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager, state);
defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager,
state, Mockito.mock(SharePartitionListener.class));
}
}
}
Loading
Loading