Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *Changefeed) ShouldRun() bool {
func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool, config.FeedState, *heartbeatpb.RunningError) {
old := c.status.Load()
failpoint.Inject("CoordinatorDontUpdateChangefeedCheckpoint", func() {
newStatus = old
newStatus.CheckpointTs = old.CheckpointTs
})

if newStatus != nil && newStatus.CheckpointTs >= old.CheckpointTs {
Expand Down
6 changes: 2 additions & 4 deletions logservice/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *logCoordinator) Run(ctx context.Context) error {
log.Warn("send reusable event service response failed", zap.Error(err))
}
case <-metricTick.C:
c.updateChangefeedMetrics()
c.reportChangefeedMetrics()
}
}
}
Expand All @@ -171,8 +171,6 @@ func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messagi
}

func (c *logCoordinator) sendResolvedTsToCoordinator(id node.ID, changefeedID common.ChangeFeedID) {
c.nodes.Lock()
defer c.nodes.Unlock()
resolvedTs := c.getMinLogServiceResolvedTs(changefeedID)
msg := messaging.NewSingleTargetMessage(
id,
Expand Down Expand Up @@ -280,7 +278,7 @@ func (c *logCoordinator) updateChangefeedStates(from node.ID, states *logservice
}
}

func (c *logCoordinator) updateChangefeedMetrics() {
func (c *logCoordinator) reportChangefeedMetrics() {
pdTime := c.pdClock.CurrentTime()
pdPhyTs := oracle.GetPhysical(pdTime)

Expand Down
2 changes: 1 addition & 1 deletion logservice/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func TestReportMetricsForAffectedChangefeeds(t *testing.T) {
pdPhyTs := oracle.GetPhysical(c.pdClock.CurrentTime())

// Call update metrics
c.updateChangefeedMetrics()
c.reportChangefeedMetrics()

// Verify metrics for cf1
cf1State := c.changefeedStates.m[cfID1.ID()]
Expand Down
119 changes: 6 additions & 113 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type EventStore interface {

// GetIterator return an iterator which scan the data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd]
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator

GetLogCoordinatorNodeID() node.ID
}

type DMLEventState struct {
Expand Down Expand Up @@ -171,11 +173,6 @@ type subscriptionStat struct {

type subscriptionStats map[logpuller.SubscriptionID]*subscriptionStat

type changefeedStat struct {
mutex sync.Mutex
dispatchers map[common.DispatcherID]*dispatcherStat
}

type eventWithCallback struct {
subID logpuller.SubscriptionID
tableID int64
Expand Down Expand Up @@ -225,9 +222,6 @@ type eventStore struct {

decoderPool *sync.Pool

// changefeed id -> changefeedStat
changefeedMeta sync.Map

// closed is used to indicate the event store is closed.
closed atomic.Bool

Expand Down Expand Up @@ -433,18 +427,6 @@ func (e *eventStore) RegisterDispatcher(
return false
}

// Defer a cleanup function that will run if registration fails.
// The success flag is set to true only at the end of successful registration paths.
success := false
defer func() {
if !success {
log.Info("register dispatcher failed, cleaning up from changefeed stat",
zap.Stringer("changefeedID", changefeedID),
zap.Stringer("dispatcherID", dispatcherID))
e.removeDispatcherFromChangefeedStat(changefeedID, dispatcherID)
}
}()

lag := time.Since(oracle.GetTimeFromTS(startTs))
metrics.EventStoreRegisterDispatcherStartTsLagHist.Observe(lag.Seconds())
if lag >= 10*time.Second {
Expand Down Expand Up @@ -476,31 +458,6 @@ func (e *eventStore) RegisterDispatcher(
}
stat.resolvedTs.Store(startTs)

// Loop to handle the race condition where a cfStat might be deleted
// after being loaded but before being locked.
for {
var cfStat *changefeedStat
if actual, ok := e.changefeedMeta.Load(changefeedID); ok {
cfStat = actual.(*changefeedStat)
} else {
newCfStat := &changefeedStat{dispatchers: make(map[common.DispatcherID]*dispatcherStat)}
actual, _ := e.changefeedMeta.LoadOrStore(changefeedID, newCfStat)
cfStat = actual.(*changefeedStat)
}

cfStat.mutex.Lock()
// After acquiring the lock, we must re-check if this cfStat is still the one
// in the map. If it has been removed and replaced, we must retry with the new one.
if current, ok := e.changefeedMeta.Load(changefeedID); !ok || current != cfStat {
cfStat.mutex.Unlock()
continue // Retry the loop
}

cfStat.dispatchers[dispatcherID] = stat
cfStat.mutex.Unlock()
break // Success
}

wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) {
util.CompareAndMonotonicIncrease(&stat.resolvedTs, resolvedTs)
notifier(resolvedTs, latestCommitTs)
Expand Down Expand Up @@ -532,7 +489,6 @@ func (e *eventStore) RegisterDispatcher(
zap.Uint64("subscriptionID", uint64(subStat.subID)),
zap.String("subSpan", common.FormatTableSpan(subStat.tableSpan)),
zap.Uint64("checkpointTs", subStat.checkpointTs.Load()))
success = true
return true
}

Expand Down Expand Up @@ -564,7 +520,6 @@ func (e *eventStore) RegisterDispatcher(
zap.Bool("exactMatch", bestMatch.tableSpan.Equal(dispatcherSpan)))
// when onlyReuse is true, we don't need a exact span match
if onlyReuse {
success = true
return true
}
} else {
Expand Down Expand Up @@ -665,7 +620,6 @@ func (e *eventStore) RegisterDispatcher(
ResolvedTs: startTs,
}
metrics.EventStoreSubscriptionGauge.Inc()
success = true
return true
}

Expand All @@ -686,25 +640,6 @@ func (e *eventStore) UnregisterDispatcher(changefeedID common.ChangeFeedID, disp
delete(e.dispatcherMeta.dispatcherStats, dispatcherID)
}
e.dispatcherMeta.Unlock()

e.removeDispatcherFromChangefeedStat(changefeedID, dispatcherID)
}

// removeDispatcherFromChangefeedStat removes a dispatcher from its changefeed's statistics.
// If the changefeed becomes empty after the removal, the changefeed statistic itself is deleted.
func (e *eventStore) removeDispatcherFromChangefeedStat(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID) {
if v, ok := e.changefeedMeta.Load(changefeedID); ok {
cfStat := v.(*changefeedStat)
cfStat.mutex.Lock()
defer cfStat.mutex.Unlock()
delete(cfStat.dispatchers, dispatcherID)

// If the changefeed has no more dispatchers, remove the changefeed stat.
if len(cfStat.dispatchers) == 0 {
e.changefeedMeta.Delete(changefeedID)
log.Info("changefeed stat is empty, removed it", zap.Stringer("changefeedID", changefeedID))
}
}
}

func (e *eventStore) UpdateDispatcherCheckpointTs(
Expand Down Expand Up @@ -941,6 +876,10 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
}
}

func (e *eventStore) GetLogCoordinatorNodeID() node.ID {
return e.getCoordinatorInfo()
}

func (e *eventStore) detachFromSubStat(dispatcherID common.DispatcherID, subStat *subscriptionStat) {
if subStat == nil {
return
Expand Down Expand Up @@ -1065,15 +1004,12 @@ func (e *eventStore) cleanObsoleteSubscriptions(ctx context.Context) error {

func (e *eventStore) runMetricsCollector(ctx context.Context) error {
storeMetricsTicker := time.NewTicker(10 * time.Second)
changefeedMetricsTicker := time.NewTicker(1 * time.Second)
for {
select {
case <-ctx.Done():
return nil
case <-storeMetricsTicker.C:
e.collectAndReportStoreMetrics()
case <-changefeedMetricsTicker.C:
e.collectAndReportChangefeedMetrics()
}
}
}
Expand Down Expand Up @@ -1146,49 +1082,6 @@ func (e *eventStore) collectAndReportStoreMetrics() {
metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLagInSec)
}

func (e *eventStore) collectAndReportChangefeedMetrics() {
// Collect resolved ts for each changefeed and send to log coordinator.
changefeedStates := &logservicepb.ChangefeedStates{
States: make([]*logservicepb.ChangefeedStateEntry, 0),
}
e.changefeedMeta.Range(func(key, value interface{}) bool {
changefeedID := key.(common.ChangeFeedID)
cfStat := value.(*changefeedStat)

// By taking the lock here, we ensure that the set of dispatchers for this
// changefeed does not change while we calculate the minimum resolved ts.
// The `advanceResolvedTs` function updates an individual dispatcher's resolvedTs
// atomically, so it does not conflict with this lock.
cfStat.mutex.Lock()
cfMinResolvedTs := uint64(math.MaxUint64)
found := false
for _, dispatcherStat := range cfStat.dispatchers {
dispatcherResolvedTs := dispatcherStat.resolvedTs.Load()
if dispatcherResolvedTs < cfMinResolvedTs {
cfMinResolvedTs = dispatcherResolvedTs
}
found = true
}
cfStat.mutex.Unlock()

if found {
changefeedStates.States = append(changefeedStates.States, &logservicepb.ChangefeedStateEntry{
ChangefeedID: changefeedID.ToPB(),
ResolvedTs: cfMinResolvedTs,
})
}
return true
})

coordinatorID := e.getCoordinatorInfo()
if coordinatorID != "" {
msg := messaging.NewSingleTargetMessage(coordinatorID, messaging.LogCoordinatorTopic, changefeedStates)
if err := e.messageCenter.SendEvent(msg); err != nil {
log.Warn("send changefeed metrics to coordinator failed", zap.Error(err))
}
}
}

func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback, encoder *zstd.Encoder) error {
metrics.EventStoreWriteRequestsCount.Inc()
batch := db.NewBatch()
Expand Down
Loading