-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
What happened:
When running a Flink job with MySQL CDC source and JDBC Exactly-Once sink connector with schema evolution enabled, the job hangs indefinitely at certain checkpoints during DDL operations. The issue occurs specifically when DDL events (ALTER TABLE) are processed concurrently with XA transactions used by the exactly-once sink.
2026-03-24 11:43:03,144 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 7 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774345383143 for job 4dabfd84185b4f842ef1738f89d84465.
# Job hangs here indefinitely, no further checkpoint progress
Root Cause:
The hang is caused by a self-deadlock in JdbcExactlyOnceSinkWriter.reOpenOutputFormat() method:
When a DDL event occurs, reOpenOutputFormat() calls this.prepareCommit() first, which puts an XA transaction into PREPARED state
The PREPARED XA transaction holds MySQL Metadata Locks (MDL) on the table
Then reOpenOutputFormat() tries to execute ALTER TABLE on the same connection
The ALTER TABLE gets blocked by the MDL locks held by the prepared XA transaction on the same connection
This creates a self-deadlock situation where the connection waits for itself
The issue occurs in translation layer when SchemaOperator forwards DDL events to JdbcExactlyOnceSinkWriter
MySQL's XA transactions acquire MDL locks that conflict with DDL operations
The prepared XA transaction from previous checkpoint blocks the current DDL execution
SeaTunnel Version
All of them have this problem.
SeaTunnel Config
source {
MySQL-CDC {
server-id = 5652-5657
username = "mysqluser"
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
}
}
sink {
Jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
driver = "com.mysql.cj.jdbc.Driver"
user = "mysqluser"
password = "mysqlpw"
generate_sink_sql = true
database = "shop"
table = "mysql_cdc_e2e_sink_table_with_schema_change_exactly_once"
support_upsert_by_query_primary_key_exist = true
is_exactly_once = true
}
}
Running Command
-------Error Exception
**Error Logs:**
2026-03-23 21:06:13,458 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Connected to MySQL binlog at localhost:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=DESKTOP-HNU0D1U-bin.000008, currentBinlogPosition=6577, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=DESKTOP-HNU0D1U-bin.000008, restartBinlogPosition=6577, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2026-03-23 21:06:13,458 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Waiting for keepalive thread to start
2026-03-23 21:06:13,459 INFO io.debezium.util.Threads - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2026-03-23 21:06:13,499 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is:
INSERT INTO `shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` (`id`, `name`, `description`, `weight`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `description`=VALUES(`description`), `weight`=VALUES(`weight`)
2026-03-23 21:06:13,503 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is:
DELETE FROM `shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` WHERE `id` = ?
2026-03-23 21:06:13,562 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Keepalive thread is running
2026-03-23 21:06:13,650 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271173638 for job b1fbcd22a40d4a881708ad26b7a86a1b.
2026-03-23 21:06:13,710 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job b1fbcd22a40d4a881708ad26b7a86a1b (16073 bytes, checkpointDuration=68 ms, finalizationTime=4 ms).
2026-03-23 21:06:13,712 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 1 as completed for source Source: MySQL-CDC-Source.
2026-03-23 21:06:18,638 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271178637 for job b1fbcd22a40d4a881708ad26b7a86a1b.
2026-03-23 21:06:18,648 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job b1fbcd22a40d4a881708ad26b7a86a1b (17820 bytes, checkpointDuration=1 ms, finalizationTime=10 ms).
2026-03-23 21:06:18,648 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 2 as completed for source Source: MySQL-CDC-Source.
2026-03-23 21:06:18,670 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter - commit xid: [XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000007f93cd1a9d010000:74eef017, attempts=0)]
2026-03-23 21:06:18,670 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - commit 1 transactions
2026-03-23 21:06:18,670 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - committing 201:3361643061393462656564343466333639393934613065323361323737333635000000007f93cd1a9d010000:74eef017 transaction
2026-03-23 21:06:23,644 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271183643 for job b1fbcd22a40d4a881708ad26b7a86a1b.
2026-03-23 21:06:23,649 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job b1fbcd22a40d4a881708ad26b7a86a1b (17852 bytes, checkpointDuration=6 ms, finalizationTime=0 ms).
2026-03-23 21:06:23,650 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 3 as completed for source Source: MySQL-CDC-Source.
2026-03-23 21:06:23,650 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter - commit xid: [XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000008093cd1a9d010000:74eef017, attempts=0)]
2026-03-23 21:06:23,650 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - commit 1 transactions
2026-03-23 21:06:23,650 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - committing 201:3361643061393462656564343466333639393934613065323361323737333635000000008093cd1a9d010000:74eef017 transaction
2026-03-23 21:06:28,636 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271188636 for job b1fbcd22a40d4a881708ad26b7a86a1b.
2026-03-23 21:06:28,643 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 4 for job b1fbcd22a40d4a881708ad26b7a86a1b (17852 bytes, checkpointDuration=7 ms, finalizationTime=0 ms).
2026-03-23 21:06:28,644 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking checkpoint 4 as completed for source Source: MySQL-CDC-Source.
2026-03-23 21:06:28,645 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkAggregatedCommitter - commit xid: [XidInfo(xid=201:3361643061393462656564343466333639393934613065323361323737333635000000008193cd1a9d010000:74eef017, attempts=0)]
2026-03-23 21:06:28,645 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - commit 1 transactions
2026-03-23 21:06:28,645 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl - committing 201:3361643061393462656564343466333639393934613065323361323737333635000000008193cd1a9d010000:74eef017 transaction
2026-03-23 21:06:28,921 WARN org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver - Ignoring statement for non-captured table ALTER TABLE products_on_hand ADD COLUMN add_column5 varchar(64) not null default 'yy'
2026-03-23 21:06:30,779 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: MessageDelayedEvent(createdTime=1774271190779, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, delayTime=2372, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1774271188, file=DESKTOP-HNU0D1U-bin.000008, pos=9657, server_id=1}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=shop}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188407,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=9446,row=0},databaseName=shop,ddl=alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1,tableChanges=[Struct{type=ALTER,id="shop"."products",table=Struct{defaultCharsetName=utf8mb4,primaryKeyColumnNames=[id],columns=[Struct{name=id,jdbcType=4,typeName=INT,typeExpression=INT,position=1,optional=false,autoIncremented=true,generated=true}, Struct{name=name,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=255,position=2,optional=false,autoIncremented=false,generated=false}, Struct{name=description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=512,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=weight,jdbcType=6,typeName=FLOAT,typeExpression=FLOAT,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=add_column1,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=utf8mb4,length=64,position=5,optional=false,autoIncremented=false,generated=false}, Struct{name=add_column2,jdbcType=4,typeName=INT,typeExpression=INT,position=6,optional=false,autoIncremented=false,generated=false}]}}]}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)})
2026-03-23 21:06:30,779 INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2026-03-23 21:06:30,797 WARN io.debezium.connector.mysql.MySqlValueConverters - Column is missing a character set: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy
2026-03-23 21:06:30,797 WARN io.debezium.connector.mysql.MySqlValueConverters - Using UTF-8 charset by default for column without charset: add_column1 VARCHAR(64) NOT NULL DEFAULT VALUE yy
2026-03-23 21:06:30,801 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190801, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190798, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column1, dataType=STRING, columnLength=256, scale=null, nullable=false, defaultValue=yy, comment=null, sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, afterColumn=null), AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271190798, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column2, dataType=INT, columnLength=null, scale=null, nullable=false, defaultValue=1, comment=null, sourceType=INT, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)])
2026-03-23 21:06:30,919 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - Starting schema change processing for table: .shop.products, job: b1fbcd22a40d4a881708ad26b7a86a1b, event time: 1774271190801
2026-03-23 21:06:30,920 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - Broadcasting SchemaChangeEvent to all downstream sink subtasks for table: .shop.products
2026-03-23 21:06:30,921 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - SchemaChangeEvent broadcast sent for table: .shop.products
2026-03-23 21:06:30,921 INFO org.apache.seatunnel.translation.flink.schema.SchemaOperator - Synchronously processing schema change for table .shop.products (epoch 1774271190801). Business data buffered.
2026-03-23 21:06:30,921 INFO org.apache.seatunnel.translation.flink.schema.coordinator.LocalSchemaCoordinator - Requesting schema change for table .shop.products (epoch 1774271190801). Waiting for all 1 sink subtasks to apply after checkpoint completion.
2026-03-23 21:06:31,026 INFO org.apache.seatunnel.translation.flink.schema.BroadcastSchemaSinkOperator - Subtask 0 applying schema change immediately for table .shop.products (epoch 1774271190801, change: AlterTableColumnsEvent). This prevents deadlock by allowing checkpoint barriers to propagate.
2026-03-23 21:06:31,035 INFO org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter - FlinkSinkWriter applying SchemaChangeEvent for table: .shop.products
2026-03-23 21:06:31,035 INFO org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter - Start apply schema change for table shop.products sub-writer 0
2026-03-23 21:06:31,055 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect - Executing add column SQL: ALTER TABLE `shop`.`mysql_cdc_e2e_sink_table_with_schema_change_exactly_once` ADD COLUMN `add_column1` VARCHAR(64) NOT NULL DEFAULT 'yy'
2026-03-23 21:06:31,808 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: MessageDelayedEvent(createdTime=1774271191808, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, delayTime=3808, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1774271188, file=DESKTOP-HNU0D1U-bin.000008, pos=9736, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, key=Struct{id=110}, keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, value=Struct{before=Struct{id=110,name=scooter,description=Small 2-wheel scooter,weight=3.140000104904175,add_column1=yy,add_column2=1},after=Struct{id=110,name=dailai,description=Small 2-wheel scooter,weight=3.140000104904175,add_column1=yy,add_column2=1},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188000,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=9891,row=0,thread=29},op=u,ts_ms=1774271188444}, valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)})
2026-03-23 21:06:31,811 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191811, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column3 float not null default 1.1, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191811, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column3, dataType=FLOAT, columnLength=null, scale=null, nullable=false, defaultValue=1.1, comment=null, sourceType=FLOAT, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)])
2026-03-23 21:06:31,824 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191813, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(), sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271191813, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column4, dataType=TIMESTAMP, columnLength=null, scale=0, nullable=false, defaultValue=current_timestamp(), comment=null, sourceType=TIMESTAMP, sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=0, longColumnLength=null)), first=false, afterColumn=null)])
2026-03-23 21:06:32,839 WARN io.debezium.connector.mysql.MySqlValueConverters - Column is missing a character set: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff
2026-03-23 21:06:32,839 WARN io.debezium.connector.mysql.MySqlValueConverters - Using UTF-8 charset by default for column without charset: add_column6 VARCHAR(64) NOT NULL DEFAULT VALUE ff
2026-03-23 21:06:32,839 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: AlterTableColumnsEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271192839, tableIdentifier=.shop.products, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, statement=alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' after id, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()})), events=[AlterTableAddColumnEvent(super=AlterTableColumnEvent(super=AlterTableEvent(super=TableEvent(createdTime=1774271192839, tableIdentifier=.shop.products, jobId=null, statement=null, sourceDialectName=MySQL, changeAfter=CatalogTable{tableId=MySQL.shop.products, tableSchema=TableSchema(primaryKey=PrimaryKey(primaryKey=PRIMARY, columnNames=[id], enableAutoId=null), constraintKeys=[]), options={table-name=shop.products, connector=jdbc, url=jdbc:mysql://localhost:3306/shop}, partitionKeys=[], comment='', catalogName='MySQL', metadata=MetadataSchema()}))), column=PhysicalColumn(super=Column(name=add_column6, dataType=STRING, columnLength=256, scale=null, nullable=false, defaultValue=ff, comment=null, sourceType=VARCHAR(64), sinkType=null, options=null, isUnsigned=false, isZeroFill=false, bitLen=2048, longColumnLength=256)), first=false, afterColumn=id)])
2026-03-23 21:06:33,645 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 5 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1774271193644 for job b1fbcd22a40d4a881708ad26b7a86a1b.
2026-03-23 21:06:33,853 INFO org.apache.seatunnel.api.event.LoggingEventHandler - log event: MessageDelayedEvent(createdTime=1774271193853, jobId=b1fbcd22a40d4a881708ad26b7a86a1b, eventType=READER_MESSAGE_DELAYED, delayTime=5853, record=SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1774271188, file=DESKTOP-HNU0D1U-bin.000008, pos=13677, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.shop.products', kafkaPartition=null, key=Struct{id=115}, keySchema=Schema{mysql_binlog_source.shop.products.Key:STRUCT}, value=Struct{before=Struct{id=115,add_column6=ff,name=hammer,description=16oz carpenter's hammer,weight=1.0,add_column1=yy,add_column2=1,add_column3=1.100000023841858,add_column4=2026-03-23T13:06:28Z},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1774271188000,db=shop,table=products,server_id=1,file=DESKTOP-HNU0D1U-bin.000008,pos=13831,row=0,thread=29},op=d,ts_ms=1774271188491}, valueSchema=Schema{mysql_binlog_source.shop.products.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)})
# Job hangs here indefinitely, no further checkpoint progress
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct