diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java index 983a2c91690..f74cfebebdd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java @@ -67,6 +67,13 @@ public class DorisEventSerializer implements DorisRecordSerializer { public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + /** Format TIME type data without precision. */ + public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); + + /** Format TIME type data with millisecond precision. */ + public static final DateTimeFormatter TIME_WITH_MILLISECOND_FORMATTER = + DateTimeFormatter.ofPattern("HH:mm:ss.SSS"); + /** ZoneId from pipeline config to support timestamp with local time zone. */ public final ZoneId pipelineZoneId; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java index b1b67a1e29a..205c94b62e4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java @@ -117,7 +117,18 @@ static SerializationConverter createExternalConverter(DataType type, ZoneId pipe final int zonedP = ((ZonedTimestampType) type).getPrecision(); return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp(); case TIME_WITHOUT_TIME_ZONE: - return (index, val) -> val.getTime(index).toLocalTime(); + return (index, val) -> { + int precision = DataTypeChecks.getPrecision(type); + if (precision == 0) { + return val.getTime(index) + .toLocalTime() + .format(DorisEventSerializer.TIME_FORMATTER); + } else { + return val.getTime(index) + .toLocalTime() + .format(DorisEventSerializer.TIME_WITH_MILLISECOND_FORMATTER); + } + }; case ARRAY: return (index, val) -> convertArrayData(val.getArray(index), type); case MAP: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java index e64643226d9..5bee9799506 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.data.DateData; +import org.apache.flink.cdc.common.data.TimeData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -41,6 +42,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.ZoneId; import java.util.HashMap; import java.util.Map; @@ -67,6 +69,38 @@ public class DorisEventSerializerTest { private static final BinaryRecordDataGenerator RECORD_DATA_GENERATOR = new BinaryRecordDataGenerator(((RowType) SCHEMA.toRowDataType())); + @Test + public void testDataChangeEventWithTimeDataType() throws IOException { + Schema schema = + Schema.newBuilder() + .physicalColumn("id_", DataTypes.BIGINT().notNull()) + .physicalColumn("time_0_", DataTypes.TIME(0)) + .physicalColumn("time_3_", DataTypes.TIME(3)) + .primaryKey("id_") + .build(); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator(((RowType) schema.toRowDataType())); + CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, schema); + DataChangeEvent dataChangeEvent = + DataChangeEvent.insertEvent( + TABLE_ID, + generator.generate( + new Object[] { + 1L, + TimeData.fromLocalTime(LocalTime.of(19, 43, 17)), + TimeData.fromLocalTime(LocalTime.of(21, 45, 3, 123000000)), + })); + + dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), new Configuration()); + dorisEventSerializer.serialize(createTableEvent); + DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent); + JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow()); + + Assertions.assertThat(jsonNode.get("id_").asLong()).isEqualTo(1L); + Assertions.assertThat(jsonNode.get("time_0_").asText()).isEqualTo("19:43:17"); + Assertions.assertThat(jsonNode.get("time_3_").asText()).isEqualTo("21:45:03.123"); + } + @Test public void testDataChangeEventWithDateTimePartitionColumn() throws IOException { Map configMap = new HashMap<>(); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java index 434ccbcf62c..64cd0e18345 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -473,6 +473,8 @@ void testComplexDataTypes() throws Exception { "datetime3_c | DATETIME(3) | Yes | false | null", "datetime6_c | DATETIME(6) | Yes | false | null", "timestamp_c | DATETIME | Yes | false | null", + "time_c | TEXT | Yes | false | null", + "time3_c | TEXT | Yes | false | null", "text_c | TEXT | Yes | false | null", "tiny_blob_c | TEXT | Yes | false | null", "blob_c | TEXT | Yes | false | null", @@ -492,9 +494,9 @@ void testComplexDataTypes() throws Exception { validateSinkResult( databaseName, "DATA_TYPES_TABLE", - 52, + 54, Collections.singletonList( - "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); + "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); LOG.info("Verifying streaming stage of DATA_TYPES_TABLE..."); // generate binlogs @@ -517,7 +519,7 @@ void testComplexDataTypes() throws Exception { + " 4294967295, 4294967295, 2147483647, 9223372036854775807,\n" + " 'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,\n" + " 123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true,\n" - + " '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',\n" + + " '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', '14:38:07', '21:49:13.123',\n" + " 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,\n" + " 'red',\n" + " ST_GeomFromText('POINT(1 1)'),\n" @@ -533,11 +535,11 @@ void testComplexDataTypes() throws Exception { validateSinkResult( databaseName, "DATA_TYPES_TABLE", - 52, + 54, Arrays.asList( - "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine", - "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine", - "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); + "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine", + "2 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine", + "5 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); } catch (SQLException e) { LOG.error("Update table for CDC failed.", e); throw e; @@ -629,6 +631,8 @@ public void testComplexDataTypesInBatchMode() throws Exception { "datetime3_c | DATETIME(3) | Yes | false | null", "datetime6_c | DATETIME(6) | Yes | false | null", "timestamp_c | DATETIME | Yes | false | null", + "time_c | TEXT | Yes | false | null", + "time3_c | TEXT | Yes | false | null", "text_c | TEXT | Yes | false | null", "tiny_blob_c | TEXT | Yes | false | null", "blob_c | TEXT | Yes | false | null", @@ -648,9 +652,9 @@ public void testComplexDataTypesInBatchMode() throws Exception { validateSinkResult( databaseName, "DATA_TYPES_TABLE", - 52, + 54, Collections.singletonList( - "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); + "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | 14:38:07 | 21:49:13.123 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); } @Test diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql index 4c7621250f9..80bd4366a23 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql @@ -55,6 +55,8 @@ CREATE TABLE DATA_TYPES_TABLE datetime3_c DATETIME(3), datetime6_c DATETIME(6), timestamp_c TIMESTAMP, + time_c TIME(0), + time3_c TIME(3), text_c TEXT, tiny_blob_c TINYBLOB, blob_c BLOB, @@ -78,7 +80,7 @@ VALUES (1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147 4294967295, 4294967295, 2147483647, 9223372036854775807, 'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true, - '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', + '2020-07-17', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', '14:38:07', '21:49:13.123', 'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021, 'red', ST_GeomFromText('POINT(1 1)'),