Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ static class ChangelogMetadata {

private long restoreStartTimeNs;

// the consumer position at which restoration started (set once for active tasks). together
// with the store offset it lets restoration progress be measured in offset slots rather than
// record counts, so the remaining-records metric stays consistent with its offset-based
// initial value even when the consumer skips transaction markers or compacted-away offsets
private long restoreStartOffset;

private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) {
this.changelogState = ChangelogState.REGISTERED;
this.storeMetadata = storeMetadata;
Expand Down Expand Up @@ -651,6 +657,9 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM

if (numRecords != 0) {
final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords);
// the store offset before this batch marks where restoration had previously reached; a null
// value means nothing has been restored yet, so progress is measured from restoreStartOffset
final Long offsetBeforeRestore = storeMetadata.offset();
final OptionalLong optionalLag = restoreConsumer.currentLag(partition);
stateManager.restore(storeMetadata, records, optionalLag);

Expand All @@ -663,9 +672,12 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
changelogMetadata.bufferedRecords.clear();
}

task.recordRestoration(time, numRecords, false);

// restoring a batch advances the store offset to the last restored record's offset
final Long currentOffset = storeMetadata.offset();

// advance the restoration metrics by the changelog offset slots this batch covered
recordRestorationProgress(task, changelogMetadata, offsetBeforeRestore, currentOffset);

log.trace("Restored {} records from changelog {} to store {}, end offset is {}, current offset is {}",
numRecords, partition, storeName, recordEndOffset(changelogMetadata.restoreEndOffset), currentOffset);

Expand Down Expand Up @@ -693,6 +705,11 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
log.info("Finished restoring changelog {} to store {} with a total number of {} records",
partition, storeName, changelogMetadata.totalRestored);

// any offset slots between the last restored record and the end offset (e.g. trailing
// transaction markers) were never seen as records; account for them now so the
// remaining-records metric reaches exactly zero on completion
recordRestorationProgress(task, changelogMetadata, storeMetadata.offset(), changelogMetadata.restoreEndOffset - 1);

changelogMetadata.transitTo(ChangelogState.COMPLETED);
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
if (storeMetadata.store() instanceof MeteredStateStore) {
Expand All @@ -713,6 +730,28 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
return numRecords;
}

/**
* Advance the restore/update total and rate metrics (and, for active tasks, the remaining-records
* metric) by the number of changelog offset slots covered between {@code previousOffset} and
* {@code restoredToOffset}, rather than by a record count. Measuring in offset slots accounts for
* offsets occupied by transaction markers or compacted-away records that the restore consumer never
* returns, keeping the metrics consistent with the offset-based remaining-records value so it reaches
* exactly zero on completion.
*
* @param previousOffset the store offset before this progress (null if nothing has been restored yet,
* in which case restoration is measured from {@code restoreStartOffset})
* @param restoredToOffset the offset restoration has now reached
*/
private void recordRestorationProgress(final Task task,
final ChangelogMetadata changelogMetadata,
final Long previousOffset,
final long restoredToOffset) {
final long restoredFrom = previousOffset == null ? changelogMetadata.restoreStartOffset - 1 : previousOffset;
if (restoredToOffset > restoredFrom) {
task.recordRestoration(time, restoredToOffset - restoredFrom, false);
}
}

private Set<Task> getTasksFromPartitions(final Map<TaskId, Task> tasks,
final Set<TopicPartition> partitions) {
final Set<Task> result = new HashSet<>();
Expand Down Expand Up @@ -1016,6 +1055,9 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
throw new StreamsException("Restore consumer get unexpected error trying to get the position " +
" of " + partition, e);
}
// remember where restoration began so progress can be measured in offset slots; for an
// active task this also seeds the remaining-records metric below
changelogMetadata.restoreStartOffset = startOffset;
if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
try {
stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -94,6 +95,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -663,6 +665,78 @@ public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
}

@Test
public void shouldDecrementRemainingRecordsToZeroWithOffsetGaps() {
// The restore-remaining-records metric is initialized from the offset range
// (restoreEndOffset - startOffset), which counts every offset slot including those
// occupied by transaction markers / compacted-away records that the restore consumer
// never returns. The per-batch decrements must therefore also be expressed in offset
// slots, so that the metric reaches exactly zero once restoration completes.
setupActiveStateManager();
setupStoreMetadata();
setupStore();
final TaskId taskId = new TaskId(0, 0);

// offset is null while preparing (forcing a seek-to-beginning so startOffset == 0, the
// from-the-beginning edge case) and still null immediately before the first batch is
// restored; once a batch is applied it reflects the last restored record's offset (7)
when(storeMetadata.offset()).thenReturn(null).thenReturn(null).thenReturn(7L);
when(activeStateManager.taskId()).thenReturn(taskId);

consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
// end offset is 10, but the changelog only contains 8 data records (offsets 0..7);
// offsets 8 and 9 are occupied by transaction markers, so the metric is initialized to 10
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));

final Task mockTask = mock(Task.class);

changelogReader.register(tp, activeStateManager);

// first restore initializes the changelog and records the initial remaining (= 10 slots)
changelogReader.restore(Collections.singletonMap(taskId, mockTask));
assertEquals(0L, consumer.position(tp));
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());

// 8 contiguous data records at offsets 0..7
for (int offset = 0; offset < 8; offset++) {
consumer.addRecord(new ConsumerRecord<>(topicName, 0, offset, "key".getBytes(), "value".getBytes()));
}

// second restore applies the 8 data records
changelogReader.restore(Collections.singletonMap(taskId, mockTask));
assertEquals(8L, changelogReader.changelogMetadata(tp).totalRestored());
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());

// bypass the trailing transaction-marker offsets (8, 9) so the consumer reaches the end
consumer.seek(tp, 10L);

// third restore observes position >= endOffset and completes restoration
changelogReader.restore(Collections.singletonMap(taskId, mockTask));
assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
// totalRestored continues to count actual records, not offset slots
assertEquals(8L, changelogReader.changelogMetadata(tp).totalRestored());

// capture every recordRestoration call: the single initRemaining=true call provides the
// initial value, and the initRemaining=false calls are decrements applied to the metric
final ArgumentCaptor<Long> numCaptor = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<Boolean> initCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(mockTask, atLeastOnce()).recordRestoration(any(), numCaptor.capture(), initCaptor.capture());

long initial = 0L;
long decremented = 0L;
for (int i = 0; i < numCaptor.getAllValues().size(); i++) {
if (initCaptor.getAllValues().get(i)) {
initial += numCaptor.getAllValues().get(i);
} else {
decremented += numCaptor.getAllValues().get(i);
}
}

assertEquals(10L, initial, "metric should be initialized from the offset range");
// the decrements must net the metric back to exactly zero
assertEquals(initial, decremented, "remaining-records metric should decrement to exactly zero");
}

@Test
public void shouldRequestPositionAndHandleTimeoutException() {
setupActiveStateManager();
Expand Down