From 6533faa95774853bc9dffeb78d6607b2faf8c6b3 Mon Sep 17 00:00:00 2001 From: wangd Date: Tue, 21 Oct 2025 11:59:31 +0800 Subject: [PATCH 1/6] Refactor Iceberg connector to support call distributed procedure --- .../CallDistributedProcedureSplitSource.java | 137 ++++++++++++++++++ .../iceberg/IcebergAbstractMetadata.java | 58 ++++++++ .../IcebergDistributedProcedureHandle.java | 53 +++++++ .../presto/iceberg/IcebergHandleResolver.java | 7 + .../presto/iceberg/IcebergHiveMetadata.java | 5 +- .../iceberg/IcebergHiveMetadataFactory.java | 5 + .../presto/iceberg/IcebergNativeMetadata.java | 5 +- .../iceberg/IcebergNativeMetadataFactory.java | 8 +- .../iceberg/IcebergPageSinkProvider.java | 7 + .../iceberg/IcebergProcedureContext.java | 95 ++++++++++++ .../presto/iceberg/IcebergSplitManager.java | 11 ++ .../InternalIcebergConnectorFactory.java | 2 + .../TestRenameTableOnFragileFileSystem.java | 2 + 13 files changed, 392 insertions(+), 3 deletions(-) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java new file mode 100644 index 0000000000000..e8eeda5c97477 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.iceberg.delete.DeleteFile; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.SplitWeight; +import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closer; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; +import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; +import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; +import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterators.limit; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class CallDistributedProcedureSplitSource + implements ConnectorSplitSource +{ + private CloseableIterator fileScanTaskIterator; + private Optional> fileScanTaskConsumer; + + private final TableScan tableScan; + private final Closer closer = Closer.create(); + private final double minimumAssignedSplitWeight; + private final ConnectorSession session; + + public CallDistributedProcedureSplitSource( + ConnectorSession session, + TableScan tableScan, + CloseableIterable fileScanTaskIterable, + Optional> fileScanTaskConsumer, + double minimumAssignedSplitWeight) + { + this.session = requireNonNull(session, "session is null"); + this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.fileScanTaskIterator = fileScanTaskIterable.iterator(); + this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null"); + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + closer.register(fileScanTaskIterator); + } + + @Override + public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) + { + // TODO: move this to a background thread + List splits = new ArrayList<>(); + Iterator iterator = limit(fileScanTaskIterator, maxSize); + while (iterator.hasNext()) { + FileScanTask task = iterator.next(); + fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task)); + splits.add(toIcebergSplit(task)); + } + return completedFuture(new ConnectorSplitBatch(splits, isFinished())); + } + + @Override + public boolean isFinished() + { + return !fileScanTaskIterator.hasNext(); + } + + @Override + public void close() + { + try { + closer.close(); + // TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose + // correct release resources holds by iterator. + fileScanTaskIterator = CloseableIterator.empty(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private ConnectorSplit toIcebergSplit(FileScanTask task) + { + PartitionSpec spec = task.spec(); + Optional partitionData = partitionDataFromStructLike(spec, task.file().partition()); + + // TODO: We should leverage residual expression and convert that to TupleDomain. + // The predicate here is used by readers for predicate push down at reader level, + // so when we do not use residual expression, we are just wasting CPU cycles + // on reader side evaluating a condition that we know will always be true. + + return new IcebergSplit( + task.file().path().toString(), + task.start(), + task.length(), + fromIcebergFileFormat(task.file().format()), + ImmutableList.of(), + getPartitionKeys(task), + PartitionSpecParser.toJson(spec), + partitionData.map(PartitionData::toJson), + getNodeSelectionStrategy(session), + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), + task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()), + Optional.empty(), + getDataSequenceNumber(task.file()), + getAffinitySchedulingFileSectionSize(session).toBytes()); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index eb6ea59912b76..91f76baa793cd 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -15,6 +15,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.TupleDomain; @@ -35,10 +36,13 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableLayout; import com.facebook.presto.spi.ConnectorTableLayoutHandle; @@ -62,6 +66,9 @@ import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.Procedure; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.security.ViewSecurity; @@ -251,11 +258,13 @@ public abstract class IcebergAbstractMetadata protected static final int CURRENT_MATERIALIZED_VIEW_FORMAT_VERSION = 1; protected final TypeManager typeManager; + protected final ProcedureRegistry procedureRegistry; protected final JsonCodec commitTaskCodec; protected final JsonCodec> columnMappingsCodec; protected final NodeVersion nodeVersion; protected final RowExpressionService rowExpressionService; protected final FilterStatsCalculatorService filterStatsCalculatorService; + protected Optional procedureContext = Optional.empty(); protected Transaction transaction; protected final StatisticsFileCache statisticsFileCache; protected final IcebergTableProperties tableProperties; @@ -265,6 +274,7 @@ public abstract class IcebergAbstractMetadata public IcebergAbstractMetadata( TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -275,6 +285,7 @@ public IcebergAbstractMetadata( IcebergTableProperties tableProperties) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.columnMappingsCodec = requireNonNull(columnMappingsCodec, "columnMappingsCodec is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); @@ -316,6 +327,11 @@ protected abstract void updateIcebergViewProperties( public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName); + public Optional getSplitSourceInCurrentCallProcedureTransaction() + { + return procedureContext.flatMap(IcebergProcedureContext::getConnectorSplitSource); + } + /** * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker */ @@ -1116,6 +1132,48 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa removeScanFiles(icebergTable, TupleDomain.all()); } + @Override + public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable(); + Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + + if (handle.isSnapshotSpecified()) { + throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot"); + } + + transaction = icebergTable.newTransaction(); + Procedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext()); + procedureContext.get().setTable(icebergTable); + procedureContext.get().setTransaction(transaction); + return ((DistributedProcedure) procedure).begin(session, procedureContext.get(), tableLayoutHandle, arguments); + } + + @Override + public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) + { + Procedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + verify(procedureContext.isPresent(), "procedure context must be present"); + ((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments); + transaction.commitTransaction(); + procedureContext.get().destroy(); + } + @Override public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java new file mode 100644 index 0000000000000..0ae38a46e8946 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.hive.HiveCompressionCodec; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Map; + +public class IcebergDistributedProcedureHandle + extends IcebergWritableTableHandle + implements ConnectorDistributedProcedureHandle +{ + @JsonCreator + public IcebergDistributedProcedureHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") IcebergTableName tableName, + @JsonProperty("schema") PrestoIcebergSchema schema, + @JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec, + @JsonProperty("inputColumns") List inputColumns, + @JsonProperty("outputPath") String outputPath, + @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec, + @JsonProperty("storageProperties") Map storageProperties) + { + super( + schemaName, + tableName, + schema, + partitionSpec, + inputColumns, + outputPath, + fileFormat, + compressionCodec, + storageProperties, + ImmutableList.of()); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java index 199939c6b7985..92d3d0e9fdeec 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java @@ -16,6 +16,7 @@ import com.facebook.presto.hive.HiveTransactionHandle; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -69,6 +70,12 @@ public Class getDeleteTableHandleClass() return IcebergTableHandle.class; } + @Override + public Class getDistributedProcedureHandleClass() + { + return IcebergDistributedProcedureHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 5fb9569b5222e..830279e61229a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -56,6 +56,7 @@ import com.facebook.presto.spi.ViewNotFoundException; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.security.PrestoPrincipal; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; @@ -177,6 +178,7 @@ public IcebergHiveMetadata( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -189,7 +191,8 @@ public IcebergHiveMetadata( IcebergTableProperties tableProperties, ConnectorSystemConfig connectorSystemConfig) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, + nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index d01ae7a33dab7..0c6bd7f411959 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import jakarta.inject.Inject; @@ -38,6 +39,7 @@ public class IcebergHiveMetadataFactory final ExtendedHiveMetastore metastore; final HdfsEnvironment hdfsEnvironment; final TypeManager typeManager; + final ProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final JsonCodec> columnMappingsCodec; final StandardFunctionResolution functionResolution; @@ -56,6 +58,7 @@ public IcebergHiveMetadataFactory( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -72,6 +75,7 @@ public IcebergHiveMetadataFactory( this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -92,6 +96,7 @@ public ConnectorMetadata create() metastore, hdfsEnvironment, typeManager, + procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 52d59d7b367f7..a8a327ee69065 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -32,6 +32,7 @@ import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -109,6 +110,7 @@ public class IcebergNativeMetadata public IcebergNativeMetadata( IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -119,7 +121,8 @@ public IcebergNativeMetadata( StatisticsFileCache statisticsFileCache, IcebergTableProperties tableProperties) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, + nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java index b421a823efe2c..72cdebd15689d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import jakarta.inject.Inject; @@ -32,6 +33,7 @@ public class IcebergNativeMetadataFactory implements IcebergMetadataFactory { final TypeManager typeManager; + final ProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final JsonCodec> columnMappingsCodec; final IcebergNativeCatalogFactory catalogFactory; @@ -48,6 +50,7 @@ public IcebergNativeMetadataFactory( IcebergConfig config, IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -59,6 +62,7 @@ public IcebergNativeMetadataFactory( { this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -72,6 +76,8 @@ public IcebergNativeMetadataFactory( public ConnectorMetadata create() { - return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, catalogType, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); + return new IcebergNativeMetadata(catalogFactory, typeManager, procedureRegistry, functionResolution, rowExpressionService, + commitTaskCodec, columnMappingsCodec, catalogType, nodeVersion, filterStatsCalculatorService, + statisticsFileCache, tableProperties); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java index e8e8db1163aed..e14d0178b153d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java @@ -16,6 +16,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -79,6 +80,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa return createPageSink(session, (IcebergWritableTableHandle) insertTableHandle); } + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + return createPageSink(session, (IcebergWritableTableHandle) procedureHandle); + } + private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritableTableHandle tableHandle) { HdfsContext hdfsContext = new HdfsContext(session, tableHandle.getSchemaName(), tableHandle.getTableName().getTableName()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java new file mode 100644 index 0000000000000..a0ce2e325959d --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.connector.ConnectorProcedureContext; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class IcebergProcedureContext + implements ConnectorProcedureContext +{ + final Set scannedDataFiles = new HashSet<>(); + final Set fullyAppliedDeleteFiles = new HashSet<>(); + final Map relevantData = new HashMap<>(); + Optional table = Optional.empty(); + Transaction transaction; + Optional connectorSplitSource = Optional.empty(); + + public void setTable(Table table) + { + this.table = Optional.of(table); + } + + public void setTransaction(Transaction transaction) + { + this.transaction = transaction; + } + + public Optional
getTable() + { + return table; + } + + public Transaction getTransaction() + { + return transaction; + } + + public void setConnectorSplitSource(ConnectorSplitSource connectorSplitSource) + { + requireNonNull(connectorSplitSource, "connectorSplitSource is null"); + this.connectorSplitSource = Optional.of(connectorSplitSource); + } + + public Optional getConnectorSplitSource() + { + return this.connectorSplitSource; + } + + public Set getScannedDataFiles() + { + return scannedDataFiles; + } + + public Set getFullyAppliedDeleteFiles() + { + return fullyAppliedDeleteFiles; + } + + public Map getRelevantData() + { + return relevantData; + } + + public void destroy() + { + this.relevantData.clear(); + this.scannedDataFiles.clear(); + this.fullyAppliedDeleteFiles.clear(); + this.connectorSplitSource.ifPresent(ConnectorSplitSource::close); + this.connectorSplitSource = null; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 0ad3345b7ae9d..69141aa4c5df8 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; @@ -35,6 +36,7 @@ import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -82,6 +84,15 @@ public ConnectorSplitSource getSplits( TupleDomain predicate = getNonMetadataColumnConstraints(layoutHandle .getValidPredicate()); + ConnectorMetadata connectorMetadata = transactionManager.get(transaction); + if (connectorMetadata != null) { + IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata; + Optional connectorSplitSource = icebergMetadata.getSplitSourceInCurrentCallProcedureTransaction(); + if (connectorSplitSource.isPresent()) { + return connectorSplitSource.get(); + } + } + Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); if (table.getIcebergTableName().getTableType() == CHANGELOG) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index 3b8994df46855..3fb8319865844 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -49,6 +49,7 @@ import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; import com.facebook.presto.spi.procedure.BaseProcedure; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.session.PropertyMetadata; import com.google.common.collect.ImmutableSet; @@ -94,6 +95,7 @@ public static Connector createConnector( binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(ProcedureRegistry.class).toInstance(context.getProcedureRegistry()); binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); binder.bind(PageSorter.class).toInstance(context.getPageSorter()); binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java index 46d4b7dc7478b..6b5e5782b6ba6 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java @@ -48,6 +48,7 @@ import com.facebook.presto.iceberg.IcebergTableType; import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.iceberg.statistics.StatisticsFileCache; +import com.facebook.presto.metadata.BuiltInProcedureRegistry; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.spi.ConnectorSession; @@ -412,6 +413,7 @@ private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore metastore, hdfsEnvironment, FUNCTION_AND_TYPE_MANAGER, + new BuiltInProcedureRegistry(METADATA.getFunctionAndTypeManager()), FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE, jsonCodec(CommitTaskData.class), From 4cdbe2c973b8e3fdc9021391594759e9105b4be9 Mon Sep 17 00:00:00 2001 From: wangd Date: Tue, 4 Nov 2025 12:10:45 +0800 Subject: [PATCH 2/6] Support Iceberg procedure `rewrite_data_files` --- .../src/main/sphinx/connector/iceberg.rst | 41 ++ .../iceberg/IcebergAbstractMetadata.java | 6 +- .../presto/iceberg/IcebergCommonModule.java | 1 + .../iceberg/RewriteDataFilesProcedure.java | 206 +++++++ .../IcebergDistributedSmokeTestBase.java | 61 +++ .../iceberg/IcebergDistributedTestBase.java | 60 ++- .../iceberg/TestIcebergLogicalPlanner.java | 50 ++ .../TestRewriteDataFilesProcedure.java | 508 ++++++++++++++++++ .../iceberg/rest/TestIcebergSmokeRest.java | 6 - .../sql/analyzer/StatementAnalyzer.java | 11 +- .../planner/assertions/PlanMatchPattern.java | 12 + 11 files changed, 949 insertions(+), 13 deletions(-) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 8897bfb777e1f..81532b4b59636 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1237,6 +1237,47 @@ Examples: CALL iceberg.system.set_table_property('schema_name', 'table_name', 'commit.retry.num-retries', '10'); +Rewrite Data Files +^^^^^^^^^^^^^^^^^^ + +Iceberg tracks all data files under different partition specs in a table. More data files requires +more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and +less efficient queries from file open costs. Also, data files under different partition specs can +prevent metadata level deletion or thorough predicate push down for Presto. + +Use `rewrite_data_files` to rewrite the data files of a specified table so that they are +merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data +files compaction can act separately on the selected partitions to improve read performance by reducing +metadata overhead and runtime file open cost. + +The following arguments are available: + +===================== ========== =============== ======================================================================= +Argument Name required type Description +===================== ========== =============== ======================================================================= +``schema`` ✔️ string Schema of the table to update. + +``table_name`` ✔️ string Name of the table to update. + +``filter`` string Predicate as a string used for filtering the files. Currently + only rewrite of whole partitions is supported. Filter on partition + columns. The default value is `true`. + +``options`` map Options to be used for data files rewrite. (to be expanded) +===================== ========== =============== ======================================================================= + +Examples: + +* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones:: + + CALL iceberg.system.rewrite_data_files('db', 'sample'); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample'); + +* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec:: + + CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1'); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1'); + Presto C++ Support ^^^^^^^^^^^^^^^^^^ diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 91f76baa793cd..dbbe3ad8b5307 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -66,8 +66,8 @@ import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.BaseProcedure; import com.facebook.presto.spi.procedure.DistributedProcedure; -import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; @@ -1147,7 +1147,7 @@ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( } transaction = icebergTable.newTransaction(); - Procedure procedure = procedureRegistry.resolve( + BaseProcedure procedure = procedureRegistry.resolve( new ConnectorId(procedureName.getCatalogName()), new SchemaTableName( procedureName.getSchemaName(), @@ -1162,7 +1162,7 @@ public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( @Override public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) { - Procedure procedure = procedureRegistry.resolve( + BaseProcedure procedure = procedureRegistry.resolve( new ConnectorId(procedureName.getCatalogName()), new SchemaTableName( procedureName.getSchemaName(), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 7220800e0959e..7d5154dca605f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -188,6 +188,7 @@ protected void setup(Binder binder) procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON); // for orc binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java new file mode 100644 index 0000000000000..93547a58b2cf3 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java @@ -0,0 +1,206 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.DistributedProcedure.Argument; +import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure; +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.TableScanUtil; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; +import static com.facebook.presto.iceberg.IcebergUtil.getColumns; +import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; +import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; +import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; +import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA; +import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class RewriteDataFilesProcedure + implements Provider +{ + TypeManager typeManager; + JsonCodec commitTaskCodec; + + @Inject + public RewriteDataFilesProcedure( + TypeManager typeManager, + JsonCodec commitTaskCodec) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + } + + @Override + public DistributedProcedure get() + { + return new TableDataRewriteDistributedProcedure( + "system", + "rewrite_data_files", + ImmutableList.of( + new Argument(SCHEMA, VARCHAR), + new Argument(TABLE_NAME, VARCHAR), + new Argument("filter", VARCHAR, false, "TRUE"), + new Argument("options", "map(varchar, varchar)", false, null)), + (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), + ((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)), + IcebergProcedureContext::new); + } + + private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + IcebergTableHandle tableHandle = layoutHandle.getTable(); + + ConnectorSplitSource splitSource; + if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) { + splitSource = new FixedSplitSource(ImmutableList.of()); + } + else { + TupleDomain predicate = layoutHandle.getValidPredicate(); + TableScan tableScan = icebergTable.newScan() + .filter(toIcebergExpression(predicate)) + .useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get()); + + Consumer fileScanTaskConsumer = (task) -> { + procedureContext.getScannedDataFiles().add(task.file()); + if (!task.deletes().isEmpty()) { + task.deletes().forEach(deleteFile -> { + if (deleteFile.content() == FileContent.EQUALITY_DELETES && + !icebergTable.specs().get(deleteFile.specId()).isPartitioned() && + !predicate.isAll()) { + // Equality files with an unpartitioned spec are applied as global deletes + // So they should not be cleaned up unless the whole table is optimized + return; + } + procedureContext.getFullyAppliedDeleteFiles().add(deleteFile); + }); + } + }; + + splitSource = new CallDistributedProcedureSplitSource( + session, + tableScan, + TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()), + Optional.of(fileScanTaskConsumer), + getMinimumAssignedSplitWeight(session)); + } + procedureContext.setConnectorSplitSource(splitSource); + + return new IcebergDistributedProcedureHandle( + tableHandle.getSchemaName(), + tableHandle.getIcebergTableName(), + toPrestoSchema(icebergTable.schema(), typeManager), + toPrestoPartitionSpec(icebergTable.spec(), typeManager), + getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + icebergTable.location(), + getFileFormat(icebergTable), + getCompressionCodec(session), + icebergTable.properties()); + } + } + + private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + { + if (fragments.isEmpty() && + procedureContext.getScannedDataFiles().isEmpty() && + procedureContext.getFullyAppliedDeleteFiles().isEmpty()) { + return; + } + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle; + Table icebergTable = procedureContext.getTransaction().table(); + + List commitTasks = fragments.stream() + .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) + .collect(toImmutableList()); + + org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream() + .map(field -> field.transform().getResultType( + icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + + Set newFiles = new HashSet<>(); + for (CommitTaskData task : commitTasks) { + DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) + .withPath(task.getPath()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withFormat(handle.getFileFormat().name()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + newFiles.add(builder.build()); + } + + RewriteFiles rewriteFiles = procedureContext.getTransaction().newRewrite(); + Set scannedDataFiles = procedureContext.getScannedDataFiles(); + Set fullyAppliedDeleteFiles = procedureContext.getFullyAppliedDeleteFiles(); + rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of()); + + // Table.snapshot method returns null if there is no matching snapshot + Snapshot snapshot = requireNonNull( + handle.getTableName() + .getSnapshotId() + .map(icebergTable::snapshot) + .orElse(null), + "snapshot is null"); + if (icebergTable.currentSnapshot() != null) { + rewriteFiles.validateFromSnapshot(snapshot.snapshotId()); + } + rewriteFiles.commit(); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 39e13724d44e9..1101edd14f3b5 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -73,6 +73,7 @@ import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.getFileSystem; import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.resolveLatestMetadataLocation; import static com.facebook.presto.testing.MaterializedResult.resultBuilder; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.String.format; @@ -2013,6 +2014,66 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str } } + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String schemaName = getSession().getSchema().get(); + String tableName = "test_rewrite_data_files_table_" + randomTableSuffix(); + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Call procedure rewrite_data_files without filter to rewrite all data files + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "')", 5); + + // Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String schemaName = getSession().getSchema().get(); + String tableName = "test_rewrite_data_files_table_" + randomTableSuffix(); + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "', filter => 'a in (1, 2)')", 2); + + // Then we can do metadata delete on column `c`, because all data files are now under new partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + @DataProvider(name = "version_and_mode") public Object[][] versionAndMode() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 8b3471229e4db..a781d28fdd012 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -2043,6 +2043,62 @@ public void testDecimal(boolean decimalVectorReaderEnabled) } } + public void testMetadataDeleteOnV2MorTableWithRewriteDataFiles() + { + String tableName = "test_rewrite_data_files_table_" + randomTableSuffix(); + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002')"); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // Execute row level delete with filter on column `b` + assertUpdate("DELETE FROM " + tableName + " WHERE b = '1004'", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3), (5, '1005', 5)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + assertQueryFails("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a > 3')", ".*"); + assertQueryFails("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'c > 3')", ".*"); + + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 3); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3), (5, '1005', 5)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 3); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE c = 5", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'c > 2')", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + @Test public void testRefsTable() { @@ -2891,14 +2947,14 @@ private void testWithAllFileFormats(Session session, BiConsumer map = snapshot.summary(); int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); assertEquals(totalDataFiles, dataFilesCount); } - private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + protected void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) { Map map = snapshot.summary(); int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java index ed274b55b7bf5..98e8521dbaf04 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java @@ -98,6 +98,7 @@ import static com.facebook.presto.sql.planner.assertions.MatchResult.match; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyNot; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.callDistributedProcedure; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.expression; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; @@ -107,8 +108,13 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictProject; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableFinish; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -730,6 +736,50 @@ public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDelete } } + @Test + public void testCallDistributedProcedureOnPartitionedTable() + { + String tableName = "partition_table_for_call_distributed_procedure"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c1'])"); + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + + assertPlan(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, getSession().getSchema().get()), + output(tableFinish(exchange(REMOTE_STREAMING, GATHER, + callDistributedProcedure( + exchange(LOCAL, GATHER, + exchange(REMOTE_STREAMING, REPARTITION, + strictTableScan(tableName, identityMap("c1", "c2"))))))))); + + // Do not support the filter that couldn't be enforced totally by tableScan + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 > ''bar''')", tableName, getSession().getSchema().get()), + "Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression"); + + // Support the filter that could be enforced totally by tableScan + assertPlan(getSession(), format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, getSession().getSchema().get()), + output(tableFinish(exchange(REMOTE_STREAMING, GATHER, + callDistributedProcedure( + exchange(LOCAL, GATHER, + exchange(REMOTE_STREAMING, REPARTITION, + strictTableScan(tableName, identityMap("c1", "c2")))))))), + plan -> assertTableLayout( + plan, + tableName, + withColumnDomains(ImmutableMap.of( + new Subfield( + "c1", + ImmutableList.of()), + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 3L)), false))), + TRUE_CONSTANT, + ImmutableSet.of("c1"))); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + @DataProvider(name = "timezones") public Object[][] timezones() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java new file mode 100644 index 0000000000000..fb79f69618fca --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java @@ -0,0 +1,508 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterator; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.Map; +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; +import static java.lang.String.format; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; +import static org.testng.Assert.assertEquals; + +public class TestRewriteDataFilesProcedure + extends AbstractTestQueryFramework +{ + public static final String TEST_SCHEMA = "tpch"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .setFormat(PARQUET) + .setNodeCount(OptionalInt.of(1)) + .setCreateTpchTables(false) + .setAddJmxPlugin(false) + .build().getQueryRunner(); + } + + public void dropTable(String tableName) + { + assertQuerySucceeds("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testRewriteDataFilesInEmptyTable() + { + String tableName = "default_empty_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)"); + assertUpdate(format("CALL system.rewrite_data_files('%s', '%s')", TEST_SCHEMA, tableName), 0); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesOnPartitionTable() + { + String tableName = "example_partition_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c2'])"); + + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 10,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 10, 0); + + assertUpdate("DELETE from " + tableName + " WHERE c1 = 7", 1); + assertUpdate("DELETE from " + tableName + " WHERE c1 in (8, 10)", 2); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 10,and the number of delete files is 3 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 3); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(9, 'foo')"); + + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 7); + + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 2,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 2); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .filter(alwaysTrue()) + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 2, 0); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(9, 'foo')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesOnNonPartitionTable() + { + String tableName = "example_non_partition_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + assertUpdate("DELETE from " + tableName + " WHERE c1 = 7", 1); + assertUpdate("DELETE from " + tableName + " WHERE c1 in (9, 10)", 2); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 5,and the number of delete files is 2 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 2); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(8, 'bar')"); + + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 7); + + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .filter(alwaysTrue()) + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 1, 0); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(8, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithFilter() + { + String tableName = "example_partition_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c2'])"); + + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 10,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 10, 0); + + // do not support rewrite files filtered by non-identity columns + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, TEST_SCHEMA), ".*"); + + // select 5 files to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 = ''bar''')", tableName, TEST_SCHEMA), 5); + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 6,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 6); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 6, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeterministicTrueFilter() + { + String tableName = "example_non_partition_true_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + // do not support rewrite files filtered by non-identity columns + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, TEST_SCHEMA), ".*"); + + // the filter is `true` means select all files to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1 = 1')", tableName, TEST_SCHEMA), 10); + + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 1, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeterministicFalseFilter() + { + String tableName = "example_non_partition_false_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + // the filter is `false` means select no file to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1 = 0')", tableName, TEST_SCHEMA), 0); + + table.refresh(); + assertHasSize(table.snapshots(), 5); + //The number of data files is still 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeleteAndPartitionEvolution() + { + String tableName = "example_partition_evolution_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar)"); + assertUpdate("INSERT INTO " + tableName + " values(1, '1001'), (2, '1002')", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + assertQuery("select * from " + tableName, "values(2, '1002')"); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 2); + //The number of data files is 1,and the number of delete files is 1 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 1); + + assertUpdate("alter table " + tableName + " add column c int with (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " values(5, '1005', 5), (6, '1006', 6), (7, '1007', 7)", 3); + assertUpdate("DELETE FROM " + tableName + " WHERE b = '1006'", 1); + assertQuery("select * from " + tableName, "values(2, '1002', NULL), (5, '1005', 5), (7, '1007', 7)"); + + table.refresh(); + assertHasSize(table.snapshots(), 4); + //The number of data files is 4,and the number of delete files is 2 + assertHasDataFiles(table.currentSnapshot(), 4); + assertHasDeleteFiles(table.currentSnapshot(), 2); + + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'a > 3')", tableName, TEST_SCHEMA), ".*"); + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c > 3')", tableName, TEST_SCHEMA), ".*"); + + assertUpdate(format("call system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 3); + table.refresh(); + assertHasSize(table.snapshots(), 5); + //The number of data files is 3,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 3); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 3, 0); + assertQuery("select * from " + tableName, "values(2, '1002', NULL), (5, '1005', 5), (7, '1007', 7)"); + + assertUpdate("delete from " + tableName + " where b = '1002'", 1); + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 3,and the number of delete files is 1 + assertHasDataFiles(table.currentSnapshot(), 3); + assertHasDeleteFiles(table.currentSnapshot(), 1); + assertUpdate(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c is null')", tableName, TEST_SCHEMA), 0); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 2,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 2); + assertHasDeleteFiles(table.currentSnapshot(), 0); + assertQuery("select * from " + tableName, "values(5, '1005', 5), (7, '1007', 7)"); + + // This is a metadata delete + assertUpdate("delete from " + tableName + " where c = 7", 1); + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + assertQuery("select * from " + tableName, "values(5, '1005', 5)"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testInvalidParameterCases() + { + String tableName = "invalid_parameter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar, c int)"); + assertQueryFails("CALL system.rewrite_data_files('n', table_name => 't')", ".*Named and positional arguments cannot be mixed"); + assertQueryFails("CALL custom.rewrite_data_files('n', 't')", "Procedure not registered: custom.rewrite_data_files"); + assertQueryFails("CALL system.rewrite_data_files()", ".*Required procedure argument 'schema' is missing"); + assertQueryFails("CALL system.rewrite_data_files('s', 'n')", "Schema s does not exist"); + assertQueryFails("CALL system.rewrite_data_files('', '')", "Table name is empty"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '''hello''')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type varchar\\(5\\)"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1001')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type integer"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'a')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type integer"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'n')", tableName, TEST_SCHEMA), ".*Column 'n' cannot be resolved"); + } + finally { + dropTable(tableName); + } + } + + private Table loadTable(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + private Map getProperties() + { + File metastoreDir = getCatalogDirectory(); + return ImmutableMap.of("warehouse", metastoreDir.toString()); + } + + private File getCatalogDirectory() + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false); + return catalogDirectory.toFile(); + } + + private void assertHasSize(Iterable iterable, int size) + { + AtomicInteger count = new AtomicInteger(0); + iterable.forEach(obj -> count.incrementAndGet()); + assertEquals(count.get(), size); + } + + private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount) + { + Map map = snapshot.summary(); + int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); + assertEquals(totalDataFiles, dataFilesCount); + } + + private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + { + Map map = snapshot.summary(); + int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); + assertEquals(totalDeleteFiles, deleteFilesCount); + } + + private void assertFilesPlan(CloseableIterator iterator, int dataFileCount, int deleteFileCount) + { + AtomicInteger dataCount = new AtomicInteger(0); + AtomicInteger deleteCount = new AtomicInteger(0); + while (iterator.hasNext()) { + FileScanTask fileScanTask = iterator.next(); + dataCount.incrementAndGet(); + deleteCount.addAndGet(fileScanTask.deletes().size()); + } + assertEquals(dataCount.get(), dataFileCount); + assertEquals(deleteCount.get(), deleteFileCount); + + try { + iterator.close(); + iterator = CloseableIterator.empty(); + } + catch (Exception e) { + // do nothing + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java index 727076b088744..42e0df564402a 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -142,10 +142,4 @@ public void testSetOauth2ServerUriPropertyI() assertEquals(catalog.properties().get(OAUTH2_SERVER_URI), authEndpoint); } - - @Override - public void testDeprecatedTablePropertiesCreateTable() - { - // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) - } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java index e4925a46b5f84..fc383fc928d23 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java @@ -1285,8 +1285,15 @@ protected Scope visitCall(Call call, Optional scope) if (analysis.isDescribe()) { return createAndAssignScope(call, scope); } - QualifiedObjectName procedureName = analysis.getProcedureName() - .orElse(createQualifiedObjectName(session, call, call.getName(), metadata)); + Optional procedureNameOptional = analysis.getProcedureName(); + QualifiedObjectName procedureName; + if (!procedureNameOptional.isPresent()) { + procedureName = createQualifiedObjectName(session, call, call.getName(), metadata); + analysis.setProcedureName(Optional.of(procedureName)); + } + else { + procedureName = procedureNameOptional.get(); + } ConnectorId connectorId = metadata.getCatalogHandle(session, procedureName.getCatalogName()) .orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName())); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java index e5838185f495f..d882fe7e54a5f 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java @@ -43,6 +43,7 @@ import com.facebook.presto.spi.plan.SemiJoinNode; import com.facebook.presto.spi.plan.SortNode; import com.facebook.presto.spi.plan.SpatialJoinNode; +import com.facebook.presto.spi.plan.TableFinishNode; import com.facebook.presto.spi.plan.TableWriterNode; import com.facebook.presto.spi.plan.TopNNode; import com.facebook.presto.spi.plan.UnionNode; @@ -60,6 +61,7 @@ import com.facebook.presto.sql.planner.iterative.GroupReference; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.GroupIdNode; @@ -691,6 +693,16 @@ public static PlanMatchPattern enforceSingleRow(PlanMatchPattern source) return node(EnforceSingleRowNode.class, source); } + public static PlanMatchPattern callDistributedProcedure(PlanMatchPattern source) + { + return node(CallDistributedProcedureNode.class, source); + } + + public static PlanMatchPattern tableFinish(PlanMatchPattern source) + { + return node(TableFinishNode.class, source); + } + public static PlanMatchPattern tableWriter(List columns, List columnNames, PlanMatchPattern source) { return node(TableWriterNode.class, source).with(new TableWriterMatcher(columns, columnNames)); From a2eb3d0aae41d489bec776d60615f293bcf5bd28 Mon Sep 17 00:00:00 2001 From: wangd Date: Fri, 31 Oct 2025 08:38:26 +0800 Subject: [PATCH 3/6] [native] Relevant changes of presto protocol for iceberg --- .../iceberg/presto_protocol_iceberg.cpp | 141 ++++++++++++++++++ .../iceberg/presto_protocol_iceberg.h | 21 +++ .../iceberg/presto_protocol_iceberg.yml | 6 + .../IcebergDistributedProcedureHandle.hpp.inc | 35 +++++ 4 files changed, 203 insertions(+) create mode 100644 presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp index 228d31f569937..c7272c467980a 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp @@ -741,6 +741,147 @@ void from_json(const json& j, PrestoIcebergPartitionSpec& p) { } } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { +IcebergDistributedProcedureHandle:: + IcebergDistributedProcedureHandle() noexcept { + _type = "hive-iceberg"; +} + +void to_json(json& j, const IcebergDistributedProcedureHandle& p) { + j = json::object(); + j["@type"] = "hive-iceberg"; + to_json_key( + j, + "schemaName", + p.schemaName, + "IcebergDistributedProcedureHandle", + "String", + "schemaName"); + to_json_key( + j, + "tableName", + p.tableName, + "IcebergDistributedProcedureHandle", + "IcebergTableName", + "tableName"); + to_json_key( + j, + "schema", + p.schema, + "IcebergDistributedProcedureHandle", + "PrestoIcebergSchema", + "schema"); + to_json_key( + j, + "partitionSpec", + p.partitionSpec, + "IcebergDistributedProcedureHandle", + "PrestoIcebergPartitionSpec", + "partitionSpec"); + to_json_key( + j, + "inputColumns", + p.inputColumns, + "IcebergDistributedProcedureHandle", + "List", + "inputColumns"); + to_json_key( + j, + "outputPath", + p.outputPath, + "IcebergDistributedProcedureHandle", + "String", + "outputPath"); + to_json_key( + j, + "fileFormat", + p.fileFormat, + "IcebergDistributedProcedureHandle", + "FileFormat", + "fileFormat"); + to_json_key( + j, + "compressionCodec", + p.compressionCodec, + "IcebergDistributedProcedureHandle", + "HiveCompressionCodec", + "compressionCodec"); + to_json_key( + j, + "storageProperties", + p.storageProperties, + "IcebergDistributedProcedureHandle", + "Map", + "storageProperties"); +} + +void from_json(const json& j, IcebergDistributedProcedureHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "schemaName", + p.schemaName, + "IcebergDistributedProcedureHandle", + "String", + "schemaName"); + from_json_key( + j, + "tableName", + p.tableName, + "IcebergDistributedProcedureHandle", + "IcebergTableName", + "tableName"); + from_json_key( + j, + "schema", + p.schema, + "IcebergDistributedProcedureHandle", + "PrestoIcebergSchema", + "schema"); + from_json_key( + j, + "partitionSpec", + p.partitionSpec, + "IcebergDistributedProcedureHandle", + "PrestoIcebergPartitionSpec", + "partitionSpec"); + from_json_key( + j, + "inputColumns", + p.inputColumns, + "IcebergDistributedProcedureHandle", + "List", + "inputColumns"); + from_json_key( + j, + "outputPath", + p.outputPath, + "IcebergDistributedProcedureHandle", + "String", + "outputPath"); + from_json_key( + j, + "fileFormat", + p.fileFormat, + "IcebergDistributedProcedureHandle", + "FileFormat", + "fileFormat"); + from_json_key( + j, + "compressionCodec", + p.compressionCodec, + "IcebergDistributedProcedureHandle", + "HiveCompressionCodec", + "compressionCodec"); + from_json_key( + j, + "storageProperties", + p.storageProperties, + "IcebergDistributedProcedureHandle", + "Map", + "storageProperties"); +} +} // namespace facebook::presto::protocol::iceberg +namespace facebook::presto::protocol::iceberg { void to_json(json& j, const SortField& p) { j = json::object(); diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h index a659dc24d103b..f6f5c8e773fe0 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h @@ -183,6 +183,27 @@ struct PrestoIcebergPartitionSpec { void to_json(json& j, const PrestoIcebergPartitionSpec& p); void from_json(const json& j, PrestoIcebergPartitionSpec& p); } // namespace facebook::presto::protocol::iceberg +// IcebergDistributedProcedureHandle is special since it needs an usage of +// hive::. + +namespace facebook::presto::protocol::iceberg { +struct IcebergDistributedProcedureHandle + : public ConnectorDistributedProcedureHandle { + String schemaName = {}; + IcebergTableName tableName = {}; + PrestoIcebergSchema schema = {}; + PrestoIcebergPartitionSpec partitionSpec = {}; + List inputColumns = {}; + String outputPath = {}; + FileFormat fileFormat = {}; + hive::HiveCompressionCodec compressionCodec = {}; + Map storageProperties = {}; + + IcebergDistributedProcedureHandle() noexcept; +}; +void to_json(json& j, const IcebergDistributedProcedureHandle& p); +void from_json(const json& j, IcebergDistributedProcedureHandle& p); +} // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { struct SortField { int sourceColumnId = {}; diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml index 1a8be3d90b3b6..9ceec008fc7c7 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml @@ -37,6 +37,11 @@ AbstractClasses: subclasses: - { name: IcebergInsertTableHandle, key: hive-iceberg } + ConnectorDistributedProcedureHandle: + super: JsonEncodedSubclass + subclasses: + - { name: IcebergDistributedProcedureHandle, key: hive-iceberg } + ConnectorTableLayoutHandle: super: JsonEncodedSubclass subclasses: @@ -62,6 +67,7 @@ JavaClasses: - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergOutputTableHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergInsertTableHandle.java + - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionField.java diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc new file mode 100644 index 0000000000000..c5ab91c0416f8 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// IcebergDistributedProcedureHandle is special since it needs an usage of +// hive::. + +namespace facebook::presto::protocol::iceberg { +struct IcebergDistributedProcedureHandle + : public ConnectorDistributedProcedureHandle { + String schemaName = {}; + IcebergTableName tableName = {}; + PrestoIcebergSchema schema = {}; + PrestoIcebergPartitionSpec partitionSpec = {}; + List inputColumns = {}; + String outputPath = {}; + FileFormat fileFormat = {}; + hive::HiveCompressionCodec compressionCodec = {}; + Map storageProperties = {}; + + IcebergDistributedProcedureHandle() noexcept; +}; +void to_json(json& j, const IcebergDistributedProcedureHandle& p); +void from_json(const json& j, IcebergDistributedProcedureHandle& p); +} // namespace facebook::presto::protocol::iceberg From bd06a6b987dd2c3bfbc270bb6ff91e4e11eb7b90 Mon Sep 17 00:00:00 2001 From: wangd Date: Tue, 18 Nov 2025 08:51:09 +0800 Subject: [PATCH 4/6] Address review comments --- presto-docs/src/main/sphinx/connector/iceberg.rst | 8 ++++---- .../facebook/presto/iceberg/IcebergAbstractMetadata.java | 1 + .../facebook/presto/iceberg/IcebergProcedureContext.java | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 81532b4b59636..4619d783071ad 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1240,12 +1240,12 @@ Examples: Rewrite Data Files ^^^^^^^^^^^^^^^^^^ -Iceberg tracks all data files under different partition specs in a table. More data files requires -more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and -less efficient queries from file open costs. Also, data files under different partition specs can +Iceberg tracks all data files under different partition specs in a table. More data files require +more metadata to be stored in manifest files, and small data files can cause an unnecessary amount of metadata and +less efficient queries due to file open costs. Also, data files under different partition specs can prevent metadata level deletion or thorough predicate push down for Presto. -Use `rewrite_data_files` to rewrite the data files of a specified table so that they are +Use ``rewrite_data_files`` to rewrite the data files of a specified table so that they are merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data files compaction can act separately on the selected partitions to improve read performance by reducing metadata overhead and runtime file open cost. diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index dbbe3ad8b5307..febf84c189794 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -1172,6 +1172,7 @@ public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDi ((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments); transaction.commitTransaction(); procedureContext.get().destroy(); + procedureContext = Optional.empty(); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java index a0ce2e325959d..b2ffa32cfd650 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java @@ -90,6 +90,6 @@ public void destroy() this.scannedDataFiles.clear(); this.fullyAppliedDeleteFiles.clear(); this.connectorSplitSource.ifPresent(ConnectorSplitSource::close); - this.connectorSplitSource = null; + this.connectorSplitSource = Optional.empty(); } } From e823b207a4c008492ed8777ca995ad4e515fbaae Mon Sep 17 00:00:00 2001 From: wangd Date: Thu, 27 Nov 2025 12:39:53 +0800 Subject: [PATCH 5/6] Address comment: add a new method in ConnectorSplitSource to avoid adding a customized split source --- .../CallDistributedProcedureSplitSource.java | 137 ------------------ .../iceberg/IcebergAbstractMetadata.java | 5 +- .../iceberg/IcebergProcedureContext.java | 18 +-- .../presto/iceberg/IcebergSplitManager.java | 14 +- .../presto/iceberg/IcebergSplitSource.java | 12 ++ .../iceberg/RewriteDataFilesProcedure.java | 58 +++----- .../presto/spi/ConnectorSplitSource.java | 6 + 7 files changed, 51 insertions(+), 199 deletions(-) delete mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java deleted file mode 100644 index e8eeda5c97477..0000000000000 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.iceberg; - -import com.facebook.presto.iceberg.delete.DeleteFile; -import com.facebook.presto.spi.ConnectorSession; -import com.facebook.presto.spi.ConnectorSplit; -import com.facebook.presto.spi.ConnectorSplitSource; -import com.facebook.presto.spi.SplitWeight; -import com.facebook.presto.spi.connector.ConnectorPartitionHandle; -import com.google.common.collect.ImmutableList; -import com.google.common.io.Closer; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - -import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; -import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; -import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; -import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; -import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Iterators.limit; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.completedFuture; - -public class CallDistributedProcedureSplitSource - implements ConnectorSplitSource -{ - private CloseableIterator fileScanTaskIterator; - private Optional> fileScanTaskConsumer; - - private final TableScan tableScan; - private final Closer closer = Closer.create(); - private final double minimumAssignedSplitWeight; - private final ConnectorSession session; - - public CallDistributedProcedureSplitSource( - ConnectorSession session, - TableScan tableScan, - CloseableIterable fileScanTaskIterable, - Optional> fileScanTaskConsumer, - double minimumAssignedSplitWeight) - { - this.session = requireNonNull(session, "session is null"); - this.tableScan = requireNonNull(tableScan, "tableScan is null"); - this.fileScanTaskIterator = fileScanTaskIterable.iterator(); - this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null"); - this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; - closer.register(fileScanTaskIterator); - } - - @Override - public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) - { - // TODO: move this to a background thread - List splits = new ArrayList<>(); - Iterator iterator = limit(fileScanTaskIterator, maxSize); - while (iterator.hasNext()) { - FileScanTask task = iterator.next(); - fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task)); - splits.add(toIcebergSplit(task)); - } - return completedFuture(new ConnectorSplitBatch(splits, isFinished())); - } - - @Override - public boolean isFinished() - { - return !fileScanTaskIterator.hasNext(); - } - - @Override - public void close() - { - try { - closer.close(); - // TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose - // correct release resources holds by iterator. - fileScanTaskIterator = CloseableIterator.empty(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private ConnectorSplit toIcebergSplit(FileScanTask task) - { - PartitionSpec spec = task.spec(); - Optional partitionData = partitionDataFromStructLike(spec, task.file().partition()); - - // TODO: We should leverage residual expression and convert that to TupleDomain. - // The predicate here is used by readers for predicate push down at reader level, - // so when we do not use residual expression, we are just wasting CPU cycles - // on reader side evaluating a condition that we know will always be true. - - return new IcebergSplit( - task.file().path().toString(), - task.start(), - task.length(), - fromIcebergFileFormat(task.file().format()), - ImmutableList.of(), - getPartitionKeys(task), - PartitionSpecParser.toJson(spec), - partitionData.map(PartitionData::toJson), - getNodeSelectionStrategy(session), - SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), - task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()), - Optional.empty(), - getDataSequenceNumber(task.file()), - getAffinitySchedulingFileSectionSize(session).toBytes()); - } -} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index febf84c189794..aeaf9340dfb3a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -42,7 +42,6 @@ import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorSession; -import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableLayout; import com.facebook.presto.spi.ConnectorTableLayoutHandle; @@ -327,9 +326,9 @@ protected abstract void updateIcebergViewProperties( public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName); - public Optional getSplitSourceInCurrentCallProcedureTransaction() + public Optional getProcedureContext() { - return procedureContext.flatMap(IcebergProcedureContext::getConnectorSplitSource); + return this.procedureContext; } /** diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java index b2ffa32cfd650..a1af6fe19a372 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java @@ -13,10 +13,10 @@ */ package com.facebook.presto.iceberg; -import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.connector.ConnectorProcedureContext; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import static java.util.Objects.requireNonNull; @@ -36,7 +37,7 @@ public class IcebergProcedureContext final Map relevantData = new HashMap<>(); Optional
table = Optional.empty(); Transaction transaction; - Optional connectorSplitSource = Optional.empty(); + Optional> fileScanTaskConsumer = Optional.empty(); public void setTable(Table table) { @@ -58,15 +59,15 @@ public Transaction getTransaction() return transaction; } - public void setConnectorSplitSource(ConnectorSplitSource connectorSplitSource) + public void setFileScanTaskConsumer(Consumer fileScanTaskConsumer) { - requireNonNull(connectorSplitSource, "connectorSplitSource is null"); - this.connectorSplitSource = Optional.of(connectorSplitSource); + requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null"); + this.fileScanTaskConsumer = Optional.of(fileScanTaskConsumer); } - public Optional getConnectorSplitSource() + public Optional> getFileScanTaskConsumer() { - return this.connectorSplitSource; + return this.fileScanTaskConsumer; } public Set getScannedDataFiles() @@ -89,7 +90,6 @@ public void destroy() this.relevantData.clear(); this.scannedDataFiles.clear(); this.fullyAppliedDeleteFiles.clear(); - this.connectorSplitSource.ifPresent(ConnectorSplitSource::close); - this.connectorSplitSource = Optional.empty(); + this.fileScanTaskConsumer = Optional.empty(); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 69141aa4c5df8..baa01e2e58b39 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -36,7 +36,6 @@ import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -84,14 +83,6 @@ public ConnectorSplitSource getSplits( TupleDomain predicate = getNonMetadataColumnConstraints(layoutHandle .getValidPredicate()); - ConnectorMetadata connectorMetadata = transactionManager.get(transaction); - if (connectorMetadata != null) { - IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata; - Optional connectorSplitSource = icebergMetadata.getSplitSourceInCurrentCallProcedureTransaction(); - if (connectorSplitSource.isPresent()) { - return connectorSplitSource.get(); - } - } Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); @@ -129,6 +120,11 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { session, tableScan, getMetadataColumnConstraints(layoutHandle.getValidPredicate())); + ConnectorMetadata connectorMetadata = transactionManager.get(transaction); + if (connectorMetadata != null) { + IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata; + icebergMetadata.getProcedureContext().ifPresent(splitSource::initDistributedProcedureContext); + } return splitSource; } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index 98ee9f2693450..03d530bd16586 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.facebook.presto.spi.connector.ConnectorProcedureContext; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; @@ -65,6 +66,8 @@ public class IcebergSplitSource private final TupleDomain metadataColumnConstraints; + private Optional procedureContext = Optional.empty(); + public IcebergSplitSource( ConnectorSession session, TableScan tableScan, @@ -91,6 +94,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan Iterator iterator = limit(fileScanTaskIterator, maxSize); while (iterator.hasNext()) { FileScanTask task = iterator.next(); + procedureContext.map(IcebergProcedureContext.class::cast) + .flatMap(IcebergProcedureContext::getFileScanTaskConsumer) + .ifPresent(consumer -> consumer.accept(task)); IcebergSplit icebergSplit = (IcebergSplit) toIcebergSplit(task); if (metadataColumnsMatchPredicates(metadataColumnConstraints, icebergSplit.getPath(), icebergSplit.getDataSequenceNumber())) { splits.add(icebergSplit); @@ -99,6 +105,12 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } + @Override + public void initDistributedProcedureContext(ConnectorProcedureContext procedureContext) + { + this.procedureContext = Optional.of(requireNonNull(procedureContext, "procedureContext is null")); + } + @Override public boolean isFinished() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java index 93547a58b2cf3..4197cac05aa41 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java @@ -18,8 +18,6 @@ import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorSession; -import com.facebook.presto.spi.ConnectorSplitSource; -import com.facebook.presto.spi.FixedSplitSource; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.procedure.DistributedProcedure; import com.facebook.presto.spi.procedure.DistributedProcedure.Argument; @@ -36,9 +34,7 @@ import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; import org.apache.iceberg.types.Type; -import org.apache.iceberg.util.TableScanUtil; import javax.inject.Inject; import javax.inject.Provider; @@ -46,14 +42,11 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import static com.facebook.presto.common.type.StandardTypes.VARCHAR; -import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; -import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; @@ -100,40 +93,23 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table")); IcebergTableHandle tableHandle = layoutHandle.getTable(); - ConnectorSplitSource splitSource; - if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) { - splitSource = new FixedSplitSource(ImmutableList.of()); - } - else { - TupleDomain predicate = layoutHandle.getValidPredicate(); - TableScan tableScan = icebergTable.newScan() - .filter(toIcebergExpression(predicate)) - .useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get()); - - Consumer fileScanTaskConsumer = (task) -> { - procedureContext.getScannedDataFiles().add(task.file()); - if (!task.deletes().isEmpty()) { - task.deletes().forEach(deleteFile -> { - if (deleteFile.content() == FileContent.EQUALITY_DELETES && - !icebergTable.specs().get(deleteFile.specId()).isPartitioned() && - !predicate.isAll()) { - // Equality files with an unpartitioned spec are applied as global deletes - // So they should not be cleaned up unless the whole table is optimized - return; - } - procedureContext.getFullyAppliedDeleteFiles().add(deleteFile); - }); - } - }; - - splitSource = new CallDistributedProcedureSplitSource( - session, - tableScan, - TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()), - Optional.of(fileScanTaskConsumer), - getMinimumAssignedSplitWeight(session)); - } - procedureContext.setConnectorSplitSource(splitSource); + TupleDomain predicate = layoutHandle.getValidPredicate(); + Consumer fileScanTaskConsumer = (task) -> { + procedureContext.getScannedDataFiles().add(task.file()); + if (!task.deletes().isEmpty()) { + task.deletes().forEach(deleteFile -> { + if (deleteFile.content() == FileContent.EQUALITY_DELETES && + !icebergTable.specs().get(deleteFile.specId()).isPartitioned() && + !predicate.isAll()) { + // Equality files with an unpartitioned spec are applied as global deletes + // So they should not be cleaned up unless the whole table is optimized + return; + } + procedureContext.getFullyAppliedDeleteFiles().add(deleteFile); + }); + } + }; + procedureContext.setFileScanTaskConsumer(fileScanTaskConsumer); return new IcebergDistributedProcedureHandle( tableHandle.getSchemaName(), diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java index a875a87d65173..6ec3c2a5e4e6a 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java @@ -14,6 +14,7 @@ package com.facebook.presto.spi; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.facebook.presto.spi.connector.ConnectorProcedureContext; import java.io.Closeable; import java.util.List; @@ -26,6 +27,11 @@ public interface ConnectorSplitSource { CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize); + default void initDistributedProcedureContext(ConnectorProcedureContext procedureContext) + { + throw new UnsupportedOperationException("distributed procedure is not supported in this Connector"); + } + default void rewind(ConnectorPartitionHandle partitionHandle) { throw new UnsupportedOperationException("rewind is not supported in this ConnectorSplitSource"); From 0d136f9fa71775bbed1b9550ac48c47ace16dddb Mon Sep 17 00:00:00 2001 From: wangd Date: Mon, 1 Dec 2025 14:26:33 +0800 Subject: [PATCH 6/6] Move RewriteDataFilesProcedure to procedure package And fix the error message thrown when target table does not exist --- .../facebook/presto/iceberg/IcebergCommonModule.java | 1 + .../{ => procedure}/RewriteDataFilesProcedure.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) rename presto-iceberg/src/main/java/com/facebook/presto/iceberg/{ => procedure}/RewriteDataFilesProcedure.java (94%) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 7d5154dca605f..06041ea9a8810 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -48,6 +48,7 @@ import com.facebook.presto.iceberg.procedure.ManifestFileCacheInvalidationProcedure; import com.facebook.presto.iceberg.procedure.RegisterTableProcedure; import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles; +import com.facebook.presto.iceberg.procedure.RewriteDataFilesProcedure; import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure; import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure; import com.facebook.presto.iceberg.procedure.SetCurrentSnapshotProcedure; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java similarity index 94% rename from presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java rename to presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java index 4197cac05aa41..c0908505fc729 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RewriteDataFilesProcedure.java @@ -11,11 +11,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.iceberg; +package com.facebook.presto.iceberg.procedure; import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.iceberg.CommitTaskData; +import com.facebook.presto.iceberg.IcebergColumnHandle; +import com.facebook.presto.iceberg.IcebergDistributedProcedureHandle; +import com.facebook.presto.iceberg.IcebergProcedureContext; +import com.facebook.presto.iceberg.IcebergTableHandle; +import com.facebook.presto.iceberg.IcebergTableLayoutHandle; +import com.facebook.presto.iceberg.PartitionData; import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; @@ -90,7 +97,7 @@ public DistributedProcedure get() private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { - Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("Target table does not exist")); IcebergTableHandle tableHandle = layoutHandle.getTable(); TupleDomain predicate = layoutHandle.getValidPredicate();