Skip to content

Conversation

@hongyunyan
Copy link
Collaborator

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Aug 22, 2025
@ti-chi-bot
Copy link

ti-chi-bot bot commented Aug 22, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign hongyunyan for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Aug 22, 2025
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural enhancement to the TiCDC sink component by implementing a new TxnSink for MySQL and TiDB. The core purpose of this change is to shift from row-level DML application to transaction-level processing, which offers substantial benefits in terms of performance and data consistency. The new sink collects DML events, groups them into complete transactions, and then generates optimized SQL statements that are executed in batches. A sophisticated conflict detection mechanism ensures that transactions can be processed concurrently where possible, without violating causal dependencies. This change lays the groundwork for more robust and efficient data replication to transactional databases.

Highlights

  • New Transaction-Level Sink: This PR introduces a new TxnSink (Transaction Sink) specifically designed for MySQL and TiDB targets. Unlike row-by-row processing, this sink processes and applies DML events at the transaction level, aiming to improve performance and ensure transactional atomicity at the destination.
  • Optimized SQL Execution with Batching: A new DBExecutor component is added to handle the actual database execution. It intelligently batches SQL statements, combining multiple DMLs from a single transaction into a single BEGIN;...;COMMIT; block. This significantly reduces network round-trips and improves write efficiency.
  • Smart SQL Generation: The SQLGenerator is responsible for converting raw DML events into optimized SQL. For inserts and updates, it leverages INSERT ON DUPLICATE KEY UPDATE statements, which can handle both new rows and existing rows efficiently within a single SQL command.
  • Concurrency with Conflict Detection: A ConflictDetector is implemented to manage dependencies between transactions. This allows the TxnSink to process non-conflicting transactions concurrently across multiple workers, maximizing throughput while preserving the correct causal order of operations.
  • Transaction Event Buffering: The TxnStore acts as an in-memory buffer, collecting DML events and grouping them by their commit and start timestamps. This enables the system to form complete transactions before processing, which is crucial for the transaction-level sink's operation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new txnsink for transaction-level SQL output, which is a significant feature. The implementation uses a pipelined architecture with channels and goroutines for processing events, which is a solid approach. However, there are several critical issues in the database execution logic, including incorrect transaction batching and a buggy rollback mechanism, which could lead to data inconsistency or errors. Additionally, there are multiple places with hardcoded configuration values (such as timeouts, buffer sizes, and batch sizes) that should be made configurable for production use. There's also some duplicated code for conflict detection that could be refactored for better maintainability.

Comment on lines 28 to 118
if len(batch) == 0 {
return nil
}

log.Debug("txnSink: executing SQL batch",
zap.Int("batchSize", len(batch)))

// If batch size is 1, execute directly (SQL already contains BEGIN/COMMIT)
if len(batch) == 1 {
txnSQL := batch[0]
for _, sql := range txnSQL.SQLs {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, execErr := e.db.ExecContext(ctx, sql)
cancel()

if execErr != nil {
log.Error("txnSink: failed to execute single SQL",
zap.String("sql", sql),
zap.Uint64("commitTs", txnSQL.TxnGroup.CommitTs),
zap.Uint64("startTs", txnSQL.TxnGroup.StartTs),
zap.Error(execErr))
return errors.Trace(execErr)
}
}

log.Debug("txnSink: successfully executed single transaction",
zap.Uint64("commitTs", txnSQL.TxnGroup.CommitTs),
zap.Uint64("startTs", txnSQL.TxnGroup.StartTs))
return nil
}

// For multiple transactions, use explicit transaction and combine SQLs
tx, err := e.db.Begin()
if err != nil {
return errors.Trace(err)
}
defer func() {
if err != nil {
tx.Rollback()
}
}()

// Build combined SQL from all transactions
var combinedSQL strings.Builder

// Collect all SQL statements from all transactions
for _, txnSQL := range batch {
for _, sql := range txnSQL.SQLs {
// Keep the original SQL with BEGIN/COMMIT
cleanSQL := strings.TrimSpace(sql)
if len(cleanSQL) > 0 {
combinedSQL.WriteString(cleanSQL)
if !strings.HasSuffix(cleanSQL, ";") {
combinedSQL.WriteString(";")
}
}
}
}

finalSQL := combinedSQL.String()

log.Debug("txnSink: executing combined SQL batch with explicit transaction",
zap.String("sql", finalSQL),
zap.Int("batchSize", len(batch)))

// Execute the combined SQL within transaction
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, execErr := tx.ExecContext(ctx, finalSQL)
cancel()

if execErr != nil {
log.Error("txnSink: failed to execute SQL batch",
zap.String("sql", finalSQL),
zap.Int("batchSize", len(batch)),
zap.Error(execErr))
return errors.Trace(execErr)
}

