diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 3d49da365374..0563b79af7df 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -38,6 +38,8 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -836,7 +838,10 @@ protected void validateAddedDVs( List newDeleteManifests = history.first(); Set newSnapshotIds = history.second(); - Tasks.foreach(newDeleteManifests) + Iterable matchingManifests = + filterManifestsByPartition(base, conflictDetectionFilter, newDeleteManifests); + + Tasks.foreach(matchingManifests) .stopOnFailure() .throwFailureWhenFinished() .executeWith(workerPool()) @@ -866,6 +871,25 @@ private void validateAddedDVs( } } + private Iterable filterManifestsByPartition( + TableMetadata base, Expression conflictDetectionFilter, List manifests) { + if (conflictDetectionFilter == null || conflictDetectionFilter == Expressions.alwaysTrue()) { + return manifests; + } + + Map specsById = base.specsById(); + return Iterables.filter( + manifests, + manifest -> { + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + Expression partitionFilter = + Projections.inclusive(spec, caseSensitive).project(conflictDetectionFilter); + ManifestEvaluator evaluator = + ManifestEvaluator.forPartitionFilter(partitionFilter, spec, caseSensitive); + return evaluator.eval(manifest); + }); + } + // returns newly added manifests and snapshot IDs between the starting and parent snapshots private Pair, Set> validationHistory( TableMetadata base, diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 749dbdc6fe53..3a724c7b5c50 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -48,6 +48,8 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete() throws IOException { assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES); } + @TestTemplate + public void testConcurrentDVsInDifferentPartitionsWithFilter() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + // bucket16("u") -> 0, bucket16("a") -> 2 + DataFile dataFileInBucket0 = newDataFile("data_bucket=0"); + DataFile dataFileInBucket2 = newDataFile("data_bucket=2"); + commit( + table, table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2), branch); + + Snapshot base = latestSnapshot(table, branch); + + // prepare a DV for bucket 0 with a conflict detection filter scoped to bucket 0 + DeleteFile dvBucket0 = newDeletes(dataFileInBucket0); + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(dvBucket0) + .validateFromSnapshot(base.snapshotId()) + .conflictDetectionFilter(Expressions.equal("data", "u")); // bucket16("u") -> 0 + + // concurrently commit a DV in bucket 2 + DeleteFile dvBucket2 = newDeletes(dataFileInBucket2); + commit(table, table.newRowDelta().addDeletes(dvBucket2), branch); + + // commit should succeed because the concurrent DV is in bucket 2 + // which does not overlap the conflict detection filter + commit(table, rowDelta, branch); + } + + @TestTemplate + public void testConcurrentDVsInSamePartitionWithFilter() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + // bucket16("u") -> 0 + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile), branch); + + Snapshot base = latestSnapshot(table, branch); + + // prepare a DV for dataFile with a conflict detection filter scoped to bucket 0 + DeleteFile dv1 = newDeletes(dataFile); + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(dv1) + .validateFromSnapshot(base.snapshotId()) + .conflictDetectionFilter(Expressions.equal("data", "u")); // bucket16("u") -> 0 + + // concurrently commit another DV for the same data file in bucket 0 + DeleteFile dv2 = newDeletes(dataFile); + commit(table, table.newRowDelta().addDeletes(dv2), branch); + + // must be conflict because the concurrent DV is in the same partition + assertThatThrownBy(() -> commit(table, rowDelta, branch)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Found concurrently added DV for %s", dataFile.location()); + } + + @TestTemplate + public void testDVValidationPartitionPruningManifestCount() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + // disable manifest merging so each commit produces a separate delete manifest + table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit(); + + // create data files and DVs across 10 different partitions (buckets 0-9) + int numPartitions = 10; + DataFile[] dataFiles = new DataFile[numPartitions]; + for (int bucket = 0; bucket < numPartitions; bucket++) { + dataFiles[bucket] = newDataFile("data_bucket=" + bucket); + commit(table, table.newRowDelta().addRows(dataFiles[bucket]), branch); + DeleteFile dv = newDeletes(dataFiles[bucket]); + commit(table, table.newRowDelta().addDeletes(dv), branch); + } + + Snapshot base = latestSnapshot(table, branch); + List deleteManifests = base.deleteManifests(table.io()); + + // there should be one delete manifest per partition since we disabled merging + assertThat(deleteManifests).hasSizeGreaterThanOrEqualTo(numPartitions); + + // count how many manifests match a filter scoped to bucket 0 + // bucket16("u") -> 0 + Expression filter = Expressions.equal("data", "u"); + int matching = 0; + for (ManifestFile manifest : deleteManifests) { + PartitionSpec spec = table.specs().get(manifest.partitionSpecId()); + Expression partitionFilter = Projections.inclusive(spec, true).project(filter); + ManifestEvaluator evaluator = + ManifestEvaluator.forPartitionFilter(partitionFilter, spec, true); + if (evaluator.eval(manifest)) { + matching = matching + 1; + } + } + + // only 1 out of N manifests should match the filter for bucket 0 + assertThat(matching).isEqualTo(1); + assertThat(deleteManifests.size() - matching) + .as("pruned manifests") + .isGreaterThanOrEqualTo(numPartitions - 1); + + // verify the DV manifest pruning works: commit a new DV in bucket 0 + // while concurrent DVs exist in all other partitions + DataFile newDataFileInBucket0 = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(newDataFileInBucket0), branch); + + Snapshot preCommit = latestSnapshot(table, branch); + DeleteFile newDV = newDeletes(newDataFileInBucket0); + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(newDV) + .validateFromSnapshot(preCommit.snapshotId()) + .conflictDetectionFilter(Expressions.equal("data", "u")); // bucket16("u") -> 0 + + // concurrently add a DV in a different partition (bucket 5) + // bucket16("v") -> 5 + DataFile newDataFileInBucket5 = newDataFile("data_bucket=5"); + commit(table, table.newRowDelta().addRows(newDataFileInBucket5), branch); + DeleteFile concurrentDV = newDeletes(newDataFileInBucket5); + commit(table, table.newRowDelta().addDeletes(concurrentDV), branch); + + // commit should succeed: the concurrent DV is in bucket 5, pruned by the filter + commit(table, rowDelta, branch); + } + @TestTemplate public void testManifestMergingAfterUpgradeToV3() { assumeThat(formatVersion).isEqualTo(2);