Skip to content

Commit 2645d51

Browse files
committed
Add support for deletion vector in Iceberg
1 parent f5c429b commit 2645d51

File tree

45 files changed

+1454
-60
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1454
-60
lines changed

docs/src/main/sphinx/connector/iceberg.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
945945
- Optionally specifies the file system location URI for the table.
946946
* - `format_version`
947947
- Optionally specifies the format version of the Iceberg specification to use
948-
for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required
948+
for new tables; either `1`, `2` or `3`. Defaults to `2`. Version `2` is required
949949
for row level deletes.
950950
* - `max_commit_retry`
951951
- Number of times to retry a commit before failing. Defaults to the value of

plugin/trino-iceberg/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@
223223
<artifactId>iceberg-core</artifactId>
224224
</dependency>
225225

226+
<dependency>
227+
<groupId>org.apache.iceberg</groupId>
228+
<artifactId>iceberg-data</artifactId>
229+
<version>${dep.iceberg.version}</version>
230+
</dependency>
231+
226232
<dependency>
227233
<groupId>org.apache.iceberg</groupId>
228234
<artifactId>iceberg-nessie</artifactId>

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,28 @@
1313
*/
1414
package io.trino.plugin.iceberg;
1515

16+
import com.google.common.collect.ImmutableList;
1617
import org.apache.iceberg.FileContent;
18+
import org.apache.iceberg.FileFormat;
1719

1820
import java.util.List;
1921
import java.util.Optional;
22+
import java.util.OptionalLong;
2023

2124
import static java.util.Objects.requireNonNull;
2225

2326
public record CommitTaskData(
2427
String path,
25-
IcebergFileFormat fileFormat,
28+
FileFormat fileFormat,
2629
long fileSizeInBytes,
2730
MetricsWrapper metrics,
2831
String partitionSpecJson,
2932
Optional<String> partitionDataJson,
3033
FileContent content,
3134
Optional<String> referencedDataFile,
35+
List<String> deletionVectorFiles,
36+
OptionalLong deletionVectorContentOffset,
37+
OptionalLong deletionVectorContentSize,
3238
Optional<List<Long>> fileSplitOffsets)
3339
{
3440
public CommitTaskData
@@ -40,6 +46,9 @@ public record CommitTaskData(
4046
requireNonNull(partitionDataJson, "partitionDataJson is null");
4147
requireNonNull(content, "content is null");
4248
requireNonNull(referencedDataFile, "referencedDataFile is null");
49+
deletionVectorFiles = ImmutableList.copyOf(deletionVectorFiles);
4350
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
51+
requireNonNull(deletionVectorContentOffset, "deletionVectorContentOffset is null");
52+
requireNonNull(deletionVectorContentSize, "deletionVectorContentSize is null");
4453
}
4554
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.trino.spi.Page;
1919
import io.trino.spi.TrinoException;
2020
import io.trino.spi.type.Type;
21+
import org.apache.iceberg.FileFormat;
2122
import org.apache.iceberg.Schema;
2223
import org.apache.iceberg.avro.Avro;
2324
import org.apache.iceberg.data.Record;
@@ -36,6 +37,7 @@
3637
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
3738
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
3839
import static java.util.Objects.requireNonNull;
40+
import static org.apache.iceberg.FileFormat.AVRO;
3941
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
4042

4143
public final class IcebergAvroFileWriter
@@ -46,6 +48,7 @@ public final class IcebergAvroFileWriter
4648
// Use static table name instead of the actual name because it becomes outdated once the table is renamed
4749
public static final String AVRO_TABLE_NAME = "table";
4850

51+
private final String location;
4952
private final Schema icebergSchema;
5053
private final List<Type> types;
5154
private final FileAppender<Record> avroWriter;
@@ -58,6 +61,7 @@ public IcebergAvroFileWriter(
5861
List<Type> types,
5962
HiveCompressionCodec hiveCompressionCodec)
6063
{
64+
this.location = requireNonNull(file.location(), "location is null");
6165
this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction null");
6266
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
6367
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
@@ -71,10 +75,22 @@ public IcebergAvroFileWriter(
7175
.build();
7276
}
7377
catch (IOException e) {
74-
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + file.location(), e);
78+
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + location, e);
7579
}
7680
}
7781