// Commit the transaction
if err = tx.Commit(); err != nil {
log.Error("txnSink: failed to commit batch transaction",
zap.Int("batchSize", len(batch)),
zap.Error(err))
return errors.Trace(err)
}

log.Debug("txnSink: successfully executed SQL batch",
zap.Int("batchSize", len(batch)),
zap.Int("sqlLength", len(finalSQL)))

return nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation of ExecuteSQLBatch has two critical issues. First, the logic for batching multiple transactions (len(batch) > 1) is incorrect because it wraps already-transactional SQL strings (with BEGIN/COMMIT) inside another transaction, which will fail. Second, the error handling for len(batch) > 1 is buggy and does not correctly roll back the transaction on ExecContext failure.

The function should be simplified to iterate through the batch and execute each TxnSQL as a separate, self-contained transaction, as prepared by the sql_generator. This resolves both issues and also correctly handles the case where a single TxnSQL might contain multiple statements that need to be executed atomically.

	if len(batch) == 0 {
		return nil
	}

	log.Debug("txnSink: executing SQL batch",
		zap.Int("batchSize", len(batch)))

	for _, txnSQL := range batch {
		// Each TxnSQL is a self-contained transaction with BEGIN/COMMIT.
		if len(txnSQL.SQLs) == 0 {
			continue
		}
		// sql_generator is expected to generate a single SQL string.
		sql := txnSQL.SQLs[0]
		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		_, execErr := e.db.ExecContext(ctx, sql)
		cancel()

		if execErr != nil {
			log.Error("txnSink: failed to execute transaction SQL",
				zap.String("sql", sql),
				zap.Uint64("commitTs", txnSQL.TxnGroup.CommitTs),
				zap.Uint64("startTs", txnSQL.TxnGroup.StartTs),
				zap.Error(execErr))
			return errors.Trace(execErr)
		}
		log.Debug("txnSink: successfully executed transaction",
			zap.Uint64("commitTs", txnSQL.TxnGroup.CommitTs),
			zap.Uint64("startTs", txnSQL.TxnGroup.StartTs))
	}

	return nil

