Skip to content

Commit 8a47d2b

Browse files
authored
Optimization: Prune manifest in snapshot overwrite operations (#3011)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Doing some performance tests for overwriting partitions, we noticed that PyIceberg took double the time it usually takes java based implementation, we noticed that `_exisiting_manifests` does not take advantage of manifest pruning before reading all Manifest Entries In this PR I: - Moved methods from _DeleteFiles to _SnapshotProducer parent class to share with other classes (_OverwriteFiles) - Implemented manifest pruning over all deleted files partitions to not read manifests that do not match file partitions - Refactored the method to only iterate once over all files (instead of multiple) ## Are these changes tested? I believe current tests in tests/integration/test_writes.py cover all cases ## Are there any user-facing changes? Nope <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 4034455 commit 8a47d2b

File tree

2 files changed

+111
-161
lines changed

2 files changed

+111
-161
lines changed

pyiceberg/table/__init__.py

Lines changed: 20 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,12 @@
2626
from functools import cached_property
2727
from itertools import chain
2828
from types import TracebackType
29-
from typing import (
30-
TYPE_CHECKING,
31-
Any,
32-
TypeVar,
33-
)
29+
from typing import TYPE_CHECKING, Any, TypeVar
3430

3531
from pydantic import Field
3632

3733
import pyiceberg.expressions.parser as parser
38-
from pyiceberg.expressions import (
39-
AlwaysFalse,
40-
AlwaysTrue,
41-
And,
42-
BooleanExpression,
43-
EqualTo,
44-
IsNull,
45-
Or,
46-
Reference,
47-
)
34+
from pyiceberg.expressions import AlwaysFalse, AlwaysTrue, And, BooleanExpression, EqualTo, IsNull, Or, Reference
4835
from pyiceberg.expressions.visitors import (
4936
ResidualEvaluator,
5037
_InclusiveMetricsEvaluator,
@@ -54,36 +41,17 @@
5441
manifest_evaluator,
5542
)
5643
from pyiceberg.io import FileIO, load_file_io
57-
from pyiceberg.manifest import (
58-
DataFile,
59-
DataFileContent,
60-
ManifestContent,
61-
ManifestEntry,
62-
ManifestFile,
63-
)
64-
from pyiceberg.partitioning import (
65-
PARTITION_FIELD_ID_START,
66-
UNPARTITIONED_PARTITION_SPEC,
67-
PartitionKey,
68-
PartitionSpec,
69-
)
44+
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestFile
45+
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec
7046
from pyiceberg.schema import Schema
7147
from pyiceberg.table.delete_file_index import DeleteFileIndex
7248
from pyiceberg.table.inspect import InspectTable
7349
from pyiceberg.table.locations import LocationProvider, load_location_provider
7450
from pyiceberg.table.maintenance import MaintenanceTable
75-
from pyiceberg.table.metadata import (
76-
INITIAL_SEQUENCE_NUMBER,
77-
TableMetadata,
78-
)
79-
from pyiceberg.table.name_mapping import (
80-
NameMapping,
81-
)
51+
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
52+
from pyiceberg.table.name_mapping import NameMapping
8253
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
83-
from pyiceberg.table.snapshots import (
84-
Snapshot,
85-
SnapshotLogEntry,
86-
)
54+
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
8755
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8856
from pyiceberg.table.update import (
8957
AddPartitionSpecUpdate,
@@ -107,11 +75,7 @@
10775
update_table_metadata,
10876
)
10977
from pyiceberg.table.update.schema import UpdateSchema
110-
from pyiceberg.table.update.snapshot import (
111-
ManageSnapshots,
112-
UpdateSnapshot,
113-
_FastAppendFiles,
114-
)
78+
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
11579
from pyiceberg.table.update.sorting import UpdateSortOrder
11680
from pyiceberg.table.update.spec import UpdateSpec
11781
from pyiceberg.table.update.statistics import UpdateStatistics
@@ -126,9 +90,7 @@
12690
Record,
12791
TableVersion,
12892
)
129-
from pyiceberg.types import (
130-
strtobool,
131-
)
93+
from pyiceberg.types import strtobool
13294
from pyiceberg.utils.concurrent import ExecutorFactory
13395
from pyiceberg.utils.config import Config
13496
from pyiceberg.utils.properties import property_as_bool
@@ -144,11 +106,7 @@
144106
from pyiceberg_core.datafusion import IcebergDataFusionTable
145107

146108
from pyiceberg.catalog import Catalog
147-
from pyiceberg.catalog.rest.scan_planning import (
148-
RESTContentFile,
149-
RESTDeleteFile,
150-
RESTFileScanTask,
151-
)
109+
from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask
152110

153111
ALWAYS_TRUE = AlwaysTrue()
154112
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -396,17 +354,19 @@ def _set_ref_snapshot(
396354

397355
return updates, requirements
398356

399-
def _build_partition_predicate(self, partition_records: set[Record]) -> BooleanExpression:
357+
def _build_partition_predicate(
358+
self, partition_records: set[Record], spec: PartitionSpec, schema: Schema
359+
) -> BooleanExpression:
400360
"""Build a filter predicate matching any of the input partition records.
401361
402362
Args:
403363
partition_records: A set of partition records to match
364+
spec: An optional partition spec, if none then defaults to current
365+
schema: An optional schema, if none then defaults to current
404366
Returns:
405367
A predicate matching any of the input partition records.
406368
"""
407-
partition_spec = self.table_metadata.spec()
408-
schema = self.table_metadata.schema()
409-
partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]
369+
partition_fields = [schema.find_field(field.source_id).name for field in spec.fields]
410370

411371
expr: BooleanExpression = AlwaysFalse()
412372
for partition_record in partition_records:
@@ -583,7 +543,9 @@ def dynamic_partition_overwrite(
583543
)
584544

585545
partitions_to_overwrite = {data_file.partition for data_file in data_files}
586-
delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
546+
delete_filter = self._build_partition_predicate(
547+
partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
548+
)
587549
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
588550

589551
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
@@ -673,11 +635,7 @@ def delete(
673635
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
674636
branch: Branch Reference to run the delete operation
675637
"""
676-
from pyiceberg.io.pyarrow import (
677-
ArrowScan,
678-
_dataframe_to_data_files,
679-
_expression_to_complementary_pyarrow,
680-
)
638+
from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow
681639

682640
if (
683641
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)

0 commit comments

Comments
 (0)