Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support txn insert when table sync #234

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion pkg/ccr/ingest_binlog_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,72 @@ type commitInfosCollector struct {
commitInfosLock sync.Mutex
}

type subTxnInfosCollector struct {
subTxnidToCommitInfos map[int64]([]*ttypes.TTabletCommitInfo)
subTxnInfosLock sync.Mutex
}

func newCommitInfosCollector() *commitInfosCollector {
return &commitInfosCollector{
commitInfos: make([]*ttypes.TTabletCommitInfo, 0),
}
}

func newSubTxnInfosCollector() *subTxnInfosCollector {
return &subTxnInfosCollector{
subTxnidToCommitInfos: make(map[int64]([]*ttypes.TTabletCommitInfo)),
}
}

func (cic *commitInfosCollector) appendCommitInfos(commitInfo ...*ttypes.TTabletCommitInfo) {
cic.commitInfosLock.Lock()
defer cic.commitInfosLock.Unlock()

cic.commitInfos = append(cic.commitInfos, commitInfo...)
}

func (stic *subTxnInfosCollector) appendSubTxnCommitInfos(stid int64, commitInfo ...*ttypes.TTabletCommitInfo) {
stic.subTxnInfosLock.Lock()
defer stic.subTxnInfosLock.Unlock()

if stic.subTxnidToCommitInfos == nil {
stic.subTxnidToCommitInfos = make(map[int64]([]*ttypes.TTabletCommitInfo))
}

tabletCommitInfos := stic.subTxnidToCommitInfos[stid]
if tabletCommitInfos == nil {
tabletCommitInfos = make([]*ttypes.TTabletCommitInfo, 0)
}

tabletCommitInfos = append(tabletCommitInfos, commitInfo...)
stic.subTxnidToCommitInfos[stid] = tabletCommitInfos
}

func (cic *commitInfosCollector) CommitInfos() []*ttypes.TTabletCommitInfo {
cic.commitInfosLock.Lock()
defer cic.commitInfosLock.Unlock()

return cic.commitInfos
}

func (stic *subTxnInfosCollector) SubTxnToCommitInfos() map[int64]([]*ttypes.TTabletCommitInfo) {
stic.subTxnInfosLock.Lock()
defer stic.subTxnInfosLock.Unlock()

return stic.subTxnidToCommitInfos
}

type tabletIngestBinlogHandler struct {
ingestJob *IngestBinlogJob
binlogVersion int64
stid int64
srcTablet *TabletMeta
destTablet *TabletMeta
destPartitionId int64
destTableId int64

*commitInfosCollector
*subTxnInfosCollector

cancel atomic.Bool
wg sync.WaitGroup
Expand All @@ -69,6 +107,7 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli
}

j := h.ingestJob
destStid := h.stid
binlogVersion := h.binlogVersion
srcTablet := h.srcTablet
destPartitionId := h.destPartitionId
Expand All @@ -94,8 +133,14 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli
loadId := ttypes.NewTUniqueId()
loadId.SetHi(-1)
loadId.SetLo(-1)

// for txn insert
txnId := j.txnId
if destStid != 0 {
txnId = destStid
}
req := &bestruct.TIngestBinlogRequest{
TxnId: utils.ThriftValueWrapper(j.txnId),
TxnId: utils.ThriftValueWrapper(txnId),
RemoteTabletId: utils.ThriftValueWrapper[int64](srcTablet.Id),
BinlogVersion: utils.ThriftValueWrapper(binlogVersion),
RemoteHost: utils.ThriftValueWrapper(srcBackend.Host),
Expand Down Expand Up @@ -138,6 +183,11 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli
return
} else {
h.appendCommitInfos(commitInfo)

// for txn insert
if destStid != 0 {
h.appendSubTxnCommitInfos(destStid, commitInfo)
}
}
}()

Expand Down Expand Up @@ -171,13 +221,19 @@ func (h *tabletIngestBinlogHandler) handle() {
h.wg.Wait()

h.ingestJob.appendCommitInfos(h.CommitInfos()...)
// for txn insert
if h.stid != 0 {
commitInfos := h.SubTxnToCommitInfos()[h.stid]
h.ingestJob.appendSubTxnCommitInfos(h.stid, commitInfos...)
}
}

type IngestContext struct {
context.Context
txnId int64
tableRecords []*record.TableRecord
tableMapping map[int64]int64
stidMapping map[int64]int64
}

func NewIngestContext(txnId int64, tableRecords []*record.TableRecord, tableMapping map[int64]int64) *IngestContext {
Expand All @@ -189,13 +245,25 @@ func NewIngestContext(txnId int64, tableRecords []*record.TableRecord, tableMapp
}
}

func NewIngestContextForTxnInsert(txnId int64, tableRecords []*record.TableRecord,
tableMapping map[int64]int64, stidMapping map[int64]int64) *IngestContext {
return &IngestContext{
Context: context.Background(),
txnId: txnId,
tableRecords: tableRecords,
tableMapping: tableMapping,
stidMapping: stidMapping,
}
}

type IngestBinlogJob struct {
ccrJob *Job // ccr job
factory *Factory

tableMapping map[int64]int64
srcMeta IngestBinlogMetaer
destMeta IngestBinlogMetaer
stidMap map[int64]int64

txnId int64
tableRecords []*record.TableRecord
Expand All @@ -206,6 +274,7 @@ type IngestBinlogJob struct {
tabletIngestJobs []*tabletIngestBinlogHandler

*commitInfosCollector
*subTxnInfosCollector

err error
errLock sync.RWMutex
Expand All @@ -227,8 +296,10 @@ func NewIngestBinlogJob(ctx context.Context, ccrJob *Job) (*IngestBinlogJob, err
tableMapping: ingestCtx.tableMapping,
txnId: ingestCtx.txnId,
tableRecords: ingestCtx.tableRecords,
stidMap: ingestCtx.stidMapping,

commitInfosCollector: newCommitInfosCollector(),
subTxnInfosCollector: newSubTxnInfosCollector(),
}, nil
}

Expand Down Expand Up @@ -269,6 +340,7 @@ func (j *IngestBinlogJob) Error() error {
type prepareIndexArg struct {
binlogVersion int64
srcTableId int64
stid int64
srcPartitionId int64
destTableId int64
destPartitionId int64
Expand Down Expand Up @@ -321,12 +393,15 @@ func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) {
destTablet := destIter.Value()
tabletIngestBinlogHandler := &tabletIngestBinlogHandler{
ingestJob: j,
stid: arg.stid,
binlogVersion: arg.binlogVersion,
srcTablet: srcTablet,
destTablet: destTablet,
destPartitionId: arg.destPartitionId,
destTableId: arg.destTableId,

commitInfosCollector: newCommitInfosCollector(),
subTxnInfosCollector: newSubTxnInfosCollector(),
}
j.tabletIngestJobs = append(j.tabletIngestJobs, tabletIngestBinlogHandler)

Expand Down Expand Up @@ -354,6 +429,8 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit

srcPartitionId := partitionRecord.Id
srcPartitionRange := partitionRecord.Range
sourceStid := partitionRecord.Stid
stidMap := j.stidMap
destPartitionId, err := j.destMeta.GetPartitionIdByRange(destTableId, srcPartitionRange)
if err != nil {
j.setError(err)
Expand Down Expand Up @@ -407,6 +484,7 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit
prepareIndexArg := prepareIndexArg{
binlogVersion: partitionRecord.Version,
srcTableId: srcTableId,
stid: stidMap[sourceStid],
srcPartitionId: srcPartitionId,
destTableId: destTableId,
destPartitionId: destPartitionId,
Expand Down
Loading
Loading