Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,26 @@ public EventSourceProvider getEventSourceProvider() {
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);

boolean isTableIdCaseInsensitive = MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig);
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
readableMetadataList,
includeComments,
sourceConfig.isTreatTinyInt1AsBoolean(),
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig));
isTableIdCaseInsensitive);

MySqlSource<Event> source =
new MySqlSource<>(
configFactory,
deserializer,
(sourceReaderMetrics, sourceConfig) ->
new MySqlPipelineRecordEmitter(
deserializer, sourceReaderMetrics, sourceConfig));
deserializer,
sourceReaderMetrics,
sourceConfig,
isTableIdCaseInsensitive));

return FlinkSourceProvider.of(source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
Expand All @@ -80,6 +82,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
// Used when startup mode is snapshot
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
private boolean isBounded = false;
private final boolean isTableIdCaseInsensitive;

private final DebeziumDeserializationSchema<Event> debeziumDeserializationSchema;

Expand All @@ -88,7 +91,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
MySqlSourceConfig sourceConfig) {
MySqlSourceConfig sourceConfig,
boolean isTableIdCaseInsensitive) {
super(
debeziumDeserializationSchema,
sourceReaderMetrics,
Expand All @@ -102,6 +106,7 @@ public MySqlPipelineRecordEmitter(
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
.getCreateTableEventCache();
this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
}

@Override
Expand Down Expand Up @@ -261,7 +266,10 @@ private Schema buildSchemaFromTable(Table table) {
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);

String colName = column.name();
String colName =
this.isTableIdCaseInsensitive
? column.name().toLowerCase(Locale.ROOT)
: column.name();
DataType dataType =
Comment on lines +269 to 273
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new case-normalization logic for column names/primary keys is gated by isTableIdCaseInsensitive, but there doesn't appear to be test coverage for the bounded snapshot (StartupOptions.snapshot()) path where schemas are fetched via SHOW CREATE TABLE/DESC. Consider adding an IT that runs with StartupOptions.snapshot() against a table with uppercase column names (and PK) to exercise this end-to-end.

Copilot uses AI. Check for mistakes.
MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean());
if (!column.isOptional()) {
Expand All @@ -277,6 +285,12 @@ private Schema buildSchemaFromTable(Table table) {

List<String> primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
if (this.isTableIdCaseInsensitive) {
primaryKey =
primaryKey.stream()
.map(key -> key.toLowerCase(Locale.ROOT))
.collect(Collectors.toList());
}
tableBuilder.primaryKey(primaryKey);
}
return tableBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ void testExcludeTable() {
Arrays.asList(
inventoryDatabase.getDatabaseName() + ".customers",
inventoryDatabase.getDatabaseName() + ".multi_max_table",
inventoryDatabase.getDatabaseName() + ".products"));
inventoryDatabase.getDatabaseName() + ".products",
inventoryDatabase.getDatabaseName() + ".uppercase_products"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,8 @@ void testExcludeTables(boolean inBatch) throws Exception {
.tableList(databaseName + ".*")
.excludeTableList(
String.format(
"%s.customers, %s.orders, %s.multi_max_table",
databaseName, databaseName, databaseName))
"%s.customers, %s.orders, %s.multi_max_table, %s.uppercase_products",
databaseName, databaseName, databaseName, databaseName))
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
Expand Down Expand Up @@ -96,8 +97,10 @@ public void before() {
env.setRestartStrategy(RestartStrategies.noRestart());
}

@Test
public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"products", "uppercase_products"})
public void testParseAlterStatementWhenTableNameAndColumnIsUpper(String tableName)
throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
Expand All @@ -107,7 +110,7 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(inventoryDatabase.getDatabaseName() + "\\.products")
.tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName)
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
Expand All @@ -124,17 +127,17 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
.executeAndCollect();
Thread.sleep(5_000);

TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName);
List<Event> expected = new ArrayList<>();
expected.add(getProductsCreateTableEvent(tableId));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
expected.addAll(executeAlterAndProvideExpected(tableId, statement));
expected.addAll(executeAlterAndProvideExpected(tableId, statement, tableName));

statement.execute(
String.format(
"ALTER TABLE `%s`.`PRODUCTS` ADD `cols1` VARCHAR(45);",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` ADD `COLS1` VARCHAR(45);",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AddColumnEvent(
tableId,
Expand All @@ -147,6 +150,42 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
assertThat(actual).isEqualTo(expected);
}

@ParameterizedTest
@ValueSource(strings = {"products", "uppercase_products"})
public void testSnapshotModeWhenTableNameAndColumnIsUpper(String tableName) throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName)
.startupOptions(StartupOptions.snapshot())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);

TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName);
List<Event> expected = new ArrayList<>();
expected.add(getProductsCreateTableEvent(tableId));
List<Event> actual = fetchResults(events, expected.size());
assertThat(actual).isEqualTo(expected);
}

private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
Expand All @@ -172,13 +211,13 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
* );
* </pre>
*/
private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement statement)
throws SQLException {
private List<Event> executeAlterAndProvideExpected(
TableId tableId, Statement statement, String tableName) throws SQLException {
List<Event> expected = new ArrayList<>();
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("description", DataTypes.VARCHAR(255))));
Expand All @@ -187,17 +226,17 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("desc", DataTypes.VARCHAR(400))));
expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc", "desc2")));

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AddColumnEvent(
tableId,
Expand All @@ -209,8 +248,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AddColumnEvent(
tableId,
Expand All @@ -230,8 +269,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc2")));
expected.add(
new AlterColumnTypeEvent(
Expand All @@ -240,22 +279,22 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
// Only available in mysql 8.0
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO `desc3`;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` RENAME COLUMN `desc1` TO `desc3`;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3")));

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("desc3", DataTypes.VARCHAR(255))));

statement.execute(
String.format(
"ALTER TABLE `%s`.`products` DROP COLUMN `desc3`;",
inventoryDatabase.getDatabaseName()));
"ALTER TABLE `%s`.`%s` DROP COLUMN `desc3`;",
inventoryDatabase.getDatabaseName(), tableName));
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc3")));

// Should not catch SchemaChangeEvent of tables other than `products`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,27 @@ VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistent black wind breaker",0.1),
(default,"jacket","water resistant black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);

-- Create a table where all fields are in uppercase.
CREATE TABLE uppercase_products (
ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
NAME VARCHAR(255) NOT NULL DEFAULT 'flink',
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT(6)
);
ALTER TABLE uppercase_products AUTO_INCREMENT = 101;

INSERT INTO uppercase_products
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"car battery","12V car battery",8.1),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(default,"hammer","12oz carpenter's hammer",0.75),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistant black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);

-- Create some customers ...
Expand Down