82+
@Override
83+
public FileFormat fileFormat()
84+
{
85+
return AVRO;
86+
}
87+
88+
@Override
89+
public String location()
90+
{
91+
return location;
92+
}
93+
7894
@Override
7995
public long getWrittenBytes()
8096
{

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
public class IcebergConfig
5353
{
5454
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
55-
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
55+
private static final int FORMAT_VERSION_DEFAULT = 2;
56+
public static final int FORMAT_VERSION_SUPPORT_MAX = 3;
5657
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
5758
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
5859
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
@@ -74,7 +75,7 @@ public class IcebergConfig
7475
private boolean registerTableProcedureEnabled;
7576
private boolean addFilesProcedureEnabled;
7677
private Optional<String> hiveCatalogName = Optional.empty();
77-
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
78+
private int formatVersion = FORMAT_VERSION_DEFAULT;
7879
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
7980
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);
8081
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java

+11
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
*/
1414
package io.trino.plugin.iceberg;
1515

16+
import com.google.common.collect.ImmutableList;
1617
import io.trino.plugin.hive.FileWriter;
18+
import org.apache.iceberg.FileFormat;
1719
import org.apache.iceberg.Metrics;
1820

1921
import java.util.List;
@@ -22,6 +24,15 @@
2224
public interface IcebergFileWriter
2325
extends FileWriter
2426
{
27+
FileFormat fileFormat();
28+
29+
String location();
30+
31+
default List<String> rewrittenDeleteFiles()
32+
{
33+
return ImmutableList.of();
34+
}
35+
2536
FileMetrics getFileMetrics();
2637

2738
record FileMetrics(Metrics metrics, Optional<List<Long>> splitOffsets) {}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java

+32-2
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,32 @@
3333
import io.trino.plugin.hive.HiveCompressionCodec;
3434
import io.trino.plugin.hive.NodeVersion;
3535
import io.trino.plugin.hive.orc.OrcWriterConfig;
36+
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
3637
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
3738
import io.trino.spi.TrinoException;
3839
import io.trino.spi.connector.ConnectorSession;
3940
import io.trino.spi.type.Type;
4041
import io.trino.spi.type.TypeManager;
42+
import org.apache.iceberg.FileFormat;
4143
import org.apache.iceberg.MetricsConfig;
44+
import org.apache.iceberg.PartitionSpec;
4245
import org.apache.iceberg.Schema;
46+
import org.apache.iceberg.deletes.PositionDeleteIndex;
4347
import org.apache.iceberg.types.Types;
48+
import org.apache.iceberg.util.DeleteFileSet;
4449
import org.weakref.jmx.Managed;
4550

4651
import java.io.Closeable;
4752
import java.io.IOException;
4853
import java.util.List;
4954
import java.util.Map;
5055
import java.util.Optional;
56+
import java.util.function.Function;
5157
import java.util.function.Supplier;
5258
import java.util.stream.IntStream;
5359

5460
import static com.google.common.base.Preconditions.checkArgument;
61+
import static com.google.common.base.Preconditions.checkState;
5562
import static com.google.common.collect.ImmutableList.toImmutableList;
5663
import static io.trino.plugin.hive.HiveCompressionCodecs.toCompressionCodec;
5764
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
@@ -82,6 +89,7 @@
8289
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
8390
import static java.lang.String.format;
8491
import static java.util.Objects.requireNonNull;
92+
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
8593
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
8694
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
8795
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
@@ -139,16 +147,37 @@ public IcebergFileWriter createPositionDeleteWriter(
139147
TrinoFileSystem fileSystem,
140148
Location outputPath,
141149
ConnectorSession session,
142-
IcebergFileFormat fileFormat,
143-
Map<String, String> storageProperties)
150+
String dataFilePath,
151+
FileFormat fileFormat,
152+
PartitionSpec partitionSpec,
153+
Optional<PartitionData> partition,
154+
Map<String, String> storageProperties,
155+
Map<String, DeleteFileSet> previousDeleteFiles)
144156
{
145157
return switch (fileFormat) {
158+
case PUFFIN -> createDeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteFiles);
146159
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
147160
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
148161
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
162+
case METADATA -> throw new IllegalArgumentException("Unexpected METADATA file format");
149163
};
150164
}
151165

