-
Notifications
You must be signed in to change notification settings - Fork 454
Description
Summary
dynamic_partition_overwrite produces incorrect results when a table has undergone
partition spec evolution. Manifests written under older specs are silently skipped
by the manifest pruning logic introduced in #3011, leaving stale data files that
should have been deleted.
Root cause
In Table.dynamic_partition_overwrite (table/__init__.py), the delete predicate
is built using only the current partition spec:
delete_filter = self._build_partition_predicate(
partition_records=partitions_to_overwrite,
spec=self.table_metadata.spec(), # always current spec
schema=self.table_metadata.schema()
)A snapshot with mixed partition_spec_ids (spec-0 and spec-1 manifests) passes
this single predicate to _DeleteFiles. The manifest evaluator in _build_partition_projection
uses inclusive_projection(schema, spec) — when projecting a spec-1 predicate
(e.g. category=A AND region=us) through spec-0 (which only has category), the
region reference has no corresponding partition field, causing the evaluator to
incorrectly skip spec-0 manifests entirely.
Reproduction
import tempfile, pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
schema = Schema(
NestedField(1, "category", StringType(), required=False),
NestedField(2, "region", StringType(), required=False),
NestedField(3, "value", LongType(), required=False),
)
spec_v0 = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category")
)
with tempfile.TemporaryDirectory() as warehouse:
catalog = load_catalog("test", **{"type": "sql", "uri": f"sqlite:///{warehouse}/catalog.db", "warehouse": f"file://{warehouse}"})
catalog.create_namespace("default")
table = catalog.create_table("default.test", schema=schema, partition_spec=spec_v0)
# Write under spec 0
table.append(pa.table({"category": ["A","A","B"], "region": [None,None,None], "value": [1,2,10]}))
# Evolve spec
with table.update_spec() as u:
u.add_field("region", IdentityTransform(), "region")
table = catalog.load_table("default.test")
# Write under spec 1
table.append(pa.table({"category": ["A","B"], "region": ["us","us"], "value": [100,200]}))
# Overwrite category=A — should delete ALL A rows (both specs)
table.dynamic_partition_overwrite(
pa.table({"category": ["A"], "region": ["us"], "value": [999]})
)
result = table.scan().to_arrow().to_pydict()
a_values = [v for c,v in zip(result["category"], result["value"]) if c == "A"]
print(a_values) # BUG: prints [1, 2, 100, 999] — stale rows from spec-0 not deleted
# EXPECTED: [999]Fix
Build the delete predicate per historical spec present in the snapshot, projecting
the new data files' partition values into each spec's coordinate space before evaluating.
PR with fix and regression tests to follow.
Related
- Optimization: Prune manifest in snapshot overwrite operations #3011 (introduced the manifest pruning optimization)
- Issue when overwriting data with row filter #1108 (prior related fix by @Fokko for spec evolution in manifest rewriting)