diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 2bc732e..aba799a 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -3,9 +3,11 @@ package verifier import ( "bytes" "context" + "fmt" "time" "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" @@ -19,6 +21,7 @@ const readTimeout = 10 * time.Minute func (verifier *Verifier) FetchAndCompareDocuments( givenCtx context.Context, + workerNum int, task *VerificationTask, ) ( []VerificationResult, @@ -61,6 +64,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( var err error results, docCount, byteCount, err = verifier.compareDocsFromChannels( ctx, + workerNum, fi, task, srcChannel, @@ -77,6 +81,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( func (verifier *Verifier) compareDocsFromChannels( ctx context.Context, + workerNum int, fi *retry.FuncInfo, task *VerificationTask, srcChannel, dstChannel <-chan bson.Raw, @@ -196,6 +201,13 @@ func (verifier *Verifier) compareDocsFromChannels( srcDocCount++ srcByteCount += types.ByteCount(len(srcDoc)) + verifier.workerTracker.SetDetail( + workerNum, + fmt.Sprintf( + "read %s documents", + reportutils.FmtReal(srcDocCount), + ), + ) } return nil diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go index cd355e6..6dad6c0 100644 --- a/internal/verifier/compare_test.go +++ b/internal/verifier/compare_test.go @@ -58,6 +58,7 @@ func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { go func() { _, _, _, err := verifier.FetchAndCompareDocuments( cancelableCtx, + 0, &task, ) if err != nil { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index d48492d..72cc19b 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -640,6 +640,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, var curBytesCount types.ByteCount curProblems, curDocsCount, curBytesCount, err = verifier.FetchAndCompareDocuments( ctx, + workerNum, &miniTask, ) @@ -654,6 +655,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, } else { problems, docsCount, bytesCount, err = verifier.FetchAndCompareDocuments( ctx, + workerNum, task, ) } diff --git a/internal/verifier/migration_verifier_bench_test.go b/internal/verifier/migration_verifier_bench_test.go index 34b8b6e..6401751 100644 --- a/internal/verifier/migration_verifier_bench_test.go +++ b/internal/verifier/migration_verifier_bench_test.go @@ -83,7 +83,7 @@ func BenchmarkGeneric(t *testing.B) { qfilter := QueryFilter{Namespace: namespace} task := VerificationTask{QueryFilter: qfilter} // TODO: is this safe? - mismatchedIds, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(context.Background(), &task) + mismatchedIds, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(context.Background(), 0, &task) if err != nil { t.Fatal(err) } diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 5b16162..571122f 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -100,7 +100,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { // Test fetchDocuments without global filter. verifier.globalFilter = nil - results, docCount, byteCount, err := verifier.FetchAndCompareDocuments(ctx, task) + results, docCount, byteCount, err := verifier.FetchAndCompareDocuments(ctx, 0, task) suite.Require().NoError(err) suite.Assert().EqualValues(2, docCount, "should find source docs") suite.Assert().NotZero(byteCount, "should tally docs' size") @@ -112,7 +112,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { // Test fetchDocuments for ids with a global filter. verifier.globalFilter = map[string]any{"num": map[string]any{"$lt": 100}} - results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, task) + results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, 0, task) suite.Require().NoError(err) suite.Assert().EqualValues(1, docCount, "should find source docs") suite.Assert().NotZero(byteCount, "should tally docs' size") @@ -129,7 +129,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { Ns: &partitions.Namespace{DB: "keyhole", Coll: "dealers"}, } verifier.globalFilter = map[string]any{"num": map[string]any{"$lt": 100}} - results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, task) + results, docCount, byteCount, err = verifier.FetchAndCompareDocuments(ctx, 0, task) suite.Require().NoError(err) suite.Assert().EqualValues(1, docCount, "should find source docs") suite.Assert().NotZero(byteCount, "should tally docs' size") @@ -655,6 +655,7 @@ func TestVerifierCompareDocs(t *testing.T) { results, docCount, byteCount, err = verifier.compareDocsFromChannels( ctx, + 0, fi, &fauxTask, srcChannel, diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index c578c35..440d413 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -483,7 +483,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder, n func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.Time) { table := tablewriter.NewWriter(builder) - table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"}) + table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed", "Detail"}) wsmap := verifier.workerTracker.Load() @@ -512,6 +512,7 @@ func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.T wsmap[w].Namespace, taskIdStr, reportutils.DurationToHMS(now.Sub(wsmap[w].StartTime)), + wsmap[w].Detail, }, ) } diff --git a/internal/verifier/worker_tracker.go b/internal/verifier/worker_tracker.go index 116f8d8..9b5f582 100644 --- a/internal/verifier/worker_tracker.go +++ b/internal/verifier/worker_tracker.go @@ -24,6 +24,7 @@ type WorkerStatus struct { TaskType verificationTaskType Namespace string StartTime time.Time + Detail string } // NewWorkerTracker creates and returns a WorkerTracker. @@ -51,6 +52,16 @@ func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) { }) } +func (wt *WorkerTracker) SetDetail(workerNum int, detail string) { + wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap { + status := m[workerNum] + status.Detail = detail + m[workerNum] = status + + return m + }) +} + // Unset tells the WorkerTracker that the worker is now inactive. func (wt *WorkerTracker) Unset(workerNum int) { wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {