From 4301e3ad58c4a9554b212d45a5996d252c40d7d0 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Fri, 6 Mar 2026 14:37:44 +0800 Subject: [PATCH 1/2] [FLINK-39204] Fluss yaml sink support add column at last --- .../sink/FlussEventSerializationSchema.java | 55 +++-- .../fluss/sink/FlussMetaDataApplier.java | 75 ++++--- .../sink/row/{ => row}/CdcAsFlussRow.java | 0 .../fluss/utils/FlussConversions.java | 30 ++- .../connectors/fluss/FlussPipelineITCase.java | 211 ++++++++++++++++-- .../FlussEventSerializationSchemaTest.java | 76 ++++++- .../fluss/sink/FlussMetadataApplierTest.java | 91 +++++++- .../fluss/sink/v2/FlussSinkITCase.java | 43 +++- .../fluss/utils/FlussConversionsTest.java | 12 +- 9 files changed, 489 insertions(+), 104 deletions(-) rename flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/{ => row}/CdcAsFlussRow.java (100%) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java index 086da58946d..6d457538a2f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java @@ -17,13 +17,16 @@ package org.apache.flink.cdc.connectors.fluss.sink; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow; import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent; import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer; @@ -43,19 +46,19 @@ import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.APPEND; import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.DELETE; import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.UPSERT; -import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue; +import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameSchemaIgnoreCommentAndDefaultValue; import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussSchema; /** Serialization schema that converts a CDC data record to a Fluss event. */ public class FlussEventSerializationSchema implements FlussEventSerializer { private static final long serialVersionUID = 1L; - private transient Map tableInfoMap; + private transient Map schemaMaps; private transient Connection connection; @Override public void open(Connection connection) { - this.tableInfoMap = new HashMap<>(); + this.schemaMaps = new HashMap<>(); this.connection = connection; } @@ -82,29 +85,45 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) { org.apache.flink.cdc.common.schema.Schema newSchema = ((CreateTableEvent) event).getSchema(); // if the table is not exist or the schema is changed, update the table info. - if (!tableInfoMap.containsKey(tableId) - || !sameCdcColumnsIgnoreCommentAndDefaultValue( - tableInfoMap.get(tableId).upstreamCdcSchema, newSchema)) { + if (!schemaMaps.containsKey(tableId) + || !sameSchemaIgnoreCommentAndDefaultValue( + schemaMaps.get(tableId).upstreamCdcSchema, newSchema)) { Table table = connection.getTable(getTablePath(tableId)); TableSchemaInfo newSchemaInfo = new TableSchemaInfo(newSchema, table.getTableInfo().getSchema()); - tableInfoMap.put(tableId, newSchemaInfo); + schemaMaps.put(tableId, newSchemaInfo); + } + } else if (event instanceof AddColumnEvent) { + TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId()); + if (schemaInfo == null) { + throw new IllegalStateException( + "Cannot apply AddColumnEvent for table " + + event.tableId() + + ": table schema not found. Ensure CreateTableEvent is processed before AddColumnEvent."); + } + Schema schema = schemaInfo.upstreamCdcSchema; + if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) { + Table table = connection.getTable(getTablePath(tableId)); + TableSchemaInfo newSchemaInfo = + new TableSchemaInfo( + SchemaUtils.applySchemaChangeEvent(schema, event), + table.getTableInfo().getSchema()); + schemaMaps.put(tableId, newSchemaInfo); } } else { - // TODO: Logics for altering tables are not supported yet. - // This is anticipated to be supported in Fluss version 0.8.0. - throw new RuntimeException( - "Schema change type not supported. Only CreateTableEvent is allowed at the moment."); + throw new UnsupportedOperationException( + String.format( + "Schema change type %s not supported. Only CreateTableEvent and AddColumnEvent is allowed at the moment.", + event.getClass())); } } private FlussRowWithOp applyDataChangeEvent(DataChangeEvent record) { OperationType op = record.op(); - TableSchemaInfo tableSchemaInfo = tableInfoMap.get(record.tableId()); + TableSchemaInfo tableSchemaInfo = schemaMaps.get(record.tableId()); Preconditions.checkNotNull( tableSchemaInfo, "Table schema not found for table " + record.tableId()); - int flussFieldCount = - tableSchemaInfo.downStreamFlusstreamSchema.getRowType().getFieldCount(); + int flussFieldCount = tableSchemaInfo.downstreamFlussSchema.getRowType().getFieldCount(); boolean hasPrimaryKey = !tableSchemaInfo.upstreamCdcSchema.primaryKeys().isEmpty(); switch (op) { case INSERT: @@ -130,17 +149,17 @@ private TablePath getTablePath(TableId tableId) { private static class TableSchemaInfo { org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema; - org.apache.fluss.metadata.Schema downStreamFlusstreamSchema; + org.apache.fluss.metadata.Schema downstreamFlussSchema; Map indexMapping; private TableSchemaInfo( org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema, - org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) { + org.apache.fluss.metadata.Schema downstreamFlussSchema) { this.upstreamCdcSchema = upstreamCdcSchema; - this.downStreamFlusstreamSchema = downStreamFlusstreamSchema; + this.downstreamFlussSchema = downstreamFlussSchema; this.indexMapping = sanityCheckAndGenerateIndexMapping( - toFlussSchema(upstreamCdcSchema), downStreamFlusstreamSchema); + toFlussSchema(upstreamCdcSchema), downstreamFlussSchema); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java index 4e28623e210..5a0749b378a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java @@ -17,12 +17,14 @@ package org.apache.flink.cdc.connectors.fluss.sink; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.table.api.ValidationException; @@ -31,12 +33,14 @@ import org.apache.fluss.client.admin.Admin; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -47,6 +51,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable; +import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType; /** {@link MetadataApplier} for fluss. */ public class FlussMetaDataApplier implements MetadataApplier { @@ -58,9 +63,6 @@ public class FlussMetaDataApplier implements MetadataApplier { private Set enabledEventTypes = new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE)); - private transient Connection connection; - private transient Admin admin; - public FlussMetaDataApplier( Configuration flussClientConfig, Map tableProperties, @@ -92,22 +94,25 @@ public Set getSupportedSchemaEvolutionTypes() { @Override public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent); - Admin admin = getAdmin(); if (schemaChangeEvent instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent; - applyCreateTable(admin, createTableEvent); + applyCreateTable(createTableEvent); } else if (schemaChangeEvent instanceof DropTableEvent) { DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent; - applyDropTable(admin, dropTableEvent); + applyDropTable(dropTableEvent); + } else if (schemaChangeEvent instanceof AddColumnEvent) { + AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent; + applyAddColumnTable(addColumnEvent); } else { throw new IllegalArgumentException( - "fluss metadata applier only support CreateTableEvent now but receives " + "fluss metadata applier only supports CreateTableEvent and AddColumnEvent now but receives " + schemaChangeEvent); } } - private void applyCreateTable(Admin admin, CreateTableEvent event) { - try { + private void applyCreateTable(CreateTableEvent event) { + try (Connection connection = ConnectionFactory.createConnection(flussClientConfig); + Admin admin = connection.getAdmin()) { TableId tableId = event.tableId(); TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName()); String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName(); @@ -129,8 +134,9 @@ private void applyCreateTable(Admin admin, CreateTableEvent event) { } } - private void applyDropTable(Admin admin, DropTableEvent event) { - try { + private void applyDropTable(DropTableEvent event) { + try (Connection connection = ConnectionFactory.createConnection(flussClientConfig); + Admin admin = connection.getAdmin()) { TableId tableId = event.tableId(); TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName()); admin.dropTable(tablePath, true).get(); @@ -140,21 +146,36 @@ private void applyDropTable(Admin admin, DropTableEvent event) { } } - private Admin getAdmin() { - if (connection == null) { - connection = ConnectionFactory.createConnection(flussClientConfig); - admin = connection.getAdmin(); - } - return admin; - } - - @Override - public void close() throws Exception { - if (admin != null) { - admin.close(); - } - if (connection != null) { - connection.close(); + private void applyAddColumnTable(AddColumnEvent event) { + List tableChanges = new ArrayList<>(); + event.getAddedColumns() + .forEach( + columnWithPosition -> { + if (columnWithPosition.getPosition() + != AddColumnEvent.ColumnPosition.LAST) { + throw new IllegalArgumentException( + "Fluss metadata applier only supports LAST position for adding columns now but receives " + + columnWithPosition.getPosition() + + ". Consider using 'schema.change.behavior' configuration with 'LENIENT' mode to handle schema changes more flexibly."); + } + + Column column = columnWithPosition.getAddColumn(); + tableChanges.add( + TableChange.addColumn( + column.getName(), + toFlussType(column.getType()), + column.getComment(), + TableChange.ColumnPosition.last())); + }); + + try (Connection connection = ConnectionFactory.createConnection(flussClientConfig); + Admin admin = connection.getAdmin()) { + TableId tableId = event.tableId(); + TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName()); + admin.alterTable(tablePath, tableChanges, true).get(); + } catch (Exception e) { + LOG.error("Failed to apply schema change {}", event, e); + throw new RuntimeException(e); } } @@ -191,7 +212,7 @@ private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTa List currentPartitionKeys = currentTableInfo.getPartitionKeys(); if (!inferredPartitionKeys.equals(currentPartitionKeys)) { throw new ValidationException( - "The table schema inferred by Flink CDC is not matched with current Fluss table schema. " + "The table schema inffered by Flink CDC is not matched with current Fluss table schema. " + "\n New Fluss table's partition keys : " + inferredPartitionKeys + "\n Current Fluss's partition keys: " diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java similarity index 100% rename from flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java rename to flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java index 25a9f53b676..6858c3e6580 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java @@ -41,7 +41,6 @@ import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.util.CollectionUtil; -import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; @@ -94,28 +93,25 @@ public static org.apache.fluss.metadata.Schema toFlussSchema( schemBuilder.primaryKey(cdcSchema.primaryKeys()); } - Schema schema = - schemBuilder - .fromColumns( - cdcSchema.getColumns().stream() - .map( - column -> - new Schema.Column( - column.getName(), - toFlussType(column.getType()), - column.getComment())) - .collect(Collectors.toList())) - .build(); - return schema; + // use schemBuilder.column rather than schemBuilder.fromColumns to reassign nested row id. + cdcSchema + .getColumns() + .forEach( + column -> + schemBuilder + .column( + column.getName(), + column.getType().accept(TO_FLUSS_TYPE_INSTANCE)) + .withComment(column.getComment())); + return schemBuilder.build(); } - @VisibleForTesting - private static org.apache.fluss.types.DataType toFlussType( + public static org.apache.fluss.types.DataType toFlussType( org.apache.flink.cdc.common.types.DataType flinkDataType) { return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE); } - public static Boolean sameCdcColumnsIgnoreCommentAndDefaultValue( + public static Boolean sameSchemaIgnoreCommentAndDefaultValue( org.apache.flink.cdc.common.schema.Schema oldSchema, org.apache.flink.cdc.common.schema.Schema newSchema) { List upstreamColumns = oldSchema.getColumns(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java index bfc71a1adde..602b32680f6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java @@ -19,12 +19,13 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; -import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; @@ -64,6 +65,10 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR; +import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.EVOLVE; +import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.IGNORE; +import static org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior.LENIENT; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2; import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; @@ -233,7 +238,7 @@ void testSinglePrimaryTable() throws Exception { eventOfSplits.add(Collections.singletonList(updateEvent)); eventOfSplits.add(Collections.singletonList(deleteEvent)); - composeAndExecute(eventOfSplits); + composeAndExecuteInEvolveMode(eventOfSplits); checkResult(TABLE_1, Arrays.asList("+I[2, b2]", "+I[3, c]")); } @@ -279,16 +284,114 @@ void testSingleLogTable() throws Exception { split1.add(insertEvent3); eventOfSplits.add(split1); - composeAndExecute(eventOfSplits); + composeAndExecuteInEvolveMode(eventOfSplits); checkResult(TABLE_1, Arrays.asList("+I[1, a]", "+I[2, b]", "+I[3, c]")); } @Test - void testSingleLogTableWithSchemaChange() { - assertThatThrownBy(() -> composeAndExecute(ValuesDataSourceHelper.singleSplitSingleTable())) + void testSingleLogTableWithNotSupportedSchemaChange() throws Exception { + assertThatThrownBy( + () -> + composeAndExecuteInEvolveMode( + ValuesDataSourceHelper.singleSplitSingleTable())) .rootCause() .hasMessageContaining( - "fluss metadata applier only support CreateTableEvent now but receives AddColumnEvent"); + "fluss metadata applier only supports CreateTableEvent and AddColumnEvent now but receives RenameColumnEvent"); + } + + @Test + void testSingleLogTableInLenientMode() throws Exception { + // test add/drop/rename column in lenient mode + composeAndExecute( + ValuesDataSourceHelper.singleSplitSingleTable(), + new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR, LENIENT)); + checkResult( + TABLE_1, Arrays.asList("+I[2, null, null, null, x]", "+I[3, 3, null, null, null]")); + } + + @Test + void testSingleLogTableInIgnoreMode() throws Exception { + // test add/drop/rename column in ignore mode + composeAndExecute( + ValuesDataSourceHelper.singleSplitSingleTable(), + new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR, IGNORE)); + checkResult(TABLE_1, Arrays.asList("+I[2, null]", "+I[3, 3]")); + } + + @Test + void testSingleLogTableWithAddColumn() throws Exception { + List> eventOfSplits = new ArrayList<>(); + List split1 = new ArrayList<>(); + Schema schema = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema); + split1.add(createTableEvent); + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + TABLE_1, + new BinaryRecordDataGenerator( + RowType.of(DataTypes.STRING(), DataTypes.STRING())) + .generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("a") + })); + split1.add(insertEvent1); + + // add column at last. + split1.add( + new AddColumnEvent( + TABLE_1, + Collections.singletonList( + AddColumnEvent.last( + Column.physicalColumn("newColumn", DataTypes.STRING()))))); + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + TABLE_1, + new BinaryRecordDataGenerator( + RowType.of( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING())) + .generate( + new Object[] { + BinaryStringData.fromString("2"), + BinaryStringData.fromString("b"), + BinaryStringData.fromString("bb") + })); + split1.add(insertEvent2); + split1.add( + new AddColumnEvent( + TABLE_1, + Collections.singletonList( + AddColumnEvent.last( + Column.physicalColumn("newColumn2", DataTypes.STRING()))))); + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + TABLE_1, + new BinaryRecordDataGenerator( + RowType.of( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING())) + .generate( + new Object[] { + BinaryStringData.fromString("3"), + BinaryStringData.fromString("c"), + BinaryStringData.fromString("cc"), + BinaryStringData.fromString("ccc") + })); + split1.add(insertEvent3); + eventOfSplits.add(split1); + + composeAndExecuteInEvolveMode(eventOfSplits); + checkResult( + TABLE_1, + Arrays.asList("+I[1, a, null, null]", "+I[2, b, bb, null]", "+I[3, c, cc, ccc]")); } @Test @@ -298,7 +401,8 @@ void testLackUsernameAndPassword() { composeAndExecute( ValuesDataSourceHelper.singleSplitSingleTable(), Collections.singletonMap( - BOOTSTRAP_SERVERS.key(), getBootstrapServers()))) + BOOTSTRAP_SERVERS.key(), getBootstrapServers()), + new Configuration())) .rootCause() .hasMessageContaining( "The connection has not completed authentication yet. This may be caused by a missing or incorrect configuration of 'client.security.protocol' on the client side."); @@ -318,7 +422,8 @@ void testWrongTableOptions() { () -> composeAndExecute( ValuesDataSourceHelper.singleSplitSingleTable(), - sinkOption)) + sinkOption, + new Configuration())) .rootCause() .hasMessageContaining("'table.non-key' is not a recognized Fluss table property"); } @@ -398,7 +503,7 @@ void testMultiTables() throws Exception { split1.add(insertTabl2Event3); eventOfSplits.add(split1); - composeAndExecute(eventOfSplits); + composeAndExecuteInEvolveMode(eventOfSplits); checkResult(TABLE_1, Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[3, 3]")); checkResult(TABLE_2, Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[3, 3]")); } @@ -438,7 +543,7 @@ void testInsertExistTableWithMoreColumns() throws Exception { new Object[] { BinaryStringData.fromString("1"), BinaryStringData.fromString("a"), - BinaryStringData.fromString("1") + BinaryStringData.fromString("aa") })); split1.add(insertEvent1); DataChangeEvent insertEvent2 = @@ -447,14 +552,39 @@ void testInsertExistTableWithMoreColumns() throws Exception { generator.generate( new Object[] { BinaryStringData.fromString("2"), - BinaryStringData.fromString("2"), - BinaryStringData.fromString("2") + BinaryStringData.fromString("b"), + BinaryStringData.fromString("bb") })); split1.add(insertEvent2); eventOfSplits.add(split1); - composeAndExecute(eventOfSplits); - checkResult(TABLE_1, Arrays.asList("+I[a, 1]", "+I[2, 2]")); + // add column at last + split1.add( + new AddColumnEvent( + TABLE_1, + Collections.singletonList( + AddColumnEvent.last( + Column.physicalColumn("newColumn", DataTypes.STRING()))))); + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + TABLE_1, + new BinaryRecordDataGenerator( + RowType.of( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING())) + .generate( + new Object[] { + BinaryStringData.fromString("3"), + BinaryStringData.fromString("c"), + BinaryStringData.fromString("cc"), + BinaryStringData.fromString("ccc") + })); + split1.add(insertEvent3); + + composeAndExecuteInEvolveMode(eventOfSplits); + checkResult(TABLE_1, Arrays.asList("+I[a, 1, null]", "+I[b, 2, null]", "+I[c, 3, ccc]")); } @Test @@ -499,27 +629,64 @@ void testInsertExistTableWithLessColumns() throws Exception { generator.generate( new Object[] { BinaryStringData.fromString("2"), - BinaryStringData.fromString("2") + BinaryStringData.fromString("b") })); split1.add(insertEvent2); eventOfSplits.add(split1); - composeAndExecute(eventOfSplits); - checkResult(TABLE_1, Arrays.asList("+I[1, a, null]", "+I[2, 2, null]")); + // add column at last + split1.add( + new AddColumnEvent( + TABLE_1, + Collections.singletonList( + AddColumnEvent.last( + Column.physicalColumn("newColumn", DataTypes.STRING()))))); + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + TABLE_1, + new BinaryRecordDataGenerator( + RowType.of( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING())) + .generate( + new Object[] { + BinaryStringData.fromString("3"), + BinaryStringData.fromString("c"), + BinaryStringData.fromString("cc") + })); + split1.add(insertEvent3); + + composeAndExecuteInEvolveMode(eventOfSplits); + checkResult( + TABLE_1, + Arrays.asList( + "+I[1, a, null, null]", "+I[2, b, null, null]", "+I[3, c, null, cc]")); } - private void composeAndExecute(List> customSourceEvents) throws Exception { + private void composeAndExecuteInEvolveMode(List> customSourceEvents) + throws Exception { + composeAndExecute( + customSourceEvents, + new Configuration().set(PIPELINE_SCHEMA_CHANGE_BEHAVIOR, EVOLVE)); + } + + private void composeAndExecute( + List> customSourceEvents, Configuration pipelineConfig) throws Exception { Map sinkOption = new HashMap<>(); sinkOption.put(BOOTSTRAP_SERVERS.key(), getBootstrapServers()); sinkOption.put("properties.client.security.protocol", "sasl"); sinkOption.put("properties.client.security.sasl.mechanism", "PLAIN"); sinkOption.put("properties.client.security.sasl.username", "guest"); sinkOption.put("properties.client.security.sasl.password", "password2"); - composeAndExecute(customSourceEvents, sinkOption); + composeAndExecute(customSourceEvents, sinkOption, pipelineConfig); } private void composeAndExecute( - List> customSourceEvents, Map sinkOption) throws Exception { + List> customSourceEvents, + Map sinkOption, + Configuration pipelineConfig) + throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); // Setup value source @@ -535,10 +702,8 @@ private void composeAndExecute( SinkDef sinkDef = new SinkDef("fluss", "Fluss Sink", Configuration.fromMap(sinkOption)); // Setup pipeline - Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 4); - pipelineConfig.set( - PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.addAll(pipelineConfig); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java index 1af59aa9792..663cb08305b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java @@ -23,12 +23,15 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.BooleanType; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DateType; import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.FloatType; @@ -97,7 +100,7 @@ static void tearDown() throws Exception { } @Test - void testMixedSchemaAndDataChanges() throws Exception { + void testSchemaAndDataChangesInMultiTables() throws Exception { // 1. create table1, and insert/delete/update data TableId table1 = TableId.parse("test.tbl1"); Schema schema1 = @@ -223,6 +226,77 @@ record = Objects.requireNonNull(serializer.serialize(deleteEvent2))); } + @Test + void testAddColumn() throws Exception { + // 1. create table1, and insert/delete/update data + TableId table1 = TableId.parse("test.tbl1"); + Schema schemaBefore = + Schema.newBuilder() + .physicalColumn("col1", new IntType()) + .physicalColumn("col2", new BooleanType()) + .physicalColumn("col3", new TimestampType()) + .primaryKey("col1") + .build(); + AddColumnEvent addColumnEvent = + new AddColumnEvent( + table1, + Collections.singletonList( + AddColumnEvent.last( + Column.physicalColumn("newColumn", DataTypes.STRING())))); + Schema schemaAfter = + Schema.newBuilder() + .physicalColumn("col1", new IntType()) + .physicalColumn("col2", new BooleanType()) + .physicalColumn("col3", new TimestampType()) + .physicalColumn("newColumn", new VarCharType()) + .primaryKey("col1") + .build(); + CreateTableEvent createTableEvent1 = new CreateTableEvent(table1, schemaBefore); + flussMetaDataApplier.applySchemaChange(createTableEvent1); + verifySchemaChangeEvent(table1, serializer.serialize(createTableEvent1)); + + BinaryRecordDataGenerator generator1 = + new BinaryRecordDataGenerator( + schemaBefore.getColumnDataTypes().toArray(new DataType[0])); + + RecordData record = + generator1.generate( + new Object[] { + 1, + true, + TimestampData.fromMillis( + Timestamp.valueOf("2023-11-27 18:00:00").getTime()) + }); + DataChangeEvent insertEvent1 = DataChangeEvent.insertEvent(table1, record); + + verifyDataChangeEvent( + table1, + new FlussRowWithOp(CdcAsFlussRow.replace(record), FlussOperationType.UPSERT), + serializer.serialize(insertEvent1)); + + // 2. add column at last. + flussMetaDataApplier.applySchemaChange(addColumnEvent); + verifySchemaChangeEvent(table1, serializer.serialize(addColumnEvent)); + BinaryRecordDataGenerator generator2 = + new BinaryRecordDataGenerator( + schemaAfter.getColumnDataTypes().toArray(new DataType[0])); + RecordData newRecord = + generator2.generate( + new Object[] { + 3, + false, + TimestampData.fromMillis( + Timestamp.valueOf("2023-11-27 20:00:00").getTime()), + new BinaryStringData("insert new column") + }); + DataChangeEvent updateEvent1 = DataChangeEvent.updateEvent(table1, record, newRecord); + + verifyDataChangeEvent( + table1, + new FlussRowWithOp(CdcAsFlussRow.replace(newRecord), FlussOperationType.UPSERT), + serializer.serialize(updateEvent1)); + } + private void verifySchemaChangeEvent(TableId tableId, FlussEvent flussEvent) throws Exception { verifySerializeResult( new FlussEvent( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java index c95aa786818..2ff64765cc4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java @@ -17,9 +17,11 @@ package org.apache.flink.cdc.connectors.fluss.sink; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.IntType; @@ -48,6 +50,7 @@ import java.util.concurrent.ExecutionException; import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR; +import static org.apache.fluss.types.DataTypeChecks.equalsWithFieldId; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -105,6 +108,9 @@ void testCreateTableAllTypes(boolean primaryKeyTable) throws Exception { "time_col", "timestamp_col", "timestamp_ltz_col", + "array_col", + "map_col", + "row_col" }; org.apache.flink.cdc.common.types.DataType[] cdcDataTypes = @@ -126,7 +132,12 @@ void testCreateTableAllTypes(boolean primaryKeyTable) throws Exception { DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIMESTAMP(3), - DataTypes.TIMESTAMP_LTZ(6) + DataTypes.TIMESTAMP_LTZ(6), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("age", DataTypes.INT())) }; org.apache.fluss.types.DataType[] flussDataTypes = @@ -150,7 +161,16 @@ void testCreateTableAllTypes(boolean primaryKeyTable) throws Exception { org.apache.fluss.types.DataTypes.DATE(), org.apache.fluss.types.DataTypes.TIME(), org.apache.fluss.types.DataTypes.TIMESTAMP(3), - org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6) + org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6), + org.apache.fluss.types.DataTypes.ARRAY(org.apache.fluss.types.DataTypes.INT()), + org.apache.fluss.types.DataTypes.MAP( + org.apache.fluss.types.DataTypes.STRING(), + org.apache.fluss.types.DataTypes.INT()), + org.apache.fluss.types.DataTypes.ROW( + org.apache.fluss.types.DataTypes.FIELD( + "name", org.apache.fluss.types.DataTypes.STRING()), + org.apache.fluss.types.DataTypes.FIELD( + "age", org.apache.fluss.types.DataTypes.INT())) }; try (FlussMetaDataApplier applier = @@ -183,6 +203,21 @@ void testCreateTableAllTypes(boolean primaryKeyTable) throws Exception { if (primaryKeyTable) { assertThat(tableInfo.getPrimaryKeys()).containsExactly("int_col"); } + + // check field of nested row. + assertThat( + equalsWithFieldId( + flussRowType.getTypeAt(flussRowType.getFieldCount() - 1), + org.apache.fluss.types.DataTypes.ROW( + org.apache.fluss.types.DataTypes.FIELD( + "name", + org.apache.fluss.types.DataTypes.STRING(), + flussRowType.getFieldCount()), + org.apache.fluss.types.DataTypes.FIELD( + "age", + org.apache.fluss.types.DataTypes.INT(), + flussRowType.getFieldCount() + 1)))) + .isTrue(); } } @@ -457,6 +492,58 @@ void testBucketKeyNotSetOfPrimaryKeys(boolean partitionTables) throws Exception } } + @Test + void testAddColumnAtLast() throws Exception { + TablePath tablePath = new TablePath("fluss", "add_column_table"); + TableId tableId = TableId.tableId("default_namespace", "fluss", "add_column_table"); + admin.createTable( + tablePath, + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column( + "id", + org.apache.fluss.types.DataTypes.INT()) + .column( + "name", + org.apache.fluss.types.DataTypes.INT()) + .build()) + .build(), + true) + .get(); + + Column oldColumn = Column.physicalColumn("name", DataTypes.STRING()); + Column newColumn = Column.physicalColumn("newColumn", DataTypes.STRING()); + + try (FlussMetaDataApplier applier = + new FlussMetaDataApplier( + FLUSS_CLUSTER_EXTENSION.getClientConfig(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap())) { + assertThatThrownBy( + () -> + applier.applySchemaChange( + new AddColumnEvent( + tableId, + Collections.singletonList( + AddColumnEvent.last(oldColumn))))) + .rootCause() + .hasMessageContaining("Column name already exists."); + applier.applySchemaChange( + new AddColumnEvent( + tableId, Collections.singletonList(AddColumnEvent.last(newColumn)))); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getSchema()) + .isEqualTo( + org.apache.fluss.metadata.Schema.newBuilder() + .column("id", org.apache.fluss.types.DataTypes.INT()) + .column("name", org.apache.fluss.types.DataTypes.INT()) + .column("newColumn", org.apache.fluss.types.DataTypes.STRING()) + .build()); + } + } + @Test void testRecreateTableWithDifferentSchema() throws Exception { TableId tableId = TableId.tableId("default_namespace", DATABASE_NAME, "table1"); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java index 1cd80c6a6b5..2e408e40ae8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java @@ -17,9 +17,11 @@ package org.apache.flink.cdc.connectors.fluss.sink.v2; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.data.GenericMapData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -31,6 +33,7 @@ import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.connectors.fluss.sink.FlussEventSerializationSchema; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -147,7 +150,10 @@ void testAllSinkType(boolean primaryKeyTable) throws Exception { + " date_type DATE,\n" + " time_type TIME,\n" + " timestamp_type TIMESTAMP,\n" - + " timestamp_ltz_type TIMESTAMP_LTZ(8)\n" + + " timestamp_ltz_type TIMESTAMP_LTZ(8),\n" + + " array_type ARRAY,\n" + + " map_type MAP,\n" + + " row_type ROW< f0 INT, f1 STRING >\n" + (primaryKeyTable ? " ,PRIMARY KEY (int_type) NOT ENFORCED \n" : "") @@ -171,7 +177,10 @@ void testAllSinkType(boolean primaryKeyTable) throws Exception { "date_type", "time_type", "timestamp_type", - "timestamp_ltz_type" + "timestamp_ltz_type", + "array_type", + "map_type", + "row_type" }; String[] pkFieldNames = primaryKeyTable ? new String[] {"int_type"} : new String[0]; @@ -191,9 +200,20 @@ void testAllSinkType(boolean primaryKeyTable) throws Exception { DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIMESTAMP(), - DataTypes.TIMESTAMP_LTZ(8) + DataTypes.TIMESTAMP_LTZ(8), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.STRING())) }; + // Currently, AbstractBinaryWriter.writeRecord only support BinaryRecordData but not + // GenericRecordData + RecordData nestedRowData = + new BinaryRecordDataGenerator(new DataType[] {DataTypes.INT(), DataTypes.STRING()}) + .generate(new Object[] {1, BinaryStringData.fromString("hello")}); + Object[][] insertedValues = new Object[][] { new Object[] { @@ -216,22 +236,26 @@ void testAllSinkType(boolean primaryKeyTable) throws Exception { LocalZonedTimestampData.fromInstant( LocalDateTime.of(2023, 11, 11, 11, 11, 11, 11) .atZone(ZoneId.of("GMT+05:00")) - .toInstant()) + .toInstant()), + new GenericArrayData(new Object[] {1, 2, 3}), + new GenericMapData( + Collections.singletonMap(BinaryStringData.fromString("key"), 123)), + nestedRowData }, new Object[] { null, null, null, null, null, null, null, 0, null, null, null, null, null, - null, null + null, null, null, null, null } }; // default timezone is asian/shanghai List expectedRows = Arrays.asList( String.format( - "+I[a, test character, test text, false, 8119.21, 1, 32767, 32768, 652482, 20.2007, 8.58965, 2023-11-12, 08:30:15, %s, 2023-11-11T06:11:11.000000011Z]", + "+I[a, test character, test text, false, 8119.21, 1, 32767, 32768, 652482, 20.2007, 8.58965, 2023-11-12, 08:30:15, %s, 2023-11-11T06:11:11.000000011Z, [1, 2, 3], {key=123}, +I[1, hello]]", primaryKeyTable ? "2023-11-11T11:11:11.000000011" : "2023-11-11T11:11:11"), - "+I[null, null, null, null, null, null, null, 0, null, null, null, null, null, null, null]"); + "+I[null, null, null, null, null, null, null, 0, null, null, null, null, null, null, null, null, null, null]"); testInsertSingleTable( tableId, @@ -576,8 +600,7 @@ private void submitJob(List events) throws Exception { StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); - DataStreamSource source = - environment.fromData(events, TypeInformation.of(Event.class)); + DataStreamSource source = environment.fromData(events, new EventTypeInfo()); FlussEventSerializationSchema flussRecordSerializer = new FlussEventSerializationSchema(); FlussSink flussSink = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java index eddaeb6ff59..d4e1e9a33bc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java @@ -294,7 +294,7 @@ void testSameCdcColumnsWithSameColumns() { .physicalColumn("name", DataTypes.STRING()) .build(); - assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, schema2)) + assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, schema2)) .isTrue(); } @@ -312,7 +312,7 @@ void testSameCdcColumnsWithDifferentColumnNames() { .physicalColumn("age", DataTypes.INT()) .build(); - assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, schema2)) + assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, schema2)) .isFalse(); } @@ -330,7 +330,7 @@ void testSameCdcColumnsWithDifferentTypes() { .physicalColumn("name", DataTypes.STRING()) .build(); - assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, schema2)) + assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, schema2)) .isFalse(); } @@ -344,7 +344,7 @@ void testSameCdcColumnsWithDifferentColumnCount() { Schema schema2 = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build(); - assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, schema2)) + assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, schema2)) .isFalse(); } @@ -363,7 +363,7 @@ void testSameCdcColumnsIgnoresComment() { .build(); // Should be true because comments are ignored - assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, schema2)) + assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, schema2)) .isTrue(); } @@ -382,7 +382,7 @@ void testSameCdcColumnsIgnoresDefaultValue() { .physicalColumn("name", DataTypes.STRING()) .build(); - assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, schema2)) + assertThat(FlussConversions.sameSchemaIgnoreCommentAndDefaultValue(schema1, schema2)) .isTrue(); } } From 83b6af34c32a8976933de4bcf8753ae54aad994e Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Fri, 6 Mar 2026 16:40:05 +0800 Subject: [PATCH 2/2] fix syntax --- .../connectors/fluss/sink/FlussEventSerializationSchema.java | 2 +- .../flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java index 6d457538a2f..c8ac822c2b3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java @@ -113,7 +113,7 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) { } else { throw new UnsupportedOperationException( String.format( - "Schema change type %s not supported. Only CreateTableEvent and AddColumnEvent is allowed at the moment.", + "Schema change type %s not supported. Only CreateTableEvent and AddColumnEvent are allowed at the moment.", event.getClass())); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java index 5a0749b378a..e2cda4bd0ca 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java @@ -212,7 +212,7 @@ private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTa List currentPartitionKeys = currentTableInfo.getPartitionKeys(); if (!inferredPartitionKeys.equals(currentPartitionKeys)) { throw new ValidationException( - "The table schema inffered by Flink CDC is not matched with current Fluss table schema. " + "The table schema inferred by Flink CDC is not matched with current Fluss table schema. " + "\n New Fluss table's partition keys : " + inferredPartitionKeys + "\n Current Fluss's partition keys: "