diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index 438ecec1fa..aca940d5e7 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -142,7 +142,7 @@ func (c *Changefeed) ShouldRun() bool { func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool, config.FeedState, *heartbeatpb.RunningError) { old := c.status.Load() failpoint.Inject("CoordinatorDontUpdateChangefeedCheckpoint", func() { - newStatus = old + newStatus.CheckpointTs = old.CheckpointTs }) if newStatus != nil && newStatus.CheckpointTs >= old.CheckpointTs { diff --git a/logservice/coordinator/coordinator.go b/logservice/coordinator/coordinator.go index d965e0a583..2bd7895090 100644 --- a/logservice/coordinator/coordinator.go +++ b/logservice/coordinator/coordinator.go @@ -144,7 +144,7 @@ func (c *logCoordinator) Run(ctx context.Context) error { log.Warn("send reusable event service response failed", zap.Error(err)) } case <-metricTick.C: - c.updateChangefeedMetrics() + c.reportChangefeedMetrics() } } } @@ -171,8 +171,6 @@ func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messagi } func (c *logCoordinator) sendResolvedTsToCoordinator(id node.ID, changefeedID common.ChangeFeedID) { - c.nodes.Lock() - defer c.nodes.Unlock() resolvedTs := c.getMinLogServiceResolvedTs(changefeedID) msg := messaging.NewSingleTargetMessage( id, @@ -280,7 +278,7 @@ func (c *logCoordinator) updateChangefeedStates(from node.ID, states *logservice } } -func (c *logCoordinator) updateChangefeedMetrics() { +func (c *logCoordinator) reportChangefeedMetrics() { pdTime := c.pdClock.CurrentTime() pdPhyTs := oracle.GetPhysical(pdTime) diff --git a/logservice/coordinator/coordinator_test.go b/logservice/coordinator/coordinator_test.go index a91e7e9f87..445e057c49 100644 --- a/logservice/coordinator/coordinator_test.go +++ b/logservice/coordinator/coordinator_test.go @@ -327,7 +327,7 @@ func TestReportMetricsForAffectedChangefeeds(t *testing.T) { pdPhyTs := oracle.GetPhysical(c.pdClock.CurrentTime()) // Call update metrics - c.updateChangefeedMetrics() + c.reportChangefeedMetrics() // Verify metrics for cf1 cf1State := c.changefeedStates.m[cfID1.ID()] diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index dec5d34e75..bd97d442ba 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -84,6 +84,8 @@ type EventStore interface { // GetIterator return an iterator which scan the data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd] GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator + + GetLogCoordinatorNodeID() node.ID } type DMLEventState struct { @@ -171,11 +173,6 @@ type subscriptionStat struct { type subscriptionStats map[logpuller.SubscriptionID]*subscriptionStat -type changefeedStat struct { - mutex sync.Mutex - dispatchers map[common.DispatcherID]*dispatcherStat -} - type eventWithCallback struct { subID logpuller.SubscriptionID tableID int64 @@ -225,9 +222,6 @@ type eventStore struct { decoderPool *sync.Pool - // changefeed id -> changefeedStat - changefeedMeta sync.Map - // closed is used to indicate the event store is closed. closed atomic.Bool @@ -433,18 +427,6 @@ func (e *eventStore) RegisterDispatcher( return false } - // Defer a cleanup function that will run if registration fails. - // The success flag is set to true only at the end of successful registration paths. - success := false - defer func() { - if !success { - log.Info("register dispatcher failed, cleaning up from changefeed stat", - zap.Stringer("changefeedID", changefeedID), - zap.Stringer("dispatcherID", dispatcherID)) - e.removeDispatcherFromChangefeedStat(changefeedID, dispatcherID) - } - }() - lag := time.Since(oracle.GetTimeFromTS(startTs)) metrics.EventStoreRegisterDispatcherStartTsLagHist.Observe(lag.Seconds()) if lag >= 10*time.Second { @@ -476,31 +458,6 @@ func (e *eventStore) RegisterDispatcher( } stat.resolvedTs.Store(startTs) - // Loop to handle the race condition where a cfStat might be deleted - // after being loaded but before being locked. - for { - var cfStat *changefeedStat - if actual, ok := e.changefeedMeta.Load(changefeedID); ok { - cfStat = actual.(*changefeedStat) - } else { - newCfStat := &changefeedStat{dispatchers: make(map[common.DispatcherID]*dispatcherStat)} - actual, _ := e.changefeedMeta.LoadOrStore(changefeedID, newCfStat) - cfStat = actual.(*changefeedStat) - } - - cfStat.mutex.Lock() - // After acquiring the lock, we must re-check if this cfStat is still the one - // in the map. If it has been removed and replaced, we must retry with the new one. - if current, ok := e.changefeedMeta.Load(changefeedID); !ok || current != cfStat { - cfStat.mutex.Unlock() - continue // Retry the loop - } - - cfStat.dispatchers[dispatcherID] = stat - cfStat.mutex.Unlock() - break // Success - } - wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) { util.CompareAndMonotonicIncrease(&stat.resolvedTs, resolvedTs) notifier(resolvedTs, latestCommitTs) @@ -532,7 +489,6 @@ func (e *eventStore) RegisterDispatcher( zap.Uint64("subscriptionID", uint64(subStat.subID)), zap.String("subSpan", common.FormatTableSpan(subStat.tableSpan)), zap.Uint64("checkpointTs", subStat.checkpointTs.Load())) - success = true return true } @@ -564,7 +520,6 @@ func (e *eventStore) RegisterDispatcher( zap.Bool("exactMatch", bestMatch.tableSpan.Equal(dispatcherSpan))) // when onlyReuse is true, we don't need a exact span match if onlyReuse { - success = true return true } } else { @@ -665,7 +620,6 @@ func (e *eventStore) RegisterDispatcher( ResolvedTs: startTs, } metrics.EventStoreSubscriptionGauge.Inc() - success = true return true } @@ -686,25 +640,6 @@ func (e *eventStore) UnregisterDispatcher(changefeedID common.ChangeFeedID, disp delete(e.dispatcherMeta.dispatcherStats, dispatcherID) } e.dispatcherMeta.Unlock() - - e.removeDispatcherFromChangefeedStat(changefeedID, dispatcherID) -} - -// removeDispatcherFromChangefeedStat removes a dispatcher from its changefeed's statistics. -// If the changefeed becomes empty after the removal, the changefeed statistic itself is deleted. -func (e *eventStore) removeDispatcherFromChangefeedStat(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID) { - if v, ok := e.changefeedMeta.Load(changefeedID); ok { - cfStat := v.(*changefeedStat) - cfStat.mutex.Lock() - defer cfStat.mutex.Unlock() - delete(cfStat.dispatchers, dispatcherID) - - // If the changefeed has no more dispatchers, remove the changefeed stat. - if len(cfStat.dispatchers) == 0 { - e.changefeedMeta.Delete(changefeedID) - log.Info("changefeed stat is empty, removed it", zap.Stringer("changefeedID", changefeedID)) - } - } } func (e *eventStore) UpdateDispatcherCheckpointTs( @@ -941,6 +876,10 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com } } +func (e *eventStore) GetLogCoordinatorNodeID() node.ID { + return e.getCoordinatorInfo() +} + func (e *eventStore) detachFromSubStat(dispatcherID common.DispatcherID, subStat *subscriptionStat) { if subStat == nil { return @@ -1065,15 +1004,12 @@ func (e *eventStore) cleanObsoleteSubscriptions(ctx context.Context) error { func (e *eventStore) runMetricsCollector(ctx context.Context) error { storeMetricsTicker := time.NewTicker(10 * time.Second) - changefeedMetricsTicker := time.NewTicker(1 * time.Second) for { select { case <-ctx.Done(): return nil case <-storeMetricsTicker.C: e.collectAndReportStoreMetrics() - case <-changefeedMetricsTicker.C: - e.collectAndReportChangefeedMetrics() } } } @@ -1146,49 +1082,6 @@ func (e *eventStore) collectAndReportStoreMetrics() { metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLagInSec) } -func (e *eventStore) collectAndReportChangefeedMetrics() { - // Collect resolved ts for each changefeed and send to log coordinator. - changefeedStates := &logservicepb.ChangefeedStates{ - States: make([]*logservicepb.ChangefeedStateEntry, 0), - } - e.changefeedMeta.Range(func(key, value interface{}) bool { - changefeedID := key.(common.ChangeFeedID) - cfStat := value.(*changefeedStat) - - // By taking the lock here, we ensure that the set of dispatchers for this - // changefeed does not change while we calculate the minimum resolved ts. - // The `advanceResolvedTs` function updates an individual dispatcher's resolvedTs - // atomically, so it does not conflict with this lock. - cfStat.mutex.Lock() - cfMinResolvedTs := uint64(math.MaxUint64) - found := false - for _, dispatcherStat := range cfStat.dispatchers { - dispatcherResolvedTs := dispatcherStat.resolvedTs.Load() - if dispatcherResolvedTs < cfMinResolvedTs { - cfMinResolvedTs = dispatcherResolvedTs - } - found = true - } - cfStat.mutex.Unlock() - - if found { - changefeedStates.States = append(changefeedStates.States, &logservicepb.ChangefeedStateEntry{ - ChangefeedID: changefeedID.ToPB(), - ResolvedTs: cfMinResolvedTs, - }) - } - return true - }) - - coordinatorID := e.getCoordinatorInfo() - if coordinatorID != "" { - msg := messaging.NewSingleTargetMessage(coordinatorID, messaging.LogCoordinatorTopic, changefeedStates) - if err := e.messageCenter.SendEvent(msg); err != nil { - log.Warn("send changefeed metrics to coordinator failed", zap.Error(err)) - } - } -} - func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback, encoder *zstd.Encoder) error { metrics.EventStoreWriteRequestsCount.Inc() batch := db.NewBatch() diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index ca29849c32..41f032c51b 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -250,19 +250,6 @@ func TestEventStoreOnlyReuseDispatcherSuccess(t *testing.T) { ok := es.RegisterDispatcher(cfID, dispatcherID3, span, 100, func(watermark uint64, latestCommitTs uint64) {}, true, false) require.True(t, ok) } - - // 4. Verify that dispatcherID2 and dispatcherID3 are included in the changefeedStat, confirming - // that the defer cleanup logic was not incorrectly triggered. - v, ok := es.changefeedMeta.Load(cfID) - require.True(t, ok) - cfStat := v.(*changefeedStat) - cfStat.mutex.Lock() - defer cfStat.mutex.Unlock() - require.Len(t, cfStat.dispatchers, 3, "dispatcher2 and dispatcher3 should be registered successfully") - _, dispatcher2Exists := cfStat.dispatchers[dispatcherID2] - require.True(t, dispatcher2Exists, "dispatcher2 should exist in changefeedStat") - _, dispatcher3Exists := cfStat.dispatchers[dispatcherID3] - require.True(t, dispatcher3Exists, "dispatcher3 should exist in changefeedStat") } func TestEventStoreNonOnlyReuseDispatcher(t *testing.T) { @@ -596,129 +583,6 @@ func TestEventStoreSwitchSubStat(t *testing.T) { } } -func TestChangefeedStatManagement(t *testing.T) { - _, store := newEventStoreForTest(fmt.Sprintf("/tmp/%s", t.Name())) - es := store.(*eventStore) - - cfID1 := common.NewChangefeedID4Test("default", "test-cf-1") - cfID2 := common.NewChangefeedID4Test("default", "test-cf-2") - - dispatcherID1 := common.NewDispatcherID() - dispatcherID2 := common.NewDispatcherID() - dispatcherID3 := common.NewDispatcherID() - - span1 := &heartbeatpb.TableSpan{TableID: 1} - span2 := &heartbeatpb.TableSpan{TableID: 2} - span3 := &heartbeatpb.TableSpan{TableID: 3} - - // 1. Register dispatcher1 for cfID1. - ok := es.RegisterDispatcher(cfID1, dispatcherID1, span1, 100, func(u uint64, u2 uint64) {}, false, false) - require.True(t, ok) - - // Check if changefeedStat for cfID1 is created. - v, ok := es.changefeedMeta.Load(cfID1) - require.True(t, ok) - cfStat1 := v.(*changefeedStat) - require.NotNil(t, cfStat1) - cfStat1.mutex.Lock() - require.Len(t, cfStat1.dispatchers, 1) - _, ok = cfStat1.dispatchers[dispatcherID1] - cfStat1.mutex.Unlock() - require.True(t, ok) - - // 2. Register dispatcher2 for cfID1. - ok = es.RegisterDispatcher(cfID1, dispatcherID2, span2, 110, func(u uint64, u2 uint64) {}, false, false) - require.True(t, ok) - - // Check if dispatcher2 is added to the same changefeedStat. - v, ok = es.changefeedMeta.Load(cfID1) - require.True(t, ok) - cfStat1 = v.(*changefeedStat) - cfStat1.mutex.Lock() - require.Len(t, cfStat1.dispatchers, 2) - _, ok = cfStat1.dispatchers[dispatcherID2] - cfStat1.mutex.Unlock() - require.True(t, ok) - - // 3. Register dispatcher3 for cfID2. - ok = es.RegisterDispatcher(cfID2, dispatcherID3, span3, 120, func(u uint64, u2 uint64) {}, false, false) - require.True(t, ok) - - // Check if changefeedStat for cfID2 is created. - v, ok = es.changefeedMeta.Load(cfID2) - require.True(t, ok) - cfStat2 := v.(*changefeedStat) - require.NotNil(t, cfStat2) - cfStat2.mutex.Lock() - require.Len(t, cfStat2.dispatchers, 1) - cfStat2.mutex.Unlock() - - // 4. Unregister dispatcher1 from cfID1. - es.UnregisterDispatcher(cfID1, dispatcherID1) - v, ok = es.changefeedMeta.Load(cfID1) - require.True(t, ok) - cfStat1 = v.(*changefeedStat) - cfStat1.mutex.Lock() - require.Len(t, cfStat1.dispatchers, 1) - _, ok = cfStat1.dispatchers[dispatcherID1] - cfStat1.mutex.Unlock() - require.False(t, ok) - - // 5. Unregister dispatcher2 from cfID1 (the last one). - es.UnregisterDispatcher(cfID1, dispatcherID2) - // Check if changefeedStat for cfID1 is removed. - _, ok = es.changefeedMeta.Load(cfID1) - require.False(t, ok) - - // 6. Check cfID2 is not affected. - v, ok = es.changefeedMeta.Load(cfID2) - require.True(t, ok) - cfStat2 = v.(*changefeedStat) - cfStat2.mutex.Lock() - require.Len(t, cfStat2.dispatchers, 1) - cfStat2.mutex.Unlock() -} - -func TestChangefeedStatManagementConcurrent(t *testing.T) { - _, store := newEventStoreForTest(fmt.Sprintf("/tmp/%s", t.Name())) - es := store.(*eventStore) - - cfID1 := common.NewChangefeedID4Test("default", "test-cf-1") - cfID2 := common.NewChangefeedID4Test("default", "test-cf-2") - cfIDs := []common.ChangeFeedID{cfID1, cfID2} - - concurrency := 100 - dispatchersPerRoutine := 20 - var wg sync.WaitGroup - wg.Add(concurrency) - - for i := 0; i < concurrency; i++ { - go func(routineID int) { - defer wg.Done() - for j := 0; j < dispatchersPerRoutine; j++ { - dispatcherID := common.NewDispatcherID() - cfID := cfIDs[(routineID*dispatchersPerRoutine+j)%len(cfIDs)] - span := &heartbeatpb.TableSpan{TableID: int64(j)} - - ok := es.RegisterDispatcher(cfID, dispatcherID, span, 100, func(u uint64, u2 uint64) {}, false, false) - require.True(t, ok) - - es.UnregisterDispatcher(cfID, dispatcherID) - } - }(i) - } - - wg.Wait() - - // After all operations, the changefeedMeta should be empty because all dispatchers are unregistered. - isEmpty := true - es.changefeedMeta.Range(func(key, value interface{}) bool { - isEmpty = false - return false // stop iteration - }) - require.True(t, isEmpty, "changefeedMeta should be empty after all dispatchers are unregistered") -} - func TestWriteToEventStore(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 2b6d2c41dd..f227e5eedc 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -151,8 +151,6 @@ func newDispatcherStat( // A small value to avoid too many scan tasks at the first place. dispStat.lastScanBytes.Store(1024) - changefeedStatus.addDispatcher() - if info.SyncPointEnabled() { dispStat.enableSyncPoint = true dispStat.nextSyncPoint.Store(info.GetSyncPointTs()) @@ -416,24 +414,30 @@ func (c *resolvedTsCache) reset() { type changefeedStatus struct { changefeedID common.ChangeFeedID - // dispatcherCount is the number of the dispatchers that belong to this changefeed. - dispatcherCount atomic.Uint64 + dispatchers sync.Map // common.DispatcherID -> *atomic.Pointer[dispatcherStat] - dispatcherStatMap sync.Map // nodeID -> dispatcherID -> dispatcherStat availableMemoryQuota sync.Map // nodeID -> atomic.Uint64 (memory quota in bytes) } func newChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus { - stat := &changefeedStatus{ + return &changefeedStatus{ changefeedID: changefeedID, } - return stat } -func (c *changefeedStatus) addDispatcher() { - c.dispatcherCount.Inc() +func (c *changefeedStatus) addDispatcher(id common.DispatcherID, dispatcher *atomic.Pointer[dispatcherStat]) { + c.dispatchers.Store(id, dispatcher) +} + +func (c *changefeedStatus) removeDispatcher(id common.DispatcherID) { + c.dispatchers.Delete(id) } -func (c *changefeedStatus) removeDispatcher() { - c.dispatcherCount.Dec() +func (c *changefeedStatus) isEmpty() bool { + empty := true + c.dispatchers.Range(func(key, value any) bool { + empty = false + return false // stop iteration + }) + return empty } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 7a11b4f54e..250aed5313 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -895,6 +895,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { dispatcher := newDispatcherStat(info, uint64(len(c.taskChan)), uint64(len(c.messageCh)), nil, status) dispatcherPtr := &atomic.Pointer[dispatcherStat]{} dispatcherPtr.Store(dispatcher) + status.addDispatcher(id, dispatcherPtr) if span.Equal(common.KeyspaceDDLSpan(span.KeyspaceID)) { c.tableTriggerDispatchers.Store(id, dispatcherPtr) log.Info("table trigger dispatcher register dispatcher", @@ -971,10 +972,10 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { } stat := statPtr.(*atomic.Pointer[dispatcherStat]).Load() - stat.changefeedStat.removeDispatcher() + stat.changefeedStat.removeDispatcher(id) changefeedID := dispatcherInfo.GetChangefeedID() - if stat.changefeedStat.dispatcherCount.Load() == 0 { + if stat.changefeedStat.isEmpty() { log.Info("All dispatchers for the changefeed are removed, remove the changefeed status", zap.Stringer("changefeedID", changefeedID), ) @@ -1046,6 +1047,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { for { if statPtr.CompareAndSwap(oldStat, newStat) { + status.addDispatcher(dispatcherID, statPtr) break } log.Warn("reset dispatcher failed since the dispatcher is changed concurrently", diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 1a363e0f8d..cdd8aad615 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -178,6 +178,11 @@ func TestCURDDispatcher(t *testing.T) { require.Nil(t, err) disp := broker.getDispatcher(dispInfo.GetID()).Load() require.NotNil(t, disp) + // Check changefeedStatus after adding a dispatcher + cfStatus, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID()) + require.True(t, ok, "changefeedStatus should exist after adding a dispatcher") + require.False(t, cfStatus.(*changefeedStatus).isEmpty(), "changefeedStatus should not be empty") + require.Equal(t, disp.id, dispInfo.GetID()) // Case 2: Reset a dispatcher. @@ -189,12 +194,19 @@ func TestCURDDispatcher(t *testing.T) { require.NotNil(t, disp) require.Equal(t, disp.id, dispInfo.GetID()) // Check the resetTs is updated. + // Check changefeedStatus after resetting a dispatcher + cfStatus, ok = broker.changefeedMap.Load(dispInfo.GetChangefeedID()) + require.True(t, ok, "changefeedStatus should still exist after resetting") + require.False(t, cfStatus.(*changefeedStatus).isEmpty(), "changefeedStatus should not be empty after resetting") require.Equal(t, disp.startTs, dispInfo.GetStartTs()) // Case 3: Remove a dispatcher. broker.removeDispatcher(dispInfo) dispPtr := broker.getDispatcher(dispInfo.GetID()) require.Nil(t, dispPtr) + // Check changefeedStatus after removing the only dispatcher + _, ok = broker.changefeedMap.Load(dispInfo.GetChangefeedID()) + require.False(t, ok, "changefeedStatus should be removed after the last dispatcher is removed") } func TestResetDispatcher(t *testing.T) { diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index 8ba2acc2dc..c7ef0fdf6b 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -322,6 +322,10 @@ func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange return iter } +func (m *mockEventStore) GetLogCoordinatorNodeID() node.ID { + return "" +} + func (m *mockEventStore) RegisterDispatcher( changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID, diff --git a/pkg/eventservice/metrics_collector.go b/pkg/eventservice/metrics_collector.go index be9f402760..709af0b1e2 100644 --- a/pkg/eventservice/metrics_collector.go +++ b/pkg/eventservice/metrics_collector.go @@ -20,7 +20,9 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/ticdc/logservice/logservicepb" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" @@ -124,8 +126,14 @@ func newMetricsCollector(broker *eventBroker) *metricsCollector { // Run starts the metrics collection loop func (mc *metricsCollector) Run(ctx context.Context) error { + // note: this ticker cannot be frequent, + // otherwise it may influence the performance of data sync ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + // need a more frequent ticker to report changefeed metrics to log coordinator for accurate metrics + // and the frequency is ok because the reporting doesn't hold any lock which may influence data sync. + reportTicker := time.NewTicker(1 * time.Second) + defer reportTicker.Stop() log.Info("metrics collector started") for { @@ -137,6 +145,8 @@ func (mc *metricsCollector) Run(ctx context.Context) error { snapshot := mc.collectMetrics() mc.updateMetricsFromSnapshot(snapshot) mc.logSlowDispatchers(snapshot) + case <-reportTicker.C: + mc.reportChangefeedStatesToLogCoordinator() } } } @@ -247,3 +257,32 @@ func (mc *metricsCollector) logSlowDispatchers(snapshot *metricsSnapshot) { zap.Bool("isTaskScanning", snapshot.slowestDispatcher.isTaskScanning.Load()), ) } + +// reportChangefeedStatesToLogCoordinator collects and reports the state of all changefeeds to the log coordinator. +func (mc *metricsCollector) reportChangefeedStatesToLogCoordinator() { + var states []*logservicepb.ChangefeedStateEntry + mc.broker.changefeedMap.Range(func(key, value any) bool { + cfStatus := value.(*changefeedStatus) + minResolvedTs := uint64(math.MaxUint64) + cfStatus.dispatchers.Range(func(key, value any) bool { + dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() + resolvedTs := dispatcher.receivedResolvedTs.Load() + if resolvedTs < minResolvedTs { + minResolvedTs = resolvedTs + } + return true + }) + states = append(states, &logservicepb.ChangefeedStateEntry{ + ChangefeedID: cfStatus.changefeedID.ToPB(), + ResolvedTs: minResolvedTs, + }) + return true + }) + coordinatorID := mc.broker.eventStore.GetLogCoordinatorNodeID() + if coordinatorID != "" { + msg := messaging.NewSingleTargetMessage(coordinatorID, messaging.LogCoordinatorTopic, &logservicepb.ChangefeedStates{States: states}) + if err := mc.broker.msgSender.SendEvent(msg); err != nil { + log.Warn("send changefeed metrics to coordinator failed", zap.Error(err)) + } + } +} diff --git a/tests/integration_tests/synced_status/run.sh b/tests/integration_tests/synced_status/run.sh index 03a0f2f26c..a490a9c0ad 100755 --- a/tests/integration_tests/synced_status/run.sh +++ b/tests/integration_tests/synced_status/run.sh @@ -263,10 +263,7 @@ function run_case_with_failpoint() { exit 1 fi info=$(echo $synced_status | jq -r '.info') - target_message="Please check whether PD is online and TiKV Regions are all available. \ -If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ -If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ -Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" + target_message="The data syncing is not finished, please wait" if [ "$info" != "$target_message" ]; then echo "synced status info is not correct" exit 1 diff --git a/tests/integration_tests/synced_status_with_redo/conf/changefeed-redo.toml b/tests/integration_tests/synced_status_with_redo/conf/changefeed-redo.toml index 8f67ecb9d8..30eef94651 100644 --- a/tests/integration_tests/synced_status_with_redo/conf/changefeed-redo.toml +++ b/tests/integration_tests/synced_status_with_redo/conf/changefeed-redo.toml @@ -4,4 +4,4 @@ checkpoint-interval = 20 [consistent] level="eventual" -storage = "file:///tmp/tidb_cdc_test/synced_status/redo" \ No newline at end of file +storage = "file:///tmp/tidb_cdc_test/synced_status_with_redo/redo" \ No newline at end of file diff --git a/tests/integration_tests/synced_status_with_redo/run.sh b/tests/integration_tests/synced_status_with_redo/run.sh index 966ca41dc1..030c7b5357 100755 --- a/tests/integration_tests/synced_status_with_redo/run.sh +++ b/tests/integration_tests/synced_status_with_redo/run.sh @@ -266,10 +266,7 @@ function run_case_with_failpoint() { exit 1 fi info=$(echo $synced_status | jq -r '.info') - target_message="Please check whether PD is online and TiKV Regions are all available. \ -If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ -If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ -Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" + target_message="The data syncing is not finished, please wait" if [ "$info" != "$target_message" ]; then echo "synced status info is not correct" exit 1