diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 8b096889851..2d35a63a6ce 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -109,6 +109,13 @@ func TestSaveReadAndDeleteShard(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{tt.shardName}, shardNames) + // ReadAllShardNames + allShardNames, err := ReadAllShardNames() + require.NoError(t, err) + ksShards, found := allShardNames[tt.keyspaceName] + require.True(t, found) + require.Equal(t, []string{tt.shardName}, ksShards) + // DeleteShard require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName)) _, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 77f3930be1e..38903124b6d 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -23,6 +23,7 @@ import ( "golang.org/x/exp/maps" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -108,6 +109,26 @@ func RefreshKeyspaceAndShard(keyspaceName string, shardName string) error { return refreshShard(keyspaceName, shardName) } +// shouldWatchShard returns true if a shard is within the shardsToWatch +// ranges for it's keyspace. +func shouldWatchShard(shard *topo.ShardInfo) bool { + if len(shardsToWatch) == 0 { + return true + } + + watchRanges, found := shardsToWatch[shard.Keyspace()] + if !found { + return false + } + + for _, keyRange := range watchRanges { + if key.KeyRangeContainsKeyRange(keyRange, shard.GetKeyRange()) { + return true + } + } + return false +} + // refreshKeyspace refreshes the keyspace's information for the given keyspace from the topo func refreshKeyspace(keyspaceName string) error { refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -149,10 +170,14 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { log.Error(err) return err } + + // save shards that should be watched. savedShards := make(map[string]bool, len(shardInfos)) for _, shardInfo := range shardInfos { - err = inst.SaveShard(shardInfo) - if err != nil { + if !shouldWatchShard(shardInfo) { + continue + } + if err = inst.SaveShard(shardInfo); err != nil { log.Error(err) return err } @@ -171,8 +196,7 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { } shardName := topoproto.KeyspaceShardString(keyspaceName, shard) log.Infof("Forgetting shard: %s", shardName) - err = inst.DeleteShard(keyspaceName, shard) - if err != nil { + if err = inst.DeleteShard(keyspaceName, shard); err != nil { log.Errorf("Failed to delete shard %s: %+v", shardName, err) return err } diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 42d0cdebbdf..10b53bf03a2 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -311,3 +311,44 @@ func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAli require.NoError(t, err) require.Equal(t, primaryAliasWanted, primaryAlias) } + +func TestRefreshAllShards(t *testing.T) { + // Store the old flags and restore on test completion + oldClustersToWatch := clustersToWatch + oldTs := ts + defer func() { + clustersToWatch = oldClustersToWatch + ts = oldTs + db.ClearVTOrcDatabase() + }() + + ctx := context.Background() + ts = memorytopo.NewServer(ctx, "zone1") + require.NoError(t, initializeShardsToWatch()) + require.NoError(t, ts.CreateKeyspace(ctx, "ks1", keyspaceDurabilityNone)) + shards := []string{"-40", "40-80", "80-c0", "c0-"} + for _, shard := range shards { + require.NoError(t, ts.CreateShard(ctx, "ks1", shard)) + } + + // test shard refresh + require.NoError(t, refreshAllShards(ctx, "ks1")) + shardNames, err := inst.ReadShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-40", "40-80", "80-c0", "c0-"}, shardNames) + + // test topo shard delete propagates + require.NoError(t, ts.DeleteShard(ctx, "ks1", "c0-")) + require.NoError(t, refreshAllShards(ctx, "ks1")) + shardNames, err = inst.ReadShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-40", "40-80", "80-c0"}, shardNames) + + // test clustersToWatch filters what shards are saved + clustersToWatch = []string{"ks1/-80"} + require.NoError(t, initializeShardsToWatch()) + require.NoError(t, refreshAllShards(ctx, "ks1")) + shardNames, err = inst.ReadShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-40", "40-80"}, shardNames) +}