Skip to content

Commit

Permalink
Add stats for shards watched by VTOrc, purge stale shards (vitessio#1…
Browse files Browse the repository at this point in the history
…7815)

Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Mar 1, 2025
1 parent a67bab1 commit 8eb8c75
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
7 changes: 7 additions & 0 deletions go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ func TestSaveReadAndDeleteShard(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{tt.shardName}, shardNames)

// ReadAllShardNames
allShardNames, err := ReadAllShardNames()
require.NoError(t, err)
ksShards, found := allShardNames[tt.keyspaceName]
require.True(t, found)
require.Equal(t, []string{tt.shardName}, ksShards)

// DeleteShard
require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName))
_, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
Expand Down
32 changes: 28 additions & 4 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -108,6 +109,26 @@ func RefreshKeyspaceAndShard(keyspaceName string, shardName string) error {
return refreshShard(keyspaceName, shardName)
}

// shouldWatchShard returns true if a shard is within the shardsToWatch
// ranges for it's keyspace.
func shouldWatchShard(shard *topo.ShardInfo) bool {
if len(shardsToWatch) == 0 {
return true
}

watchRanges, found := shardsToWatch[shard.Keyspace()]
if !found {
return false
}

for _, keyRange := range watchRanges {
if key.KeyRangeContainsKeyRange(keyRange, shard.GetKeyRange()) {
return true
}
}
return false
}

// refreshKeyspace refreshes the keyspace's information for the given keyspace from the topo
func refreshKeyspace(keyspaceName string) error {
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -149,10 +170,14 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
log.Error(err)
return err
}

// save shards that should be watched.
savedShards := make(map[string]bool, len(shardInfos))
for _, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
if !shouldWatchShard(shardInfo) {
continue
}
if err = inst.SaveShard(shardInfo); err != nil {
log.Error(err)
return err
}
Expand All @@ -171,8 +196,7 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
}
shardName := topoproto.KeyspaceShardString(keyspaceName, shard)
log.Infof("Forgetting shard: %s", shardName)
err = inst.DeleteShard(keyspaceName, shard)
if err != nil {
if err = inst.DeleteShard(keyspaceName, shard); err != nil {
log.Errorf("Failed to delete shard %s: %+v", shardName, err)
return err
}
Expand Down
41 changes: 41 additions & 0 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,44 @@ func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAli
require.NoError(t, err)
require.Equal(t, primaryAliasWanted, primaryAlias)
}

func TestRefreshAllShards(t *testing.T) {
// Store the old flags and restore on test completion
oldClustersToWatch := clustersToWatch
oldTs := ts
defer func() {
clustersToWatch = oldClustersToWatch
ts = oldTs
db.ClearVTOrcDatabase()
}()

ctx := context.Background()
ts = memorytopo.NewServer(ctx, "zone1")
require.NoError(t, initializeShardsToWatch())
require.NoError(t, ts.CreateKeyspace(ctx, "ks1", keyspaceDurabilityNone))
shards := []string{"-40", "40-80", "80-c0", "c0-"}
for _, shard := range shards {
require.NoError(t, ts.CreateShard(ctx, "ks1", shard))
}

// test shard refresh
require.NoError(t, refreshAllShards(ctx, "ks1"))
shardNames, err := inst.ReadShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-40", "40-80", "80-c0", "c0-"}, shardNames)

// test topo shard delete propagates
require.NoError(t, ts.DeleteShard(ctx, "ks1", "c0-"))
require.NoError(t, refreshAllShards(ctx, "ks1"))
shardNames, err = inst.ReadShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-40", "40-80", "80-c0"}, shardNames)

// test clustersToWatch filters what shards are saved
clustersToWatch = []string{"ks1/-80"}
require.NoError(t, initializeShardsToWatch())
require.NoError(t, refreshAllShards(ctx, "ks1"))
shardNames, err = inst.ReadShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-40", "40-80"}, shardNames)
}

0 comments on commit 8eb8c75

Please sign in to comment.