Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
*
* <p>The commit is gated on the plan's done-timestamp watermark.
*
* <p>Watermarks are forwarded only after the cycle commits, never mid-cycle. The {@link
* LockRemover} releases the maintenance lock once a watermark past the trigger's start epoch
* reaches it. The planner emits phase watermarks in the middle of a cycle; forwarding those would
* release the lock before this commit, letting the TriggerManager start a concurrent cycle that
* re-processes the same uncommitted staging snapshot.
*
* <p>Emits a {@link Trigger} after each cycle (commit, no-op, or error) so the downstream {@link
* TaskResultAggregator} can track task completion. This is the sole source of Trigger records for
* the Aggregator.
Expand Down Expand Up @@ -139,27 +145,29 @@ public void processElement2(StreamRecord<EqualityConvertPlan> record) {

@Override
public void processWatermark(Watermark mark) throws Exception {
if (planResult != null && mark.getTimestamp() >= planResult.doneTimestamp()) {
try {
commitIfNeeded();
} catch (Exception e) {
LOG.error(
"Failed to commit equality convert result for table {} task {}",
tableName,
taskName,
e);
output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
errorCounter.inc();
}

// Emit Trigger for the Aggregator (even on error or no-op).
output.collect(new StreamRecord<>(Trigger.create(planResult.triggerTimestamp(), 0)));
if (planResult == null || mark.getTimestamp() < planResult.doneTimestamp()) {
// Hold back watermarks until the cycle commits so the LockRemover keeps the maintenance lock
// for the whole cycle. Forwarding the planner's mid-cycle phase watermarks will release the
// lock early and could let the next trigger run a concurrent cycle on the same staging
// snapshot.
return;
}

bufferedResults.clear();
planResult = null;
try {
commitIfNeeded();
} catch (Exception e) {
LOG.error(
"Failed to commit equality convert result for table {} task {}", tableName, taskName, e);
output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
errorCounter.inc();
}

// Always forward watermarks to prevent stalling downstream.
// Emit Trigger for the Aggregator (even on error or no-op).
output.collect(new StreamRecord<>(Trigger.create(planResult.triggerTimestamp(), 0)));

bufferedResults.clear();
planResult = null;

super.processWatermark(mark);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -83,6 +84,43 @@ void commitsDataFilesToMainBranch() throws Exception {
}
}

@Test
void holdsBackWatermarkUntilCommit() throws Exception {
Table table = createTable(3, FileFormat.PARQUET);
insert(table, 1, "a");

DataFile stagingDataFile = getFirstDataFile(table);
long snapshotIdBefore = table.currentSnapshot().snapshotId();

try (TwoInputStreamOperatorTestHarness<DVWriteResult, EqualityConvertPlan, Trigger> harness =
createHarness()) {
harness.open();

long doneTs = System.currentTimeMillis();
EqualityConvertPlan planResult =
new EqualityConvertPlan(
Lists.newArrayList(stagingDataFile),
Lists.newArrayList(),
42L,
snapshotIdBefore,
doneTs - 2,
doneTs);

harness.processElement2(new StreamRecord<>(planResult, doneTs - 2));

// A phase watermark before the plan's done timestamp must not be forwarded: it would let the
// LockRemover release the maintenance lock before this cycle commits.
harness.processBothWatermarks(new Watermark(doneTs - 1));
assertThat(harness.extractOutputValues()).isEmpty();
assertThat(watermarks(harness)).isEmpty();

// The done-timestamp watermark commits the cycle; the watermark is forwarded only now.
harness.processBothWatermarks(new Watermark(doneTs));
assertThat(harness.extractOutputValues()).hasSize(1);
assertThat(watermarks(harness)).containsExactly(new Watermark(doneTs));
}
}

@Test
void skipsCommitForEmptyCycle() throws Exception {
Table table = createTable(3, FileFormat.PARQUET);
Expand Down Expand Up @@ -476,6 +514,13 @@ void removesRewrittenStagingDvOnSharedBranch() throws Exception {
SnapshotRef.MAIN_BRANCH));
}

private static List<Watermark> watermarks(TwoInputStreamOperatorTestHarness<?, ?, ?> harness) {
return harness.getOutput().stream()
.filter(Watermark.class::isInstance)
.map(Watermark.class::cast)
.collect(Collectors.toList());
}

private static List<DeleteFile> deletesForDataFile(Table table, String dataFilePath) {
List<DeleteFile> deletes = Lists.newArrayList();
for (ManifestFile manifest : table.currentSnapshot().deleteManifests(table.io())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
*
* <p>The commit is gated on the plan's done-timestamp watermark.
*
* <p>Watermarks are forwarded only after the cycle commits, never mid-cycle. The {@link
* LockRemover} releases the maintenance lock once a watermark past the trigger's start epoch
* reaches it. The planner emits phase watermarks in the middle of a cycle; forwarding those would
* release the lock before this commit, letting the TriggerManager start a concurrent cycle that
* re-processes the same uncommitted staging snapshot.
*
* <p>Emits a {@link Trigger} after each cycle (commit, no-op, or error) so the downstream {@link
* TaskResultAggregator} can track task completion. This is the sole source of Trigger records for
* the Aggregator.
Expand Down Expand Up @@ -139,27 +145,29 @@ public void processElement2(StreamRecord<EqualityConvertPlan> record) {

@Override
public void processWatermark(Watermark mark) throws Exception {
if (planResult != null && mark.getTimestamp() >= planResult.doneTimestamp()) {
try {
commitIfNeeded();
} catch (Exception e) {
LOG.error(
"Failed to commit equality convert result for table {} task {}",
tableName,
taskName,
e);
output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
errorCounter.inc();
}

// Emit Trigger for the Aggregator (even on error or no-op).
output.collect(new StreamRecord<>(Trigger.create(planResult.triggerTimestamp(), 0)));
if (planResult == null || mark.getTimestamp() < planResult.doneTimestamp()) {
// Hold back watermarks until the cycle commits so the LockRemover keeps the maintenance lock
// for the whole cycle. Forwarding the planner's mid-cycle phase watermarks will release the
// lock early and could let the next trigger run a concurrent cycle on the same staging
// snapshot.
return;
}

bufferedResults.clear();
planResult = null;
try {
commitIfNeeded();
} catch (Exception e) {
LOG.error(
"Failed to commit equality convert result for table {} task {}", tableName, taskName, e);
output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
errorCounter.inc();
}

// Always forward watermarks to prevent stalling downstream.
// Emit Trigger for the Aggregator (even on error or no-op).
output.collect(new StreamRecord<>(Trigger.create(planResult.triggerTimestamp(), 0)));

bufferedResults.clear();
planResult = null;

super.processWatermark(mark);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -83,6 +84,43 @@ void commitsDataFilesToMainBranch() throws Exception {
}
}

@Test
void holdsBackWatermarkUntilCommit() throws Exception {
Table table = createTable(3, FileFormat.PARQUET);
insert(table, 1, "a");

DataFile stagingDataFile = getFirstDataFile(table);
long snapshotIdBefore = table.currentSnapshot().snapshotId();

try (TwoInputStreamOperatorTestHarness<DVWriteResult, EqualityConvertPlan, Trigger> harness =
createHarness()) {
harness.open();

long doneTs = System.currentTimeMillis();
EqualityConvertPlan planResult =
new EqualityConvertPlan(
Lists.newArrayList(stagingDataFile),
Lists.newArrayList(),
42L,
snapshotIdBefore,
doneTs - 2,
doneTs);

harness.processElement2(new StreamRecord<>(planResult, doneTs - 2));

// A phase watermark before the plan's done timestamp must not be forwarded: it would let the
// LockRemover release the maintenance lock before this cycle commits.
harness.processBothWatermarks(new Watermark(doneTs - 1));
assertThat(harness.extractOutputValues()).isEmpty();
assertThat(watermarks(harness)).isEmpty();

// The done-timestamp watermark commits the cycle; the watermark is forwarded only now.
harness.processBothWatermarks(new Watermark(doneTs));
assertThat(harness.extractOutputValues()).hasSize(1);
assertThat(watermarks(harness)).containsExactly(new Watermark(doneTs));
}
}

@Test
void skipsCommitForEmptyCycle() throws Exception {
Table table = createTable(3, FileFormat.PARQUET);
Expand Down Expand Up @@ -476,6 +514,13 @@ void removesRewrittenStagingDvOnSharedBranch() throws Exception {
SnapshotRef.MAIN_BRANCH));
}

private static List<Watermark> watermarks(TwoInputStreamOperatorTestHarness<?, ?, ?> harness) {
return harness.getOutput().stream()
.filter(Watermark.class::isInstance)
.map(Watermark.class::cast)
.collect(Collectors.toList());
}

private static List<DeleteFile> deletesForDataFile(Table table, String dataFilePath) {
List<DeleteFile> deletes = Lists.newArrayList();
for (ManifestFile manifest : table.currentSnapshot().deleteManifests(table.io())) {
Expand Down