Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use renamed column name in $partition table in Iceberg #24069

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.transforms.Transforms;

import java.util.List;
import java.util.function.Consumer;
Expand All @@ -25,6 +26,7 @@
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.PartitionTransforms.partitionNameTransform;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
Expand Down Expand Up @@ -97,31 +99,33 @@ public static void parsePartitionField(PartitionSpec.Builder builder, String fie
}) ||
tryMatch(field, YEAR_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.year(column, column + "_year" + suffix);
builder.year(column, partitionNameTransform(Transforms.year(), column) + suffix);
}) ||
tryMatch(field, MONTH_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.month(column, column + "_month" + suffix);
builder.month(column, partitionNameTransform(Transforms.month(), column) + suffix);
}) ||
tryMatch(field, DAY_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.day(column, column + "_day" + suffix);
builder.day(column, partitionNameTransform(Transforms.day(), column) + suffix);
}) ||
tryMatch(field, HOUR_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.hour(column, column + "_hour" + suffix);
builder.hour(column, partitionNameTransform(Transforms.hour(), column) + suffix);
}) ||
tryMatch(field, BUCKET_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.bucket(column, parseInt(match.group(2)), column + "_bucket" + suffix);
int bucketCount = parseInt(match.group(2));
builder.bucket(column, bucketCount, partitionNameTransform(Transforms.bucket(bucketCount), column) + suffix);
}) ||
tryMatch(field, TRUNCATE_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.truncate(column, parseInt(match.group(2)), column + "_trunc" + suffix);
int width = parseInt(match.group(2));
builder.truncate(column, width, partitionNameTransform(Transforms.truncate(width), column) + suffix);
}) ||
tryMatch(field, VOID_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.alwaysNull(column, column + "_null" + suffix);
builder.alwaysNull(column, partitionNameTransform(Transforms.alwaysNull(), column) + suffix);
});
if (!matched) {
throw new IllegalArgumentException("Invalid partition field declaration: " + field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.PartitionTransforms.partitionNameTransform;
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.block.RowValueBuilder.buildRowValue;
Expand Down Expand Up @@ -166,15 +168,34 @@ private Optional<IcebergPartitionColumn> getPartitionColumnType(List<PartitionFi
if (fields.isEmpty()) {
return Optional.empty();
}
List<RowType.Field> partitionFields = fields.stream()
.map(field -> RowType.field(
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());

Map<Integer, String> fieldNames = TypeUtil.indexNameById(icebergTable.schema().asStruct());
ImmutableList.Builder<RowType.Field> fieldsBuilder = ImmutableList.builder();
Map<String, Integer> nameSuffix = new HashMap<>();
schema.columns().stream().map(NestedField::name).forEach(name -> nameSuffix.put(name, 1));

for (PartitionField field : fields) {
String sourceName = fieldNames.getOrDefault(field.sourceId(), field.name());
String targetName;
if (field.transform().isIdentity()) {
checkState(schema.findField(sourceName).fieldId() == field.sourceId(), "Partition field %s is not identity", field);
targetName = partitionNameTransform(field.transform(), sourceName);
}
else {
targetName = partitionNameTransform(field.transform(), sourceName);
int suffix = nameSuffix.compute(targetName, (_, v) -> v == null ? 1 : v + 1);
if (suffix > 1) {
targetName = targetName + "_" + suffix;
}
}
io.trino.spi.type.Type type = toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager);
fieldsBuilder.add(RowType.field(targetName, type));
}

List<Integer> fieldIds = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
return Optional.of(new IcebergPartitionColumn(RowType.from(partitionFields), fieldIds));
return Optional.of(new IcebergPartitionColumn(RowType.from(fieldsBuilder.build()), fieldIds));
}

private Optional<RowType> getMetricsColumnType(List<NestedField> columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.spi.type.VarcharType;
import jakarta.annotation.Nullable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.transforms.Transform;
import org.joda.time.DateTimeField;
import org.joda.time.chrono.ISOChronology;

Expand Down Expand Up @@ -160,6 +161,41 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type sour
throw new UnsupportedOperationException("Unsupported partition transform: " + field);
}

public static String partitionNameTransform(Transform<?, ?> transform, String columnName)
{
String transformString = transform.toString();
switch (transformString) {
case "identity" -> {
return columnName;
}
case "void" -> {
return columnName + "_null";
}
case "year" -> {
return columnName + "_year";
}
case "month" -> {
return columnName + "_month";
}
case "day" -> {
return columnName + "_day";
}
case "hour" -> {
return columnName + "_hour";
}
}

Matcher matcher = BUCKET_PATTERN.matcher(transformString);
if (matcher.matches()) {
return columnName + "_bucket";
}
matcher = TRUNCATE_PATTERN.matcher(transformString);
if (matcher.matches()) {
return columnName + "_trunc";
}
throw new UnsupportedOperationException("Unsupported partition transform: " + transform);
}

private static ColumnTransform identity(Type type)
{
return new ColumnTransform(type, false, true, false, Function.identity(), ValueTransform.identity(type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RowType.field;
import static io.trino.spi.type.RowType.rowType;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
Expand Down Expand Up @@ -381,6 +387,73 @@ public void testFilesTableWithDelete()
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_delete");
}

@Test
public void testPartitionRename()
{
String tableName = "test_partition_rename_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['par1', '\"par2.f1\"']) AS SELECT 1 as v, 11 as par1, CAST(ROW(21, 22) AS ROW(f1 integer, f2 integer)) as par2", 1);

assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME COLUMN par1 to par1_renamed");
assertQuery("SELECT partition.par1_renamed FROM \"" + tableName + "$partitions\"", "VALUES 11");

assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME COLUMN par2 to par2_renamed");
assertQuery("SELECT partition.\"par2_renamed.f1\" FROM \"" + tableName + "$partitions\"", "VALUES 21");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testBucketingRename()
{
String tableName = "test_bucketing_rename_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['bucket(par, 4)']) AS SELECT 1 as v, 11 as par", 1);

assertQuery("SELECT partition.par_bucket FROM \"" + tableName + "$partitions\"", "VALUES 3");
assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME COLUMN par to par_renamed");
assertQuery("SELECT partition.par_renamed_bucket FROM \"" + tableName + "$partitions\"", "VALUES 3");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testPartitionColumns()
{
String tableName = "test_partition_columns_" + randomNameSuffix();
assertUpdate(String.format("""
CREATE TABLE %s WITH (partitioning = ARRAY[
'c1',
'"r1.f1"',
'year(d1)',
'month(d2)',
'day(d3)',
'hour(d4)',
'bucket(b1, 4)',
'truncate(t1, 2)']) AS
SELECT
CAST('c1' AS VARCHAR) as c1
, CAST(ROW(1, 2) AS ROW(f1 integer, f2 integer)) as r1
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d1
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d2
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d3
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d4
, 1 as b1
, CAST('12345678' AS VARCHAR) as t1""", tableName), 1);


assertThat(query("SELECT partition FROM \"" + tableName + "$partitions\""))
.result()
.hasTypes(ImmutableList.of(rowType(
field("c1", VARCHAR),
field("r1.f1", INTEGER),
field("d1_year", INTEGER),
field("d2_month", INTEGER),
field("d3_day", DATE),
field("d4_hour", INTEGER),
field("b1_bucket", INTEGER),
field("t1_trunc", VARCHAR))))
.matches("SELECT CAST(ROW('c1', 1, 52, 624, DATE '2022-01-01', 455833, 0, '12') AS ROW(c1 varchar, \"r1.f1\" integer, d1_year integer, d2_month integer, d3_day date, d4_hour integer, b1_bucket integer, t1_trunc varchar))");
}

private Long nanCount(long value)
{
// Parquet does not have nan count metrics
Expand Down