Skip to content

[FLINK-37729][flink-formats] ResultTypeQueryable for Avro serialization schema #26507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.formats.avro;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

Expand All @@ -38,7 +41,8 @@
* <p>Note: Changes in this class need to be kept in sync with the corresponding runtime class
* {@link AvroRowDataDeserializationSchema} and schema converter {@link AvroSchemaConverter}.
*/
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
public class AvroRowDataSerializationSchema
implements SerializationSchema<RowData>, ResultTypeQueryable<GenericRecord> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -139,4 +143,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(nestedSchema, rowType);
}

@Override
public TypeInformation<GenericRecord> getProducedType() {
if (schema == null) {
throw new IllegalStateException(
"The produced type is not available before the schema is initialized.");
} else {
return new GenericRecordAvroTypeInfo(schema);
}
}
Comment on lines +149 to +155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the produced type be of row? Naively I'd expect to have the same type bound to
SerializationSchema<RowData>, ResultTypeQueryable<GenericRecord>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know any convenient helper to convert RowData into sth extending TypeInformation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that in the Avro deserialization it returns the type as TypeInformation<RowData>.

I can see in the Parquet Avro and the Orc code it is returning a type.

It would be good if what we do here is consistent with the Parquet Avro case.

I also see that there is a comment at the top of this class saying to keep this class in step with AvroRowDataDeserializationSchema} and schema converter {@link AvroSchemaConverter}. AvroSchemaConverter does deal with Type info.

It would be useful for me to understand what how lineage is calling this and why we end up in this class. Maybe a lineage unit test bringing in an Avro schema and a Parquet Avro schema should be helpful.

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package org.apache.flink.formats.avro;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -45,7 +50,8 @@
*
* @param <T> the type to be serialized
*/
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
public class AvroSerializationSchema<T>
implements SerializationSchema<T>, ResultTypeQueryable<GenericContainer> {

/**
* Creates {@link AvroSerializationSchema} that serializes {@link SpecificRecord} using provided
Expand Down Expand Up @@ -220,4 +226,22 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(recordClazz, schema);
}

/**
* Returns the type information of the produced type. Depending on the type of the record,
* whether it is a specific or generic record, the type information will be different. It can be
* either a {@link AvroTypeInfo} or a {@link GenericRecordAvroTypeInfo}.
*
* @return TypeInformation of the produced type
*/
@Override
public TypeInformation getProducedType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be TypeInformation<T>. You probably need to cast the returns though but I think it makes things more explicit.

Copy link
Author

@pawel-big-lebowski pawel-big-lebowski Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common type for both cases is ResultTypeQueryable<GenericContainer> as both SpecificRecord and GenericRecord extend GenericContainer. However, I could not make it work for GenericRecordAvroTypeInfo to be casted into TypeInformation<GenericContainer>

if (recordClazz != null && SpecificRecord.class.isAssignableFrom(recordClazz)) {
// if the record class is a specific record, we can use the schema to get the type info
return new AvroTypeInfo(recordClazz);
} else {
// generic record
return new GenericRecordAvroTypeInfo(schema);
}
}
}