diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 1149d8ccd5e..066cc390d96 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -16,6 +16,8 @@ package checker import ( "fmt" + "math/rand" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -64,6 +66,7 @@ type ReplicaChecker struct { cluster schedule.Cluster conf config.Config regionWaitingList cache.Cache + r *rand.Rand } // NewReplicaChecker creates a replica checker. @@ -72,49 +75,50 @@ func NewReplicaChecker(cluster schedule.Cluster, conf config.Config, regionWaiti cluster: cluster, conf: conf, regionWaitingList: regionWaitingList, + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } // GetType return ReplicaChecker's type -func (r *ReplicaChecker) GetType() string { +func (c *ReplicaChecker) GetType() string { return replicaCheckerName } // Check verifies a region's replicas, creating an operator.Operator if need. -func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { +func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { replicaCheckerCounter.Inc() - if r.IsPaused() { + if c.IsPaused() { replicaCheckerPausedCounter.Inc() return nil } - if op := r.checkDownPeer(region); op != nil { + if op := c.checkDownPeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkOfflinePeer(region); op != nil { + if op := c.checkOfflinePeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkMakeUpReplica(region); op != nil { + if op := c.checkMakeUpReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkRemoveExtraReplica(region); op != nil { + if op := c.checkRemoveExtraReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } - if op := r.checkLocationReplacement(region); op != nil { + if op := c.checkLocationReplacement(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } return nil } -func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveDownReplicaEnabled() { +func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveDownReplicaEnabled() { return nil } @@ -124,22 +128,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat continue } storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil } // Only consider the state of the Store, not `stats.DownSeconds`. - if store.DownTime() < r.conf.GetMaxStoreDownTime() { + if store.DownTime() < c.conf.GetMaxStoreDownTime() { continue } - return r.fixPeer(region, storeID, downStatus) + return c.fixPeer(region, storeID, downStatus) } return nil } -func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsReplaceOfflineReplicaEnabled() { +func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsReplaceOfflineReplicaEnabled() { return nil } @@ -150,7 +154,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope for _, peer := range region.GetPeers() { storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil @@ -159,32 +163,32 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope continue } - return r.fixPeer(region, storeID, offlineStatus) + return c.fixPeer(region, storeID, offlineStatus) } return nil } -func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsMakeUpReplicaEnabled() { +func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsMakeUpReplicaEnabled() { return nil } - if len(region.GetPeers()) >= r.conf.GetMaxReplicas() { + if len(region.GetPeers()) >= c.conf.GetMaxReplicas() { return nil } log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores) if target == 0 { log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID())) replicaCheckerNoTargetStoreCounter.Inc() if filterByTempState { - r.regionWaitingList.Put(region.GetID(), nil) + c.regionWaitingList.Put(region.GetID(), nil) } return nil } newPeer := &metapb.Peer{StoreId: target} - op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica) + op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica) if err != nil { log.Debug("create make-up-replica operator fail", errs.ZapError(err)) return nil @@ -192,24 +196,24 @@ func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O return op } -func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveExtraReplicaEnabled() { +func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveExtraReplicaEnabled() { return nil } // when add learner peer, the number of peer will exceed max replicas for a while, // just comparing the the number of voters to avoid too many cancel add operator log. - if len(region.GetVoters()) <= r.conf.GetMaxReplicas() { + if len(region.GetVoters()) <= c.conf.GetMaxReplicas() { return nil } log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - old := r.strategy(region).SelectStoreToRemove(regionStores) + regionStores := c.cluster.GetRegionStores(region) + old := c.strategy(c.r, region).SelectStoreToRemove(regionStores) if old == 0 { replicaCheckerNoWorstPeerCounter.Inc() - r.regionWaitingList.Put(region.GetID(), nil) + c.regionWaitingList.Put(region.GetID(), nil) return nil } - op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old) + op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -217,13 +221,13 @@ func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera return op } -func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsLocationReplacementEnabled() { +func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsLocationReplacementEnabled() { return nil } - strategy := r.strategy(region) - regionStores := r.cluster.GetRegionStores(region) + strategy := c.strategy(c.r, region) + regionStores := c.cluster.GetRegionStores(region) oldStore := strategy.SelectStoreToRemove(regionStores) if oldStore == 0 { replicaCheckerAllRightCounter.Inc() @@ -237,7 +241,7 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper } newPeer := &metapb.Peer{StoreId: newStore} - op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer) + op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -245,11 +249,11 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper return op } -func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { +func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { // Check the number of replicas first. - if len(region.GetVoters()) > r.conf.GetMaxReplicas() { + if len(region.GetVoters()) > c.conf.GetMaxReplicas() { removeExtra := fmt.Sprintf("remove-extra-%s-replica", status) - op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID) + op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID) if err != nil { if status == offlineStatus { replicaCheckerRemoveExtraOfflineFailedCounter.Inc() @@ -261,8 +265,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID) if target == 0 { if status == offlineStatus { replicaCheckerNoStoreOfflineCounter.Inc() @@ -271,13 +275,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status } log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID())) if filterByTempState { - r.regionWaitingList.Put(region.GetID(), nil) + c.regionWaitingList.Put(region.GetID(), nil) } return nil } newPeer := &metapb.Peer{StoreId: target} replace := fmt.Sprintf("replace-%s-replica", status) - op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer) + op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer) if err != nil { if status == offlineStatus { replicaCheckerReplaceOfflineFailedCounter.Inc() @@ -289,12 +293,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } -func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy { +func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy { return &ReplicaStrategy{ checkerName: replicaCheckerName, - cluster: r.cluster, - locationLabels: r.conf.GetLocationLabels(), - isolationLevel: r.conf.GetIsolationLevel(), + cluster: c.cluster, + locationLabels: c.conf.GetLocationLabels(), + isolationLevel: c.conf.GetIsolationLevel(), region: region, + r: r, } } diff --git a/pkg/schedule/checker/replica_strategy.go b/pkg/schedule/checker/replica_strategy.go index b8037612531..8822c1dfd76 100644 --- a/pkg/schedule/checker/replica_strategy.go +++ b/pkg/schedule/checker/replica_strategy.go @@ -15,6 +15,8 @@ package checker import ( + "math/rand" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" @@ -26,6 +28,7 @@ import ( // ReplicaStrategy collects some utilities to manipulate region peers. It // exists to allow replica_checker and rule_checker to reuse common logics. type ReplicaStrategy struct { + r *rand.Rand checkerName string // replica-checker / rule-checker cluster schedule.Cluster locationLabels []string @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level} - targetCandidate := filter.NewCandidates(s.cluster.GetStores()). + targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()). FilterTarget(s.cluster.GetOpts(), nil, nil, filters...). KeepTheTopStores(isolationComparer, false) // greater isolation score is better if targetCandidate.Len() == 0 { @@ -137,7 +140,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo if s.fastFailover { level = constant.Urgent } - source := filter.NewCandidates(coLocationStores). + source := filter.NewCandidates(s.r, coLocationStores). FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}). KeepTheTopStores(isolationComparer, true). PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index c7e27092346..eb109f6eb1e 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -18,6 +18,7 @@ import ( "context" "errors" "math" + "math/rand" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -91,6 +92,7 @@ type RuleChecker struct { pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder + r *rand.Rand } // NewRuleChecker creates a checker instance. @@ -103,6 +105,7 @@ func NewRuleChecker(ctx context.Context, cluster schedule.Cluster, ruleManager * pendingList: cache.NewDefaultCache(maxPendingListLen), switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetOpts().GetSwitchWitnessInterval()), record: newRecord(), + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -232,7 +235,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region ruleStores := c.getRuleFitStores(rf) isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic. - store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) if store == 0 { ruleCheckerNoStoreAddCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -283,7 +286,7 @@ func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *place fastFailover = false } ruleStores := c.getRuleFitStores(rf) - store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) if store == 0 { ruleCheckerNoStoreReplaceCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -424,7 +427,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic. - strategy := c.strategy(region, rf.Rule, isWitness) + strategy := c.strategy(c.r, region, rf.Rule, isWitness) ruleStores := c.getRuleFitStores(rf) oldStore := strategy.SelectStoreToRemove(ruleStores) if oldStore == 0 { @@ -645,7 +648,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb. return nil, false } -func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { +func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { return &ReplicaStrategy{ checkerName: c.name, cluster: c.cluster, @@ -654,6 +657,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa region: region, extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.name, rule.LabelConstraints)}, fastFailover: fastFailover, + r: r, } } diff --git a/pkg/schedule/filter/candidates.go b/pkg/schedule/filter/candidates.go index b3524da47ec..15cce23eaa0 100644 --- a/pkg/schedule/filter/candidates.go +++ b/pkg/schedule/filter/candidates.go @@ -17,7 +17,6 @@ package filter import ( "math/rand" "sort" - "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/config" @@ -32,8 +31,8 @@ type StoreCandidates struct { } // NewCandidates creates StoreCandidates with store list. -func NewCandidates(stores []*core.StoreInfo) *StoreCandidates { - return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores} +func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates { + return &StoreCandidates{r: r, Stores: stores} } // FilterSource keeps stores that can pass all source filters. diff --git a/pkg/schedule/filter/candidates_test.go b/pkg/schedule/filter/candidates_test.go index be30a874468..0fbb9027352 100644 --- a/pkg/schedule/filter/candidates_test.go +++ b/pkg/schedule/filter/candidates_test.go @@ -15,7 +15,9 @@ package filter import ( + "math/rand" "testing" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" @@ -112,7 +114,7 @@ func newTestCandidates(ids ...uint64) *StoreCandidates { for _, id := range ids { stores = append(stores, core.NewStoreInfo(&metapb.Store{Id: id})) } - return NewCandidates(stores) + return NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) } func check(re *require.Assertions, candidates *StoreCandidates, ids ...uint64) { diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 73fef718bad..bb11d3771d6 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -512,7 +512,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } - target := filter.NewCandidates([]*core.StoreInfo{solver.target}). + target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.target}). FilterTarget(conf, nil, l.filterCounter, finalFilters...). PickFirst() if target == nil { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 47cf3fa16ef..ed4a4375ba3 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -253,7 +253,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, solver.fit), } - candidates := filter.NewCandidates(dstStores).FilterTarget(solver.GetOpts(), collector, s.filterCounter, filters...) + candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(solver.GetOpts(), collector, s.filterCounter, filters...) if len(candidates.Stores) != 0 { solver.step++ } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 9da9b55b04e..f399498cefe 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "fmt" + "math/rand" "net/http" "time" @@ -60,11 +61,12 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth // BaseScheduler is a basic scheduler for all other complex scheduler type BaseScheduler struct { OpController *schedule.OperatorController + R *rand.Rand } // NewBaseScheduler returns a basic scheduler func NewBaseScheduler(opController *schedule.OperatorController) *BaseScheduler { - return &BaseScheduler{OpController: opController} + return &BaseScheduler{OpController: opController, R: rand.New(rand.NewSource(time.Now().UnixNano()))} } func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index e4160725130..f4b0fee4993 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -15,6 +15,7 @@ package schedulers import ( + "math/rand" "net/http" "strconv" @@ -233,7 +234,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil + return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -256,10 +257,10 @@ type evictLeaderStoresConf interface { getKeyRangesByID(id uint64) []core.KeyRange } -func scheduleEvictLeaderBatch(name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { +func scheduleEvictLeaderBatch(r *rand.Rand, name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { var ops []*operator.Operator for i := 0; i < batchSize; i++ { - once := scheduleEvictLeaderOnce(name, typ, cluster, conf) + once := scheduleEvictLeaderOnce(r, name, typ, cluster, conf) // no more regions if len(once) == 0 { break @@ -273,7 +274,7 @@ func scheduleEvictLeaderBatch(name, typ string, cluster schedule.Cluster, conf e return ops } -func scheduleEvictLeaderOnce(name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf) []*operator.Operator { +func scheduleEvictLeaderOnce(r *rand.Rand, name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf) []*operator.Operator { stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { @@ -304,7 +305,7 @@ func scheduleEvictLeaderOnce(name, typ string, cluster schedule.Cluster, conf ev } filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetOpts(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 70dde65a38c..f2172bf8add 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -143,7 +143,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster schedule.Cluster) { } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster schedule.Cluster) []*operator.Operator { - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) + return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) } func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index e94aa38e420..df460ef1175 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -177,7 +177,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster schedule.Cluster) return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) + return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) } func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool { diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index e150e6530fc..d7fd9934fde 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -109,7 +109,7 @@ func (s *labelScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*ope } f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores) - target := filter.NewCandidates(cluster.GetFollowerStores(region)). + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f). RandomPick() if target == nil { diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index f324dbc1599..98e80d96f7b 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -88,7 +88,7 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool func (s *randomMergeScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() - store := filter.NewCandidates(cluster.GetStores()). + store := filter.NewCandidates(s.R, cluster.GetStores()). FilterSource(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}). RandomPick() if store == nil { diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 42b8dcd325d..ac4e30ec274 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -91,7 +91,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) // 1. random select a valid store. // 2. transfer a leader to the store. shuffleLeaderCounter.Inc() - targetStore := filter.NewCandidates(cluster.GetStores()). + targetStore := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetOpts(), nil, nil, s.filters...). RandomPick() if targetStore == nil { diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index 3555dc15160..6e90498887c 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -113,7 +113,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) } func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster schedule.Cluster) (*core.RegionInfo, *metapb.Peer) { - candidates := filter.NewCandidates(cluster.GetStores()). + candidates := filter.NewCandidates(s.R, cluster.GetStores()). FilterSource(cluster.GetOpts(), nil, nil, s.filters...). Shuffle() @@ -152,7 +152,7 @@ func (s *shuffleRegionScheduler) scheduleAddPeer(cluster schedule.Cluster, regio scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster.GetOpts(), cluster.GetBasicCluster(), cluster.GetRuleManager(), region, store, nil) excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIDs()) - target := filter.NewCandidates(cluster.GetStores()). + target := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetOpts(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...). RandomPick() if target == nil { diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index ada7a026e1b..aaf4e8f22a7 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -15,6 +15,8 @@ package schedulers import ( + "math/rand" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" @@ -81,7 +83,7 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, for i := 0; i < batchSize; i++ { select { case region := <-s.regions: - op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region) + op, err := s.scheduleTransferWitnessLeader(s.R, name, typ, cluster, region) if err != nil { log.Debug("fail to create transfer leader operator", errs.ZapError(err)) continue @@ -98,7 +100,7 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, return ops } -func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) { +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(r *rand.Rand, name, typ string, cluster schedule.Cluster, region *core.RegionInfo) (*operator.Operator, error) { var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -108,7 +110,7 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ unhealthyPeerStores[peer.GetStoreId()] = struct{}{} } filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetOpts(), nil, nil, filters...) + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetOpts(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() targets := candidates.PickAll() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 348f8ca971f..6ea9b158d24 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -225,7 +225,7 @@ func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ( if region == nil { continue } - target := filter.NewCandidates(cluster.GetFollowerStores(region)). + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true, OperatorLevel: constant.Urgent}). RandomPick() if target == nil {