From c395358282111641cd8c03062e17c6bf1ab838b9 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 17 Jun 2026 17:40:55 +0530 Subject: [PATCH 1/2] Add periodic job generation test --- .../share/ShareCoordinatorServiceTest.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 4479d76d515dc..03ddec4aaf53e 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -2066,6 +2066,82 @@ public void testPeriodicJobsDoNotRunWhenShareGroupsDisabled() throws Interrupted service.shutdown(); } + @Test + public void testPeriodicJobsDoNotDuplicateAfterDisableEnableWithInFlightJobs() throws InterruptedException { + CoordinatorRuntime runtime = mockRuntime(); + PartitionWriter writer = mock(PartitionWriter.class); + MockTime time = new MockTime(); + MockTimer timer = spy(new MockTimer(time)); + + Metrics metrics = new Metrics(); + + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(metrics), + time, + timer, + writer + )); + + CompletableFuture> firstPruneFuture = new CompletableFuture<>(); + CompletableFuture> secondPruneFuture = new CompletableFuture<>(); + CompletableFuture firstSnapshotFuture = new CompletableFuture<>(); + CompletableFuture secondSnapshotFuture = new CompletableFuture<>(); + + when(runtime.>scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any() + )) + .thenReturn(firstPruneFuture) + .thenReturn(secondPruneFuture); + + when(runtime.scheduleWriteAllOperation( + eq("snapshot-cold-partitions"), + any() + )) + .thenReturn(List.of(firstSnapshotFuture)) + .thenReturn(List.of(secondSnapshotFuture)); + + service.startup(() -> 1); + + MetadataImage disabledImage = mock(MetadataImage.class, RETURNS_DEEP_STUBS); + when(disabledImage.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME), anyShort())).thenReturn((short) 0); + + MetadataImage enabledImage = mockMetadataImageWithShareGroupsEnabled(); + service.onMetadataUpdate(mock(MetadataDelta.class), enabledImage); + + verify(timer, times(2)).add(any()); + timer.advanceClock(30001L); + verify(runtime, times(1)).scheduleWriteOperation( + eq("write-state-record-prune"), + any(), + any() + ); + verify(runtime, times(1)).scheduleWriteAllOperation( + eq("snapshot-cold-partitions"), + any() + ); + + service.onMetadataUpdate(mock(MetadataDelta.class), disabledImage); + assertFalse(service.shouldRunPeriodicJob()); + + service.onMetadataUpdate(mock(MetadataDelta.class), enabledImage); + assertTrue(service.shouldRunPeriodicJob()); + verify(timer, times(4)).add(any()); + + firstPruneFuture.complete(Optional.empty()); + firstSnapshotFuture.complete(null); + + verify(timer, times(4)).add(any()); + + checkMetrics(metrics); + + service.shutdown(); + } + @Test public void testShareStateTopicConfigs() { CoordinatorRuntime runtime = mockRuntime(); From 03e4e12061e6430b5c46548e038014e46f9a6c9e Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 17 Jun 2026 17:42:44 +0530 Subject: [PATCH 2/2] Fence stale periodic jobs --- .../share/ShareCoordinatorService.java | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 702d27b857e96..d45ab9326f3fe 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -76,6 +76,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntSupplier; import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException; @@ -141,6 +142,8 @@ public class ShareCoordinatorService implements ShareCoordinator { */ private volatile boolean shouldRunPeriodicJob; + private final AtomicLong periodicJobGeneration = new AtomicLong(); + public static class Builder { private final int nodeId; private final ShareCoordinatorConfig config; @@ -319,9 +322,9 @@ public void startup( log.info("Startup complete."); } - private void setupPeriodicJobs() { - setupRecordPruning(); - setupSnapshotColdPartitions(); + private void setupPeriodicJobs(long generation) { + setupRecordPruning(generation); + setupSnapshotColdPartitions(generation); } /** @@ -331,11 +334,15 @@ private void setupPeriodicJobs() { */ // Visibility for tests void setupRecordPruning() { + setupRecordPruning(periodicJobGeneration.get()); + } + + private void setupRecordPruning(long generation) { log.debug("Scheduling share-group state topic prune job."); timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { @Override public void run() { - if (!shouldRunPeriodicJob) { + if (!shouldRunPeriodicJob(generation)) { return; } List> futures = new ArrayList<>(); @@ -346,8 +353,11 @@ public void run() { if (exp != null) { log.error("Received error in share-group state topic prune.", exp); } + if (!shouldRunPeriodicJob(generation)) { + return; + } // Perpetual recursion, failure or not. - setupRecordPruning(); + setupRecordPruning(generation); }); } }); @@ -415,11 +425,15 @@ private CompletableFuture performRecordPruning(TopicPartition tp) { */ // Visibility for tests void setupSnapshotColdPartitions() { + setupSnapshotColdPartitions(periodicJobGeneration.get()); + } + + private void setupSnapshotColdPartitions(long generation) { log.debug("Scheduling cold share-partition snapshotting."); timer.add(new TimerTask(config.shareCoordinatorColdPartitionSnapshotIntervalMs()) { @Override public void run() { - if (!shouldRunPeriodicJob) { + if (!shouldRunPeriodicJob(generation)) { return; } List> futures = runtime.scheduleWriteAllOperation( @@ -432,7 +446,10 @@ public void run() { if (exp != null) { log.error("Received error while snapshotting cold partitions.", exp); } - setupSnapshotColdPartitions(); + if (!shouldRunPeriodicJob(generation)) { + return; + } + setupSnapshotColdPartitions(generation); }); } }); @@ -1137,8 +1154,9 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) { // 1 1 no op on flag, do not call jobs if (enabled ^ shouldRunPeriodicJob) { shouldRunPeriodicJob = enabled; + long generation = periodicJobGeneration.incrementAndGet(); if (enabled) { - setupPeriodicJobs(); + setupPeriodicJobs(generation); } } } @@ -1182,4 +1200,8 @@ private boolean isShareGroupsEnabled(MetadataImage image) { boolean shouldRunPeriodicJob() { return shouldRunPeriodicJob; } + + private boolean shouldRunPeriodicJob(long generation) { + return shouldRunPeriodicJob && periodicJobGeneration.get() == generation; + } }