Comment on lines 82 to 87
txnConfig := &txnsink.TxnSinkConfig{
MaxConcurrentTxns: 16,
BatchSize: 16,
FlushInterval: 100,
MaxSQLBatchSize: 1024 * 16, // 1MB
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The configuration for txnSink is hardcoded. These values should be made configurable, for example, through the sink URI, to allow for tuning in different environments.

Also, the comment for MaxSQLBatchSize is incorrect. 1024 * 16 is 16KB, not 1MB. 1MB would be 1024 * 1024.

Suggested change
txnConfig := &txnsink.TxnSinkConfig{
MaxConcurrentTxns: 16,
BatchSize: 16,
FlushInterval: 100,
MaxSQLBatchSize: 1024 * 16, // 1MB
}
txnConfig := &txnsink.TxnSinkConfig{
MaxConcurrentTxns: 16, // TODO: make this configurable
BatchSize: 16, // TODO: make this configurable
FlushInterval: 100, // TODO: make this configurable
MaxSQLBatchSize: 1024 * 1024, // 1MB
}

if len(batch) == 1 {
txnSQL := batch[0]
for _, sql := range txnSQL.SQLs {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The 30-second timeout for ExecContext is hardcoded. This should be configurable to allow users to adjust it based on their downstream database performance and workload.

Comment on lines 84 to 87
dmlEventChan: make(chan *commonEvent.DMLEvent, 10000),
checkpointChan: make(chan uint64, 100),
txnChan: make(chan *TxnGroup, 10000),
sqlChan: make(chan *TxnSQL, 10000),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The channel buffer sizes are hardcoded. These values can have a significant impact on performance and memory usage. It would be better to make them configurable.

Comment on lines 278 to 283
select {
case s.sqlChan <- txnSQL:
return nil
default:
return errors.New("SQL channel is full")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When the sqlChan is full, processTxnGroup returns an error, which will cause the entire sink to shut down. This seems too aggressive. A full channel might be a temporary condition due to a burst of data or a slow downstream. Consider blocking until there is space in the channel, or implementing a retry mechanism with backoff.

Comment on lines 380 to 384
opt := TxnCacheOption{
Count: 10, // Default worker count
Size: 1024,
BlockStrategy: BlockStrategyWaitEmpty,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The TxnCacheOption for the ConflictDetector is hardcoded. These values, especially Count (number of workers) and Size (cache size), should be configurable to allow tuning for different workloads.

Comment on lines +234 to +343
func ConflictKeysForTxnGroup(txnGroup *TxnGroup) []uint64 {
if len(txnGroup.Events) == 0 {
return nil
}

hashRes := make(map[uint64]struct{})
hasher := fnv.New32a()

// Iterate through all events in the transaction group
for _, event := range txnGroup.Events {
// Iterate through all rows in the event
event.Rewind()
for {
rowChange, ok := event.GetNextRow()
if !ok {
break
}

// Generate keys for each row
keys := genRowKeysForTxnGroup(rowChange, event.TableInfo, event.DispatcherID)
for _, key := range keys {
if n, err := hasher.Write(key); n != len(key) || err != nil {
log.Panic("transaction key hash fail")
}
hashRes[uint64(hasher.Sum32())] = struct{}{}
hasher.Reset()
}
}
event.Rewind()
}

keys := make([]uint64, 0, len(hashRes))
for key := range hashRes {
keys = append(keys, key)
}
return keys
}

// genRowKeysForTxnGroup generates row keys for a row change in transaction group
func genRowKeysForTxnGroup(rowChange commonEvent.RowChange, tableInfo *common.TableInfo, dispatcherID common.DispatcherID) [][]byte {
var keys [][]byte

if !rowChange.Row.IsEmpty() {
for iIdx, idxColID := range tableInfo.GetIndexColumns() {
key := genKeyListForTxnGroup(&rowChange.Row, iIdx, idxColID, dispatcherID, tableInfo)
if len(key) == 0 {
continue
}
keys = append(keys, key)
}
}
if !rowChange.PreRow.IsEmpty() {
for iIdx, idxColID := range tableInfo.GetIndexColumns() {
key := genKeyListForTxnGroup(&rowChange.PreRow, iIdx, idxColID, dispatcherID, tableInfo)
if len(key) == 0 {
continue
}
keys = append(keys, key)
}
}
if len(keys) == 0 {
// use dispatcherID as key if no key generated (no PK/UK),
// no concurrence for rows in the same dispatcher.
log.Debug("Use dispatcherID as the key", zap.Any("dispatcherID", dispatcherID))
tableKey := make([]byte, 8)
binary.BigEndian.PutUint64(tableKey, uint64(dispatcherID.GetLow()))
keys = [][]byte{tableKey}
}
return keys
}

// genKeyListForTxnGroup generates a key list for a row in transaction group
func genKeyListForTxnGroup(
row *chunk.Row, iIdx int, idxColID []int64, dispatcherID common.DispatcherID, tableInfo *common.TableInfo,
) []byte {
var key []byte
for _, colID := range idxColID {
info, ok := tableInfo.GetColumnInfo(colID)
// If the index contain generated column, we can't use this key to detect conflict with other DML,
if !ok || info == nil || info.IsGenerated() {
return nil
}
offset, ok := tableInfo.GetRowColumnsOffset()[colID]
if !ok {
log.Warn("can't find column offset", zap.Int64("colID", colID), zap.String("colName", info.Name.O))
return nil
}
value := common.ExtractColVal(row, info, offset)
// if a column value is null, we can ignore this index
if value == nil {
return nil
}

val := common.ColumnValueString(value)
if columnNeeds2LowerCase(info.GetType(), info.GetCollate()) {
val = strings.ToLower(val)
}

key = append(key, []byte(val)...)
key = append(key, 0)
}
if len(key) == 0 {
return nil
}
tableKey := make([]byte, 16)
binary.BigEndian.PutUint64(tableKey[:8], uint64(iIdx))
binary.BigEndian.PutUint64(tableKey[8:], dispatcherID.GetLow())
key = append(key, tableKey...)
return key
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The functions ConflictKeysForTxnGroup, genRowKeysForTxnGroup, and genKeyListForTxnGroup appear to be duplicating logic that exists in pkg/sink/mysql/txn.go for conflict key generation. Duplicating code makes maintenance harder. It would be better to refactor the existing logic into a shared package so it can be reused by both the mysql sink and this new txn sink.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Aug 27, 2025

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Sep 16, 2025

@hongyunyan: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 353d480 link true /test pull-error-log-review
pull-cdc-kafka-integration-light 353d480 link true /test pull-cdc-kafka-integration-light
pull-cdc-storage-integration-light 353d480 link true /test pull-cdc-storage-integration-light
pull-cdc-mysql-integration-light 353d480 link true /test pull-cdc-mysql-integration-light
pull-cdc-kafka-integration-heavy 353d480 link true /test pull-cdc-kafka-integration-heavy
pull-cdc-storage-integration-heavy 353d480 link true /test pull-cdc-storage-integration-heavy
pull-cdc-mysql-integration-heavy 353d480 link true /test pull-cdc-mysql-integration-heavy
pull-cdc-pulsar-integration-light 353d480 link false /test pull-cdc-pulsar-integration-light
pull-check 353d480 link true /test pull-check
pull-build 353d480 link true /test pull-build

Full PR test history. Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant