Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -836,7 +838,10 @@ protected void validateAddedDVs(
List<ManifestFile> newDeleteManifests = history.first();
Set<Long> newSnapshotIds = history.second();

Tasks.foreach(newDeleteManifests)
Iterable<ManifestFile> matchingManifests =
filterManifestsByPartition(base, conflictDetectionFilter, newDeleteManifests);

Tasks.foreach(matchingManifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(workerPool())
Expand Down Expand Up @@ -866,6 +871,25 @@ private void validateAddedDVs(
}
}

private Iterable<ManifestFile> filterManifestsByPartition(
TableMetadata base, Expression conflictDetectionFilter, List<ManifestFile> manifests) {
if (conflictDetectionFilter == null || conflictDetectionFilter == Expressions.alwaysTrue()) {
return manifests;
}

Map<Integer, PartitionSpec> specsById = base.specsById();
return Iterables.filter(
manifests,
manifest -> {
PartitionSpec spec = specsById.get(manifest.partitionSpecId());
Expression partitionFilter =
Projections.inclusive(spec, caseSensitive).project(conflictDetectionFilter);
ManifestEvaluator evaluator =
ManifestEvaluator.forPartitionFilter(partitionFilter, spec, caseSensitive);
return evaluator.eval(manifest);
});
}

// returns newly added manifests and snapshot IDs between the starting and parent snapshots
private Pair<List<ManifestFile>, Set<Long>> validationHistory(
TableMetadata base,
Expand Down
129 changes: 129 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete() throws IOException {
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
}

@TestTemplate
public void testConcurrentDVsInDifferentPartitionsWithFilter() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

// bucket16("u") -> 0, bucket16("a") -> 2
DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
commit(
table, table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2), branch);

Snapshot base = latestSnapshot(table, branch);

// prepare a DV for bucket 0 with a conflict detection filter scoped to bucket 0
DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
RowDelta rowDelta =
table
.newRowDelta()
.addDeletes(dvBucket0)
.validateFromSnapshot(base.snapshotId())
.conflictDetectionFilter(Expressions.equal("data", "u")); // bucket16("u") -> 0

// concurrently commit a DV in bucket 2
DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);

// commit should succeed because the concurrent DV is in bucket 2
// which does not overlap the conflict detection filter
commit(table, rowDelta, branch);
}

@TestTemplate
public void testConcurrentDVsInSamePartitionWithFilter() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

// bucket16("u") -> 0
DataFile dataFile = newDataFile("data_bucket=0");
commit(table, table.newRowDelta().addRows(dataFile), branch);

Snapshot base = latestSnapshot(table, branch);

// prepare a DV for dataFile with a conflict detection filter scoped to bucket 0
DeleteFile dv1 = newDeletes(dataFile);
RowDelta rowDelta =
table
.newRowDelta()
.addDeletes(dv1)
.validateFromSnapshot(base.snapshotId())
.conflictDetectionFilter(Expressions.equal("data", "u")); // bucket16("u") -> 0

// concurrently commit another DV for the same data file in bucket 0
DeleteFile dv2 = newDeletes(dataFile);
commit(table, table.newRowDelta().addDeletes(dv2), branch);

// must be conflict because the concurrent DV is in the same partition
assertThatThrownBy(() -> commit(table, rowDelta, branch))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found concurrently added DV for %s", dataFile.location());
}

@TestTemplate
public void testDVValidationPartitionPruningManifestCount() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

// disable manifest merging so each commit produces a separate delete manifest
table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit();

// create data files and DVs across 10 different partitions (buckets 0-9)
int numPartitions = 10;
DataFile[] dataFiles = new DataFile[numPartitions];
for (int bucket = 0; bucket < numPartitions; bucket++) {
dataFiles[bucket] = newDataFile("data_bucket=" + bucket);
commit(table, table.newRowDelta().addRows(dataFiles[bucket]), branch);
DeleteFile dv = newDeletes(dataFiles[bucket]);
commit(table, table.newRowDelta().addDeletes(dv), branch);
}

Snapshot base = latestSnapshot(table, branch);
List<ManifestFile> deleteManifests = base.deleteManifests(table.io());

// there should be one delete manifest per partition since we disabled merging
assertThat(deleteManifests).hasSizeGreaterThanOrEqualTo(numPartitions);

// count how many manifests match a filter scoped to bucket 0
// bucket16("u") -> 0
Expression filter = Expressions.equal("data", "u");
int matching = 0;
for (ManifestFile manifest : deleteManifests) {
PartitionSpec spec = table.specs().get(manifest.partitionSpecId());
Expression partitionFilter = Projections.inclusive(spec, true).project(filter);
ManifestEvaluator evaluator =
ManifestEvaluator.forPartitionFilter(partitionFilter, spec, true);
if (evaluator.eval(manifest)) {
matching = matching + 1;
}
}

// only 1 out of N manifests should match the filter for bucket 0
assertThat(matching).isEqualTo(1);
assertThat(deleteManifests.size() - matching)
.as("pruned manifests")
.isGreaterThanOrEqualTo(numPartitions - 1);

// verify the DV manifest pruning works: commit a new DV in bucket 0
// while concurrent DVs exist in all other partitions
DataFile newDataFileInBucket0 = newDataFile("data_bucket=0");
commit(table, table.newRowDelta().addRows(newDataFileInBucket0), branch);

Snapshot preCommit = latestSnapshot(table, branch);
DeleteFile newDV = newDeletes(newDataFileInBucket0);
RowDelta rowDelta =
table
.newRowDelta()
.addDeletes(newDV)
.validateFromSnapshot(preCommit.snapshotId())
.conflictDetectionFilter(Expressions.equal("data", "u")); // bucket16("u") -> 0

// concurrently add a DV in a different partition (bucket 5)
// bucket16("v") -> 5
DataFile newDataFileInBucket5 = newDataFile("data_bucket=5");
commit(table, table.newRowDelta().addRows(newDataFileInBucket5), branch);
DeleteFile concurrentDV = newDeletes(newDataFileInBucket5);
commit(table, table.newRowDelta().addDeletes(concurrentDV), branch);

// commit should succeed: the concurrent DV is in bucket 5, pruned by the filter
commit(table, rowDelta, branch);
}

@TestTemplate
public void testManifestMergingAfterUpgradeToV3() {
assumeThat(formatVersion).isEqualTo(2);
Expand Down
Loading