[FLINK-39209][doris] Fix time data type serialiazation when sink to doris with pipeline connector#4312
[FLINK-39209][doris] Fix time data type serialiazation when sink to doris with pipeline connector#4312chengcongchina wants to merge 5 commits intoapache:masterfrom
Conversation
…ris with pipeline connector
There was a problem hiding this comment.
Pull request overview
Fixes Doris sink JSON serialization for TIME columns in the Flink CDC Pipeline connector by converting TIME_WITHOUT_TIME_ZONE values into formatted strings before Jackson serialization.
Changes:
- Add a
TIME_FORMATTERto standardizeTIMEserialization. - Convert
TIME_WITHOUT_TIME_ZONEfromLocalTimeto formattedStringinDorisRowConverter. - Extend unit and e2e tests to cover
TIME(0)/TIME(3)columns end-to-end.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql | Adds TIME columns and values to the e2e DDL/data set. |
| flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java | Updates expected schema/rows to include serialized TIME fields. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java | Adds a focused unit test asserting TIME serialization output. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java | Formats TIME_WITHOUT_TIME_ZONE to a String during conversion. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java | Introduces a shared TIME_FORMATTER constant. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return (index, val) -> | ||
| val.getTime(index) | ||
| .toLocalTime() | ||
| .format(DorisEventSerializer.TIME_FORMATTER); |
There was a problem hiding this comment.
The conversion currently forces a millisecond suffix for all TIME values via HH:mm:ss.SSS, so TIME(0) becomes HH:mm:ss.000. This changes the textual representation compared to typical TIME(0) formatting and ignores the declared TIME precision. Consider formatting based on the TimeType precision (e.g., HH:mm:ss for precision 0; fractional seconds only when precision > 0, padded to the precision), or use an optional-fraction formatter to avoid emitting .000 when not needed.
| return (index, val) -> | ||
| val.getTime(index) | ||
| .toLocalTime() | ||
| .format(DorisEventSerializer.TIME_FORMATTER); |
There was a problem hiding this comment.
DorisRowConverter depending on DorisEventSerializer.TIME_FORMATTER couples conversion logic to the serializer class. To reduce cross-class coupling, consider moving TIME formatting constants/utilities into a dedicated shared utility (or into DorisRowConverter if only used there), keeping DorisEventSerializer focused on event serialization.
There was a problem hiding this comment.
No need to do that, just keep consistent with other parts of the code.
| import org.apache.flink.api.java.tuple.Tuple2; | ||
| import org.apache.flink.cdc.common.configuration.Configuration; | ||
| import org.apache.flink.cdc.common.data.RecordData; | ||
| import org.apache.flink.cdc.common.data.TimeData; |
There was a problem hiding this comment.
TimeData appears to be imported only for the Javadoc link. Many build setups enforce no-unused-imports via Checkstyle/Spotless, and Javadoc references typically don’t count as usage. To avoid a style failure, remove the import and reference the class in Javadoc with its fully-qualified name (or otherwise ensure the import is used in code).
This closes FLINK-39209.
What is the purpose of the change
This PR fixes an issue where sinking data to Doris using the Flink CDC Pipeline connector with
TIMEdata type (e.g.,time(0)from MySQL) would throw a serialization exception:Java 8 date/time type java.time.LocalTime not supported by default.Previously, the
DorisRowConverterreturned rawLocalTimeobjects forTIME_WITHOUT_TIME_ZONEtypes. Since the default JacksonObjectMapperconfiguration inDorisEventSerializerdoes not support Java 8 time types without thejackson-datatype-jsr310module, this caused runtime failures. This PR introduces a dedicatedTIME_FORMATTER(pattern:HH:mm:ss.SSS) to serializeTIMEdata into formatted strings before passing them to the Jackson serializer.Brief change log
DorisEventSerializerto include aTIME_FORMATTERconstant with the patternHH:mm:ss.SSS.DorisRowConverterto formatTIME_WITHOUT_TIME_ZONEdata usingDorisEventSerializer.TIME_FORMATTERinstead of returning rawLocalTimeobjects.DorisEventSerializerTestto include a new test casetestDataChangeEventWithTimeDataTypecoveringTIME(0)andTIME(3)types.MySqlToDorisE2eITCaseanddata_types_test.sqlto includeTIMEcolumns in the end-to-end test scenario.Verifying this change
This change is verified by the following tests:
DorisEventSerializerTest#testDataChangeEventWithTimeDataType: Verifies thatTIMEdata with different precisions are correctly serialized to the expected JSON string format without exceptions.MySqlToDorisE2eITCase: Verifies the end-to-end data synchronization correctness forTIMEtype columns from MySQL to Doris.Does this pull request potentially affect one of the following parts:
Documentation