From 803a85f2a3b10a7f82b4c65dc2d01ffefdc7f98e Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Thu, 18 Jun 2026 11:02:57 +0100 Subject: [PATCH] KAFKA-20711: Measure restore progress in offset slots The restore-remaining-records metric never reached zero when a changelog contained offsets the restore consumer does not return as records, such as transaction markers or compacted-away records. The metric is initialized from the offset range (restoreEndOffset minus startOffset), which counts every offset slot, but it was decremented by the number of records actually restored. Those two quantities diverge whenever the changelog has gaps, leaving the metric stuck at a positive value after restoration completed. The decrements are now expressed in offset slots rather than record counts. Each batch advances the metric by the change in store offset, and on completion any trailing slots between the last restored record and the end offset are accounted for, so the metric lands at exactly zero. The store offset before each batch is captured to measure the slots covered, and the consumer position at which restoration began is recorded so progress can be tracked from the first record onward. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../internals/StoreChangelogReader.java | 46 +++++++++++- .../internals/StoreChangelogReaderTest.java | 74 +++++++++++++++++++ 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index ebd8d6b97cd76..2e04489030d4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -141,6 +141,12 @@ static class ChangelogMetadata { private long restoreStartTimeNs; + // the consumer position at which restoration started (set once for active tasks). together + // with the store offset it lets restoration progress be measured in offset slots rather than + // record counts, so the remaining-records metric stays consistent with its offset-based + // initial value even when the consumer skips transaction markers or compacted-away offsets + private long restoreStartOffset; + private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) { this.changelogState = ChangelogState.REGISTERED; this.storeMetadata = storeMetadata; @@ -651,6 +657,9 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM if (numRecords != 0) { final List> records = changelogMetadata.bufferedRecords.subList(0, numRecords); + // the store offset before this batch marks where restoration had previously reached; a null + // value means nothing has been restored yet, so progress is measured from restoreStartOffset + final Long offsetBeforeRestore = storeMetadata.offset(); final OptionalLong optionalLag = restoreConsumer.currentLag(partition); stateManager.restore(storeMetadata, records, optionalLag); @@ -663,9 +672,12 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM changelogMetadata.bufferedRecords.clear(); } - task.recordRestoration(time, numRecords, false); - + // restoring a batch advances the store offset to the last restored record's offset final Long currentOffset = storeMetadata.offset(); + + // advance the restoration metrics by the changelog offset slots this batch covered + recordRestorationProgress(task, changelogMetadata, offsetBeforeRestore, currentOffset); + log.trace("Restored {} records from changelog {} to store {}, end offset is {}, current offset is {}", numRecords, partition, storeName, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset); @@ -693,6 +705,11 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM log.info("Finished restoring changelog {} to store {} with a total number of {} records", partition, storeName, changelogMetadata.totalRestored); + // any offset slots between the last restored record and the end offset (e.g. trailing + // transaction markers) were never seen as records; account for them now so the + // remaining-records metric reaches exactly zero on completion + recordRestorationProgress(task, changelogMetadata, storeMetadata.offset(), changelogMetadata.restoreEndOffset - 1); + changelogMetadata.transitTo(ChangelogState.COMPLETED); pauseChangelogsFromRestoreConsumer(Collections.singleton(partition)); if (storeMetadata.store() instanceof MeteredStateStore) { @@ -713,6 +730,28 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM return numRecords; } + /** + * Advance the restore/update total and rate metrics (and, for active tasks, the remaining-records + * metric) by the number of changelog offset slots covered between {@code previousOffset} and + * {@code restoredToOffset}, rather than by a record count. Measuring in offset slots accounts for + * offsets occupied by transaction markers or compacted-away records that the restore consumer never + * returns, keeping the metrics consistent with the offset-based remaining-records value so it reaches + * exactly zero on completion. + * + * @param previousOffset the store offset before this progress (null if nothing has been restored yet, + * in which case restoration is measured from {@code restoreStartOffset}) + * @param restoredToOffset the offset restoration has now reached + */ + private void recordRestorationProgress(final Task task, + final ChangelogMetadata changelogMetadata, + final Long previousOffset, + final long restoredToOffset) { + final long restoredFrom = previousOffset == null ? changelogMetadata.restoreStartOffset - 1 : previousOffset; + if (restoredToOffset > restoredFrom) { + task.recordRestoration(time, restoredToOffset - restoredFrom, false); + } + } + private Set getTasksFromPartitions(final Map tasks, final Set partitions) { final Set result = new HashSet<>(); @@ -1016,6 +1055,9 @@ private void prepareChangelogs(final Map tasks, throw new StreamsException("Restore consumer get unexpected error trying to get the position " + " of " + partition, e); } + // remember where restoration began so progress can be measured in offset slots; for an + // active task this also seeds the remaining-records metric below + changelogMetadata.restoreStartOffset = startOffset; if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) { try { stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index ad81af114946a..f114f002027a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -54,6 +54,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -94,6 +95,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -663,6 +665,78 @@ public void shouldCheckCompletionIfPositionLargerThanEndOffset() { assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); } + @Test + public void shouldDecrementRemainingRecordsToZeroWithOffsetGaps() { + // The restore-remaining-records metric is initialized from the offset range + // (restoreEndOffset - startOffset), which counts every offset slot including those + // occupied by transaction markers / compacted-away records that the restore consumer + // never returns. The per-batch decrements must therefore also be expressed in offset + // slots, so that the metric reaches exactly zero once restoration completes. + setupActiveStateManager(); + setupStoreMetadata(); + setupStore(); + final TaskId taskId = new TaskId(0, 0); + + // offset is null while preparing (forcing a seek-to-beginning so startOffset == 0, the + // from-the-beginning edge case) and still null immediately before the first batch is + // restored; once a batch is applied it reflects the last restored record's offset (7) + when(storeMetadata.offset()).thenReturn(null).thenReturn(null).thenReturn(7L); + when(activeStateManager.taskId()).thenReturn(taskId); + + consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); + // end offset is 10, but the changelog only contains 8 data records (offsets 0..7); + // offsets 8 and 9 are occupied by transaction markers, so the metric is initialized to 10 + adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); + + final Task mockTask = mock(Task.class); + + changelogReader.register(tp, activeStateManager); + + // first restore initializes the changelog and records the initial remaining (= 10 slots) + changelogReader.restore(Collections.singletonMap(taskId, mockTask)); + assertEquals(0L, consumer.position(tp)); + assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); + + // 8 contiguous data records at offsets 0..7 + for (int offset = 0; offset < 8; offset++) { + consumer.addRecord(new ConsumerRecord<>(topicName, 0, offset, "key".getBytes(), "value".getBytes())); + } + + // second restore applies the 8 data records + changelogReader.restore(Collections.singletonMap(taskId, mockTask)); + assertEquals(8L, changelogReader.changelogMetadata(tp).totalRestored()); + assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); + + // bypass the trailing transaction-marker offsets (8, 9) so the consumer reaches the end + consumer.seek(tp, 10L); + + // third restore observes position >= endOffset and completes restoration + changelogReader.restore(Collections.singletonMap(taskId, mockTask)); + assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state()); + // totalRestored continues to count actual records, not offset slots + assertEquals(8L, changelogReader.changelogMetadata(tp).totalRestored()); + + // capture every recordRestoration call: the single initRemaining=true call provides the + // initial value, and the initRemaining=false calls are decrements applied to the metric + final ArgumentCaptor numCaptor = ArgumentCaptor.forClass(Long.class); + final ArgumentCaptor initCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(mockTask, atLeastOnce()).recordRestoration(any(), numCaptor.capture(), initCaptor.capture()); + + long initial = 0L; + long decremented = 0L; + for (int i = 0; i < numCaptor.getAllValues().size(); i++) { + if (initCaptor.getAllValues().get(i)) { + initial += numCaptor.getAllValues().get(i); + } else { + decremented += numCaptor.getAllValues().get(i); + } + } + + assertEquals(10L, initial, "metric should be initialized from the offset range"); + // the decrements must net the metric back to exactly zero + assertEquals(initial, decremented, "remaining-records metric should decrement to exactly zero"); + } + @Test public void shouldRequestPositionAndHandleTimeoutException() { setupActiveStateManager();