Skip to content

Commit 86f3d3c

Browse files
[FLINK-34466] Support dataset type facet for Avro
Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent 2bc999b commit 86f3d3c

File tree

6 files changed

+70
-10
lines changed

6 files changed

+70
-10
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package org.apache.flink.connector.kafka.lineage;
22

33
import org.apache.flink.annotation.PublicEvolving;
4+
import org.apache.flink.api.common.serialization.SerializationSchema;
45
import org.apache.flink.api.common.typeinfo.TypeInformation;
56

67
import java.util.Objects;
8+
import java.util.Optional;
79

810
/** Default implementation of {@link KafkaDatasetFacet}. */
911
@PublicEvolving
@@ -13,14 +15,26 @@ public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
1315

1416
private final TypeInformation typeInformation;
1517

18+
private final Optional<SerializationSchema> serializationSchema;
19+
1620
public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
1721
this.typeInformation = typeInformation;
22+
this.serializationSchema = Optional.empty();
23+
}
24+
25+
public DefaultTypeDatasetFacet(SerializationSchema serializationSchema) {
26+
this.serializationSchema = Optional.of(serializationSchema);
27+
this.typeInformation = null;
1828
}
1929

2030
public TypeInformation getTypeInformation() {
2131
return typeInformation;
2232
}
2333

34+
public Optional<SerializationSchema> getSerializationSchema() {
35+
return serializationSchema;
36+
}
37+
2438
public boolean equals(Object o) {
2539
if (this == o) {
2640
return true;
@@ -29,12 +43,13 @@ public boolean equals(Object o) {
2943
return false;
3044
}
3145
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
32-
return Objects.equals(typeInformation, that.typeInformation);
46+
return Objects.equals(typeInformation, that.typeInformation)
47+
&& Objects.equals(serializationSchema, that.serializationSchema);
3348
}
3449

3550
@Override
3651
public int hashCode() {
37-
return Objects.hash(typeInformation);
52+
return Objects.hash(typeInformation, serializationSchema);
3853
}
3954

4055
@Override
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,26 @@
11
package org.apache.flink.connector.kafka.lineage;
22

33
import org.apache.flink.annotation.PublicEvolving;
4+
import org.apache.flink.api.common.serialization.SerializationSchema;
45
import org.apache.flink.api.common.typeinfo.TypeInformation;
56
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
67

8+
import java.util.Optional;
9+
710
/** Facet definition to contain type information of source and sink. */
811
@PublicEvolving
912
public interface TypeDatasetFacet extends LineageDatasetFacet {
1013
TypeInformation getTypeInformation();
14+
15+
/**
16+
* Sometimes a sink implementing {@link TypeDatasetFacetProvider} is not able to extract type.
17+
* This is happening for AvroSerializationSchema due to type erasure problem. In this case, it
18+
* makes sense to expose SerializationSchema to the lineage consumer so that it can use it to
19+
* extract type information.
20+
*
21+
* @return
22+
*/
23+
default Optional<SerializationSchema> getSerializationSchema() {
24+
return Optional.empty();
25+
}
1126
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.connector.kafka.sink;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.api.common.functions.InvalidTypesException;
2122
import org.apache.flink.api.common.serialization.SerializationSchema;
2223
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2324
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -432,13 +433,17 @@ public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
432433
((ResultTypeQueryable<?>) this.valueSerializationSchema)
433434
.getProducedType()));
434435
} else {
435-
// gets type information from serialize method signature
436-
Type type =
437-
TypeExtractor.getParameterType(
438-
SerializationSchema.class, valueSerializationSchema.getClass(), 0);
439436
try {
437+
Type type =
438+
TypeExtractor.getParameterType(
439+
SerializationSchema.class,
440+
valueSerializationSchema.getClass(),
441+
0);
442+
440443
return Optional.of(
441444
new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type)));
445+
} catch (InvalidTypesException e) {
446+
return Optional.of(new DefaultTypeDatasetFacet(valueSerializationSchema));
442447
} catch (Exception e) {
443448
LOG.info(
444449
"Could not extract type information from {}",

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ public LineageVertex getLineageVertex() {
213213
Optional<TypeDatasetFacet> typeDatasetFacet = Optional.empty();
214214
if (recordSerializer instanceof TypeDatasetFacetProvider) {
215215
typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet();
216+
} else {
217+
LOG.info(
218+
"recordSerializer does not implement TypeDatasetFacetProvider: {}",
219+
recordSerializer);
216220
}
217221

218222
if (typeDatasetFacet.isPresent()) {

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.streaming.connectors.kafka.table;
1919

2020
import org.apache.flink.annotation.Internal;
21+
import org.apache.flink.api.common.functions.InvalidTypesException;
2122
import org.apache.flink.api.common.serialization.SerializationSchema;
2223
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2324
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -211,12 +212,14 @@ public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
211212
new DefaultTypeDatasetFacet(
212213
((ResultTypeQueryable<?>) this.valueSerialization).getProducedType()));
213214
} else {
214-
// gets type information from serialize method signature
215-
Type type =
216-
TypeExtractor.getParameterType(
217-
SerializationSchema.class, valueSerialization.getClass(), 0);
218215
try {
216+
Type type =
217+
TypeExtractor.getParameterType(
218+
SerializationSchema.class, valueSerialization.getClass(), 0);
219+
219220
return Optional.of(new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type)));
221+
} catch (InvalidTypesException e) {
222+
return Optional.of(new DefaultTypeDatasetFacet(valueSerialization));
220223
} catch (Exception e) {
221224
LOG.info(
222225
"Could not extract type information from {}",

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,24 @@ public String apply(Object o) {
400400
.isEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
401401
}
402402

403+
@Test
404+
public void testTypeDatasetFacetWithErasureProblem() {
405+
SerializationSchema serializationSchema = element -> new byte[0];
406+
407+
KafkaRecordSerializationSchema<String> schema =
408+
KafkaRecordSerializationSchema.builder()
409+
.setTopic("some-topic")
410+
.setValueSerializationSchema(serializationSchema)
411+
.setKeySerializationSchema(new SimpleStringSchema())
412+
.build();
413+
414+
assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
415+
.isPresent()
416+
.get()
417+
.extracting(TypeDatasetFacet::getSerializationSchema)
418+
.isEqualTo(Optional.of(serializationSchema));
419+
}
420+
403421
private static void assertOnlyOneSerializerAllowed(
404422
List<
405423
Function<

0 commit comments

Comments
 (0)