Skip to content
Open

demo #1732

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (c *Changefeed) ForceUpdateStatus(newStatus *heartbeatpb.MaintainerStatus)
}

func (c *Changefeed) NeedCheckpointTsMessage() bool {
return c.sinkType == common.KafkaSinkType || c.sinkType == common.CloudStorageSinkType
// return c.sinkType != common.MysqlSinkType
return true
}

func (c *Changefeed) SetIsNew(isNew bool) {
Expand Down
47 changes: 47 additions & 0 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package sink
import (
"context"
"net/url"
"strconv"

"github.com/pingcap/ticdc/downstreamadapter/sink/blackhole"
"github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage"
"github.com/pingcap/ticdc/downstreamadapter/sink/kafka"
"github.com/pingcap/ticdc/downstreamadapter/sink/mysql"
"github.com/pingcap/ticdc/downstreamadapter/sink/pulsar"
"github.com/pingcap/ticdc/downstreamadapter/sink/txnsink"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
mysqlpkg "github.com/pingcap/ticdc/pkg/sink/mysql"
"github.com/pingcap/ticdc/pkg/sink/util"
)

Expand All @@ -50,6 +53,11 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.
scheme := config.GetScheme(sinkURI)
switch scheme {
case config.MySQLScheme, config.MySQLSSLScheme, config.TiDBScheme, config.TiDBSSLScheme:
// Check if enable-transaction-atomic is set to true
if isTransactionAtomicEnabled(sinkURI) {
return newTxnSinkAdapter(ctx, changefeedID, cfg, sinkURI)
}
// Use mysqlSink if enable-transaction-atomic is not set or set to false
return mysql.New(ctx, changefeedID, cfg, sinkURI)
case config.KafkaScheme, config.KafkaSSLScheme:
return kafka.New(ctx, changefeedID, sinkURI, cfg.SinkConfig)
Expand All @@ -63,6 +71,45 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.
return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
}

// isTransactionAtomicEnabled checks if enable-transaction-atomic parameter is set to true in sink URI
func isTransactionAtomicEnabled(sinkURI *url.URL) bool {
query := sinkURI.Query()
s := query.Get("enable-transaction-atomic")
if len(s) == 0 {
return false
}
enabled, err := strconv.ParseBool(s)
if err != nil {
// If the parameter value is invalid, default to false
return false
}
return enabled
}

// newTxnSinkAdapter creates a txnSink adapter that uses the same database connection as mysqlSink
func newTxnSinkAdapter(
ctx context.Context,
changefeedID common.ChangeFeedID,
config *config.ChangefeedConfig,
sinkURI *url.URL,
) (Sink, error) {
// Use the same database connection logic as mysqlSink
_, db, err := mysqlpkg.NewMysqlConfigAndDB(ctx, changefeedID, sinkURI, config)
if err != nil {
return nil, err
}

// Create txnSink configuration
txnConfig := &txnsink.TxnSinkConfig{
MaxConcurrentTxns: 128,
BatchSize: 256,
MaxSQLBatchSize: 1024 * 16, // 1MB
}

// Create and return txnSink
return txnsink.New(ctx, changefeedID, db, txnConfig), nil
}

func Verify(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.ChangeFeedID) error {
sinkURI, err := url.Parse(cfg.SinkURI)
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions downstreamadapter/sink/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"net/url"
"testing"

"github.com/stretchr/testify/require"
)

func TestIsTransactionAtomicEnabled(t *testing.T) {
tests := []struct {
name string
sinkURI string
expected bool
}{
{
name: "enable-transaction-atomic=true",
sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=true",
expected: true,
},
{
name: "enable-transaction-atomic=false",
sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=false",
expected: false,
},
{
name: "enable-transaction-atomic not set",
sinkURI: "mysql://user:pass@localhost:3306/test",
expected: false,
},
{
name: "enable-transaction-atomic with invalid value",
sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=invalid",
expected: false,
},
{
name: "enable-transaction-atomic=1",
sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=1",
expected: true,
},
{
name: "enable-transaction-atomic=0",
sinkURI: "mysql://user:pass@localhost:3306/test?enable-transaction-atomic=0",
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parsedURI, err := url.Parse(tt.sinkURI)
require.NoError(t, err)

result := isTransactionAtomicEnabled(parsedURI)
require.Equal(t, tt.expected, result)
})
}
}
133 changes: 133 additions & 0 deletions downstreamadapter/sink/txnsink/db_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package txnsink

