Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,47 @@ Examples:

CALL iceberg.system.set_table_property('schema_name', 'table_name', 'commit.retry.num-retries', '10');

Rewrite Data Files
^^^^^^^^^^^^^^^^^^

Iceberg tracks all data files under different partition specs in a table. More data files require
more metadata to be stored in manifest files, and small data files can cause an unnecessary amount of metadata and
less efficient queries due to file open costs. Also, data files under different partition specs can
prevent metadata level deletion or thorough predicate push down for Presto.

Use ``rewrite_data_files`` to rewrite the data files of a specified table so that they are
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
files compaction can act separately on the selected partitions to improve read performance by reducing
metadata overhead and runtime file open cost.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update.

``table_name`` ✔️ string Name of the table to update.

``filter`` string Predicate as a string used for filtering the files. Currently
only rewrite of whole partitions is supported. Filter on partition
columns. The default value is `true`.

``options`` map Options to be used for data files rewrite. (to be expanded)
===================== ========== =============== =======================================================================

Examples:

* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::

CALL iceberg.system.rewrite_data_files('db', 'sample');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');

* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::

CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');

Presto C++ Support
^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class CallDistributedProcedureSplitSource
implements ConnectorSplitSource
{
private CloseableIterator<FileScanTask> fileScanTaskIterator;
private Optional<Consumer<FileScanTask>> fileScanTaskConsumer;

private final TableScan tableScan;
private final Closer closer = Closer.create();
private final double minimumAssignedSplitWeight;
private final ConnectorSession session;

public CallDistributedProcedureSplitSource(
ConnectorSession session,
TableScan tableScan,
CloseableIterable<FileScanTask> fileScanTaskIterable,
Optional<Consumer<FileScanTask>> fileScanTaskConsumer,
double minimumAssignedSplitWeight)
{
this.session = requireNonNull(session, "session is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null");
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
closer.register(fileScanTaskIterator);
}

@Override
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
{
// TODO: move this to a background thread
List<ConnectorSplit> splits = new ArrayList<>();
Iterator<FileScanTask> iterator = limit(fileScanTaskIterator, maxSize);
while (iterator.hasNext()) {
FileScanTask task = iterator.next();
fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task));
splits.add(toIcebergSplit(task));
}
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
}

@Override
public boolean isFinished()
{
return !fileScanTaskIterator.hasNext();
}

@Override
public void close()
{
try {
closer.close();
// TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
// correct release resources holds by iterator.
fileScanTaskIterator = CloseableIterator.empty();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private ConnectorSplit toIcebergSplit(FileScanTask task)
{
PartitionSpec spec = task.spec();
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());

// TODO: We should leverage residual expression and convert that to TupleDomain.
// The predicate here is used by readers for predicate push down at reader level,
// so when we do not use residual expression, we are just wasting CPU cycles
// on reader side evaluating a condition that we know will always be true.

return new IcebergSplit(
task.file().path().toString(),
task.start(),
task.length(),
fromIcebergFileFormat(task.file().format()),
ImmutableList.of(),
getPartitionKeys(task),
PartitionSpecParser.toJson(spec),
partitionData.map(PartitionData::toJson),
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()),
getAffinitySchedulingFileSectionSize(session).toBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.TupleDomain;
Expand All @@ -35,10 +36,13 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
Expand All @@ -59,6 +63,9 @@
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.procedure.BaseProcedure;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.ProcedureRegistry;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
Expand Down Expand Up @@ -219,10 +226,12 @@ public abstract class IcebergAbstractMetadata
protected static final String INFORMATION_SCHEMA = "information_schema";

protected final TypeManager typeManager;
protected final ProcedureRegistry procedureRegistry;
protected final JsonCodec<CommitTaskData> commitTaskCodec;
protected final NodeVersion nodeVersion;
protected final RowExpressionService rowExpressionService;
protected final FilterStatsCalculatorService filterStatsCalculatorService;
protected Optional<IcebergProcedureContext> procedureContext = Optional.empty();
protected Transaction transaction;
protected final StatisticsFileCache statisticsFileCache;
protected final IcebergTableProperties tableProperties;
Expand All @@ -232,6 +241,7 @@ public abstract class IcebergAbstractMetadata

public IcebergAbstractMetadata(
TypeManager typeManager,
ProcedureRegistry procedureRegistry,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
Expand All @@ -241,6 +251,7 @@ public IcebergAbstractMetadata(
IcebergTableProperties tableProperties)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
Expand All @@ -267,6 +278,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName

public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName);

public Optional<ConnectorSplitSource> getSplitSourceInCurrentCallProcedureTransaction()
{
return procedureContext.flatMap(IcebergProcedureContext::getConnectorSplitSource);
}

/**
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
*/
Expand Down Expand Up @@ -1041,6 +1057,49 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
removeScanFiles(icebergTable, TupleDomain.all());
}

@Override
public ConnectorDistributedProcedureHandle beginCallDistributedProcedure(
ConnectorSession session,
QualifiedObjectName procedureName,
ConnectorTableLayoutHandle tableLayoutHandle,
Object[] arguments)
{
IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable();
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

if (handle.isSnapshotSpecified()) {
throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot");
}

transaction = icebergTable.newTransaction();
BaseProcedure<?> procedure = procedureRegistry.resolve(
new ConnectorId(procedureName.getCatalogName()),
new SchemaTableName(
procedureName.getSchemaName(),
procedureName.getObjectName()));
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
procedureContext = Optional.of((IcebergProcedureContext) ((DistributedProcedure) procedure).createContext());
procedureContext.get().setTable(icebergTable);
procedureContext.get().setTransaction(transaction);
return ((DistributedProcedure) procedure).begin(session, procedureContext.get(), tableLayoutHandle, arguments);
}

@Override
public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
{
BaseProcedure<?> procedure = procedureRegistry.resolve(
new ConnectorId(procedureName.getCatalogName()),
new SchemaTableName(
procedureName.getSchemaName(),
procedureName.getObjectName()));
verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure");
verify(procedureContext.isPresent(), "procedure context must be present");
((DistributedProcedure) procedure).finish(procedureContext.get(), procedureHandle, fragments);
transaction.commitTransaction();
procedureContext.get().destroy();
procedureContext = Optional.empty();
}

@Override
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ protected void setup(Binder binder)
procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Map;

public class IcebergDistributedProcedureHandle
extends IcebergWritableTableHandle
implements ConnectorDistributedProcedureHandle
{
@JsonCreator
public IcebergDistributedProcedureHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") IcebergTableName tableName,
@JsonProperty("schema") PrestoIcebergSchema schema,
@JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec,
@JsonProperty("inputColumns") List<IcebergColumnHandle> inputColumns,
@JsonProperty("outputPath") String outputPath,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("compressionCodec") HiveCompressionCodec compressionCodec,
@JsonProperty("storageProperties") Map<String, String> storageProperties)
{
super(
schemaName,
tableName,
schema,
partitionSpec,
inputColumns,
outputPath,
fileFormat,
compressionCodec,
storageProperties,
ImmutableList.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -69,6 +70,12 @@ public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
return IcebergTableHandle.class;
}

@Override
public Class<? extends ConnectorDistributedProcedureHandle> getDistributedProcedureHandleClass()
{
return IcebergDistributedProcedureHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Loading
Loading