diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index 62a4a4ced45..f492ca4f1a5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -66,7 +66,7 @@ public EventSourceProvider getEventSourceProvider() { .getBoolean( RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(), false); - + boolean isTableIdCaseInsensitive = MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig); MySqlEventDeserializer deserializer = new MySqlEventDeserializer( DebeziumChangelogMode.ALL, @@ -74,7 +74,7 @@ public EventSourceProvider getEventSourceProvider() { readableMetadataList, includeComments, sourceConfig.isTreatTinyInt1AsBoolean(), - MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig)); + isTableIdCaseInsensitive); MySqlSource source = new MySqlSource<>( @@ -82,7 +82,10 @@ public EventSourceProvider getEventSourceProvider() { deserializer, (sourceReaderMetrics, sourceConfig) -> new MySqlPipelineRecordEmitter( - deserializer, sourceReaderMetrics, sourceConfig)); + deserializer, + sourceReaderMetrics, + sourceConfig, + isTableIdCaseInsensitive)); return FlinkSourceProvider.of(source); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index eb9e658cf6f..41befa1f3d6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -55,9 +55,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId; @@ -80,6 +82,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { // Used when startup mode is snapshot private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true; private boolean isBounded = false; + private final boolean isTableIdCaseInsensitive; private final DebeziumDeserializationSchema debeziumDeserializationSchema; @@ -88,7 +91,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { public MySqlPipelineRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, - MySqlSourceConfig sourceConfig) { + MySqlSourceConfig sourceConfig, + boolean isTableIdCaseInsensitive) { super( debeziumDeserializationSchema, sourceReaderMetrics, @@ -102,6 +106,7 @@ public MySqlPipelineRecordEmitter( ((DebeziumEventDeserializationSchema) debeziumDeserializationSchema) .getCreateTableEventCache(); this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions()); + this.isTableIdCaseInsensitive = isTableIdCaseInsensitive; } @Override @@ -261,7 +266,10 @@ private Schema buildSchemaFromTable(Table table) { for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); - String colName = column.name(); + String colName = + this.isTableIdCaseInsensitive + ? column.name().toLowerCase(Locale.ROOT) + : column.name(); DataType dataType = MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean()); if (!column.isOptional()) { @@ -277,6 +285,12 @@ private Schema buildSchemaFromTable(Table table) { List primaryKey = table.primaryKeyColumnNames(); if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) { + if (this.isTableIdCaseInsensitive) { + primaryKey = + primaryKey.stream() + .map(key -> key.toLowerCase(Locale.ROOT)) + .collect(Collectors.toList()); + } tableBuilder.primaryKey(primaryKey); } return tableBuilder.build(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 74f3ef52a3c..75fb6590d34 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -135,7 +135,8 @@ void testExcludeTable() { Arrays.asList( inventoryDatabase.getDatabaseName() + ".customers", inventoryDatabase.getDatabaseName() + ".multi_max_table", - inventoryDatabase.getDatabaseName() + ".products")); + inventoryDatabase.getDatabaseName() + ".products", + inventoryDatabase.getDatabaseName() + ".uppercase_products")); } @Test diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 2c45c14f4bd..8934f03de33 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -540,8 +540,8 @@ void testExcludeTables(boolean inBatch) throws Exception { .tableList(databaseName + ".*") .excludeTableList( String.format( - "%s.customers, %s.orders, %s.multi_max_table", - databaseName, databaseName, databaseName)) + "%s.customers, %s.orders, %s.multi_max_table, %s.uppercase_products", + databaseName, databaseName, databaseName, databaseName)) .startupOptions(StartupOptions.initial()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java index a1b9935aeac..ac126682b0d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java @@ -44,7 +44,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.lifecycle.Startables; import java.sql.Connection; @@ -96,8 +97,10 @@ public void before() { env.setRestartStrategy(RestartStrategies.noRestart()); } - @Test - public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"products", "uppercase_products"}) + public void testParseAlterStatementWhenTableNameAndColumnIsUpper(String tableName) + throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); MySqlSourceConfigFactory configFactory = @@ -107,7 +110,7 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except .username(TEST_USER) .password(TEST_PASSWORD) .databaseList(inventoryDatabase.getDatabaseName()) - .tableList(inventoryDatabase.getDatabaseName() + "\\.products") + .tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName) .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") @@ -124,17 +127,17 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except .executeAndCollect(); Thread.sleep(5_000); - TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products"); + TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName); List expected = new ArrayList<>(); expected.add(getProductsCreateTableEvent(tableId)); try (Connection connection = inventoryDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { - expected.addAll(executeAlterAndProvideExpected(tableId, statement)); + expected.addAll(executeAlterAndProvideExpected(tableId, statement, tableName)); statement.execute( String.format( - "ALTER TABLE `%s`.`PRODUCTS` ADD `cols1` VARCHAR(45);", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` ADD `COLS1` VARCHAR(45);", + inventoryDatabase.getDatabaseName(), tableName)); expected.add( new AddColumnEvent( tableId, @@ -147,6 +150,42 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except assertThat(actual).isEqualTo(expected); } + @ParameterizedTest + @ValueSource(strings = {"products", "uppercase_products"}) + public void testSnapshotModeWhenTableNameAndColumnIsUpper(String tableName) throws Exception { + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName) + .startupOptions(StartupOptions.snapshot()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName); + List expected = new ArrayList<>(); + expected.add(getProductsCreateTableEvent(tableId)); + List actual = fetchResults(events, expected.size()); + assertThat(actual).isEqualTo(expected); + } + private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { return new CreateTableEvent( tableId, @@ -172,13 +211,13 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { * ); * */ - private List executeAlterAndProvideExpected(TableId tableId, Statement statement) - throws SQLException { + private List executeAlterAndProvideExpected( + TableId tableId, Statement statement, String tableName) throws SQLException { List expected = new ArrayList<>(); statement.execute( String.format( - "ALTER TABLE `%s`.`products` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add( new AlterColumnTypeEvent( tableId, Collections.singletonMap("description", DataTypes.VARCHAR(255)))); @@ -187,8 +226,8 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st statement.execute( String.format( - "ALTER TABLE `%s`.`products` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add( new AlterColumnTypeEvent( tableId, Collections.singletonMap("desc", DataTypes.VARCHAR(400)))); @@ -196,8 +235,8 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st statement.execute( String.format( - "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add( new AddColumnEvent( tableId, @@ -209,8 +248,8 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st statement.execute( String.format( - "ALTER TABLE `%s`.`products` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add( new AddColumnEvent( tableId, @@ -230,8 +269,8 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st statement.execute( String.format( - "ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc2"))); expected.add( new AlterColumnTypeEvent( @@ -240,22 +279,22 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st // Only available in mysql 8.0 statement.execute( String.format( - "ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO `desc3`;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` RENAME COLUMN `desc1` TO `desc3`;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3"))); statement.execute( String.format( - "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add( new AlterColumnTypeEvent( tableId, Collections.singletonMap("desc3", DataTypes.VARCHAR(255)))); statement.execute( String.format( - "ALTER TABLE `%s`.`products` DROP COLUMN `desc3`;", - inventoryDatabase.getDatabaseName())); + "ALTER TABLE `%s`.`%s` DROP COLUMN `desc3`;", + inventoryDatabase.getDatabaseName(), tableName)); expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc3"))); // Should not catch SchemaChangeEvent of tables other than `products` diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql index 626107f5cf0..e9386693f2c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql @@ -34,7 +34,27 @@ VALUES (default,"scooter","Small 2-wheel scooter",3.14), (default,"hammer","14oz carpenter's hammer",0.875), (default,"hammer","16oz carpenter's hammer",1.0), (default,"rocks","box of assorted rocks",5.3), - (default,"jacket","water resistent black wind breaker",0.1), + (default,"jacket","water resistant black wind breaker",0.1), + (default,"spare tire","24 inch spare tire",22.2); + +-- Create a table where all fields are in uppercase. +CREATE TABLE uppercase_products ( + ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + NAME VARCHAR(255) NOT NULL DEFAULT 'flink', + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT(6) +); +ALTER TABLE uppercase_products AUTO_INCREMENT = 101; + +INSERT INTO uppercase_products +VALUES (default,"scooter","Small 2-wheel scooter",3.14), + (default,"car battery","12V car battery",8.1), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (default,"hammer","12oz carpenter's hammer",0.75), + (default,"hammer","14oz carpenter's hammer",0.875), + (default,"hammer","16oz carpenter's hammer",1.0), + (default,"rocks","box of assorted rocks",5.3), + (default,"jacket","water resistant black wind breaker",0.1), (default,"spare tire","24 inch spare tire",22.2); -- Create some customers ...