From e9abd18d926490c47854acbf8f231883c49fff35 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Fri, 25 Apr 2025 10:03:41 +0200 Subject: [PATCH] [FLINK-37729][avro] Implement ResultTypeQueryable for Avro serialization schema Signed-off-by: Pawel Leszczynski --- .../avro/AvroRowDataSerializationSchema.java | 16 +++++++++++- .../formats/avro/AvroSerializationSchema.java | 26 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java index b88979cfc8887..e5f415eb7e181 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java @@ -19,8 +19,11 @@ package org.apache.flink.formats.avro; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -38,7 +41,8 @@ *

Note: Changes in this class need to be kept in sync with the corresponding runtime class * {@link AvroRowDataDeserializationSchema} and schema converter {@link AvroSchemaConverter}. */ -public class AvroRowDataSerializationSchema implements SerializationSchema { +public class AvroRowDataSerializationSchema + implements SerializationSchema, ResultTypeQueryable { private static final long serialVersionUID = 1L; @@ -139,4 +143,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(nestedSchema, rowType); } + + @Override + public TypeInformation getProducedType() { + if (schema == null) { + throw new IllegalStateException( + "The produced type is not available before the schema is initialized."); + } else { + return new GenericRecordAvroTypeInfo(schema); + } + } } diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroSerializationSchema.java index 11aae1ce64a51..95889583d7262 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroSerializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroSerializationSchema.java @@ -19,12 +19,17 @@ package org.apache.flink.formats.avro; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.util.Preconditions; import org.apache.flink.util.WrappingRuntimeException; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -45,7 +50,8 @@ * * @param the type to be serialized */ -public class AvroSerializationSchema implements SerializationSchema { +public class AvroSerializationSchema + implements SerializationSchema, ResultTypeQueryable { /** * Creates {@link AvroSerializationSchema} that serializes {@link SpecificRecord} using provided @@ -220,4 +226,22 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(recordClazz, schema); } + + /** + * Returns the type information of the produced type. Depending on the type of the record, + * whether it is a specific or generic record, the type information will be different. It can be + * either a {@link AvroTypeInfo} or a {@link GenericRecordAvroTypeInfo}. + * + * @return TypeInformation of the produced type + */ + @Override + public TypeInformation getProducedType() { + if (recordClazz != null && SpecificRecord.class.isAssignableFrom(recordClazz)) { + // if the record class is a specific record, we can use the schema to get the type info + return new AvroTypeInfo(recordClazz); + } else { + // generic record + return new GenericRecordAvroTypeInfo(schema); + } + } }