From 7fb7f413140bc9f0236ab49e540f55aa191b0f46 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 24 Apr 2025 15:03:59 -0400 Subject: [PATCH 1/3] =?UTF-8?q?This=20adds=20a=20=E2=80=9CDetail=E2=80=9D?= =?UTF-8?q?=20column=20to=20the=20worker-thread=20table.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This will make it easier to confirm, at a glance, that a given worker thread is still doing useful work. --- internal/verifier/compare.go | 12 ++++++++++++ internal/verifier/migration_verifier.go | 2 ++ internal/verifier/summary.go | 3 ++- internal/verifier/worker_tracker.go | 11 +++++++++++ 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 2bc732e7..aba799a6 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -3,9 +3,11 @@ package verifier import ( "bytes" "context" + "fmt" "time" "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" @@ -19,6 +21,7 @@ const readTimeout = 10 * time.Minute func (verifier *Verifier) FetchAndCompareDocuments( givenCtx context.Context, + workerNum int, task *VerificationTask, ) ( []VerificationResult, @@ -61,6 +64,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( var err error results, docCount, byteCount, err = verifier.compareDocsFromChannels( ctx, + workerNum, fi, task, srcChannel, @@ -77,6 +81,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( func (verifier *Verifier) compareDocsFromChannels( ctx context.Context, + workerNum int, fi *retry.FuncInfo, task *VerificationTask, srcChannel, dstChannel <-chan bson.Raw, @@ -196,6 +201,13 @@ func (verifier *Verifier) compareDocsFromChannels( srcDocCount++ srcByteCount += types.ByteCount(len(srcDoc)) + verifier.workerTracker.SetDetail( + workerNum, + fmt.Sprintf( + "read %s documents", + reportutils.FmtReal(srcDocCount), + ), + ) } return nil diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index d48492d9..72cc19bd 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -640,6 +640,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, var curBytesCount types.ByteCount curProblems, curDocsCount, curBytesCount, err = verifier.FetchAndCompareDocuments( ctx, + workerNum, &miniTask, ) @@ -654,6 +655,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, } else { problems, docsCount, bytesCount, err = verifier.FetchAndCompareDocuments( ctx, + workerNum, task, ) } diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index c578c35b..440d4133 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -483,7 +483,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder, n func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.Time) { table := tablewriter.NewWriter(builder) - table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"}) + table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed", "Detail"}) wsmap := verifier.workerTracker.Load() @@ -512,6 +512,7 @@ func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.T wsmap[w].Namespace, taskIdStr, reportutils.DurationToHMS(now.Sub(wsmap[w].StartTime)), + wsmap[w].Detail, }, ) } diff --git a/internal/verifier/worker_tracker.go b/internal/verifier/worker_tracker.go index 116f8d82..9b5f582c 100644 --- a/internal/verifier/worker_tracker.go +++ b/internal/verifier/worker_tracker.go @@ -24,6 +24,7 @@ type WorkerStatus struct { TaskType verificationTaskType Namespace string StartTime time.Time + Detail string } // NewWorkerTracker creates and returns a WorkerTracker. @@ -51,6 +52,16 @@ func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) { }) } +func (wt *WorkerTracker) SetDetail(workerNum int, detail string) { + wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { + status := m[workerNum] + status.Detail = detail + m[workerNum] = status + + return m + }) +} + // Unset tells the WorkerTracker that the worker is now inactive. func (wt *WorkerTracker) Unset(workerNum int) { wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { From c588b5ab58a42e26ac324d15cdbb97d1530af9b0 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 24 Apr 2025 15:08:04 -0400 Subject: [PATCH 2/3] fix tests --- internal/verifier/migration_verifier_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 5b16162a..571122ff 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -100,7 +100,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { // Test fetchDocuments without global filter. verifier.globalFilter = nil - results, docCount, byteCount, err := verifier.FetchAndCompareDocuments(ctx, task) + results, docCount, byteCount, err := verifier.FetchAndCompareDocuments(ctx, 0, task) suite.Require().NoError(err) suite.Assert().EqualValues(2, docCount, "should find source docs") suite.Assert().NotZero(byteCount, "should tally docs' size") @@ -112,7 +112,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { // Test fetchDocuments for ids with a global filter. verifier.globalFilter = map[string]any{"num": map[string]any{"$lt": 100}} - results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, task) + results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, 0, task) suite.Require().NoError(err) suite.Assert().EqualValues(1, docCount, "should find source docs") suite.Assert().NotZero(byteCount, "should tally docs' size") @@ -129,7 +129,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { Ns: &partitions.Namespace{DB: "keyhole", Coll: "dealers"}, } verifier.globalFilter = map[string]any{"num": map[string]any{"$lt": 100}} - results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, task) + results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, 0, task) suite.Require().NoError(err) suite.Assert().EqualValues(1, docCount, "should find source docs") suite.Assert().NotZero(byteCount, "should tally docs' size") @@ -655,6 +655,7 @@ func TestVerifierCompareDocs(t *testing.T) { results, docCount, byteCount, err = verifier.compareDocsFromChannels( ctx, + 0, fi, &fauxTask, srcChannel, From a83bd12155a41b72982d0e29ca19b017c6b66952 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 24 Apr 2025 15:14:39 -0400 Subject: [PATCH 3/3] fix more --- internal/verifier/compare_test.go | 1 + internal/verifier/migration_verifier_bench_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go index cd355e67..6dad6c0a 100644 --- a/internal/verifier/compare_test.go +++ b/internal/verifier/compare_test.go @@ -58,6 +58,7 @@ func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { go func() { _, _, _, err := verifier.FetchAndCompareDocuments( cancelableCtx, + 0, &task, ) if err != nil { diff --git a/internal/verifier/migration_verifier_bench_test.go b/internal/verifier/migration_verifier_bench_test.go index 34b8b6ef..6401751a 100644 --- a/internal/verifier/migration_verifier_bench_test.go +++ b/internal/verifier/migration_verifier_bench_test.go @@ -83,7 +83,7 @@ func BenchmarkGeneric(t *testing.B) { qfilter := QueryFilter{Namespace: namespace} task := VerificationTask{QueryFilter: qfilter} // TODO: is this safe? - mismatchedIds, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(context.Background(), &task) + mismatchedIds, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(context.Background(), 0, &task) if err != nil { t.Fatal(err) }