Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtorc: require topo for Healthy: true in /debug/health #17129

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ import (
)

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards() {
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
// Get all the keyspaces
keyspaces, err = ts.GetKeyspaces(ctx)
if err != nil {
log.Error(err)
return
return err
}
} else {
// Parse input and build list of keyspaces
Expand All @@ -55,14 +54,14 @@ func RefreshAllKeyspacesAndShards() {
}
if len(keyspaces) == 0 {
log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
return
return nil
}
}

// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for idx, keyspace := range keyspaces {
Expand All @@ -83,6 +82,8 @@ func RefreshAllKeyspacesAndShards() {
}(keyspace)
}
wg.Wait()

return nil
}

// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
RefreshAllKeyspacesAndShards()
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))

// Verify that we only have ks1 and ks3 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
Expand All @@ -107,7 +107,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
clustersToWatch = nil
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", "semi_sync")
RefreshAllKeyspacesAndShards()
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))

// Verify that all the keyspaces are correctly reloaded
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "")
Expand Down
36 changes: 15 additions & 21 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/spf13/pflag"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand All @@ -38,7 +37,6 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -69,36 +67,31 @@ func OpenTabletDiscovery() <-chan time.Time {
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
}
// We refresh all information from the topo once before we start the ticks to do it on a timer.
populateAllInformation()
// We refresh all information from the topo once before we start the ticks to do
// it on a timer. We can wait forever (context.Background()) for this call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain how do you mean "We can wait forever (context.Background()) for this call."?

Copy link
Contributor Author

@timvaillancourt timvaillancourt Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach this comment is confusing, yeah

What I intended with adding the context.Context is to prevent ticks from overlapping here (currently no timeout)

This area of the code is ran once before the ticker starts at vtorc init time, so I figured waiting forever had no risk. Adding a timeout here as well wouldn't hurt either I suppose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is to prevent ticks from overlapping here

Sorry, not "overlapping" per se, but one loop iteration stalling things

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do prefer that we have some form of timeout -- let's say this errors and we cannot connect to topo -- is there going to be some retry? If yes, let's timeout. If not, let's consider our options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach makes sense, I've updated to use a timeout == topo.RemoteOperationTimeout 👍

if err := refreshAllInformation(context.Background()); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// populateAllInformation initializes all the information for VTOrc to function.
func populateAllInformation() {
refreshAllInformation()
// We have completed one full discovery cycle. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(tabletAlias string) {
func refreshAllTablets(ctx context.Context) error {
return refreshTabletsUsing(ctx, func(tabletAlias string) {
DiscoverInstance(tabletAlias, false /* forceDiscovery */)
}, false /* forceRefresh */)
}

func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
if len(clustersToWatch) == 0 { // all known clusters
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
if err != nil {
log.Error(err)
return
return err
}

refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, cell := range cells {
Expand All @@ -119,7 +112,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
Expand All @@ -138,9 +131,9 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}
if len(keyspaceShards) == 0 {
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
return
return nil
}
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, ks := range keyspaceShards {
Expand All @@ -152,6 +145,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}
wg.Wait()
}
return nil
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
Expand Down
20 changes: 0 additions & 20 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
)

var (
Expand Down Expand Up @@ -369,25 +368,6 @@ func TestGetLockAction(t *testing.T) {
}
}

// TestProcessHealth tests that the health of the process reflects that we have run the first discovery once correctly.
func TestProcessHealth(t *testing.T) {
require.False(t, process.FirstDiscoveryCycleComplete.Load())
originalTs := ts
defer func() {
ts = originalTs
process.FirstDiscoveryCycleComplete.Store(false)
}()
// Verify in the beginning, we have the first DiscoveredOnce field false.
_, discoveredOnce := process.HealthTest()
require.False(t, discoveredOnce)
ts = memorytopo.NewServer(context.Background(), cell1)
populateAllInformation()
require.True(t, process.FirstDiscoveryCycleComplete.Load())
// Verify after we populate all information, we have the first DiscoveredOnce field true.
_, discoveredOnce = process.HealthTest()
require.True(t, discoveredOnce)
}

func TestSetReadOnly(t *testing.T) {
tests := []struct {
name string
Expand Down
38 changes: 23 additions & 15 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package logic

import (
"context"
"os"
"os/signal"
"sync"
Expand All @@ -26,6 +27,7 @@ import (

"github.com/patrickmn/go-cache"
"github.com/sjmudd/stopwatch"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand All @@ -35,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/vtorc/discovery"
"vitess.io/vitess/go/vt/vtorc/inst"
ometrics "vitess.io/vitess/go/vt/vtorc/metrics"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vtorc/util"
)

Expand Down Expand Up @@ -328,30 +331,35 @@ func ContinuousDiscovery() {
go inst.SnapshotTopologies()
}()
case <-tabletTopoTick:
refreshAllInformation()
timeout := time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to refresh topo information: %+v", err)
}
cancel()
}
}
}

// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation() {
// Create a wait group
var wg sync.WaitGroup
func refreshAllInformation(ctx context.Context) error {
// Create an errgroup
eg, ctx := errgroup.WithContext(ctx)

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()
eg.Go(func() error {
return RefreshAllKeyspacesAndShards(ctx)
})

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()
eg.Go(func() error {
return refreshAllTablets(ctx)
})

// Wait for both the refreshes to complete
wg.Wait()
err := eg.Wait()
if err == nil {
process.FirstDiscoveryCycleComplete.Store(true)
}
return err
}
44 changes: 44 additions & 0 deletions go/vt/vtorc/logic/vtorc_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package logic

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/process"
)

func TestWaitForLocksRelease(t *testing.T) {
Expand Down Expand Up @@ -54,3 +60,41 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
waitForLocksRelease()
return time.Since(start)
}

func TestRefreshAllInformation(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
defer func() {
ts = oldTs
}()

// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

// Verify in the beginning, we have the first DiscoveredOnce field false.
_, discoveredOnce := process.HealthTest()
require.False(t, discoveredOnce)

// Create a memory topo-server and create the keyspace and shard records
ts = memorytopo.NewServer(context.Background(), cell1)
_, err := ts.GetOrCreateShard(context.Background(), keyspace, shard)
require.NoError(t, err)

// Test error
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel context to simulate timeout
require.Error(t, refreshAllInformation(ctx))
require.False(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.False(t, discoveredOnce)

// Test success
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
require.NoError(t, refreshAllInformation(ctx2))
require.True(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.True(t, discoveredOnce)
}
2 changes: 1 addition & 1 deletion go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func writeHealthToDatabase() bool {
func HealthTest() (health *NodeHealth, discoveredOnce bool) {
ThisNodeHealth.LastReported = time.Now()
discoveredOnce = FirstDiscoveryCycleComplete.Load()
ThisNodeHealth.Healthy = writeHealthToDatabase()
ThisNodeHealth.Healthy = discoveredOnce && writeHealthToDatabase()

return ThisNodeHealth, discoveredOnce
}
46 changes: 46 additions & 0 deletions go/vt/vtorc/process/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package process

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestHealthTest(t *testing.T) {
defer func() {
FirstDiscoveryCycleComplete.Store(false)
ThisNodeHealth = &NodeHealth{}
}()

require.Zero(t, ThisNodeHealth.LastReported)
require.False(t, ThisNodeHealth.Healthy)

ThisNodeHealth = &NodeHealth{}
health, discoveredOnce := HealthTest()
require.False(t, health.Healthy)
require.False(t, discoveredOnce)
require.NotZero(t, ThisNodeHealth.LastReported)

ThisNodeHealth = &NodeHealth{}
FirstDiscoveryCycleComplete.Store(true)
health, discoveredOnce = HealthTest()
require.True(t, health.Healthy)
require.True(t, discoveredOnce)
require.NotZero(t, ThisNodeHealth.LastReported)
}
Loading