diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index dcfcb4764..da30e97ca 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -166,7 +166,8 @@ func (c *Changefeed) ForceUpdateStatus(newStatus *heartbeatpb.MaintainerStatus) } func (c *Changefeed) NeedCheckpointTsMessage() bool { - return c.sinkType == common.KafkaSinkType || c.sinkType == common.CloudStorageSinkType + // return c.sinkType != common.MysqlSinkType + return true } func (c *Changefeed) SetIsNew(isNew bool) { diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index da922477f..f8a878a56 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -16,16 +16,19 @@ package sink import ( "context" "net/url" + "strconv" "github.com/pingcap/ticdc/downstreamadapter/sink/blackhole" "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage" "github.com/pingcap/ticdc/downstreamadapter/sink/kafka" "github.com/pingcap/ticdc/downstreamadapter/sink/mysql" "github.com/pingcap/ticdc/downstreamadapter/sink/pulsar" + "github.com/pingcap/ticdc/downstreamadapter/sink/txnsink" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" + mysqlpkg "github.com/pingcap/ticdc/pkg/sink/mysql" "github.com/pingcap/ticdc/pkg/sink/util" ) @@ -50,6 +53,11 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common. scheme := config.GetScheme(sinkURI) switch scheme { case config.MySQLScheme, config.MySQLSSLScheme, config.TiDBScheme, config.TiDBSSLScheme: + // Check if enable-transaction-atomic is set to true + if isTransactionAtomicEnabled(sinkURI) { + return newTxnSinkAdapter(ctx, changefeedID, cfg, sinkURI) + } + // Use mysqlSink if enable-transaction-atomic is not set or set to false return mysql.New(ctx, changefeedID, cfg, sinkURI) case config.KafkaScheme, config.KafkaSSLScheme: return kafka.New(ctx, changefeedID, sinkURI, cfg.SinkConfig) @@ -63,6 +71,45 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common. return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) } +// isTransactionAtomicEnabled checks if enable-transaction-atomic parameter is set to true in sink URI +func isTransactionAtomicEnabled(sinkURI *url.URL) bool { + query := sinkURI.Query() + s := query.Get("enable-transaction-atomic") + if len(s) == 0 { + return false + } + enabled, err := strconv.ParseBool(s) + if err != nil { + // If the parameter value is invalid, default to false + return false + } + return enabled +} + +// newTxnSinkAdapter creates a txnSink adapter that uses the same database connection as mysqlSink +func newTxnSinkAdapter( + ctx context.Context, + changefeedID common.ChangeFeedID, + config *config.ChangefeedConfig, + sinkURI *url.URL, +) (Sink, error) { + // Use the same database connection logic as mysqlSink + _, db, err := mysqlpkg.NewMysqlConfigAndDB(ctx, changefeedID, sinkURI, config) + if err != nil { + return nil, err + } + + // Create txnSink configuration + txnConfig := &txnsink.TxnSinkConfig{ + MaxConcurrentTxns: 128, + BatchSize: 256, + MaxSQLBatchSize: 1024 * 16, // 1MB + } + + // Create and return txnSink + return txnsink.New(ctx, changefeedID, db, txnConfig), nil +} + func Verify(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.ChangeFeedID) error { sinkURI, err := url.Parse(cfg.SinkURI) if err != nil { diff --git a/downstreamadapter/sink/sink_test.go b/downstreamadapter/sink/sink_test.go new file mode 100644 index 000000000..8a8ef39e3 --- /dev/null +++ b/downstreamadapter/sink/sink_test.go @@ -0,0 +1,70 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsTransactionAtomicEnabled(t *testing.T) { + tests := []struct { + name string + sinkURI string + expected bool + }{ + { + name: "enable-transaction-atomic=true", + sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=true", + expected: true, + }, + { + name: "enable-transaction-atomic=false", + sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=false", + expected: false, + }, + { + name: "enable-transaction-atomic not set", + sinkURI: "mysql://user:pass@localhost:3306/test", + expected: false, + }, + { + name: "enable-transaction-atomic with invalid value", + sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=invalid", + expected: false, + }, + { + name: "enable-transaction-atomic=1", + sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=1", + expected: true, + }, + { + name: "enable-transaction-atomic=0", + sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=0", + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parsedURI, err := url.Parse(tt.sinkURI) + require.NoError(t, err) + + result := isTransactionAtomicEnabled(parsedURI) + require.Equal(t, tt.expected, result) + }) + } +} diff --git a/downstreamadapter/sink/txnsink/db_executor.go b/downstreamadapter/sink/txnsink/db_executor.go new file mode 100644 index 000000000..aa2549246 --- /dev/null +++ b/downstreamadapter/sink/txnsink/db_executor.go @@ -0,0 +1,133 @@ +package txnsink + +import ( + "context" + "database/sql" + "database/sql/driver" + "strings" + "time" + + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" + "github.com/pingcap/tidb/pkg/parser/mysql" + "go.uber.org/zap" +) + +const ( + // BackoffBaseDelay indicates the base delay time for retrying. + BackoffBaseDelay = 500 * time.Millisecond + // BackoffMaxDelay indicates the max delay time for retrying. + BackoffMaxDelay = 60 * time.Second + // DefaultDMLMaxRetry is the default maximum number of retries for DML operations + DefaultDMLMaxRetry = 8 +) + +// DBExecutor handles database execution for transaction SQL +type DBExecutor struct { + db *sql.DB +} + +// NewDBExecutor creates a new database executor +func NewDBExecutor(db *sql.DB) *DBExecutor { + return &DBExecutor{ + db: db, + } +} + +// ExecuteSQLBatch executes a batch of SQL transactions with retry mechanism +func (e *DBExecutor) ExecuteSQLBatch(batch []*TxnSQL) error { + if len(batch) == 0 { + return nil + } + + log.Info("txnSink: executing SQL batch", + zap.Int("batchSize", len(batch))) + + // Define the execution function that will be retried + tryExec := func() error { + // Filter out empty SQL statements + var validSQLs []string + var validArgs []interface{} + + for _, txnSQL := range batch { + if txnSQL.SQL != "" { + validSQLs = append(validSQLs, txnSQL.SQL) + validArgs = append(validArgs, txnSQL.Args...) + } + } + + if len(validSQLs) == 0 { + log.Debug("txnSink: no valid SQL to execute in batch") + return nil + } + + // Combine SQL statements with semicolons + finalSQL := strings.Join(validSQLs, ";") + + log.Info("executing transaction", + zap.String("sql", finalSQL), + zap.Any("args", validArgs)) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + _, execErr := e.db.ExecContext(ctx, finalSQL, validArgs...) + cancel() + + if execErr != nil { + log.Error("txnSink: failed to execute SQL batch", + zap.String("sql", finalSQL), + zap.Int("batchSize", len(batch)), + zap.Error(execErr)) + return errors.Trace(execErr) + } + + log.Debug("txnSink: successfully executed SQL batch", + zap.String("sql", finalSQL), + zap.Int("batchSize", len(batch))) + + return nil + } + + // Use retry mechanism + return retry.Do(context.Background(), func() error { + err := tryExec() + if err != nil { + log.Warn("txnSink: SQL execution failed, will retry", + zap.Int("batchSize", len(batch)), + zap.Error(err)) + } + return err + }, retry.WithBackoffBaseDelay(BackoffBaseDelay.Milliseconds()), + retry.WithBackoffMaxDelay(BackoffMaxDelay.Milliseconds()), + retry.WithMaxTries(DefaultDMLMaxRetry), + retry.WithIsRetryableErr(isRetryableDMLError)) +} + +// isRetryableDMLError determines if a DML error is retryable +func isRetryableDMLError(err error) bool { + // Check if it's a retryable error + if !errors.IsRetryableError(err) { + return false + } + + // Check for specific MySQL error codes + if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok { + switch mysqlErr.Number { + case uint16(mysql.ErrNoSuchTable), uint16(mysql.ErrBadDB), uint16(mysql.ErrDupEntry): + return false + } + } + + // Check for driver errors + if err == driver.ErrBadConn { + return true + } + + return true +} + +// Close closes the database connection +func (e *DBExecutor) Close() error { + return e.db.Close() +} diff --git a/downstreamadapter/sink/txnsink/db_executor_test.go b/downstreamadapter/sink/txnsink/db_executor_test.go new file mode 100644 index 000000000..e596d5314 --- /dev/null +++ b/downstreamadapter/sink/txnsink/db_executor_test.go @@ -0,0 +1,365 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "database/sql" + "errors" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/stretchr/testify/require" +) + +func TestDBExecutor_ExecuteTransaction(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "BEGIN;INSERT INTO test VALUES (1, 'test');UPDATE test SET name = 'updated' WHERE id = 1;COMMIT;", + Args: []interface{}{}, + Keys: map[string]struct{}{"key1": {}}, + } + + // Set up mock expectations - expect the full SQL with BEGIN/COMMIT + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WithArgs().WillReturnResult(sqlmock.NewResult(1, 1)) + + // Execute transaction + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.NoError(t, err) + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_ExecuteTransaction_EmptySQL(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL with empty SQL list + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "", + Args: []interface{}{}, + Keys: map[string]struct{}{}, + } + + // Execute transaction - should succeed without any database operations + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.NoError(t, err) + + // Verify no database operations were performed + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_ExecuteTransaction_BeginError(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "BEGIN;INSERT INTO test VALUES (1, 'test');COMMIT;", + Args: []interface{}{}, + Keys: map[string]struct{}{"key1": {}}, + } + + // Set up mock to fail on SQL execution + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WillReturnError(errors.New("connection failed")) + + // Execute transaction + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.Error(t, err) + require.Contains(t, err.Error(), "connection failed") + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_ExecuteTransaction_ExecError(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "BEGIN;INSERT INTO test VALUES (1, 'test');UPDATE test SET name = 'updated' WHERE id = 1;COMMIT;", + Args: []interface{}{}, + Keys: map[string]struct{}{"key1": {}}, + } + + // Set up mock expectations - SQL execution fails + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WillReturnError(errors.New("update failed")) + + // Execute transaction + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.Error(t, err) + require.Contains(t, err.Error(), "update failed") + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_ExecuteTransaction_CommitError(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "BEGIN;INSERT INTO test VALUES (1, 'test');COMMIT;", + Args: []interface{}{}, + Keys: map[string]struct{}{"key1": {}}, + } + + // Set up mock expectations - SQL execution fails + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WillReturnError(errors.New("commit failed")) + + // Execute transaction + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.Error(t, err) + require.Contains(t, err.Error(), "commit failed") + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_ExecuteTransaction_Timeout(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "BEGIN;INSERT INTO test VALUES (1, 'test');COMMIT;", + Args: []interface{}{}, + Keys: map[string]struct{}{"key1": {}}, + } + + // Set up mock expectations with delay + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WillDelayFor(35 * time.Second).WillReturnResult(sqlmock.NewResult(1, 1)) + // Should timeout after 30 seconds + + // Execute transaction + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.Error(t, err) + require.Contains(t, err.Error(), "canceling query due to user request") + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_ExecuteTransaction_MultipleSQL(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + executor := NewDBExecutor(db) + + // Create test transaction SQL with multiple statements + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + txnSQL := &TxnSQL{ + TxnGroup: txnGroup, + SQL: "BEGIN;INSERT INTO users VALUES (1, 'alice');INSERT INTO users VALUES (2, 'bob');UPDATE users SET name = 'alice_updated' WHERE id = 1;DELETE FROM users WHERE id = 2;COMMIT;", + Args: []interface{}{}, + Keys: map[string]struct{}{"user1": {}, "user2": {}}, + } + + // Set up mock expectations for the combined SQL + mock.ExpectExec("BEGIN;INSERT INTO users VALUES").WithArgs().WillReturnResult(sqlmock.NewResult(1, 1)) + + // Execute transaction + err = executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.NoError(t, err) + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDBExecutor_Close(t *testing.T) { + t.Parallel() + + // Create a mock database + db, mock, err := sqlmock.New() + require.NoError(t, err) + + executor := NewDBExecutor(db) + + // Set up mock expectation for Close + mock.ExpectClose() + + // Close the executor + err = executor.Close() + require.NoError(t, err) + + // Verify all expectations were met + require.NoError(t, mock.ExpectationsWereMet()) +} + +// Test helper function to create a mock database with specific behavior +func createMockDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock, func()) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + + cleanup := func() { + db.Close() + } + + return db, mock, cleanup +} + +// Test helper function to create a test TxnSQL +func createTestTxnSQL(sql string) *TxnSQL { + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + return &TxnSQL{ + TxnGroup: txnGroup, + SQL: sql, + Args: []interface{}{}, + Keys: map[string]struct{}{"test_key": {}}, + } +} + +// Benchmark tests +func BenchmarkDBExecutor_ExecuteTransaction(b *testing.B) { + db, mock, cleanup := createMockDB(&testing.T{}) + defer cleanup() + + executor := NewDBExecutor(db) + + // Create test transaction SQL + txnSQL := createTestTxnSQL("BEGIN;INSERT INTO test VALUES (1, 'test');UPDATE test SET name = 'updated' WHERE id = 1;COMMIT;") + + // Set up mock expectations + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WithArgs().WillReturnResult(sqlmock.NewResult(1, 1)) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Reset mock expectations for each iteration + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WithArgs().WillReturnResult(sqlmock.NewResult(1, 1)) + + err := executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.NoError(b, err) + } +} + +func BenchmarkDBExecutor_ExecuteTransaction_SingleSQL(b *testing.B) { + db, mock, cleanup := createMockDB(&testing.T{}) + defer cleanup() + + executor := NewDBExecutor(db) + + // Create test transaction SQL with single statement + txnSQL := createTestTxnSQL("BEGIN;INSERT INTO test VALUES (1, 'test');COMMIT;") + + // Set up mock expectations + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WithArgs().WillReturnResult(sqlmock.NewResult(1, 1)) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Reset mock expectations for each iteration + mock.ExpectExec("BEGIN;INSERT INTO test VALUES").WithArgs().WillReturnResult(sqlmock.NewResult(1, 1)) + + err := executor.ExecuteSQLBatch([]*TxnSQL{txnSQL}) + require.NoError(b, err) + } +} diff --git a/downstreamadapter/sink/txnsink/event_processor.go b/downstreamadapter/sink/txnsink/event_processor.go new file mode 100644 index 000000000..e76a34f64 --- /dev/null +++ b/downstreamadapter/sink/txnsink/event_processor.go @@ -0,0 +1,91 @@ +package txnsink + +import ( + "context" + + "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "go.uber.org/zap" +) + +// EventProcessor handles DML events and checkpoint processing +type EventProcessor struct { + txnStore *TxnStore + progressTracker *ProgressTracker +} + +// NewEventProcessor creates a new event processor +func NewEventProcessor(txnStore *TxnStore, progressTracker *ProgressTracker) *EventProcessor { + return &EventProcessor{ + txnStore: txnStore, + progressTracker: progressTracker, + } +} + +// ProcessDMLEvents processes DML events from the input channel +func (p *EventProcessor) ProcessDMLEvents(ctx context.Context, dmlEventChan <-chan *commonEvent.DMLEvent) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event, ok := <-dmlEventChan: + if !ok { + return nil + } + p.processDMLEvent(event) + } + } +} + +// ProcessCheckpoints processes checkpoint timestamps from the input channel +func (p *EventProcessor) ProcessCheckpoints(ctx context.Context, checkpointChan <-chan uint64, txnChan chan<- *TxnGroup) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case checkpointTs, ok := <-checkpointChan: + if !ok { + return nil + } + if err := p.processCheckpoint(checkpointTs, txnChan); err != nil { + return err + } + } + } +} + +// processDMLEvent processes a single DML event +func (p *EventProcessor) processDMLEvent(event *commonEvent.DMLEvent) { + // Add event to the transaction store + p.txnStore.AddEvent(event) +} + +// processCheckpoint processes a checkpoint timestamp +func (p *EventProcessor) processCheckpoint(checkpointTs uint64, txnChan chan<- *TxnGroup) error { + // Get all events with commitTs <= checkpointTs (already sorted by commitTs) + txnGroups := p.txnStore.GetEventsByCheckpointTs(checkpointTs) + if len(txnGroups) == 0 { + log.Info("txnSink: no events to process for checkpoint", + zap.Uint64("checkpointTs", checkpointTs)) + p.progressTracker.UpdateCheckpointTs(checkpointTs) + return nil + } + + // Send transaction groups to the output channel + for _, txnGroup := range txnGroups { + p.progressTracker.AddPendingTxn(txnGroup.CommitTs, txnGroup.StartTs) + txnChan <- txnGroup + } + + // Update checkpoint progress + p.progressTracker.UpdateCheckpointTs(checkpointTs) + + // Remove processed events from the store + p.txnStore.RemoveEventsByCheckpointTs(checkpointTs) + + log.Info("txnSink: processed checkpoint", + zap.Uint64("checkpointTs", checkpointTs), + zap.Int("txnGroupCount", len(txnGroups))) + + return nil +} diff --git a/downstreamadapter/sink/txnsink/progress_tracker.go b/downstreamadapter/sink/txnsink/progress_tracker.go new file mode 100644 index 000000000..07926aa06 --- /dev/null +++ b/downstreamadapter/sink/txnsink/progress_tracker.go @@ -0,0 +1,198 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "container/list" + "context" + "sync" + "time" + + "github.com/pingcap/log" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" +) + +// TxnKey represents a unique transaction identifier +type TxnKey struct { + commitTs uint64 + startTs uint64 +} + +// ProgressTracker tracks the progress of data processing and flushing +type ProgressTracker struct { + // Current progress state + checkpointTs uint64 + + // Track pending transactions using list + map pattern (like tableProgress) + list *list.List // Maintains order (FIFO) + elemMap map[TxnKey]*list.Element // TxnKey -> list.Element for O(1) removal + + // Thread safety + mu sync.RWMutex + + // Monitoring + pdClock pdutil.Clock + cancelMonitor context.CancelFunc + namespace string + changefeedName string +} + +// NewProgressTracker creates a new progress tracker +func NewProgressTracker() *ProgressTracker { + return &ProgressTracker{ + checkpointTs: 0, + list: list.New(), + elemMap: make(map[TxnKey]*list.Element), + } +} + +// NewProgressTrackerWithMonitor creates a new progress tracker with monitoring enabled +func NewProgressTrackerWithMonitor(namespace, changefeedName string) *ProgressTracker { + pt := &ProgressTracker{ + checkpointTs: 0, + list: list.New(), + elemMap: make(map[TxnKey]*list.Element), + namespace: namespace, + changefeedName: changefeedName, + } + + pt.pdClock = appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) + + // Start monitoring goroutine + ctx, cancel := context.WithCancel(context.Background()) + pt.cancelMonitor = cancel + go pt.runMonitor(ctx) + + return pt +} + +// AddPendingTxn adds a pending transaction to track +func (pt *ProgressTracker) AddPendingTxn(commitTs, startTs uint64) { + pt.mu.Lock() + defer pt.mu.Unlock() + + key := TxnKey{commitTs: commitTs, startTs: startTs} + // Add to list (maintains order) + elem := pt.list.PushBack(key) + pt.elemMap[key] = elem +} + +// RemoveCompletedTxn removes a completed transaction from pending list +func (pt *ProgressTracker) RemoveCompletedTxn(commitTs, startTs uint64) { + pt.mu.Lock() + defer pt.mu.Unlock() + + key := TxnKey{commitTs: commitTs, startTs: startTs} + if elem, ok := pt.elemMap[key]; ok { + pt.list.Remove(elem) + delete(pt.elemMap, key) + } +} + +// UpdateCheckpointTs updates the latest checkpoint TS received +func (pt *ProgressTracker) UpdateCheckpointTs(ts uint64) { + pt.mu.Lock() + defer pt.mu.Unlock() + + if ts > pt.checkpointTs { + pt.checkpointTs = ts + } +} + +// calculateEffectiveTs calculates the effective progress TS +// If there are pending transactions: effectiveTs = min(pendingTxns) - 1 +// If no pending transactions: effectiveTs = checkpointTs +func (pt *ProgressTracker) calculateEffectiveTs() uint64 { + if pt.list.Len() > 0 { + // Return min(pendingTxns) - 1 (first element in list) + key := pt.list.Front().Value.(TxnKey) + return key.commitTs - 1 + } + + // No pending transactions, use checkpointTs + return pt.checkpointTs +} + +// Reset resets the progress tracker to initial state +func (pt *ProgressTracker) Reset() { + pt.mu.Lock() + defer pt.mu.Unlock() + + pt.checkpointTs = 0 + pt.list.Init() // Clear list + pt.elemMap = make(map[TxnKey]*list.Element) // Clear map +} + +// Close stops the monitoring goroutine +func (pt *ProgressTracker) Close() { + if pt.cancelMonitor != nil { + pt.cancelMonitor() + } +} + +// runMonitor runs the monitoring goroutine that prints progress and lag every second +func (pt *ProgressTracker) runMonitor(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + log.Info("txnSink: ProgressTracker monitor started", zap.String("changefeed", pt.changefeedName)) + + for { + select { + case <-ctx.Done(): + log.Info("txnSink: ProgressTracker monitor stopped", zap.String("changefeed", pt.changefeedName)) + return + case <-ticker.C: + pt.printProgress() + } + } +} + +// printProgress prints current progress and lag information +func (pt *ProgressTracker) printProgress() { + pt.mu.RLock() + effectiveTs := pt.calculateEffectiveTs() + checkpointTs := pt.checkpointTs + pendingCount := pt.list.Len() + pt.mu.RUnlock() + + if pt.pdClock == nil { + log.Info("txnSink: Progress status", + zap.String("changefeed", pt.changefeedName), + zap.Uint64("effectiveTs", effectiveTs), + zap.Uint64("checkpointTs", checkpointTs), + zap.Int("pendingCount", pendingCount)) + return + } + + // Calculate lag using the same logic as maintainer + pdTime := pt.pdClock.CurrentTime() + phyEffectiveTs := oracle.ExtractPhysical(effectiveTs) + lag := float64(oracle.GetPhysical(pdTime)-phyEffectiveTs) / 1e3 // Convert to seconds + + // Update monitoring metrics + metrics.TxnSinkProgressLagGauge.WithLabelValues(pt.namespace, pt.changefeedName).Set(lag) + + log.Info("txnSink: Progress status", + zap.String("changefeed", pt.changefeedName), + zap.Uint64("effectiveTs", effectiveTs), + zap.Int64("phyEffectiveTs", phyEffectiveTs), + zap.Uint64("checkpointTs", checkpointTs), + zap.Int("pendingCount", pendingCount), + zap.Float64("lagSeconds", lag)) +} diff --git a/downstreamadapter/sink/txnsink/progress_tracker_test.go b/downstreamadapter/sink/txnsink/progress_tracker_test.go new file mode 100644 index 000000000..022985a3d --- /dev/null +++ b/downstreamadapter/sink/txnsink/progress_tracker_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestProgressTrackerMetrics(t *testing.T) { + // Create a new registry for testing + registry := prometheus.NewRegistry() + metrics.InitSinkMetrics(registry) + + // Create progress tracker with monitoring + pt := NewProgressTrackerWithMonitor("test_namespace", "test_changefeed") + defer pt.Close() + + // Add some pending transactions + pt.AddPendingTxn(1000, 900) + pt.AddPendingTxn(2000, 1900) + + // Update checkpoint ts + pt.UpdateCheckpointTs(500) + + // Wait a bit for the monitor to run + time.Sleep(2 * time.Second) + + // Check if the metric is registered and has a value + metricValue := testutil.ToFloat64(metrics.TxnSinkProgressLagGauge.WithLabelValues("test_namespace", "test_changefeed")) + require.Greater(t, metricValue, float64(0), "Progress lag metric should be greater than 0") + + // Verify the metric is working correctly + require.NotZero(t, metricValue, "Progress lag metric should not be zero") +} diff --git a/downstreamadapter/sink/txnsink/sink.go b/downstreamadapter/sink/txnsink/sink.go new file mode 100644 index 000000000..0e5b7670f --- /dev/null +++ b/downstreamadapter/sink/txnsink/sink.go @@ -0,0 +1,235 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "context" + "database/sql" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/sink/util" + "go.uber.org/atomic" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// Sink implements the txnSink interface for transaction-level SQL output +type Sink struct { + changefeedID common.ChangeFeedID + + // Core components + txnStore *TxnStore + conflictDetector *ConflictDetector + eventProcessor *EventProcessor + progressTracker *ProgressTracker + + // Workers + workers []*Worker + + // Configuration + config *TxnSinkConfig + + // Channels for coordination + dmlEventChan chan *commonEvent.DMLEvent + checkpointChan chan uint64 + txnChan chan *TxnGroup + + // State management + isNormal *atomic.Bool + ctx context.Context + + // Statistics and metrics + statistics *metrics.Statistics +} + +// New creates a new txnSink instance +func New(ctx context.Context, changefeedID common.ChangeFeedID, db *sql.DB, config *TxnSinkConfig) *Sink { + if config == nil { + config = &TxnSinkConfig{ + MaxConcurrentTxns: 32, + BatchSize: 256, + MaxSQLBatchSize: 1024 * 16, + } + } + + txnStore := NewTxnStore() + conflictDetector := NewConflictDetector(changefeedID, config.MaxConcurrentTxns) + progressTracker := NewProgressTrackerWithMonitor(changefeedID.Namespace(), changefeedID.Name()) + eventProcessor := NewEventProcessor(txnStore, progressTracker) + + // Create workers + workers := make([]*Worker, config.MaxConcurrentTxns) + for i := 0; i < config.MaxConcurrentTxns; i++ { + inputCh := conflictDetector.GetOutChByCacheID(i) + if inputCh == nil { + log.Error("txnSink: failed to get output channel from conflict detector", + zap.String("namespace", changefeedID.Namespace()), + zap.String("changefeed", changefeedID.Name()), + zap.Int("workerID", i)) + continue + } + workers[i] = NewWorker(i, changefeedID, config, db, inputCh, progressTracker, metrics.NewStatistics(changefeedID, "txnsink")) + } + + return &Sink{ + changefeedID: changefeedID, + txnStore: txnStore, + conflictDetector: conflictDetector, + eventProcessor: eventProcessor, + progressTracker: progressTracker, + workers: workers, + config: config, + dmlEventChan: make(chan *commonEvent.DMLEvent, 10000), + checkpointChan: make(chan uint64, 100), + txnChan: make(chan *TxnGroup, 10000), + isNormal: atomic.NewBool(true), + ctx: ctx, + statistics: metrics.NewStatistics(changefeedID, "txnsink"), + } +} + +// SinkType returns the sink type +func (s *Sink) SinkType() common.SinkType { + return common.TxnSinkType +} + +// IsNormal returns whether the sink is in normal state +func (s *Sink) IsNormal() bool { + return s.isNormal.Load() +} + +// AddDMLEvent adds a DML event to the sink +func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) { + // Note: We don't add to pending here, as we need to wait for txnGroup formation + s.dmlEventChan <- event + event.PostFlush() +} + +// AddCheckpointTs adds a checkpoint timestamp to trigger transaction processing +func (s *Sink) AddCheckpointTs(ts uint64) { + s.checkpointChan <- ts +} + +// WriteBlockEvent writes a block event (not supported in txnSink) +func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { + return errors.New("txnSink does not support block events") +} + +// SetTableSchemaStore sets the table schema store (not used in txnSink) +func (s *Sink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { + // Not used in txnSink +} + +// Close closes the sink and releases resources +func (s *Sink) Close(removeChangefeed bool) { + s.isNormal.Store(false) + + // Close conflict detector + s.conflictDetector.Close() + + // Close all workers + for _, worker := range s.workers { + if worker != nil { + worker.Close() + } + } + + // Close progress tracker + s.progressTracker.Close() + + // Close channels + close(s.dmlEventChan) + close(s.checkpointChan) + close(s.txnChan) + + log.Info("txnSink: closed", + zap.String("namespace", s.changefeedID.Namespace()), + zap.String("changefeed", s.changefeedID.Name())) +} + +// Run starts the sink processing +func (s *Sink) Run(ctx context.Context) error { + namespace := s.changefeedID.Namespace() + changefeed := s.changefeedID.Name() + + log.Info("txnSink: starting", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed)) + + // Start conflict detector + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return s.conflictDetector.Run(ctx) + }) + + // Start event processor for DML events + eg.Go(func() error { + return s.eventProcessor.ProcessDMLEvents(ctx, s.dmlEventChan) + }) + + // Start event processor for checkpoints + eg.Go(func() error { + return s.eventProcessor.ProcessCheckpoints(ctx, s.checkpointChan, s.txnChan) + }) + + // Start transaction processor + eg.Go(func() error { + return s.processTransactions(ctx) + }) + + // Start all workers + for _, worker := range s.workers { + if worker != nil { + worker := worker + eg.Go(func() error { + return worker.Run(ctx) + }) + } + } + + err := eg.Wait() + if err != nil { + log.Error("txnSink: stopped with error", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Error(err)) + return err + } + + log.Info("txnSink: stopped normally", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed)) + + return nil +} + +// processTransactions processes transactions from the transaction channel +func (s *Sink) processTransactions(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case txnGroup, ok := <-s.txnChan: + if !ok { + return nil + } + // Add transaction group to conflict detector + s.conflictDetector.AddTxnGroup(txnGroup) + } + } +} diff --git a/downstreamadapter/sink/txnsink/sql_generator.go b/downstreamadapter/sink/txnsink/sql_generator.go new file mode 100644 index 000000000..a9a783fb4 --- /dev/null +++ b/downstreamadapter/sink/txnsink/sql_generator.go @@ -0,0 +1,216 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "strings" + + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/sink/sqlmodel" + "github.com/pingcap/tidb/pkg/util/chunk" +) + +// SQLGenerator handles SQL generation for transaction groups +type SQLGenerator struct{} + +// NewSQLGenerator creates a new SQL generator +func NewSQLGenerator() *SQLGenerator { + return &SQLGenerator{} +} + +// ConvertTxnGroupToSQL converts a transaction group to SQL statements +func (g *SQLGenerator) ConvertTxnGroupToSQL(txnGroup *TxnGroup) (*TxnSQL, error) { + // Group events by table for batch processing + tableEvents := make(map[int64][]*commonEvent.DMLEvent) + for _, event := range txnGroup.Events { + tableID := event.GetTableID() + tableEvents[tableID] = append(tableEvents[tableID], event) + } + + var allSQLs []string + var allArgs [][]interface{} + + // Process each table's events + for _, events := range tableEvents { + if len(events) == 0 { + continue + } + + // Generate SQL for this table's events + sqls, args, err := g.generateTableSQL(events) + if err != nil { + return nil, err + } + + allSQLs = append(allSQLs, sqls...) + allArgs = append(allArgs, args...) + } + // Wrap in transaction + var transactionSQL string + if len(allSQLs) == 0 { + transactionSQL = "" + } else { + transactionSQL = "BEGIN;" + strings.Join(allSQLs, ";") + ";COMMIT;" + } + transactionArgs := make([]interface{}, 0) + for _, arg := range allArgs { + transactionArgs = append(transactionArgs, arg...) + } + + return &TxnSQL{ + TxnGroup: txnGroup, + SQL: transactionSQL, + Args: transactionArgs, + Keys: txnGroup.ExtractKeys(), + }, nil + +} + +// generateTableSQL generates SQL statements for events of the same table +func (g *SQLGenerator) generateTableSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, error) { + if len(events) == 0 { + return []string{}, [][]interface{}{}, nil + } + + tableInfo := events[0].TableInfo + startTs := events[0].StartTs + commitTs := events[0].CommitTs + + // Group rows by type (insert, update, delete) + insertRows, updateRows, deleteRows := g.groupRowsByType(events, tableInfo) + + var sqls []string + var args [][]interface{} + + // Handle delete operations + if len(deleteRows) > 0 { + for _, rows := range deleteRows { + sql, value := g.genDeleteSQL(rows...) + sqls = append(sqls, sql) + args = append(args, value) + } + } + + // Handle update operations - use INSERT ON DUPLICATE KEY UPDATE + if len(updateRows) > 0 { + for _, rows := range updateRows { + sql, value := g.genInsertOnDuplicateUpdateSQL(startTs, commitTs, rows...) + sqls = append(sqls, sql) + args = append(args, value) + } + } + + // Handle insert operations - use INSERT ON DUPLICATE KEY UPDATE + if len(insertRows) > 0 { + for _, rows := range insertRows { + sql, value := g.genInsertOnDuplicateUpdateSQL(startTs, commitTs, rows...) + sqls = append(sqls, sql) + args = append(args, value) + } + } + + return sqls, args, nil +} + +// groupRowsByType groups rows by their type (insert, update, delete) +func (g *SQLGenerator) groupRowsByType(events []*commonEvent.DMLEvent, tableInfo *common.TableInfo) ( + insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange, +) { + insertRow := make([]*sqlmodel.RowChange, 0) + updateRow := make([]*sqlmodel.RowChange, 0) + deleteRow := make([]*sqlmodel.RowChange, 0) + + for _, event := range events { + event.Rewind() + for { + row, ok := event.GetNextRow() + if !ok { + break + } + + switch row.RowType { + case commonEvent.RowTypeInsert: + args := g.getArgsWithGeneratedColumn(&row.Row, tableInfo) + newInsertRow := sqlmodel.NewRowChange( + &tableInfo.TableName, + nil, + nil, + args, + tableInfo, + nil, nil) + insertRow = append(insertRow, newInsertRow) + + case commonEvent.RowTypeUpdate: + args := g.getArgsWithGeneratedColumn(&row.Row, tableInfo) + preArgs := g.getArgsWithGeneratedColumn(&row.PreRow, tableInfo) + newUpdateRow := sqlmodel.NewRowChange( + &tableInfo.TableName, + nil, + preArgs, + args, + tableInfo, + nil, nil) + updateRow = append(updateRow, newUpdateRow) + + case commonEvent.RowTypeDelete: + preArgs := g.getArgsWithGeneratedColumn(&row.PreRow, tableInfo) + newDeleteRow := sqlmodel.NewRowChange( + &tableInfo.TableName, + nil, + preArgs, + nil, + tableInfo, + nil, nil) + deleteRow = append(deleteRow, newDeleteRow) + } + } + } + + // Group rows into batches + if len(insertRow) > 0 { + insertRows = append(insertRows, insertRow) + } + if len(updateRow) > 0 { + updateRows = append(updateRows, updateRow) + } + if len(deleteRow) > 0 { + deleteRows = append(deleteRows, deleteRow) + } + + return +} + +// genDeleteSQL generates DELETE SQL for multiple rows +func (g *SQLGenerator) genDeleteSQL(rows ...*sqlmodel.RowChange) (string, []interface{}) { + return sqlmodel.GenDeleteSQL(rows...) +} + +// genInsertOnDuplicateUpdateSQL generates INSERT ON DUPLICATE KEY UPDATE SQL +func (g *SQLGenerator) genInsertOnDuplicateUpdateSQL(startTs uint64, commitTs uint64, rows ...*sqlmodel.RowChange) (string, []interface{}) { + return sqlmodel.GenInsertSQLWithCommitTs(sqlmodel.DMLInsertOnDuplicateUpdate, startTs, commitTs, rows...) +} + +// getArgsWithGeneratedColumn extracts column values including generated columns +func (g *SQLGenerator) getArgsWithGeneratedColumn(row *chunk.Row, tableInfo *common.TableInfo) []interface{} { + args := make([]interface{}, 0, len(tableInfo.GetColumns())) + for i, col := range tableInfo.GetColumns() { + if col == nil { + continue + } + v := common.ExtractColVal(row, col, i) + args = append(args, v) + } + return args +} diff --git a/downstreamadapter/sink/txnsink/sql_generator_test.go b/downstreamadapter/sink/txnsink/sql_generator_test.go new file mode 100644 index 000000000..5da68aa9a --- /dev/null +++ b/downstreamadapter/sink/txnsink/sql_generator_test.go @@ -0,0 +1,378 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "strings" + "testing" + + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/stretchr/testify/require" +) + +func TestSQLGenerator_ConvertTxnGroupToSQL(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + // Create test DML events + event1 := helper.DML2Event("test", "t", "insert into t values (1, 'test1')") + event1.CommitTs = 100 + event1.StartTs = 50 + + event2 := helper.DML2Event("test", "t", "insert into t values (2, 'test2')") + event2.CommitTs = 100 + event2.StartTs = 50 + + // Create transaction group + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{event1, event2}, + } + + // Convert to SQL + txnSQL, err := generator.ConvertTxnGroupToSQL(txnGroup) + require.NoError(t, err) + require.NotNil(t, txnSQL) + + // Verify transaction structure + require.Equal(t, txnGroup, txnSQL.TxnGroup) + require.NotEmpty(t, txnSQL.SQL) + + // Verify SQL format: should start with BEGIN and end with COMMIT + sql := txnSQL.SQL + require.True(t, strings.HasPrefix(sql, "BEGIN;")) + require.True(t, strings.HasSuffix(sql, ";COMMIT;")) + + // Should contain INSERT ON DUPLICATE KEY UPDATE + require.Contains(t, sql, "INSERT INTO") + require.Contains(t, sql, "ON DUPLICATE KEY UPDATE") +} + +func TestSQLGenerator_ConvertTxnGroupToSQL_EmptyGroup(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + + // Create empty transaction group + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{}, + } + + // Convert to SQL + txnSQL, err := generator.ConvertTxnGroupToSQL(txnGroup) + require.NoError(t, err) + require.NotNil(t, txnSQL) + + // Should have empty SQL + require.Empty(t, txnSQL.SQL) +} + +func TestSQLGenerator_ConvertTxnGroupToSQL_MultiTable(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + // Setup test tables + helper.Tk().MustExec("use test") + createTableSQL1 := "create table t1 (id int primary key, name varchar(32));" + job1 := helper.DDL2Job(createTableSQL1) + require.NotNil(t, job1) + + createTableSQL2 := "create table t2 (id int primary key, age int);" + job2 := helper.DDL2Job(createTableSQL2) + require.NotNil(t, job2) + + // Create events for different tables + event1 := helper.DML2Event("test", "t1", "insert into t1 values (1, 'test1')") + event1.CommitTs = 100 + event1.StartTs = 50 + + event2 := helper.DML2Event("test", "t2", "insert into t2 values (1, 25)") + event2.CommitTs = 100 + event2.StartTs = 50 + + // Create transaction group + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{event1, event2}, + } + + // Convert to SQL + txnSQL, err := generator.ConvertTxnGroupToSQL(txnGroup) + require.NoError(t, err) + require.NotNil(t, txnSQL) + + // Should have one transaction SQL + require.NotEmpty(t, txnSQL.SQL) + sql := txnSQL.SQL + + // Should contain both tables + require.Contains(t, sql, "`test`.`t1`") + require.Contains(t, sql, "`test`.`t2`") + require.True(t, strings.HasPrefix(sql, "BEGIN;")) + require.True(t, strings.HasSuffix(sql, ";COMMIT;")) +} + +func TestSQLGenerator_groupRowsByType(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + // Create different types of events + insertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test1')") + updateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (2, 'test2')", "update t set name = 'updated' where id = 2") + deleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test3')", "delete from t where id = 3") + + events := []*commonEvent.DMLEvent{insertEvent, updateEvent, deleteEvent} + tableInfo := insertEvent.TableInfo + + // Group rows by type + insertRows, updateRows, deleteRows := generator.groupRowsByType(events, tableInfo) + + // Verify grouping + require.Len(t, insertRows, 1) // One batch of insert rows + require.Len(t, updateRows, 1) // One batch of update rows + require.Len(t, deleteRows, 1) // One batch of delete rows + + // Verify row counts in each batch + require.Len(t, insertRows[0], 1) // One insert row + require.Len(t, updateRows[0], 1) // One update row + require.Len(t, deleteRows[0], 1) // One delete row +} + +func TestSQLGenerator_generateTableSQL(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + // Create test events + insertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test1')") + deleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (2, 'test2')", "delete from t where id = 2") + + events := []*commonEvent.DMLEvent{insertEvent, deleteEvent} + + // Generate table SQL + sqls, args, err := generator.generateTableSQL(events) + require.NoError(t, err) + + // Should generate 2 SQL statements (one DELETE, one INSERT ON DUPLICATE KEY UPDATE) + require.Len(t, sqls, 2) + require.Len(t, args, 2) + + // Verify SQL types + foundDelete := false + foundInsert := false + + for _, sql := range sqls { + if strings.Contains(sql, "DELETE FROM") { + foundDelete = true + } + if strings.Contains(sql, "INSERT INTO") && strings.Contains(sql, "ON DUPLICATE KEY UPDATE") { + foundInsert = true + } + } + + require.True(t, foundDelete, "Should generate DELETE SQL") + require.True(t, foundInsert, "Should generate INSERT ON DUPLICATE KEY UPDATE SQL") +} + +func TestSQLGenerator_generateTableSQL_EmptyEvents(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + + // Test with empty events + sqls, args, err := generator.generateTableSQL([]*commonEvent.DMLEvent{}) + require.NoError(t, err) + require.Len(t, sqls, 0) + require.Len(t, args, 0) +} + +func TestSQLGenerator_getArgsWithGeneratedColumn(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + // Create test event + event := helper.DML2Event("test", "t", "insert into t values (1, 'test')") + tableInfo := event.TableInfo + + // Get first row + event.Rewind() + row, ok := event.GetNextRow() + require.True(t, ok) + + // Extract arguments + args := generator.getArgsWithGeneratedColumn(&row.Row, tableInfo) + + // Should extract arguments for all columns + require.Equal(t, len(tableInfo.GetColumns()), len(args)) +} + +// Test mixed operations in the same transaction +func TestSQLGenerator_MixedOperations(t *testing.T) { + t.Parallel() + + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + + // Create mixed events: insert, update, delete + insertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test1')") + updateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (2, 'test2')", "update t set name = 'updated' where id = 2") + deleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test3')", "delete from t where id = 3") + + // Set same transaction timestamps + insertEvent.CommitTs = 100 + insertEvent.StartTs = 50 + updateEvent.CommitTs = 100 + updateEvent.StartTs = 50 + deleteEvent.CommitTs = 100 + deleteEvent.StartTs = 50 + + // Create transaction group + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: []*commonEvent.DMLEvent{insertEvent, updateEvent, deleteEvent}, + } + + // Convert to SQL + txnSQL, err := generator.ConvertTxnGroupToSQL(txnGroup) + require.NoError(t, err) + require.NotNil(t, txnSQL) + + // Should have one transaction SQL + require.NotEmpty(t, txnSQL.SQL) + sql := txnSQL.SQL + + // Verify transaction format + require.True(t, strings.HasPrefix(sql, "BEGIN;")) + require.True(t, strings.HasSuffix(sql, ";COMMIT;")) + + // Should contain all operation types + require.Contains(t, sql, "DELETE FROM") + require.Contains(t, sql, "INSERT INTO") + require.Contains(t, sql, "ON DUPLICATE KEY UPDATE") +} + +// Benchmark tests for SQL generation +func BenchmarkSQLGenerator_ConvertTxnGroupToSQL(b *testing.B) { + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(b) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(b, job) + + // Create multiple events + events := make([]*commonEvent.DMLEvent, 100) + for i := 0; i < 100; i++ { + events[i] = helper.DML2Event("test", "t", "insert into t values (?, 'test')") + events[i].CommitTs = 100 + events[i].StartTs = 50 + } + + txnGroup := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: events, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := generator.ConvertTxnGroupToSQL(txnGroup) + require.NoError(b, err) + } +} + +func BenchmarkSQLGenerator_groupRowsByType(b *testing.B) { + generator := NewSQLGenerator() + helper := commonEvent.NewEventTestHelper(b) + defer helper.Close() + + // Setup test table + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(b, job) + + // Create mixed events + events := make([]*commonEvent.DMLEvent, 1000) + for i := 0; i < 1000; i++ { + switch i % 3 { + case 0: + events[i] = helper.DML2Event("test", "t", "insert into t values (?, 'test')") + case 1: + events[i], _ = helper.DML2UpdateEvent("test", "t", "insert into t values (?, 'test')", "update t set name = 'updated' where id = ?") + case 2: + events[i] = helper.DML2DeleteEvent("test", "t", "insert into t values (?, 'test')", "delete from t where id = ?") + } + } + + tableInfo := events[0].TableInfo + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, _ = generator.groupRowsByType(events, tableInfo) + } +} diff --git a/downstreamadapter/sink/txnsink/types.go b/downstreamadapter/sink/txnsink/types.go new file mode 100644 index 000000000..10f54cf92 --- /dev/null +++ b/downstreamadapter/sink/txnsink/types.go @@ -0,0 +1,491 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "context" + "encoding/binary" + "hash/fnv" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/downstreamadapter/sink/mysql/causality" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/utils/chann" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +// TxnStore represents the in-memory store for DML events +// Structure: map[commitTs]map[startTs][]DMLEvent +type TxnStore struct { + store map[uint64]map[uint64][]*commonEvent.DMLEvent + mu sync.RWMutex +} + +// NewTxnStore creates a new TxnStore instance +func NewTxnStore() *TxnStore { + return &TxnStore{ + store: make(map[uint64]map[uint64][]*commonEvent.DMLEvent), + } +} + +// AddEvent adds a DML event to the store +func (ts *TxnStore) AddEvent(event *commonEvent.DMLEvent) { + ts.mu.Lock() + defer ts.mu.Unlock() + + commitTs := event.CommitTs + startTs := event.StartTs + + if ts.store[commitTs] == nil { + ts.store[commitTs] = make(map[uint64][]*commonEvent.DMLEvent) + } + ts.store[commitTs][startTs] = append(ts.store[commitTs][startTs], event) +} + +// GetEventsByCheckpointTs retrieves all events with commitTs <= checkpointTs +// Returns txnGroups sorted by commitTs in ascending order +func (ts *TxnStore) GetEventsByCheckpointTs(checkpointTs uint64) []*TxnGroup { + ts.mu.Lock() + defer ts.mu.Unlock() + + var groups []*TxnGroup + for commitTs, startTsMap := range ts.store { + if commitTs <= checkpointTs { + for startTs, events := range startTsMap { + groups = append(groups, &TxnGroup{ + CommitTs: commitTs, + StartTs: startTs, + Events: events, + }) + } + } + } + + // Sort groups by commitTs in ascending order + sort.Slice(groups, func(i, j int) bool { + return groups[i].CommitTs < groups[j].CommitTs + }) + + return groups +} + +// RemoveEventsByCheckpointTs removes all events with commitTs <= checkpointTs +func (ts *TxnStore) RemoveEventsByCheckpointTs(checkpointTs uint64) { + ts.mu.Lock() + defer ts.mu.Unlock() + + for commitTs := range ts.store { + if commitTs <= checkpointTs { + delete(ts.store, commitTs) + } + } +} + +// TxnGroup represents a complete transaction +type TxnGroup struct { + CommitTs uint64 + StartTs uint64 + Events []*commonEvent.DMLEvent + + // PostFlushFuncs are functions to be executed after the transaction is flushed + PostFlushFuncs []func() +} + +// GetTxnKey returns a unique key for the transaction +func (tg *TxnGroup) GetTxnKey() string { + return strconv.FormatUint(tg.CommitTs, 10) + "_" + strconv.FormatUint(tg.StartTs, 10) +} + +// ExtractKeys extracts all affected keys from the transaction +func (tg *TxnGroup) ExtractKeys() map[string]struct{} { + keys := make(map[string]struct{}) + for _, event := range tg.Events { + for _, rowKey := range event.RowKeys { + keys[string(rowKey)] = struct{}{} + } + } + return keys +} + +// AddPostFlushFunc adds a function to be executed after the transaction is flushed +func (tg *TxnGroup) AddPostFlushFunc(f func()) { + tg.PostFlushFuncs = append(tg.PostFlushFuncs, f) +} + +// PostFlush executes all post-flush functions +func (tg *TxnGroup) PostFlush() { + for _, f := range tg.PostFlushFuncs { + f() + } +} + +// TxnSQL represents the SQL statements for a transaction +type TxnSQL struct { + TxnGroup *TxnGroup + SQL string + Args []interface{} + Keys map[string]struct{} +} + +// BlockStrategy is the strategy to handle the situation when the cache is full. +type BlockStrategy string + +const ( + // BlockStrategyWaitAvailable means the cache will block until there is an available slot. + BlockStrategyWaitAvailable BlockStrategy = "waitAvailable" + // BlockStrategyWaitEmpty means the cache will block until all cached txns are consumed. + BlockStrategyWaitEmpty = "waitEmpty" +) + +// TxnCacheOption is the option for creating a cache for resolved txns. +type TxnCacheOption struct { + // Count controls the number of caches, txns in different caches could be executed concurrently. + Count int + // Size controls the max number of txns a cache can hold. + Size int + // BlockStrategy controls the strategy when the cache is full. + BlockStrategy BlockStrategy +} + +// txnCache interface for TxnGroup +type txnCache interface { + // addTxnGroup adds a txn group to the Cache. + addTxnGroup(txnGroup *TxnGroup) bool + // out returns a unlimited channel to receive txn groups which are ready to be executed. + out() *chann.UnlimitedChannel[*TxnGroup, any] +} + +// boundedTxnCache is a cache which has a limit on the number of txn groups it can hold. +type boundedTxnCache struct { + ch *chann.UnlimitedChannel[*TxnGroup, any] + upperSize int +} + +func (w *boundedTxnCache) addTxnGroup(txnGroup *TxnGroup) bool { + if w.ch.Len() > w.upperSize { + return false + } + w.ch.Push(txnGroup) + return true +} + +func (w *boundedTxnCache) out() *chann.UnlimitedChannel[*TxnGroup, any] { + return w.ch +} + +// boundedTxnCacheWithBlock is a special bounded cache. Once the cache +// is full, it will block until all cached txn groups are consumed. +type boundedTxnCacheWithBlock struct { + ch *chann.UnlimitedChannel[*TxnGroup, any] + isBlocked atomic.Bool + upperSize int +} + +func (w *boundedTxnCacheWithBlock) addTxnGroup(txnGroup *TxnGroup) bool { + if w.isBlocked.Load() && w.ch.Len() <= 0 { + w.isBlocked.Store(false) + } + + if !w.isBlocked.Load() { + if w.ch.Len() > w.upperSize { + w.isBlocked.CompareAndSwap(false, true) + return false + } + w.ch.Push(txnGroup) + return true + } + return false +} + +func (w *boundedTxnCacheWithBlock) out() *chann.UnlimitedChannel[*TxnGroup, any] { + return w.ch +} + +func newTxnCache(opt TxnCacheOption) txnCache { + if opt.Size <= 0 { + log.Panic("TxnCacheOption.Size should be greater than 0, please report a bug") + } + + switch opt.BlockStrategy { + case BlockStrategyWaitAvailable: + return &boundedTxnCache{ch: chann.NewUnlimitedChannel[*TxnGroup, any](nil, nil), upperSize: opt.Size} + case BlockStrategyWaitEmpty: + return &boundedTxnCacheWithBlock{ch: chann.NewUnlimitedChannel[*TxnGroup, any](nil, nil), upperSize: opt.Size} + default: + return nil + } +} + +// ConflictKeysForTxnGroup generates conflict keys for a transaction group +func ConflictKeysForTxnGroup(txnGroup *TxnGroup) []uint64 { + if len(txnGroup.Events) == 0 { + return nil + } + + hashRes := make(map[uint64]struct{}) + hasher := fnv.New32a() + + // Iterate through all events in the transaction group + for _, event := range txnGroup.Events { + // Iterate through all rows in the event + event.Rewind() + for { + rowChange, ok := event.GetNextRow() + if !ok { + break + } + + // Generate keys for each row + keys := genRowKeysForTxnGroup(rowChange, event.TableInfo, event.DispatcherID) + for _, key := range keys { + if n, err := hasher.Write(key); n != len(key) || err != nil { + log.Panic("transaction key hash fail") + } + hashRes[uint64(hasher.Sum32())] = struct{}{} + hasher.Reset() + } + } + event.Rewind() + } + + keys := make([]uint64, 0, len(hashRes)) + for key := range hashRes { + keys = append(keys, key) + } + return keys +} + +// genRowKeysForTxnGroup generates row keys for a row change in transaction group +func genRowKeysForTxnGroup(rowChange commonEvent.RowChange, tableInfo *common.TableInfo, dispatcherID common.DispatcherID) [][]byte { + var keys [][]byte + + if !rowChange.Row.IsEmpty() { + for iIdx, idxColID := range tableInfo.GetIndexColumns() { + key := genKeyListForTxnGroup(&rowChange.Row, iIdx, idxColID, dispatcherID, tableInfo) + if len(key) == 0 { + continue + } + keys = append(keys, key) + } + } + if !rowChange.PreRow.IsEmpty() { + for iIdx, idxColID := range tableInfo.GetIndexColumns() { + key := genKeyListForTxnGroup(&rowChange.PreRow, iIdx, idxColID, dispatcherID, tableInfo) + if len(key) == 0 { + continue + } + keys = append(keys, key) + } + } + if len(keys) == 0 { + // use dispatcherID as key if no key generated (no PK/UK), + // no concurrence for rows in the same dispatcher. + log.Debug("Use dispatcherID as the key", zap.Any("dispatcherID", dispatcherID)) + tableKey := make([]byte, 8) + binary.BigEndian.PutUint64(tableKey, uint64(dispatcherID.GetLow())) + keys = [][]byte{tableKey} + } + return keys +} + +// genKeyListForTxnGroup generates a key list for a row in transaction group +func genKeyListForTxnGroup( + row *chunk.Row, iIdx int, idxColID []int64, dispatcherID common.DispatcherID, tableInfo *common.TableInfo, +) []byte { + var key []byte + for _, colID := range idxColID { + info, ok := tableInfo.GetColumnInfo(colID) + // If the index contain generated column, we can't use this key to detect conflict with other DML, + if !ok || info == nil || info.IsGenerated() { + return nil + } + offset, ok := tableInfo.GetRowColumnsOffset()[colID] + if !ok { + log.Warn("can't find column offset", zap.Int64("colID", colID), zap.String("colName", info.Name.O)) + return nil + } + value := common.ExtractColVal(row, info, offset) + // if a column value is null, we can ignore this index + if value == nil { + return nil + } + + val := common.ColumnValueString(value) + if columnNeeds2LowerCase(info.GetType(), info.GetCollate()) { + val = strings.ToLower(val) + } + + key = append(key, []byte(val)...) + key = append(key, 0) + } + if len(key) == 0 { + return nil + } + tableKey := make([]byte, 16) + binary.BigEndian.PutUint64(tableKey[:8], uint64(iIdx)) + binary.BigEndian.PutUint64(tableKey[8:], dispatcherID.GetLow()) + key = append(key, tableKey...) + return key +} + +// columnNeeds2LowerCase checks if a column needs to be converted to lowercase +func columnNeeds2LowerCase(mysqlType byte, collation string) bool { + switch mysqlType { + case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, + mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob: + return collationNeeds2LowerCase(collation) + } + return false +} + +// collationNeeds2LowerCase checks if a collation needs to be converted to lowercase +func collationNeeds2LowerCase(collation string) bool { + return strings.HasSuffix(collation, "_ci") +} + +// ConflictDetector manages transaction conflicts for parallel processing +type ConflictDetector struct { + // resolvedTxnCaches are used to cache resolved transactions. + resolvedTxnCaches []txnCache + + // slots are used to find all unfinished transactions + // conflicting with an incoming transactions. + slots *causality.Slots + + // nextCacheID is used to dispatch transactions round-robin. + nextCacheID atomic.Int64 + + notifiedNodes *chann.DrainableChann[func()] + + changefeedID common.ChangeFeedID + metricConflictDetectDuration prometheus.Observer +} + +// NewConflictDetector creates a new ConflictDetector instance +func NewConflictDetector(changefeedID common.ChangeFeedID, maxConcurrentTxns int) *ConflictDetector { + opt := TxnCacheOption{ + Count: maxConcurrentTxns, // Default worker count + Size: 1024, + BlockStrategy: BlockStrategyWaitEmpty, + } + + ret := &ConflictDetector{ + resolvedTxnCaches: make([]txnCache, opt.Count), + slots: causality.NewSlots(16 * 1024), // Default slot count + notifiedNodes: chann.NewAutoDrainChann[func()](), + metricConflictDetectDuration: metrics.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()), + changefeedID: changefeedID, + } + for i := 0; i < opt.Count; i++ { + ret.resolvedTxnCaches[i] = newTxnCache(opt) + } + log.Info("txn conflict detector initialized", zap.Int("cacheCount", opt.Count), + zap.Int("cacheSize", opt.Size), zap.String("BlockStrategy", string(opt.BlockStrategy))) + return ret +} + +// AddTxnGroup adds a transaction group to the conflict detector +func (cd *ConflictDetector) AddTxnGroup(txnGroup *TxnGroup) { + start := time.Now() + hashes := ConflictKeysForTxnGroup(txnGroup) + node := cd.slots.AllocNode(hashes) + + txnGroup.AddPostFlushFunc(func() { + cd.slots.Remove(node) + }) + + node.TrySendToTxnCache = func(cacheID int64) bool { + // Try sending this txn group to related cache as soon as all dependencies are resolved. + ok := cd.sendTxnGroupToCache(txnGroup, cacheID) + if ok { + cd.metricConflictDetectDuration.Observe(time.Since(start).Seconds()) + } + return ok + } + node.RandCacheID = func() int64 { + return cd.nextCacheID.Add(1) % int64(len(cd.resolvedTxnCaches)) + } + node.OnNotified = func(callback func()) { + // TODO:find a better way to handle the panic + defer func() { + if r := recover(); r != nil { + log.Warn("failed to send notification, channel might be closed", zap.Any("error", r)) + } + }() + cd.notifiedNodes.In() <- callback + } + cd.slots.Add(node) +} + +// GetOutChByCacheID returns the output channel by cacheID +func (cd *ConflictDetector) GetOutChByCacheID(id int) *chann.UnlimitedChannel[*TxnGroup, any] { + return cd.resolvedTxnCaches[id].out() +} + +// Run starts the conflict detector +func (cd *ConflictDetector) Run(ctx context.Context) error { + defer func() { + metrics.ConflictDetectDuration.DeleteLabelValues(cd.changefeedID.Namespace(), cd.changefeedID.Name()) + cd.closeCache() + }() + + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case notifyCallback := <-cd.notifiedNodes.Out(): + if notifyCallback != nil { + notifyCallback() + } + } + } +} + +// Close closes the conflict detector +func (cd *ConflictDetector) Close() { + cd.notifiedNodes.CloseAndDrain() +} + +// sendTxnGroupToCache should not call txn.Callback if it returns an error. +func (cd *ConflictDetector) sendTxnGroupToCache(txnGroup *TxnGroup, id int64) bool { + cache := cd.resolvedTxnCaches[id] + ok := cache.addTxnGroup(txnGroup) + return ok +} + +func (cd *ConflictDetector) closeCache() { + // the unlimited channel should be closed when quit wait group, otherwise txnWorker will be blocked + for _, cache := range cd.resolvedTxnCaches { + cache.out().Close() + } +} + +// TxnSinkConfig represents the configuration for txnSink +type TxnSinkConfig struct { + MaxConcurrentTxns int + BatchSize int + MaxSQLBatchSize int // maximum size of SQL batch in bytes +} diff --git a/downstreamadapter/sink/txnsink/types_test.go b/downstreamadapter/sink/txnsink/types_test.go new file mode 100644 index 000000000..e707bc6a7 --- /dev/null +++ b/downstreamadapter/sink/txnsink/types_test.go @@ -0,0 +1,416 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "sync" + "testing" + + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/stretchr/testify/require" +) + +func TestTxnStore_AddEvent(t *testing.T) { + t.Parallel() + + store := NewTxnStore() + + // Create test event + event := &commonEvent.DMLEvent{ + CommitTs: 100, + StartTs: 50, + Length: 1, + } + + // Add event to store + store.AddEvent(event) + + // Verify event is stored correctly + store.mu.Lock() + defer store.mu.Unlock() + + require.Contains(t, store.store, uint64(100)) + require.Contains(t, store.store[100], uint64(50)) + require.Len(t, store.store[100][50], 1) + require.Equal(t, event, store.store[100][50][0]) +} + +func TestTxnStore_AddMultipleEvents(t *testing.T) { + t.Parallel() + + store := NewTxnStore() + + // Create multiple events with same commitTs but different startTs + event1 := &commonEvent.DMLEvent{CommitTs: 100, StartTs: 50} + event2 := &commonEvent.DMLEvent{CommitTs: 100, StartTs: 60} + event3 := &commonEvent.DMLEvent{CommitTs: 100, StartTs: 50} // Same as event1 + + store.AddEvent(event1) + store.AddEvent(event2) + store.AddEvent(event3) + + store.mu.Lock() + defer store.mu.Unlock() + + // Should have one commitTs entry + require.Len(t, store.store, 1) + require.Contains(t, store.store, uint64(100)) + + // Should have two startTs entries under commitTs 100 + require.Len(t, store.store[100], 2) + require.Contains(t, store.store[100], uint64(50)) + require.Contains(t, store.store[100], uint64(60)) + + // Should have two events under startTs 50 + require.Len(t, store.store[100][50], 2) + require.Equal(t, event1, store.store[100][50][0]) + require.Equal(t, event3, store.store[100][50][1]) + + // Should have one event under startTs 60 + require.Len(t, store.store[100][60], 1) + require.Equal(t, event2, store.store[100][60][0]) +} + +func TestTxnStore_GetEventsByCheckpointTs(t *testing.T) { + t.Parallel() + + store := NewTxnStore() + + // Add events with different commitTs + events := []*commonEvent.DMLEvent{ + {CommitTs: 50, StartTs: 10}, + {CommitTs: 100, StartTs: 20}, + {CommitTs: 150, StartTs: 30}, + {CommitTs: 200, StartTs: 40}, + } + + for _, event := range events { + store.AddEvent(event) + } + + // Test getting events with checkpoint 100 + groups := store.GetEventsByCheckpointTs(100) + + // Should return 2 groups (commitTs 50 and 100) + require.Len(t, groups, 2) + + // Verify the groups + commitTsSet := make(map[uint64]bool) + for _, group := range groups { + commitTsSet[group.CommitTs] = true + require.True(t, group.CommitTs <= 100) + require.Len(t, group.Events, 1) + } + + require.True(t, commitTsSet[50]) + require.True(t, commitTsSet[100]) +} + +func TestTxnStore_RemoveEventsByCheckpointTs(t *testing.T) { + t.Parallel() + + store := NewTxnStore() + + // Add events with different commitTs + events := []*commonEvent.DMLEvent{ + {CommitTs: 50, StartTs: 10}, + {CommitTs: 100, StartTs: 20}, + {CommitTs: 150, StartTs: 30}, + {CommitTs: 200, StartTs: 40}, + } + + for _, event := range events { + store.AddEvent(event) + } + + // Remove events with checkpoint 100 + store.RemoveEventsByCheckpointTs(100) + + store.mu.Lock() + defer store.mu.Unlock() + + // Should only have commitTs 150 and 200 remaining + require.Len(t, store.store, 2) + require.Contains(t, store.store, uint64(150)) + require.Contains(t, store.store, uint64(200)) + require.NotContains(t, store.store, uint64(50)) + require.NotContains(t, store.store, uint64(100)) +} + +func TestTxnStore_ConcurrentAccess(t *testing.T) { + t.Parallel() + + store := NewTxnStore() + var wg sync.WaitGroup + + // Concurrent writes + numWriters := 10 + eventsPerWriter := 100 + + for i := 0; i < numWriters; i++ { + wg.Add(1) + go func(writerID int) { + defer wg.Done() + for j := 0; j < eventsPerWriter; j++ { + event := &commonEvent.DMLEvent{ + CommitTs: uint64(writerID*eventsPerWriter + j), + StartTs: uint64(j), + } + store.AddEvent(event) + } + }(i) + } + + // Concurrent reads + numReaders := 5 + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 50; j++ { + _ = store.GetEventsByCheckpointTs(uint64(j * 10)) + } + }() + } + + wg.Wait() + + // Verify final state + store.mu.Lock() + defer store.mu.Unlock() + require.Len(t, store.store, numWriters*eventsPerWriter) +} + +func TestTxnGroup_GetTxnKey(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + commitTs uint64 + startTs uint64 + expected string + }{ + { + name: "normal case", + commitTs: 123, + startTs: 456, + expected: "123_456", + }, + { + name: "zero values", + commitTs: 0, + startTs: 0, + expected: "0_0", + }, + { + name: "large values", + commitTs: 18446744073709551615, // max uint64 + startTs: 18446744073709551614, + expected: "18446744073709551615_18446744073709551614", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + group := &TxnGroup{ + CommitTs: tt.commitTs, + StartTs: tt.startTs, + } + + result := group.GetTxnKey() + require.Equal(t, tt.expected, result) + }) + } +} + +func TestTxnGroup_ExtractKeys(t *testing.T) { + t.Parallel() + + // Create test events with row keys + events := []*commonEvent.DMLEvent{ + { + RowKeys: [][]byte{ + []byte("key1"), + []byte("key2"), + }, + }, + { + RowKeys: [][]byte{ + []byte("key2"), // Duplicate key + []byte("key3"), + }, + }, + } + + group := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: events, + } + + keys := group.ExtractKeys() + + // Should have 3 unique keys + require.Len(t, keys, 3) + require.Contains(t, keys, "key1") + require.Contains(t, keys, "key2") + require.Contains(t, keys, "key3") +} + +func TestTxnGroup_PostFlushFuncs(t *testing.T) { + t.Parallel() + + group := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + } + + var executed []int + + // Add post-flush functions + group.AddPostFlushFunc(func() { + executed = append(executed, 1) + }) + group.AddPostFlushFunc(func() { + executed = append(executed, 2) + }) + group.AddPostFlushFunc(func() { + executed = append(executed, 3) + }) + + // Execute post-flush functions + group.PostFlush() + + // Verify all functions were executed in order + require.Equal(t, []int{1, 2, 3}, executed) +} + +func TestTxnSinkConfig_Defaults(t *testing.T) { + t.Parallel() + + config := &TxnSinkConfig{} + + // Test default values are reasonable + require.Equal(t, 0, config.MaxConcurrentTxns) + require.Equal(t, 0, config.BatchSize) + require.Equal(t, 0, config.FlushInterval) +} + +func TestConflictDetector_Creation(t *testing.T) { + t.Parallel() + + changefeedID := common.NewChangefeedID4Test("test", "test") + detector := NewConflictDetector(changefeedID, 16) + + require.NotNil(t, detector) + require.NotNil(t, detector.resolvedTxnCaches) + require.NotNil(t, detector.slots) + require.NotNil(t, detector.notifiedNodes) + require.NotNil(t, detector.metricConflictDetectDuration) + require.Equal(t, changefeedID, detector.changefeedID) +} + +func TestConflictDetector_GetOutChByCacheID(t *testing.T) { + t.Parallel() + + changefeedID := common.NewChangefeedID4Test("test", "test") + detector := NewConflictDetector(changefeedID, 16) + + // Test valid cache ID + ch := detector.GetOutChByCacheID(0) + require.NotNil(t, ch) + + // Test another valid cache ID + ch2 := detector.GetOutChByCacheID(1) + require.NotNil(t, ch2) + + // Channels should be different + require.NotEqual(t, ch, ch2) + + // Test invalid cache ID (should still return a channel but may be nil depending on implementation) + ch3 := detector.GetOutChByCacheID(999) + // The behavior here depends on the underlying causality implementation + // We just verify it doesn't panic + _ = ch3 +} + +// Helper function to create a test DML event +func createTestDMLEvent(commitTs, startTs uint64, tableID int64) *commonEvent.DMLEvent { + return &commonEvent.DMLEvent{ + CommitTs: commitTs, + StartTs: startTs, + Length: 1, + RowKeys: [][]byte{[]byte("test_key")}, + } +} + +// Benchmark tests +func BenchmarkTxnStore_AddEvent(b *testing.B) { + store := NewTxnStore() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + event := &commonEvent.DMLEvent{ + CommitTs: uint64(i % 1000), // Reuse some commitTs values + StartTs: uint64(i), + } + store.AddEvent(event) + } +} + +func BenchmarkTxnStore_GetEventsByCheckpointTs(b *testing.B) { + store := NewTxnStore() + + // Pre-populate store + for i := 0; i < 10000; i++ { + event := &commonEvent.DMLEvent{ + CommitTs: uint64(i), + StartTs: uint64(i), + } + store.AddEvent(event) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = store.GetEventsByCheckpointTs(uint64(i % 5000)) + } +} + +func BenchmarkTxnGroup_ExtractKeys(b *testing.B) { + // Create a transaction group with many events + events := make([]*commonEvent.DMLEvent, 100) + for i := 0; i < 100; i++ { + events[i] = &commonEvent.DMLEvent{ + RowKeys: [][]byte{ + []byte("key_" + string(rune(i%10))), // Some duplicate keys + []byte("unique_key_" + string(rune(i))), + }, + } + } + + group := &TxnGroup{ + CommitTs: 100, + StartTs: 50, + Events: events, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = group.ExtractKeys() + } +} diff --git a/downstreamadapter/sink/txnsink/worker.go b/downstreamadapter/sink/txnsink/worker.go new file mode 100644 index 000000000..2c543a1fc --- /dev/null +++ b/downstreamadapter/sink/txnsink/worker.go @@ -0,0 +1,371 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsink + +import ( + "context" + "database/sql" + "strconv" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/utils/chann" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// Worker represents a worker that processes transaction groups and executes SQL +type Worker struct { + workerID int + changefeedID common.ChangeFeedID + config *TxnSinkConfig + + // Core components + sqlGenerator *SQLGenerator + dbExecutor *DBExecutor + progressTracker *ProgressTracker + + // Channels + inputCh *chann.UnlimitedChannel[*TxnGroup, any] + sqlChan *chann.UnlimitedChannel[*TxnSQL, any] // Simple FIFO channel for SQL batching + + // Statistics + statistics *metrics.Statistics + + // Monitoring metrics + workerFlushDuration prometheus.Observer + workerTotalDuration prometheus.Observer + workerHandledRows prometheus.Counter +} + +// NewWorker creates a new worker instance +func NewWorker( + workerID int, + changefeedID common.ChangeFeedID, + config *TxnSinkConfig, + db *sql.DB, + inputCh *chann.UnlimitedChannel[*TxnGroup, any], + progressTracker *ProgressTracker, + statistics *metrics.Statistics, +) *Worker { + // Create unlimited channel for SQL batching + sqlChan := chann.NewUnlimitedChannel[*TxnSQL, any]( + nil, // No grouping function needed + func(txnSQL *TxnSQL) int { + // Calculate SQL size for batching + return len(txnSQL.SQL) + }, + ) + + // Initialize monitoring metrics + namespace := changefeedID.Namespace() + changefeed := changefeedID.Name() + workerIDStr := strconv.Itoa(workerID) + + return &Worker{ + workerID: workerID, + changefeedID: changefeedID, + config: config, + sqlGenerator: NewSQLGenerator(), + dbExecutor: NewDBExecutor(db), + progressTracker: progressTracker, + inputCh: inputCh, + sqlChan: sqlChan, + statistics: statistics, + workerFlushDuration: metrics.WorkerFlushDuration.WithLabelValues(namespace, changefeed, workerIDStr), + workerTotalDuration: metrics.WorkerTotalDuration.WithLabelValues(namespace, changefeed, workerIDStr), + workerHandledRows: metrics.WorkerHandledRows.WithLabelValues(namespace, changefeed, workerIDStr), + } +} + +// Run starts the worker processing +func (w *Worker) Run(ctx context.Context) error { + namespace := w.changefeedID.Namespace() + changefeed := w.changefeedID.Name() + + log.Info("txnSink: starting worker", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID)) + + // Start multiple goroutines for different responsibilities + eg, ctx := errgroup.WithContext(ctx) + + // Start transaction processor (converts TxnGroup to SQL) + eg.Go(func() error { + return w.processTransactions(ctx) + }) + + // Start SQL executor (executes SQL batches) + eg.Go(func() error { + return w.executeSQLBatches(ctx) + }) + + err := eg.Wait() + if err != nil { + log.Error("txnSink: worker stopped with error", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Error(err)) + return err + } + + log.Info("txnSink: worker stopped normally", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID)) + + return nil +} + +// processTransactions processes transaction groups from input channel and converts them to SQL +func (w *Worker) processTransactions(ctx context.Context) error { + namespace := w.changefeedID.Namespace() + changefeed := w.changefeedID.Name() + + log.Info("hyy processTransactions") + + buffer := make([]*TxnGroup, 0, w.config.BatchSize) + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + default: + // Get multiple txn groups from the channel + txnGroups, ok := w.inputCh.GetMultipleNoGroup(buffer) + if !ok { + return errors.Trace(ctx.Err()) + } + log.Info("hyy get txn group", zap.Int("txnGroupSize", len(txnGroups))) + + if len(txnGroups) == 0 { + buffer = buffer[:0] + continue + } + + // Process each txn group + for _, txnGroup := range txnGroups { + if len(txnGroup.Events) == 0 { + continue + } + if err := w.processTxnGroup(txnGroup); err != nil { + log.Error("txnSink: failed to process transaction group", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Uint64("commitTs", txnGroup.CommitTs), + zap.Uint64("startTs", txnGroup.StartTs), + zap.Error(err)) + return err + } + } + + buffer = buffer[:0] + } + } +} + +// processTxnGroup converts a transaction group to SQL and pushes to sqlChan +func (w *Worker) processTxnGroup(txnGroup *TxnGroup) error { + // Convert to SQL + txnSQL, err := w.sqlGenerator.ConvertTxnGroupToSQL(txnGroup) + if err != nil { + return err + } + + // Push to worker's own sqlChan + w.sqlChan.Push(txnSQL) + + return nil +} + +// executeSQLBatches processes SQL batches from the SQL channel and executes them +func (w *Worker) executeSQLBatches(ctx context.Context) error { + namespace := w.changefeedID.Namespace() + changefeed := w.changefeedID.Name() + + log.Info("txnSink: starting SQL batch executor", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID)) + + buffer := make([]*TxnSQL, 0, w.config.BatchSize) + totalStart := time.Now() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Get multiple SQLs from the channel (no grouping needed) + sqlBatch, ok := w.sqlChan.GetMultipleNoGroup(buffer, w.config.MaxSQLBatchSize) + if !ok { + return nil + } + + if len(sqlBatch) == 0 { + buffer = buffer[:0] + continue + } + + batch := make([]*TxnSQL, 0, w.config.BatchSize) + currentBatchSize := 0 + + log.Info("txnSink: got sql batch", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Int("sqlBatchSize", len(sqlBatch)), + zap.Int("batchSize", len(batch)), + zap.Int("currentBatchSize", currentBatchSize)) + + // Process each SQL in the batch, respecting size and count limits + for _, txnSQL := range sqlBatch { + // Calculate SQL size for this transaction + sqlSize := w.calculateSQLSize(txnSQL) + + // Check if adding this SQL would exceed batch size limit + if len(batch) > 0 && (currentBatchSize+sqlSize > w.config.MaxSQLBatchSize || len(batch) >= w.config.BatchSize) { + // Execute current batch before adding new SQL + if err := w.executeSQLBatch(batch); err != nil { + log.Error("txnSink: failed to execute SQL batch", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Error(err)) + return err + } + + // Reset batch + batch = batch[:0] + currentBatchSize = 0 + } + + // Add SQL to batch + log.Debug("txnSink: add sql to batch", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Int("batchSize", len(batch)), + zap.Int("currentBatchSize", currentBatchSize)) + batch = append(batch, txnSQL) + currentBatchSize += sqlSize + } + + if len(batch) > 0 { + if err := w.executeSQLBatch(batch); err != nil { + log.Error("txnSink: failed to execute SQL batch", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Error(err)) + return err + } + } + + // Record total duration for worker busy ratio calculation + w.workerTotalDuration.Observe(time.Since(totalStart).Seconds()) + totalStart = time.Now() + buffer = buffer[:0] + } + } +} + +// calculateSQLSize calculates the total size of SQL statements in a transaction +func (w *Worker) calculateSQLSize(txnSQL *TxnSQL) int { + return len(txnSQL.SQL) +} + +// executeSQLBatch executes a batch of SQL transactions with monitoring metrics +func (w *Worker) executeSQLBatch(batch []*TxnSQL) error { + namespace := w.changefeedID.Namespace() + changefeed := w.changefeedID.Name() + + log.Debug("txnSink: execute sql batch", + zap.String("namespace", namespace), + zap.String("changefeed", changefeed), + zap.Int("workerID", w.workerID), + zap.Int("batchSize", len(batch))) + + if len(batch) == 0 { + return nil + } + + // Calculate total row count for this batch + totalRowCount := 0 + for _, txnSQL := range batch { + for _, event := range txnSQL.TxnGroup.Events { + totalRowCount += int(event.Len()) + } + } + + // Record batch size metric + metrics.TxnSinkBatchSize.WithLabelValues(namespace, changefeed).Observe(float64(len(batch))) + + // Record batch execution with monitoring + start := time.Now() + err := w.statistics.RecordBatchExecution(func() (int, int64, error) { + execErr := w.dbExecutor.ExecuteSQLBatch(batch) + if execErr != nil { + return 0, 0, execErr + } + // Return row count and approximate size (using SQL length as approximation) + approximateSize := int64(0) + for _, txnSQL := range batch { + for _, event := range txnSQL.TxnGroup.Events { + approximateSize += int64(event.GetSize()) + } + } + return totalRowCount, approximateSize, nil + }) + + if err != nil { + return err + } + + // Record flush duration and handled rows + w.workerFlushDuration.Observe(time.Since(start).Seconds()) + w.workerHandledRows.Add(float64(totalRowCount)) + + // Record batch duration + metrics.TxnSinkBatchDuration.WithLabelValues(namespace, changefeed).Observe(time.Since(start).Seconds()) + + // Update flushed progress for all transactions in the batch + for _, txnSQL := range batch { + txnSQL.TxnGroup.PostFlush() + w.progressTracker.RemoveCompletedTxn(txnSQL.TxnGroup.CommitTs, txnSQL.TxnGroup.StartTs) + } + + return nil +} + +// Close closes the worker and releases resources +func (w *Worker) Close() { + w.sqlChan.Close() + w.dbExecutor.Close() + + // Clean up monitoring metrics + namespace := w.changefeedID.Namespace() + changefeed := w.changefeedID.Name() + workerIDStr := strconv.Itoa(w.workerID) + + metrics.WorkerFlushDuration.DeleteLabelValues(namespace, changefeed, workerIDStr) + metrics.WorkerTotalDuration.DeleteLabelValues(namespace, changefeed, workerIDStr) + metrics.WorkerHandledRows.DeleteLabelValues(namespace, changefeed, workerIDStr) +} diff --git a/pkg/common/types.go b/pkg/common/types.go index 10ec3f946..bfcdb5c19 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -301,4 +301,5 @@ const ( CloudStorageSinkType BlackHoleSinkType RedoSinkType + TxnSinkType ) diff --git a/pkg/metrics/sink.go b/pkg/metrics/sink.go index a915c6147..a35b81821 100644 --- a/pkg/metrics/sink.go +++ b/pkg/metrics/sink.go @@ -124,6 +124,53 @@ var ( Help: "Busy ratio (X ms in 1s) for all workers.", }, []string{"namespace", "changefeed", "id"}) + // TxnSinkBatchSize records the batch size of SQL transactions + TxnSinkBatchSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "txn_sink_batch_size", + Help: "Batch size of SQL transactions in txnSink.", + Buckets: prometheus.ExponentialBuckets(1, 2, 16), // 1~65536 + }, []string{"namespace", "changefeed"}) + + // TxnSinkBatchDuration records the duration of processing a batch of transactions + TxnSinkBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "txn_sink_batch_duration", + Help: "Duration of processing a batch of transactions in txnSink.", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s + }, []string{"namespace", "changefeed"}) + + // TxnSinkPendingTxns records the number of pending transactions + TxnSinkPendingTxns = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "txn_sink_pending_txns", + Help: "Number of pending transactions in txnSink.", + }, []string{"namespace", "changefeed"}) + + // TxnSinkTxnStoreSize records the size of transaction store + TxnSinkTxnStoreSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "txn_sink_txn_store_size", + Help: "Size of transaction store in txnSink.", + }, []string{"namespace", "changefeed"}) + + // TxnSinkProgressLagGauge records the lag of progress tracker in txnSink + TxnSinkProgressLagGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "txn_sink_progress_lag", + Help: "Progress lag of txnSink in seconds (based on effectiveTs).", + }, []string{"namespace", "changefeed"}) + SinkDMLBatchCommit = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -215,6 +262,11 @@ func InitSinkMetrics(registry *prometheus.Registry) { registry.MustRegister(WorkerFlushDuration) registry.MustRegister(WorkerTotalDuration) registry.MustRegister(WorkerHandledRows) + registry.MustRegister(TxnSinkBatchSize) + registry.MustRegister(TxnSinkBatchDuration) + registry.MustRegister(TxnSinkPendingTxns) + registry.MustRegister(TxnSinkTxnStoreSize) + registry.MustRegister(TxnSinkProgressLagGauge) registry.MustRegister(SinkDMLBatchCommit) registry.MustRegister(SinkDMLBatchCallback) registry.MustRegister(PrepareStatementErrors) diff --git a/pkg/sink/sqlmodel/multi_row.go b/pkg/sink/sqlmodel/multi_row.go index 9291b2c76..93483c310 100644 --- a/pkg/sink/sqlmodel/multi_row.go +++ b/pkg/sink/sqlmodel/multi_row.go @@ -260,6 +260,7 @@ func GenInsertSQL(tp DMLType, changes ...*RowChange) (string, []interface{}) { columnNum++ buf.WriteString(common.QuoteName(col.Name.O)) } + buf.WriteString(") VALUES ") holder := valuesHolder(columnNum) for i := range changes { @@ -306,3 +307,143 @@ func GenInsertSQL(tp DMLType, changes ...*RowChange) (string, []interface{}) { } return buf.String(), args } + +// GenInsertSQLWithCommitTs generates the INSERT SQL and its arguments with commitTs logic for txnSink. +// This function is specifically designed for txnSink to handle _tidb_origin_ts column. +// Input `changes` should have same target table and same modifiable columns, +// otherwise the behaviour is undefined. +// If commitTs is greater than 0, the _tidb_origin_ts column will be handled specially. +func GenInsertSQLWithCommitTs(tp DMLType, startTs uint64, commitTs uint64, changes ...*RowChange) (string, []interface{}) { + if len(changes) == 0 { + log.L().DPanic("row changes is empty") + return "", nil + } + + first := changes[0] + + var buf strings.Builder + buf.Grow(1024) + if tp == DMLReplace { + buf.WriteString("REPLACE INTO ") + } else { + buf.WriteString("INSERT INTO ") + } + buf.WriteString(first.targetTable.QuoteString()) + buf.WriteString(" (") + columnNum := 0 + var skipColIdx []int + + // build generated columns lower name set to accelerate the following check + generatedColumns := generatedColumnsNameSet(first.targetTableInfo.GetColumns()) + for i, col := range first.sourceTableInfo.GetColumns() { + if _, ok := generatedColumns[col.Name.L]; ok { + skipColIdx = append(skipColIdx, i) + continue + } + + if columnNum != 0 { + buf.WriteByte(',') + } + columnNum++ + buf.WriteString(common.QuoteName(col.Name.O)) + } + + // Add _tidb_origin_ts column if it doesn't exist and commitTs is provided + hasOriginTsColumn := false + for _, col := range first.sourceTableInfo.GetColumns() { + if col.Name.L == "_tidb_origin_ts" { + hasOriginTsColumn = true + break + } + } + + if commitTs > 0 && !hasOriginTsColumn { + if columnNum != 0 { + buf.WriteByte(',') + } + columnNum++ + buf.WriteString(common.QuoteName("_tidb_origin_ts")) + } + + buf.WriteString(") VALUES ") + holder := valuesHolder(columnNum) + for i := range changes { + if i > 0 { + buf.WriteString(",") + } + buf.WriteString(holder) + } + if tp == DMLInsertOnDuplicateUpdate { + buf.WriteString(" ON DUPLICATE KEY UPDATE ") + i := 0 // used as index of skipColIdx + writtenFirstCol := false + + for j, col := range first.sourceTableInfo.GetColumns() { + if i < len(skipColIdx) && skipColIdx[i] == j { + i++ + continue + } + + if writtenFirstCol { + buf.WriteByte(',') + } + writtenFirstCol = true + + colName := common.QuoteName(col.Name.O) + tableName := first.targetTable.QuoteString() + + // For all columns, use _tidb_origin_ts as comparison basis + buf.WriteString(colName + "=IF(((IFNULL(" + tableName + "._tidb_origin_ts, " + tableName + "._tidb_commit_ts) <= VALUES(" + common.QuoteName("_tidb_origin_ts") + "))),VALUES(" + colName + "), " + tableName + "." + colName + ")") + } + + // Add _tidb_origin_ts to ON DUPLICATE KEY UPDATE if it doesn't exist in source but we're adding it + if commitTs > 0 && !hasOriginTsColumn { + if writtenFirstCol { + buf.WriteByte(',') + } + tableName := first.targetTable.QuoteString() + buf.WriteString(common.QuoteName("_tidb_origin_ts") + "=IF(((IFNULL(" + tableName + "._tidb_origin_ts, " + tableName + "._tidb_commit_ts) <= VALUES(" + common.QuoteName("_tidb_origin_ts") + "))),VALUES(" + common.QuoteName("_tidb_origin_ts") + "), " + tableName + "._tidb_origin_ts)") + } + } + + args := make([]interface{}, 0, len(changes)*(len(first.sourceTableInfo.GetColumns())-len(skipColIdx)+1)) // +1 for potential _tidb_origin_ts column + + // Find the origin_ts column index + originTsColIndex := -1 + for i, col := range first.sourceTableInfo.GetColumns() { + if col.Name.L == "_tidb_origin_ts" { + originTsColIndex = i + break + } + } + + for _, change := range changes { + i := 0 // used as index of skipColIdx + colIndex := 0 // used to track the actual column index + for j, val := range change.postValues { + if i < len(skipColIdx) && skipColIdx[i] == j { + i++ + continue + } + + // If this is the origin_ts column and commitTs is provided, replace NULL with commitTs + if commitTs > 0 && j == originTsColIndex { + // This is the origin_ts column, replace NULL with commitTs + if val == nil { + args = append(args, commitTs) + } else { + args = append(args, val) + } + } else { + args = append(args, val) + } + colIndex++ + } + + // Add _tidb_origin_ts value if the column doesn't exist in source but we're adding it + if commitTs > 0 && originTsColIndex == -1 { + args = append(args, commitTs) + } + } + return buf.String(), args +}