diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 8897bfb777e1f..4619d783071ad 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1237,6 +1237,47 @@ Examples: CALL iceberg.system.set_table_property('schema_name', 'table_name', 'commit.retry.num-retries', '10'); +Rewrite Data Files +^^^^^^^^^^^^^^^^^^ + +Iceberg tracks all data files under different partition specs in a table. More data files require +more metadata to be stored in manifest files, and small data files can cause an unnecessary amount of metadata and +less efficient queries due to file open costs. Also, data files under different partition specs can +prevent metadata level deletion or thorough predicate push down for Presto. + +Use ``rewrite_data_files`` to rewrite the data files of a specified table so that they are +merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data +files compaction can act separately on the selected partitions to improve read performance by reducing +metadata overhead and runtime file open cost. + +The following arguments are available: + +===================== ========== =============== ======================================================================= +Argument Name required type Description +===================== ========== =============== ======================================================================= +``schema`` ✔️ string Schema of the table to update. + +``table_name`` ✔️ string Name of the table to update. + +``filter`` string Predicate as a string used for filtering the files. Currently + only rewrite of whole partitions is supported. Filter on partition + columns. The default value is `true`. + +``options`` map Options to be used for data files rewrite. (to be expanded) +===================== ========== =============== ======================================================================= + +Examples: + +* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones:: + + CALL iceberg.system.rewrite_data_files('db', 'sample'); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample'); + +* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec:: + + CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1'); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1'); + Presto C++ Support ^^^^^^^^^^^^^^^^^^ diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index eb6ea59912b76..aeaf9340dfb3a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -15,6 +15,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.TupleDomain; @@ -35,6 +36,8 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -62,6 +65,9 @@ import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.BaseProcedure; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.security.ViewSecurity; @@ -251,11 +257,13 @@ public abstract class IcebergAbstractMetadata protected static final int CURRENT_MATERIALIZED_VIEW_FORMAT_VERSION = 1; protected final TypeManager typeManager; + protected final ProcedureRegistry procedureRegistry; protected final JsonCodec commitTaskCodec; protected final JsonCodec> columnMappingsCodec; protected final NodeVersion nodeVersion; protected final RowExpressionService rowExpressionService; protected final FilterStatsCalculatorService filterStatsCalculatorService; + protected Optional procedureContext = Optional.empty(); protected Transaction transaction; protected final StatisticsFileCache statisticsFileCache; protected final IcebergTableProperties tableProperties; @@ -265,6 +273,7 @@ public abstract class IcebergAbstractMetadata public IcebergAbstractMetadata( TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -275,6 +284,7 @@ public IcebergAbstractMetadata( IcebergTableProperties tableProperties) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.columnMappingsCodec = requireNonNull(columnMappingsCodec, "columnMappingsCodec is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); @@ -316,6 +326,11 @@ protected abstract void updateIcebergViewProperties( public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName); + public Optional getProcedureContext() + { + return this.procedureContext; + } + /** * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker */ @@ -1116,6 +1131,49 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa removeScanFiles(icebergTable, TupleDomain.all()); } + @Override + public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable(); + Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + + if (handle.isSnapshotSpecified()) { + throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot"); + } + + transaction = icebergTable.newTransaction(); + BaseProcedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext()); + procedureContext.get().setTable(icebergTable); + procedureContext.get().setTransaction(transaction); + return ((DistributedProcedure) procedure).begin(session, procedureContext.get(), tableLayoutHandle, arguments); + } + + @Override + public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) + { + BaseProcedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + verify(procedureContext.isPresent(), "procedure context must be present"); + ((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments); + transaction.commitTransaction(); + procedureContext.get().destroy(); + procedureContext = Optional.empty(); + } + @Override public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 7220800e0959e..7d5154dca605f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -188,6 +188,7 @@ protected void setup(Binder binder) procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON); // for orc binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java new file mode 100644 index 0000000000000..0ae38a46e8946 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.hive.HiveCompressionCodec; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Map; + +public class IcebergDistributedProcedureHandle + extends IcebergWritableTableHandle + implements ConnectorDistributedProcedureHandle +{ + @JsonCreator + public IcebergDistributedProcedureHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") IcebergTableName tableName, + @JsonProperty("schema") PrestoIcebergSchema schema, + @JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec, + @JsonProperty("inputColumns") List inputColumns, + @JsonProperty("outputPath") String outputPath, + @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec, + @JsonProperty("storageProperties") Map storageProperties) + { + super( + schemaName, + tableName, + schema, + partitionSpec, + inputColumns, + outputPath, + fileFormat, + compressionCodec, + storageProperties, + ImmutableList.of()); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java index 199939c6b7985..92d3d0e9fdeec 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java @@ -16,6 +16,7 @@ import com.facebook.presto.hive.HiveTransactionHandle; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -69,6 +70,12 @@ public Class getDeleteTableHandleClass() return IcebergTableHandle.class; } + @Override + public Class getDistributedProcedureHandleClass() + { + return IcebergDistributedProcedureHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 5fb9569b5222e..830279e61229a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -56,6 +56,7 @@ import com.facebook.presto.spi.ViewNotFoundException; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.security.PrestoPrincipal; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; @@ -177,6 +178,7 @@ public IcebergHiveMetadata( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -189,7 +191,8 @@ public IcebergHiveMetadata( IcebergTableProperties tableProperties, ConnectorSystemConfig connectorSystemConfig) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, + nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index d01ae7a33dab7..0c6bd7f411959 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import jakarta.inject.Inject; @@ -38,6 +39,7 @@ public class IcebergHiveMetadataFactory final ExtendedHiveMetastore metastore; final HdfsEnvironment hdfsEnvironment; final TypeManager typeManager; + final ProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final JsonCodec> columnMappingsCodec; final StandardFunctionResolution functionResolution; @@ -56,6 +58,7 @@ public IcebergHiveMetadataFactory( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -72,6 +75,7 @@ public IcebergHiveMetadataFactory( this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -92,6 +96,7 @@ public ConnectorMetadata create() metastore, hdfsEnvironment, typeManager, + procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 52d59d7b367f7..a8a327ee69065 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -32,6 +32,7 @@ import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -109,6 +110,7 @@ public class IcebergNativeMetadata public IcebergNativeMetadata( IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -119,7 +121,8 @@ public IcebergNativeMetadata( StatisticsFileCache statisticsFileCache, IcebergTableProperties tableProperties) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, + nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java index b421a823efe2c..72cdebd15689d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import jakarta.inject.Inject; @@ -32,6 +33,7 @@ public class IcebergNativeMetadataFactory implements IcebergMetadataFactory { final TypeManager typeManager; + final ProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final JsonCodec> columnMappingsCodec; final IcebergNativeCatalogFactory catalogFactory; @@ -48,6 +50,7 @@ public IcebergNativeMetadataFactory( IcebergConfig config, IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, + ProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -59,6 +62,7 @@ public IcebergNativeMetadataFactory( { this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -72,6 +76,8 @@ public IcebergNativeMetadataFactory( public ConnectorMetadata create() { - return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, catalogType, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties); + return new IcebergNativeMetadata(catalogFactory, typeManager, procedureRegistry, functionResolution, rowExpressionService, + commitTaskCodec, columnMappingsCodec, catalogType, nodeVersion, filterStatsCalculatorService, + statisticsFileCache, tableProperties); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java index e8e8db1163aed..e14d0178b153d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java @@ -16,6 +16,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -79,6 +80,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa return createPageSink(session, (IcebergWritableTableHandle) insertTableHandle); } + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + return createPageSink(session, (IcebergWritableTableHandle) procedureHandle); + } + private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritableTableHandle tableHandle) { HdfsContext hdfsContext = new HdfsContext(session, tableHandle.getSchemaName(), tableHandle.getTableName().getTableName()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java new file mode 100644 index 0000000000000..a1af6fe19a372 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergProcedureContext.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.connector.ConnectorProcedureContext; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +import static java.util.Objects.requireNonNull; + +public class IcebergProcedureContext + implements ConnectorProcedureContext +{ + final Set scannedDataFiles = new HashSet<>(); + final Set fullyAppliedDeleteFiles = new HashSet<>(); + final Map relevantData = new HashMap<>(); + Optional table = Optional.empty(); + Transaction transaction; + Optional> fileScanTaskConsumer = Optional.empty(); + + public void setTable(Table table) + { + this.table = Optional.of(table); + } + + public void setTransaction(Transaction transaction) + { + this.transaction = transaction; + } + + public Optional
getTable() + { + return table; + } + + public Transaction getTransaction() + { + return transaction; + } + + public void setFileScanTaskConsumer(Consumer fileScanTaskConsumer) + { + requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null"); + this.fileScanTaskConsumer = Optional.of(fileScanTaskConsumer); + } + + public Optional> getFileScanTaskConsumer() + { + return this.fileScanTaskConsumer; + } + + public Set getScannedDataFiles() + { + return scannedDataFiles; + } + + public Set getFullyAppliedDeleteFiles() + { + return fullyAppliedDeleteFiles; + } + + public Map getRelevantData() + { + return relevantData; + } + + public void destroy() + { + this.relevantData.clear(); + this.scannedDataFiles.clear(); + this.fullyAppliedDeleteFiles.clear(); + this.fileScanTaskConsumer = Optional.empty(); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 0ad3345b7ae9d..baa01e2e58b39 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; @@ -82,6 +83,7 @@ public ConnectorSplitSource getSplits( TupleDomain predicate = getNonMetadataColumnConstraints(layoutHandle .getValidPredicate()); + Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); if (table.getIcebergTableName().getTableType() == CHANGELOG) { @@ -118,6 +120,11 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { session, tableScan, getMetadataColumnConstraints(layoutHandle.getValidPredicate())); + ConnectorMetadata connectorMetadata = transactionManager.get(transaction); + if (connectorMetadata != null) { + IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata; + icebergMetadata.getProcedureContext().ifPresent(splitSource::initDistributedProcedureContext); + } return splitSource; } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index 98ee9f2693450..03d530bd16586 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.facebook.presto.spi.connector.ConnectorProcedureContext; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; @@ -65,6 +66,8 @@ public class IcebergSplitSource private final TupleDomain metadataColumnConstraints; + private Optional procedureContext = Optional.empty(); + public IcebergSplitSource( ConnectorSession session, TableScan tableScan, @@ -91,6 +94,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan Iterator iterator = limit(fileScanTaskIterator, maxSize); while (iterator.hasNext()) { FileScanTask task = iterator.next(); + procedureContext.map(IcebergProcedureContext.class::cast) + .flatMap(IcebergProcedureContext::getFileScanTaskConsumer) + .ifPresent(consumer -> consumer.accept(task)); IcebergSplit icebergSplit = (IcebergSplit) toIcebergSplit(task); if (metadataColumnsMatchPredicates(metadataColumnConstraints, icebergSplit.getPath(), icebergSplit.getDataSequenceNumber())) { splits.add(icebergSplit); @@ -99,6 +105,12 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } + @Override + public void initDistributedProcedureContext(ConnectorProcedureContext procedureContext) + { + this.procedureContext = Optional.of(requireNonNull(procedureContext, "procedureContext is null")); + } + @Override public boolean isFinished() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index 3b8994df46855..3fb8319865844 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -49,6 +49,7 @@ import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; import com.facebook.presto.spi.procedure.BaseProcedure; +import com.facebook.presto.spi.procedure.ProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.session.PropertyMetadata; import com.google.common.collect.ImmutableSet; @@ -94,6 +95,7 @@ public static Connector createConnector( binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(ProcedureRegistry.class).toInstance(context.getProcedureRegistry()); binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); binder.bind(PageSorter.class).toInstance(context.getPageSorter()); binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java new file mode 100644 index 0000000000000..4197cac05aa41 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java @@ -0,0 +1,182 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.DistributedProcedure.Argument; +import com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure; +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Type; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; +import static com.facebook.presto.iceberg.IcebergUtil.getColumns; +import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; +import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; +import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; +import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.SCHEMA; +import static com.facebook.presto.spi.procedure.TableDataRewriteDistributedProcedure.TABLE_NAME; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class RewriteDataFilesProcedure + implements Provider +{ + TypeManager typeManager; + JsonCodec commitTaskCodec; + + @Inject + public RewriteDataFilesProcedure( + TypeManager typeManager, + JsonCodec commitTaskCodec) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + } + + @Override + public DistributedProcedure get() + { + return new TableDataRewriteDistributedProcedure( + "system", + "rewrite_data_files", + ImmutableList.of( + new Argument(SCHEMA, VARCHAR), + new Argument(TABLE_NAME, VARCHAR), + new Argument("filter", VARCHAR, false, "TRUE"), + new Argument("options", "map(varchar, varchar)", false, null)), + (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), + ((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)), + IcebergProcedureContext::new); + } + + private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + IcebergTableHandle tableHandle = layoutHandle.getTable(); + + TupleDomain predicate = layoutHandle.getValidPredicate(); + Consumer fileScanTaskConsumer = (task) -> { + procedureContext.getScannedDataFiles().add(task.file()); + if (!task.deletes().isEmpty()) { + task.deletes().forEach(deleteFile -> { + if (deleteFile.content() == FileContent.EQUALITY_DELETES && + !icebergTable.specs().get(deleteFile.specId()).isPartitioned() && + !predicate.isAll()) { + // Equality files with an unpartitioned spec are applied as global deletes + // So they should not be cleaned up unless the whole table is optimized + return; + } + procedureContext.getFullyAppliedDeleteFiles().add(deleteFile); + }); + } + }; + procedureContext.setFileScanTaskConsumer(fileScanTaskConsumer); + + return new IcebergDistributedProcedureHandle( + tableHandle.getSchemaName(), + tableHandle.getIcebergTableName(), + toPrestoSchema(icebergTable.schema(), typeManager), + toPrestoPartitionSpec(icebergTable.spec(), typeManager), + getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + icebergTable.location(), + getFileFormat(icebergTable), + getCompressionCodec(session), + icebergTable.properties()); + } + } + + private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + { + if (fragments.isEmpty() && + procedureContext.getScannedDataFiles().isEmpty() && + procedureContext.getFullyAppliedDeleteFiles().isEmpty()) { + return; + } + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle; + Table icebergTable = procedureContext.getTransaction().table(); + + List commitTasks = fragments.stream() + .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) + .collect(toImmutableList()); + + org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream() + .map(field -> field.transform().getResultType( + icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + + Set newFiles = new HashSet<>(); + for (CommitTaskData task : commitTasks) { + DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) + .withPath(task.getPath()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withFormat(handle.getFileFormat().name()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + newFiles.add(builder.build()); + } + + RewriteFiles rewriteFiles = procedureContext.getTransaction().newRewrite(); + Set scannedDataFiles = procedureContext.getScannedDataFiles(); + Set fullyAppliedDeleteFiles = procedureContext.getFullyAppliedDeleteFiles(); + rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of()); + + // Table.snapshot method returns null if there is no matching snapshot + Snapshot snapshot = requireNonNull( + handle.getTableName() + .getSnapshotId() + .map(icebergTable::snapshot) + .orElse(null), + "snapshot is null"); + if (icebergTable.currentSnapshot() != null) { + rewriteFiles.validateFromSnapshot(snapshot.snapshotId()); + } + rewriteFiles.commit(); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 39e13724d44e9..1101edd14f3b5 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -73,6 +73,7 @@ import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.getFileSystem; import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.resolveLatestMetadataLocation; import static com.facebook.presto.testing.MaterializedResult.resultBuilder; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.String.format; @@ -2013,6 +2014,66 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str } } + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String schemaName = getSession().getSchema().get(); + String tableName = "test_rewrite_data_files_table_" + randomTableSuffix(); + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Call procedure rewrite_data_files without filter to rewrite all data files + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "')", 5); + + // Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String schemaName = getSession().getSchema().get(); + String tableName = "test_rewrite_data_files_table_" + randomTableSuffix(); + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => '" + schemaName + "', filter => 'a in (1, 2)')", 2); + + // Then we can do metadata delete on column `c`, because all data files are now under new partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + @DataProvider(name = "version_and_mode") public Object[][] versionAndMode() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 8b3471229e4db..a781d28fdd012 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -2043,6 +2043,62 @@ public void testDecimal(boolean decimalVectorReaderEnabled) } } + public void testMetadataDeleteOnV2MorTableWithRewriteDataFiles() + { + String tableName = "test_rewrite_data_files_table_" + randomTableSuffix(); + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002')"); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // Execute row level delete with filter on column `b` + assertUpdate("DELETE FROM " + tableName + " WHERE b = '1004'", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3), (5, '1005', 5)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + assertQueryFails("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a > 3')", ".*"); + assertQueryFails("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'c > 3')", ".*"); + + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 3); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3), (5, '1005', 5)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 3); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE c = 5", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'c > 2')", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + @Test public void testRefsTable() { @@ -2891,14 +2947,14 @@ private void testWithAllFileFormats(Session session, BiConsumer map = snapshot.summary(); int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); assertEquals(totalDataFiles, dataFilesCount); } - private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + protected void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) { Map map = snapshot.summary(); int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java index ed274b55b7bf5..98e8521dbaf04 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java @@ -98,6 +98,7 @@ import static com.facebook.presto.sql.planner.assertions.MatchResult.match; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyNot; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.callDistributedProcedure; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.expression; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; @@ -107,8 +108,13 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictProject; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableFinish; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -730,6 +736,50 @@ public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDelete } } + @Test + public void testCallDistributedProcedureOnPartitionedTable() + { + String tableName = "partition_table_for_call_distributed_procedure"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c1'])"); + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + + assertPlan(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, getSession().getSchema().get()), + output(tableFinish(exchange(REMOTE_STREAMING, GATHER, + callDistributedProcedure( + exchange(LOCAL, GATHER, + exchange(REMOTE_STREAMING, REPARTITION, + strictTableScan(tableName, identityMap("c1", "c2"))))))))); + + // Do not support the filter that couldn't be enforced totally by tableScan + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 > ''bar''')", tableName, getSession().getSchema().get()), + "Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression"); + + // Support the filter that could be enforced totally by tableScan + assertPlan(getSession(), format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, getSession().getSchema().get()), + output(tableFinish(exchange(REMOTE_STREAMING, GATHER, + callDistributedProcedure( + exchange(LOCAL, GATHER, + exchange(REMOTE_STREAMING, REPARTITION, + strictTableScan(tableName, identityMap("c1", "c2")))))))), + plan -> assertTableLayout( + plan, + tableName, + withColumnDomains(ImmutableMap.of( + new Subfield( + "c1", + ImmutableList.of()), + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 3L)), false))), + TRUE_CONSTANT, + ImmutableSet.of("c1"))); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + @DataProvider(name = "timezones") public Object[][] timezones() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java new file mode 100644 index 0000000000000..fb79f69618fca --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java @@ -0,0 +1,508 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterator; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.Map; +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; +import static java.lang.String.format; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; +import static org.testng.Assert.assertEquals; + +public class TestRewriteDataFilesProcedure + extends AbstractTestQueryFramework +{ + public static final String TEST_SCHEMA = "tpch"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .setFormat(PARQUET) + .setNodeCount(OptionalInt.of(1)) + .setCreateTpchTables(false) + .setAddJmxPlugin(false) + .build().getQueryRunner(); + } + + public void dropTable(String tableName) + { + assertQuerySucceeds("DROP TABLE IF EXISTS " + tableName); + } + + @Test + public void testRewriteDataFilesInEmptyTable() + { + String tableName = "default_empty_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)"); + assertUpdate(format("CALL system.rewrite_data_files('%s', '%s')", TEST_SCHEMA, tableName), 0); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesOnPartitionTable() + { + String tableName = "example_partition_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c2'])"); + + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 10,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 10, 0); + + assertUpdate("DELETE from " + tableName + " WHERE c1 = 7", 1); + assertUpdate("DELETE from " + tableName + " WHERE c1 in (8, 10)", 2); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 10,and the number of delete files is 3 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 3); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(9, 'foo')"); + + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 7); + + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 2,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 2); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .filter(alwaysTrue()) + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 2, 0); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(9, 'foo')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesOnNonPartitionTable() + { + String tableName = "example_non_partition_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + assertUpdate("DELETE from " + tableName + " WHERE c1 = 7", 1); + assertUpdate("DELETE from " + tableName + " WHERE c1 in (9, 10)", 2); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 5,and the number of delete files is 2 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 2); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(8, 'bar')"); + + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 7); + + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .filter(alwaysTrue()) + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 1, 0); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(8, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithFilter() + { + String tableName = "example_partition_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c2'])"); + + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 10,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 10, 0); + + // do not support rewrite files filtered by non-identity columns + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, TEST_SCHEMA), ".*"); + + // select 5 files to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 = ''bar''')", tableName, TEST_SCHEMA), 5); + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 6,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 6); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 6, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeterministicTrueFilter() + { + String tableName = "example_non_partition_true_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + // do not support rewrite files filtered by non-identity columns + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, TEST_SCHEMA), ".*"); + + // the filter is `true` means select all files to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1 = 1')", tableName, TEST_SCHEMA), 10); + + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 1, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeterministicFalseFilter() + { + String tableName = "example_non_partition_false_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + // the filter is `false` means select no file to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1 = 0')", tableName, TEST_SCHEMA), 0); + + table.refresh(); + assertHasSize(table.snapshots(), 5); + //The number of data files is still 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeleteAndPartitionEvolution() + { + String tableName = "example_partition_evolution_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar)"); + assertUpdate("INSERT INTO " + tableName + " values(1, '1001'), (2, '1002')", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + assertQuery("select * from " + tableName, "values(2, '1002')"); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 2); + //The number of data files is 1,and the number of delete files is 1 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 1); + + assertUpdate("alter table " + tableName + " add column c int with (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " values(5, '1005', 5), (6, '1006', 6), (7, '1007', 7)", 3); + assertUpdate("DELETE FROM " + tableName + " WHERE b = '1006'", 1); + assertQuery("select * from " + tableName, "values(2, '1002', NULL), (5, '1005', 5), (7, '1007', 7)"); + + table.refresh(); + assertHasSize(table.snapshots(), 4); + //The number of data files is 4,and the number of delete files is 2 + assertHasDataFiles(table.currentSnapshot(), 4); + assertHasDeleteFiles(table.currentSnapshot(), 2); + + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'a > 3')", tableName, TEST_SCHEMA), ".*"); + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c > 3')", tableName, TEST_SCHEMA), ".*"); + + assertUpdate(format("call system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 3); + table.refresh(); + assertHasSize(table.snapshots(), 5); + //The number of data files is 3,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 3); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 3, 0); + assertQuery("select * from " + tableName, "values(2, '1002', NULL), (5, '1005', 5), (7, '1007', 7)"); + + assertUpdate("delete from " + tableName + " where b = '1002'", 1); + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 3,and the number of delete files is 1 + assertHasDataFiles(table.currentSnapshot(), 3); + assertHasDeleteFiles(table.currentSnapshot(), 1); + assertUpdate(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c is null')", tableName, TEST_SCHEMA), 0); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 2,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 2); + assertHasDeleteFiles(table.currentSnapshot(), 0); + assertQuery("select * from " + tableName, "values(5, '1005', 5), (7, '1007', 7)"); + + // This is a metadata delete + assertUpdate("delete from " + tableName + " where c = 7", 1); + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + assertQuery("select * from " + tableName, "values(5, '1005', 5)"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testInvalidParameterCases() + { + String tableName = "invalid_parameter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar, c int)"); + assertQueryFails("CALL system.rewrite_data_files('n', table_name => 't')", ".*Named and positional arguments cannot be mixed"); + assertQueryFails("CALL custom.rewrite_data_files('n', 't')", "Procedure not registered: custom.rewrite_data_files"); + assertQueryFails("CALL system.rewrite_data_files()", ".*Required procedure argument 'schema' is missing"); + assertQueryFails("CALL system.rewrite_data_files('s', 'n')", "Schema s does not exist"); + assertQueryFails("CALL system.rewrite_data_files('', '')", "Table name is empty"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '''hello''')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type varchar\\(5\\)"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1001')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type integer"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'a')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type integer"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'n')", tableName, TEST_SCHEMA), ".*Column 'n' cannot be resolved"); + } + finally { + dropTable(tableName); + } + } + + private Table loadTable(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + private Map getProperties() + { + File metastoreDir = getCatalogDirectory(); + return ImmutableMap.of("warehouse", metastoreDir.toString()); + } + + private File getCatalogDirectory() + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false); + return catalogDirectory.toFile(); + } + + private void assertHasSize(Iterable iterable, int size) + { + AtomicInteger count = new AtomicInteger(0); + iterable.forEach(obj -> count.incrementAndGet()); + assertEquals(count.get(), size); + } + + private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount) + { + Map map = snapshot.summary(); + int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); + assertEquals(totalDataFiles, dataFilesCount); + } + + private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + { + Map map = snapshot.summary(); + int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); + assertEquals(totalDeleteFiles, deleteFilesCount); + } + + private void assertFilesPlan(CloseableIterator iterator, int dataFileCount, int deleteFileCount) + { + AtomicInteger dataCount = new AtomicInteger(0); + AtomicInteger deleteCount = new AtomicInteger(0); + while (iterator.hasNext()) { + FileScanTask fileScanTask = iterator.next(); + dataCount.incrementAndGet(); + deleteCount.addAndGet(fileScanTask.deletes().size()); + } + assertEquals(dataCount.get(), dataFileCount); + assertEquals(deleteCount.get(), deleteFileCount); + + try { + iterator.close(); + iterator = CloseableIterator.empty(); + } + catch (Exception e) { + // do nothing + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java index 46d4b7dc7478b..6b5e5782b6ba6 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java @@ -48,6 +48,7 @@ import com.facebook.presto.iceberg.IcebergTableType; import com.facebook.presto.iceberg.ManifestFileCache; import com.facebook.presto.iceberg.statistics.StatisticsFileCache; +import com.facebook.presto.metadata.BuiltInProcedureRegistry; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.spi.ConnectorSession; @@ -412,6 +413,7 @@ private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore metastore, hdfsEnvironment, FUNCTION_AND_TYPE_MANAGER, + new BuiltInProcedureRegistry(METADATA.getFunctionAndTypeManager()), FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE, jsonCodec(CommitTaskData.class), diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java index 727076b088744..42e0df564402a 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -142,10 +142,4 @@ public void testSetOauth2ServerUriPropertyI() assertEquals(catalog.properties().get(OAUTH2_SERVER_URI), authEndpoint); } - - @Override - public void testDeprecatedTablePropertiesCreateTable() - { - // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) - } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java index cace6c11606af..7719add441bcb 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java @@ -1285,8 +1285,15 @@ protected Scope visitCall(Call call, Optional scope) if (analysis.isDescribe()) { return createAndAssignScope(call, scope); } - QualifiedObjectName procedureName = analysis.getProcedureName() - .orElse(createQualifiedObjectName(session, call, call.getName(), metadata)); + Optional procedureNameOptional = analysis.getProcedureName(); + QualifiedObjectName procedureName; + if (!procedureNameOptional.isPresent()) { + procedureName = createQualifiedObjectName(session, call, call.getName(), metadata); + analysis.setProcedureName(Optional.of(procedureName)); + } + else { + procedureName = procedureNameOptional.get(); + } ConnectorId connectorId = metadata.getCatalogHandle(session, procedureName.getCatalogName()) .orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName())); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java index e5838185f495f..d882fe7e54a5f 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java @@ -43,6 +43,7 @@ import com.facebook.presto.spi.plan.SemiJoinNode; import com.facebook.presto.spi.plan.SortNode; import com.facebook.presto.spi.plan.SpatialJoinNode; +import com.facebook.presto.spi.plan.TableFinishNode; import com.facebook.presto.spi.plan.TableWriterNode; import com.facebook.presto.spi.plan.TopNNode; import com.facebook.presto.spi.plan.UnionNode; @@ -60,6 +61,7 @@ import com.facebook.presto.sql.planner.iterative.GroupReference; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.GroupIdNode; @@ -691,6 +693,16 @@ public static PlanMatchPattern enforceSingleRow(PlanMatchPattern source) return node(EnforceSingleRowNode.class, source); } + public static PlanMatchPattern callDistributedProcedure(PlanMatchPattern source) + { + return node(CallDistributedProcedureNode.class, source); + } + + public static PlanMatchPattern tableFinish(PlanMatchPattern source) + { + return node(TableFinishNode.class, source); + } + public static PlanMatchPattern tableWriter(List columns, List columnNames, PlanMatchPattern source) { return node(TableWriterNode.class, source).with(new TableWriterMatcher(columns, columnNames)); diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp index 228d31f569937..c7272c467980a 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp @@ -741,6 +741,147 @@ void from_json(const json& j, PrestoIcebergPartitionSpec& p) { } } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { +IcebergDistributedProcedureHandle:: + IcebergDistributedProcedureHandle() noexcept { + _type = "hive-iceberg"; +} + +void to_json(json& j, const IcebergDistributedProcedureHandle& p) { + j = json::object(); + j["@type"] = "hive-iceberg"; + to_json_key( + j, + "schemaName", + p.schemaName, + "IcebergDistributedProcedureHandle", + "String", + "schemaName"); + to_json_key( + j, + "tableName", + p.tableName, + "IcebergDistributedProcedureHandle", + "IcebergTableName", + "tableName"); + to_json_key( + j, + "schema", + p.schema, + "IcebergDistributedProcedureHandle", + "PrestoIcebergSchema", + "schema"); + to_json_key( + j, + "partitionSpec", + p.partitionSpec, + "IcebergDistributedProcedureHandle", + "PrestoIcebergPartitionSpec", + "partitionSpec"); + to_json_key( + j, + "inputColumns", + p.inputColumns, + "IcebergDistributedProcedureHandle", + "List", + "inputColumns"); + to_json_key( + j, + "outputPath", + p.outputPath, + "IcebergDistributedProcedureHandle", + "String", + "outputPath"); + to_json_key( + j, + "fileFormat", + p.fileFormat, + "IcebergDistributedProcedureHandle", + "FileFormat", + "fileFormat"); + to_json_key( + j, + "compressionCodec", + p.compressionCodec, + "IcebergDistributedProcedureHandle", + "HiveCompressionCodec", + "compressionCodec"); + to_json_key( + j, + "storageProperties", + p.storageProperties, + "IcebergDistributedProcedureHandle", + "Map", + "storageProperties"); +} + +void from_json(const json& j, IcebergDistributedProcedureHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "schemaName", + p.schemaName, + "IcebergDistributedProcedureHandle", + "String", + "schemaName"); + from_json_key( + j, + "tableName", + p.tableName, + "IcebergDistributedProcedureHandle", + "IcebergTableName", + "tableName"); + from_json_key( + j, + "schema", + p.schema, + "IcebergDistributedProcedureHandle", + "PrestoIcebergSchema", + "schema"); + from_json_key( + j, + "partitionSpec", + p.partitionSpec, + "IcebergDistributedProcedureHandle", + "PrestoIcebergPartitionSpec", + "partitionSpec"); + from_json_key( + j, + "inputColumns", + p.inputColumns, + "IcebergDistributedProcedureHandle", + "List", + "inputColumns"); + from_json_key( + j, + "outputPath", + p.outputPath, + "IcebergDistributedProcedureHandle", + "String", + "outputPath"); + from_json_key( + j, + "fileFormat", + p.fileFormat, + "IcebergDistributedProcedureHandle", + "FileFormat", + "fileFormat"); + from_json_key( + j, + "compressionCodec", + p.compressionCodec, + "IcebergDistributedProcedureHandle", + "HiveCompressionCodec", + "compressionCodec"); + from_json_key( + j, + "storageProperties", + p.storageProperties, + "IcebergDistributedProcedureHandle", + "Map", + "storageProperties"); +} +} // namespace facebook::presto::protocol::iceberg +namespace facebook::presto::protocol::iceberg { void to_json(json& j, const SortField& p) { j = json::object(); diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h index a659dc24d103b..f6f5c8e773fe0 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h @@ -183,6 +183,27 @@ struct PrestoIcebergPartitionSpec { void to_json(json& j, const PrestoIcebergPartitionSpec& p); void from_json(const json& j, PrestoIcebergPartitionSpec& p); } // namespace facebook::presto::protocol::iceberg +// IcebergDistributedProcedureHandle is special since it needs an usage of +// hive::. + +namespace facebook::presto::protocol::iceberg { +struct IcebergDistributedProcedureHandle + : public ConnectorDistributedProcedureHandle { + String schemaName = {}; + IcebergTableName tableName = {}; + PrestoIcebergSchema schema = {}; + PrestoIcebergPartitionSpec partitionSpec = {}; + List inputColumns = {}; + String outputPath = {}; + FileFormat fileFormat = {}; + hive::HiveCompressionCodec compressionCodec = {}; + Map storageProperties = {}; + + IcebergDistributedProcedureHandle() noexcept; +}; +void to_json(json& j, const IcebergDistributedProcedureHandle& p); +void from_json(const json& j, IcebergDistributedProcedureHandle& p); +} // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { struct SortField { int sourceColumnId = {}; diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml index 1a8be3d90b3b6..9ceec008fc7c7 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.yml @@ -37,6 +37,11 @@ AbstractClasses: subclasses: - { name: IcebergInsertTableHandle, key: hive-iceberg } + ConnectorDistributedProcedureHandle: + super: JsonEncodedSubclass + subclasses: + - { name: IcebergDistributedProcedureHandle, key: hive-iceberg } + ConnectorTableLayoutHandle: super: JsonEncodedSubclass subclasses: @@ -62,6 +67,7 @@ JavaClasses: - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergOutputTableHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergInsertTableHandle.java + - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java - presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionField.java diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc new file mode 100644 index 0000000000000..c5ab91c0416f8 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/special/IcebergDistributedProcedureHandle.hpp.inc @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// IcebergDistributedProcedureHandle is special since it needs an usage of +// hive::. + +namespace facebook::presto::protocol::iceberg { +struct IcebergDistributedProcedureHandle + : public ConnectorDistributedProcedureHandle { + String schemaName = {}; + IcebergTableName tableName = {}; + PrestoIcebergSchema schema = {}; + PrestoIcebergPartitionSpec partitionSpec = {}; + List inputColumns = {}; + String outputPath = {}; + FileFormat fileFormat = {}; + hive::HiveCompressionCodec compressionCodec = {}; + Map storageProperties = {}; + + IcebergDistributedProcedureHandle() noexcept; +}; +void to_json(json& j, const IcebergDistributedProcedureHandle& p); +void from_json(const json& j, IcebergDistributedProcedureHandle& p); +} // namespace facebook::presto::protocol::iceberg diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java index a875a87d65173..6ec3c2a5e4e6a 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorSplitSource.java @@ -14,6 +14,7 @@ package com.facebook.presto.spi; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.facebook.presto.spi.connector.ConnectorProcedureContext; import java.io.Closeable; import java.util.List; @@ -26,6 +27,11 @@ public interface ConnectorSplitSource { CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize); + default void initDistributedProcedureContext(ConnectorProcedureContext procedureContext) + { + throw new UnsupportedOperationException("distributed procedure is not supported in this Connector"); + } + default void rewind(ConnectorPartitionHandle partitionHandle) { throw new UnsupportedOperationException("rewind is not supported in this ConnectorSplitSource");