KAFKA-20710: Share coordinator - Fence stale periodic jobs#22603
KAFKA-20710: Share coordinator - Fence stale periodic jobs#22603Shekharrajak wants to merge 2 commits into
Conversation
| */ | ||
| private volatile boolean shouldRunPeriodicJob; | ||
|
|
||
| private final AtomicLong periodicJobGeneration = new AtomicLong(); |
There was a problem hiding this comment.
protects duplicate timer chains - similar to epoch concept.
| } | ||
|
|
||
| private boolean shouldRunPeriodicJob(long generation) { | ||
| return shouldRunPeriodicJob && periodicJobGeneration.get() == generation; |
There was a problem hiding this comment.
It must not start work if it belongs to old generation.
|
|
||
| service.onMetadataUpdate(mock(MetadataDelta.class), enabledImage); | ||
| assertTrue(service.shouldRunPeriodicJob()); | ||
| verify(timer, times(4)).add(any()); |
There was a problem hiding this comment.
Without the fix, this became 6 because old prune and old snapshot completions each rescheduled one stale job.
There was a problem hiding this comment.
2(old) + 4(new)
| .thenReturn(List.of(firstSnapshotFuture)) | ||
| .thenReturn(List.of(secondSnapshotFuture)); | ||
|
|
||
| service.startup(() -> 1); |
There was a problem hiding this comment.
two periodic jobs:
- record prune job: write-state-record-prune
- cold partition snapshot job: snapshot-cold-partitions
| service.onMetadataUpdate(mock(MetadataDelta.class), enabledImage); | ||
|
|
||
| verify(timer, times(2)).add(any()); | ||
| timer.advanceClock(30001L); |
There was a problem hiding this comment.
manually advances mock time so the scheduled timer tasks fire
|
|
||
| verify(timer, times(2)).add(any()); | ||
| timer.advanceClock(30001L); | ||
| verify(runtime, times(1)).scheduleWriteOperation( |
There was a problem hiding this comment.
both delayed tasks run once when we advance the timer
Ref https://issues.apache.org/jira/browse/KAFKA-20710
ShareCoordinatorServicewith a periodic-job generation guard so stale timer tasks and stale async completions cannot reschedule after disable/re-enable.