Skip to content

Commit 10ba4ee

Browse files
Flink: Support writing shredded variant (#15596)
1 parent 7bb0fa2 commit 10ba4ee

11 files changed

Lines changed: 1164 additions & 14 deletions

File tree

docs/docs/flink-configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
160160
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
161161
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
162162
| uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table |
163+
| shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write |
164+
| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size for this write |
163165

164166
#### Range distribution statistics type
165167

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,4 +262,22 @@ public Duration tableRefreshInterval() {
262262
.flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL)
263263
.parseOptional();
264264
}
265+
266+
public boolean parquetShredVariants() {
267+
return confParser
268+
.booleanConf()
269+
.option(FlinkWriteOptions.SHRED_VARIANTS.key())
270+
.tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
271+
.defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
272+
.parse();
273+
}
274+
275+
public int parquetVariantInferenceBufferSize() {
276+
return confParser
277+
.intConf()
278+
.option(FlinkWriteOptions.VARIANT_INFERENCE_BUFFER_SIZE.key())
279+
.tableProperty(TableProperties.PARQUET_VARIANT_BUFFER_SIZE)
280+
.defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT)
281+
.parse();
282+
}
265283
}

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,10 @@ private FlinkWriteOptions() {}
105105
// specify the uidSuffix to be used for the underlying IcebergSink
106106
public static final ConfigOption<String> UID_SUFFIX =
107107
ConfigOptions.key("uid-suffix").stringType().defaultValue("");
108+
109+
public static final ConfigOption<Boolean> SHRED_VARIANTS =
110+
ConfigOptions.key("shred-variants").booleanType().defaultValue(false);
111+
112+
public static final ConfigOption<Integer> VARIANT_INFERENCE_BUFFER_SIZE =
113+
ConfigOptions.key("variant-inference-buffer-size").intType().noDefaultValue();
108114
}

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
*/
1919
package org.apache.iceberg.flink.data;
2020

21+
import java.util.function.Function;
22+
import java.util.function.UnaryOperator;
2123
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
2225
import org.apache.flink.table.types.logical.RowType;
2326
import org.apache.iceberg.avro.AvroFormatModel;
2427
import org.apache.iceberg.formats.FormatModelRegistry;
@@ -33,7 +36,10 @@ public static void register() {
3336
RowType.class,
3437
FlinkParquetWriters::buildWriter,
3538
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
36-
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
39+
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
40+
new FlinkVariantShreddingAnalyzer(),
41+
(Function<RowType, UnaryOperator<RowData>>)
42+
rowType -> new RowDataSerializer(rowType)::copy));
3743

3844
FormatModelRegistry.register(
3945
AvroFormatModel.create(
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.data;
20+
21+
import java.nio.ByteBuffer;
22+
import java.nio.ByteOrder;
23+
import java.util.List;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.table.types.logical.RowType;
26+
import org.apache.flink.types.variant.BinaryVariant;
27+
import org.apache.flink.types.variant.Variant;
28+
import org.apache.iceberg.parquet.VariantShreddingAnalyzer;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
30+
import org.apache.iceberg.variants.VariantMetadata;
31+
import org.apache.iceberg.variants.VariantValue;
32+
33+
/**
34+
* Analyzes Variant fields in Flink {@link RowData} and converts Flink's binary Variant
35+
* representation to Iceberg {@link VariantValue} instances for Variant shredding.
36+
*/
37+
public class FlinkVariantShreddingAnalyzer extends VariantShreddingAnalyzer<RowData, RowType> {
38+
39+
@Override
40+
protected List<VariantValue> extractVariantValues(
41+
List<RowData> bufferedRows, int variantFieldIndex) {
42+
List<VariantValue> values = Lists.newArrayList();
43+
44+
for (RowData row : bufferedRows) {
45+
if (!row.isNullAt(variantFieldIndex)) {
46+
Variant flinkVariant = row.getVariant(variantFieldIndex);
47+
if (flinkVariant != null) {
48+
if (flinkVariant instanceof BinaryVariant binaryVariant) {
49+
VariantValue variantValue =
50+
VariantValue.from(
51+
VariantMetadata.from(
52+
ByteBuffer.wrap(binaryVariant.getMetadata())
53+
.order(ByteOrder.LITTLE_ENDIAN)),
54+
ByteBuffer.wrap(binaryVariant.getValue()).order(ByteOrder.LITTLE_ENDIAN));
55+
56+
values.add(variantValue);
57+
} else {
58+
throw new UnsupportedOperationException(
59+
"Not a supported type: " + flinkVariant.getClass());
60+
}
61+
}
62+
}
63+
}
64+
65+
return values;
66+
}
67+
68+
@Override
69+
protected int resolveColumnIndex(RowType flinkSchema, String columnName) {
70+
return flinkSchema.getFieldIndex(columnName);
71+
}
72+
}

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
2525
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
2626
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
27+
import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS;
28+
import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE;
2729

2830
import java.util.List;
2931
import java.util.Map;
@@ -128,6 +130,10 @@ public static Map<String, String> writeProperties(
128130
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
129131
}
130132

133+
writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.parquetShredVariants()));
134+
writeProperties.put(
135+
PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.parquetVariantInferenceBufferSize()));
136+
131137
break;
132138
case AVRO:
133139
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());

0 commit comments

Comments
 (0)