-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled #4246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2785bf2
aa9dfb0
ea64d7e
b548110
723f1a3
bf93c21
d2c10e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -24,6 +24,8 @@ | |||||||||||||||||
| import org.apache.flink.table.catalog.ObjectPath; | ||||||||||||||||||
|
|
||||||||||||||||||
| import io.debezium.config.CommonConnectorConfig; | ||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||||||
|
|
||||||||||||||||||
| import java.io.Serializable; | ||||||||||||||||||
| import java.time.Duration; | ||||||||||||||||||
|
|
@@ -34,6 +36,7 @@ | |||||||||||||||||
| import java.util.Map; | ||||||||||||||||||
| import java.util.Properties; | ||||||||||||||||||
| import java.util.UUID; | ||||||||||||||||||
| import java.util.stream.Collectors; | ||||||||||||||||||
|
|
||||||||||||||||||
| import static org.apache.flink.cdc.connectors.mysql.source.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; | ||||||||||||||||||
| import static org.apache.flink.util.Preconditions.checkNotNull; | ||||||||||||||||||
|
|
@@ -43,6 +46,7 @@ | |||||||||||||||||
| public class MySqlSourceConfigFactory implements Serializable { | ||||||||||||||||||
|
|
||||||||||||||||||
| private static final long serialVersionUID = 1L; | ||||||||||||||||||
| private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceConfigFactory.class); | ||||||||||||||||||
|
|
||||||||||||||||||
| private int port = 3306; // default 3306 port | ||||||||||||||||||
| private String hostname; | ||||||||||||||||||
|
|
@@ -68,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable { | |||||||||||||||||
| private boolean includeHeartbeatEvents = false; | ||||||||||||||||||
| private boolean includeTransactionMetadataEvents = false; | ||||||||||||||||||
| private boolean scanNewlyAddedTableEnabled = false; | ||||||||||||||||||
| private boolean scanBinlogNewlyAddedTableEnabled = false; | ||||||||||||||||||
| private boolean closeIdleReaders = false; | ||||||||||||||||||
| private Properties jdbcProperties; | ||||||||||||||||||
| private Duration heartbeatInterval = MySqlSourceOptions.HEARTBEAT_INTERVAL.defaultValue(); | ||||||||||||||||||
|
|
@@ -258,6 +263,17 @@ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdde | |||||||||||||||||
| return this; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Whether to capture newly added tables in binlog reading phase without snapshot. This option | ||||||||||||||||||
| * can only be used with stream-only startup modes. Cannot be enabled together with {@link | ||||||||||||||||||
| * #scanNewlyAddedTableEnabled(boolean)}. | ||||||||||||||||||
| */ | ||||||||||||||||||
| public MySqlSourceConfigFactory scanBinlogNewlyAddedTableEnabled( | ||||||||||||||||||
| boolean scanBinlogNewlyAddedTableEnabled) { | ||||||||||||||||||
| this.scanBinlogNewlyAddedTableEnabled = scanBinlogNewlyAddedTableEnabled; | ||||||||||||||||||
| return this; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** Custom properties that will overwrite the default JDBC connection URL. */ | ||||||||||||||||||
| public MySqlSourceConfigFactory jdbcProperties(Properties jdbcProperties) { | ||||||||||||||||||
| this.jdbcProperties = jdbcProperties; | ||||||||||||||||||
|
|
@@ -397,8 +413,26 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { | |||||||||||||||||
| if (databaseList != null) { | ||||||||||||||||||
| props.setProperty("database.include.list", String.join(",", databaseList)); | ||||||||||||||||||
| } | ||||||||||||||||||
| // Validate: Two modes are mutually exclusive | ||||||||||||||||||
| if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) { | ||||||||||||||||||
| throw new IllegalArgumentException( | ||||||||||||||||||
| "Cannot enable both 'scan.binlog.newly-added-table.enabled' and " | ||||||||||||||||||
| + "'scan.newly-added-table.enabled' as they may cause duplicate data"); | ||||||||||||||||||
| } | ||||||||||||||||||
|
||||||||||||||||||
| } | |
| } | |
| // Validate: binlog-only newly-added-table scan requires a stream-only startup mode | |
| if (scanBinlogNewlyAddedTableEnabled && !startupOptions.isStreamOnly()) { | |
| throw new IllegalArgumentException( | |
| "'scan.binlog.newly-added-table.enabled' can only be enabled when using a " | |
| + "stream-only startup mode (latest-offset, earliest-offset, specific-offset, or timestamp)."); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description states "Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern", and the comment here claims "BinlogSplitReader will handle the auto-capture", but there are no changes to BinlogSplitReader.java in this PR. The existing logic in BinlogSplitReader.hasEnterPureBinlogPhase() (line 318-323) only checks isScanNewlyAddedTableEnabled(), not isScanBinlogNewlyAddedTableEnabled(). This means the feature will not actually work as described. BinlogSplitReader needs to be enhanced to check isScanBinlogNewlyAddedTableEnabled() and handle newly added tables accordingly.