[FLINK-34466][LINEAGE] Support dataset type facet for Avro#171
[FLINK-34466][LINEAGE] Support dataset type facet for Avro#171pawel-big-lebowski wants to merge 1 commit into
Conversation
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1b75ef0 to
86f3d3c
Compare
| * | ||
| * @return | ||
| */ | ||
| default Optional<SerializationSchema> getSerializationSchema() { |
There was a problem hiding this comment.
Shouldn't this interface live in Flink? And the logic to fallback to the serialization schema also be in Flink.
As I assume a File connector using the Avro format could hit this issue as well.
There was a problem hiding this comment.
I strongly agree. The whole TypeDatasetFacet should be removed from here. As far as i can tell, this PR should be revived and merged beforehand: apache/flink#25712
|
|
||
| return Optional.of( | ||
| new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type))); | ||
| } catch (InvalidTypesException e) { |
There was a problem hiding this comment.
Wouldn't a type check beforehand be less expensive resource-wise? (Depending on how often do you expect to hit this fallback)
| SerializationSchema.class, valueSerialization.getClass(), 0); | ||
|
|
||
| return Optional.of(new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type))); | ||
| } catch (InvalidTypesException e) { |
There was a problem hiding this comment.
|
This PR is being marked as stale since it has not had any activity in the last 90 days. If you are having difficulty finding a reviewer, please reach out to the If this PR is no longer valid or desired, please feel free to close it. |
|
This PR has been closed since it has not had any activity in 120 days. |
With an implementation of lineage interfaces, Kafka source and sink provide
TypeDatasetFacetwhich contains type information of data processed. This was working with extracting type information fromSerializationSchema:However, this ain't working for
ConfluentRegistryAvroSerializationSchemadue to type erasure.As a workaround,
TypeDatasetFacetcan be extended to containSerializationSchemawhen direct extraction ofTypeInformationis not possible. This allows handling this case on the lineage listener side, while keeping flink-connector-kafka unaware of Avro type extraction logic.