166+
private static DeletionVectorWriter createDeletionVectorWriter(
167+
NodeVersion nodeVersion,
168+
TrinoFileSystem fileSystem,
169+
Location outputPath,
170+
String dataFilePath,
171+
PartitionSpec partitionSpec,
172+
Optional<PartitionData> partition,
173+
Map<String, DeleteFileSet> previousDeleteFiles)
174+
{
175+
Function<CharSequence, PositionDeleteIndex> previousDeleteLoader = DeletionVectorWriter.create(fileSystem, previousDeleteFiles);
176+
int positionChannel = POSITION_DELETE_SCHEMA.columns().indexOf(DELETE_FILE_POS);
177+
checkState(positionChannel != -1, "positionChannel not found");
178+
return new DeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteLoader::apply, positionChannel);
179+
}
180+
152181
private IcebergFileWriter createParquetWriter(
153182
MetricsConfig metricsConfig,
154183
TrinoFileSystem fileSystem,
@@ -234,6 +263,7 @@ private IcebergFileWriter createOrcWriter(
234263
}
235264

236265
return new IcebergOrcFileWriter(
266+
outputPath,
237267
metricsConfig,
238268
icebergSchema,
239269
orcDataSink,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iceberg.Schema;
3131
import org.apache.iceberg.io.LocationProvider;
3232
import org.apache.iceberg.types.Type;
33+
import org.apache.iceberg.util.DeleteFileSet;
3334
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
3435
import org.roaringbitmap.longlong.LongBitmapDataProvider;
3536
import org.roaringbitmap.longlong.Roaring64Bitmap;
@@ -55,8 +56,10 @@ public class IcebergMergeSink
5556
private final LocationProvider locationProvider;
5657
private final IcebergFileWriterFactory fileWriterFactory;
5758
private final TrinoFileSystem fileSystem;
59+
private final Map<String, DeleteFileSet> previousDeleteFiles;
5860
private final JsonCodec<CommitTaskData> jsonCodec;
5961
private final ConnectorSession session;
62+
private final int formatVersion;
6063
private final IcebergFileFormat fileFormat;
6164
private final Map<String, String> storageProperties;
6265
private final Schema schema;
@@ -69,8 +72,10 @@ public IcebergMergeSink(
6972
LocationProvider locationProvider,
7073
IcebergFileWriterFactory fileWriterFactory,
7174
TrinoFileSystem fileSystem,
75+
Map<String, DeleteFileSet> previousDeleteFiles,
7276
JsonCodec<CommitTaskData> jsonCodec,
7377
ConnectorSession session,
78+
int formatVersion,
7479
IcebergFileFormat fileFormat,
7580
Map<String, String> storageProperties,
7681
Schema schema,
@@ -81,8 +86,10 @@ public IcebergMergeSink(
8186
this.locationProvider = requireNonNull(locationProvider, "locationProvider is null");
8287
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
8388
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
89+
this.previousDeleteFiles = ImmutableMap.copyOf(previousDeleteFiles);
8490
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
8591
this.session = requireNonNull(session, "session is null");
92+
this.formatVersion = formatVersion;
8693
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
8794
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
8895
this.schema = requireNonNull(schema, "schema is null");
@@ -162,8 +169,10 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
162169
fileSystem,
163170
jsonCodec,
164171
session,
172+
formatVersion,
165173
fileFormat,
166-
storageProperties);
174+
storageProperties,
175+
previousDeleteFiles);
167176
}
168177

169178
private static Collection<Slice> writePositionDeletes(PositionDeleteWriter writer, ImmutableLongBitmapDataProvider rowsToDelete)

0 commit comments

Comments
 (0)