Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ public class DorisEventSerializer implements DorisRecordSerializer<Event> {
public static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");

/** Format TIME type data without precision. */
public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");

/** Format TIME type data with millisecond precision. */
public static final DateTimeFormatter TIME_WITH_MILLISECOND_FORMATTER =
DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

/** ZoneId from pipeline config to support timestamp with local time zone. */
public final ZoneId pipelineZoneId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,18 @@ static SerializationConverter createExternalConverter(DataType type, ZoneId pipe
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
case TIME_WITHOUT_TIME_ZONE:
return (index, val) -> val.getTime(index).toLocalTime();
return (index, val) -> {
int precision = DataTypeChecks.getPrecision(type);
if (precision == 0) {
return val.getTime(index)
.toLocalTime()
.format(DorisEventSerializer.TIME_FORMATTER);
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DorisRowConverter depending on DorisEventSerializer.TIME_FORMATTER couples conversion logic to the serializer class. To reduce cross-class coupling, consider moving TIME formatting constants/utilities into a dedicated shared utility (or into DorisRowConverter if only used there), keeping DorisEventSerializer focused on event serialization.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do that, just keep consistent with other parts of the code.

} else {
return val.getTime(index)
.toLocalTime()
.format(DorisEventSerializer.TIME_WITH_MILLISECOND_FORMATTER);
}
};
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index), type);
case MAP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand All @@ -41,6 +42,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -67,6 +69,38 @@ public class DorisEventSerializerTest {
private static final BinaryRecordDataGenerator RECORD_DATA_GENERATOR =
new BinaryRecordDataGenerator(((RowType) SCHEMA.toRowDataType()));

@Test
public void testDataChangeEventWithTimeDataType() throws IOException {
Schema schema =
Schema.newBuilder()
.physicalColumn("id_", DataTypes.BIGINT().notNull())
.physicalColumn("time_0_", DataTypes.TIME(0))
.physicalColumn("time_3_", DataTypes.TIME(3))
.primaryKey("id_")
.build();
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(((RowType) schema.toRowDataType()));
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, schema);
DataChangeEvent dataChangeEvent =
DataChangeEvent.insertEvent(
TABLE_ID,
generator.generate(
new Object[] {
1L,
TimeData.fromLocalTime(LocalTime.of(19, 43, 17)),
TimeData.fromLocalTime(LocalTime.of(21, 45, 3, 123000000)),
}));

dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), new Configuration());
dorisEventSerializer.serialize(createTableEvent);
DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent);
JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());

Assertions.assertThat(jsonNode.get("id_").asLong()).isEqualTo(1L);
Assertions.assertThat(jsonNode.get("time_0_").asText()).isEqualTo("19:43:17");
Assertions.assertThat(jsonNode.get("time_3_").asText()).isEqualTo("21:45:03.123");
}

@Test
public void testDataChangeEventWithDateTimePartitionColumn() throws IOException {
Map<String, String> configMap = new HashMap<>();
Expand Down
Loading