import (
"context"
"database/sql"
"database/sql/driver"
"strings"
"time"

dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/tidb/pkg/parser/mysql"
"go.uber.org/zap"
)

const (
// BackoffBaseDelay indicates the base delay time for retrying.
BackoffBaseDelay = 500 * time.Millisecond
// BackoffMaxDelay indicates the max delay time for retrying.
BackoffMaxDelay = 60 * time.Second
// DefaultDMLMaxRetry is the default maximum number of retries for DML operations
DefaultDMLMaxRetry = 8
)

// DBExecutor handles database execution for transaction SQL
type DBExecutor struct {
db *sql.DB
}

// NewDBExecutor creates a new database executor
func NewDBExecutor(db *sql.DB) *DBExecutor {
return &DBExecutor{
db: db,
}
}

// ExecuteSQLBatch executes a batch of SQL transactions with retry mechanism
func (e *DBExecutor) ExecuteSQLBatch(batch []*TxnSQL) error {
if len(batch) == 0 {
return nil
}

log.Info("txnSink: executing SQL batch",
zap.Int("batchSize", len(batch)))

// Define the execution function that will be retried
tryExec := func() error {
// Filter out empty SQL statements
var validSQLs []string
var validArgs []interface{}

for _, txnSQL := range batch {
if txnSQL.SQL != "" {
validSQLs = append(validSQLs, txnSQL.SQL)
validArgs = append(validArgs, txnSQL.Args...)
}
}

if len(validSQLs) == 0 {
log.Debug("txnSink: no valid SQL to execute in batch")
return nil
}

// Combine SQL statements with semicolons
finalSQL := strings.Join(validSQLs, ";")

log.Info("executing transaction",
zap.String("sql", finalSQL),
zap.Any("args", validArgs))

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, execErr := e.db.ExecContext(ctx, finalSQL, validArgs...)
cancel()

if execErr != nil {
log.Error("txnSink: failed to execute SQL batch",
zap.String("sql", finalSQL),
zap.Int("batchSize", len(batch)),
zap.Error(execErr))
return errors.Trace(execErr)
}

log.Debug("txnSink: successfully executed SQL batch",
zap.String("sql", finalSQL),
zap.Int("batchSize", len(batch)))

return nil
}

// Use retry mechanism
return retry.Do(context.Background(), func() error {
err := tryExec()
if err != nil {
log.Warn("txnSink: SQL execution failed, will retry",
zap.Int("batchSize", len(batch)),
zap.Error(err))
}
return err
}, retry.WithBackoffBaseDelay(BackoffBaseDelay.Milliseconds()),
retry.WithBackoffMaxDelay(BackoffMaxDelay.Milliseconds()),
retry.WithMaxTries(DefaultDMLMaxRetry),
retry.WithIsRetryableErr(isRetryableDMLError))
}

// isRetryableDMLError determines if a DML error is retryable
func isRetryableDMLError(err error) bool {
// Check if it's a retryable error
if !errors.IsRetryableError(err) {
return false
}

// Check for specific MySQL error codes
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok {
switch mysqlErr.Number {
case uint16(mysql.ErrNoSuchTable), uint16(mysql.ErrBadDB), uint16(mysql.ErrDupEntry):
return false
}
}

// Check for driver errors
if err == driver.ErrBadConn {
return true
}

return true
}

// Close closes the database connection
func (e *DBExecutor) Close() error {
return e.db.Close()
}
Loading
Loading