From 512ca521b55e3001ba118880033ed82397a58a85 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 29 Sep 2024 17:41:39 +0800 Subject: [PATCH] This is an automated cherry-pick of #8675 close tikv/pd#8674 Signed-off-by: ti-chi-bot --- pkg/schedule/checker/replica_checker.go | 116 ++++++++++++------ pkg/schedule/checker/replica_strategy.go | 7 +- pkg/schedule/checker/rule_checker.go | 30 ++++- pkg/schedule/filter/candidates.go | 5 +- pkg/schedule/filter/candidates_test.go | 4 +- pkg/schedule/schedulers/balance_leader.go | 2 +- pkg/schedule/schedulers/balance_region.go | 2 +- pkg/schedule/schedulers/base_scheduler.go | 18 +++ pkg/schedule/schedulers/evict_leader.go | 19 ++- pkg/schedule/schedulers/evict_slow_store.go | 4 + pkg/schedule/schedulers/evict_slow_trend.go | 4 + pkg/schedule/schedulers/label.go | 5 + pkg/schedule/schedulers/random_merge.go | 5 + pkg/schedule/schedulers/shuffle_leader.go | 2 +- pkg/schedule/schedulers/shuffle_region.go | 4 +- .../schedulers/transfer_witness_leader.go | 16 +++ plugin/scheduler_example/evict_leader.go | 2 +- 17 files changed, 188 insertions(+), 57 deletions(-) diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 3e23f3bdcac..1b537cb6f4f 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -16,6 +16,8 @@ package checker import ( "fmt" + "math/rand" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -61,17 +63,31 @@ var ( // Location management, mainly used for cross data center deployment. type ReplicaChecker struct { PauseController +<<<<<<< HEAD cluster sche.CheckerCluster conf config.CheckerConfigProvider regionWaitingList cache.Cache +======= + cluster sche.CheckerCluster + conf config.CheckerConfigProvider + pendingProcessedRegions *cache.TTLUint64 + r *rand.Rand +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } // NewReplicaChecker creates a replica checker. func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker { return &ReplicaChecker{ +<<<<<<< HEAD cluster: cluster, conf: conf, regionWaitingList: regionWaitingList, +======= + cluster: cluster, + conf: conf, + pendingProcessedRegions: pendingProcessedRegions, + r: rand.New(rand.NewSource(time.Now().UnixNano())), +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } } @@ -81,40 +97,40 @@ func (r *ReplicaChecker) GetType() string { } // Check verifies a region's replicas, creating an operator.Operator if need. -func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { +func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator { replicaCheckerCounter.Inc() - if r.IsPaused() { + if c.IsPaused() { replicaCheckerPausedCounter.Inc() return nil } - if op := r.checkDownPeer(region); op != nil { + if op := c.checkDownPeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkOfflinePeer(region); op != nil { + if op := c.checkOfflinePeer(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkMakeUpReplica(region); op != nil { + if op := c.checkMakeUpReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() op.SetPriorityLevel(constant.High) return op } - if op := r.checkRemoveExtraReplica(region); op != nil { + if op := c.checkRemoveExtraReplica(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } - if op := r.checkLocationReplacement(region); op != nil { + if op := c.checkLocationReplacement(region); op != nil { replicaCheckerNewOpCounter.Inc() return op } return nil } -func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveDownReplicaEnabled() { +func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveDownReplicaEnabled() { return nil } @@ -124,22 +140,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat continue } storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil } // Only consider the state of the Store, not `stats.DownSeconds`. - if store.DownTime() < r.conf.GetMaxStoreDownTime() { + if store.DownTime() < c.conf.GetMaxStoreDownTime() { continue } - return r.fixPeer(region, storeID, downStatus) + return c.fixPeer(region, storeID, downStatus) } return nil } -func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsReplaceOfflineReplicaEnabled() { +func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsReplaceOfflineReplicaEnabled() { return nil } @@ -150,7 +166,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope for _, peer := range region.GetPeers() { storeID := peer.GetStoreId() - store := r.cluster.GetStore(storeID) + store := c.cluster.GetStore(storeID) if store == nil { log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) return nil @@ -159,32 +175,36 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope continue } - return r.fixPeer(region, storeID, offlineStatus) + return c.fixPeer(region, storeID, offlineStatus) } return nil } -func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsMakeUpReplicaEnabled() { +func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsMakeUpReplicaEnabled() { return nil } - if len(region.GetPeers()) >= r.conf.GetMaxReplicas() { + if len(region.GetPeers()) >= c.conf.GetMaxReplicas() { return nil } log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores) if target == 0 { log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID())) replicaCheckerNoTargetStoreCounter.Inc() if filterByTempState { +<<<<<<< HEAD r.regionWaitingList.Put(region.GetID(), nil) +======= + c.pendingProcessedRegions.Put(region.GetID(), nil) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } return nil } newPeer := &metapb.Peer{StoreId: target} - op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica) + op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica) if err != nil { log.Debug("create make-up-replica operator fail", errs.ZapError(err)) return nil @@ -192,24 +212,28 @@ func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O return op } -func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsRemoveExtraReplicaEnabled() { +func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsRemoveExtraReplicaEnabled() { return nil } // when add learner peer, the number of peer will exceed max replicas for a while, // just comparing the the number of voters to avoid too many cancel add operator log. - if len(region.GetVoters()) <= r.conf.GetMaxReplicas() { + if len(region.GetVoters()) <= c.conf.GetMaxReplicas() { return nil } log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers()))) - regionStores := r.cluster.GetRegionStores(region) - old := r.strategy(region).SelectStoreToRemove(regionStores) + regionStores := c.cluster.GetRegionStores(region) + old := c.strategy(c.r, region).SelectStoreToRemove(regionStores) if old == 0 { replicaCheckerNoWorstPeerCounter.Inc() +<<<<<<< HEAD r.regionWaitingList.Put(region.GetID(), nil) +======= + c.pendingProcessedRegions.Put(region.GetID(), nil) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) return nil } - op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old) + op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -217,13 +241,13 @@ func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera return op } -func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { - if !r.conf.IsLocationReplacementEnabled() { +func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator { + if !c.conf.IsLocationReplacementEnabled() { return nil } - strategy := r.strategy(region) - regionStores := r.cluster.GetRegionStores(region) + strategy := c.strategy(c.r, region) + regionStores := c.cluster.GetRegionStores(region) oldStore := strategy.SelectStoreToRemove(regionStores) if oldStore == 0 { replicaCheckerAllRightCounter.Inc() @@ -237,7 +261,7 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper } newPeer := &metapb.Peer{StoreId: newStore} - op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer) + op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer) if err != nil { replicaCheckerCreateOpFailedCounter.Inc() return nil @@ -245,11 +269,11 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper return op } -func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { +func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator { // Check the number of replicas first. - if len(region.GetVoters()) > r.conf.GetMaxReplicas() { + if len(region.GetVoters()) > c.conf.GetMaxReplicas() { removeExtra := fmt.Sprintf("remove-extra-%s-replica", status) - op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID) + op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID) if err != nil { if status == offlineStatus { replicaCheckerRemoveExtraOfflineFailedCounter.Inc() @@ -261,8 +285,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } - regionStores := r.cluster.GetRegionStores(region) - target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID) + regionStores := c.cluster.GetRegionStores(region) + target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID) if target == 0 { if status == offlineStatus { replicaCheckerNoStoreOfflineCounter.Inc() @@ -271,13 +295,17 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status } log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID())) if filterByTempState { +<<<<<<< HEAD r.regionWaitingList.Put(region.GetID(), nil) +======= + c.pendingProcessedRegions.Put(region.GetID(), nil) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } return nil } newPeer := &metapb.Peer{StoreId: target} replace := fmt.Sprintf("replace-%s-replica", status) - op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer) + op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer) if err != nil { if status == offlineStatus { replicaCheckerReplaceOfflineFailedCounter.Inc() @@ -289,12 +317,20 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status return op } -func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy { +func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy { return &ReplicaStrategy{ +<<<<<<< HEAD checkerName: replicaCheckerName, cluster: r.cluster, locationLabels: r.conf.GetLocationLabels(), isolationLevel: r.conf.GetIsolationLevel(), +======= + checkerName: c.Name(), + cluster: c.cluster, + locationLabels: c.conf.GetLocationLabels(), + isolationLevel: c.conf.GetIsolationLevel(), +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) region: region, + r: r, } } diff --git a/pkg/schedule/checker/replica_strategy.go b/pkg/schedule/checker/replica_strategy.go index ad85e307bbe..02371a4a08e 100644 --- a/pkg/schedule/checker/replica_strategy.go +++ b/pkg/schedule/checker/replica_strategy.go @@ -15,6 +15,8 @@ package checker import ( + "math/rand" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" @@ -26,6 +28,7 @@ import ( // ReplicaStrategy collects some utilities to manipulate region peers. It // exists to allow replica_checker and rule_checker to reuse common logics. type ReplicaStrategy struct { + r *rand.Rand checkerName string // replica-checker / rule-checker cluster sche.CheckerCluster locationLabels []string @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level} - targetCandidate := filter.NewCandidates(s.cluster.GetStores()). + targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()). FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...). KeepTheTopStores(isolationComparer, false) // greater isolation score is better if targetCandidate.Len() == 0 { @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo if s.fastFailover { level = constant.Urgent } - source := filter.NewCandidates(coLocationStores). + source := filter.NewCandidates(s.r, coLocationStores). FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}). KeepTheTopStores(isolationComparer, true). PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 5ae164d51d0..5154b2099a0 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -18,6 +18,7 @@ import ( "context" "errors" "math" + "math/rand" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -84,6 +85,7 @@ var ( // RuleChecker fix/improve region by placement rules. type RuleChecker struct { PauseController +<<<<<<< HEAD cluster sche.CheckerCluster ruleManager *placement.RuleManager name string @@ -91,11 +93,21 @@ type RuleChecker struct { pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder +======= + cluster sche.CheckerCluster + ruleManager *placement.RuleManager + pendingProcessedRegions *cache.TTLUint64 + pendingList cache.Cache + switchWitnessCache *cache.TTLUint64 + record *recorder + r *rand.Rand +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } // NewRuleChecker creates a checker instance. func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker { return &RuleChecker{ +<<<<<<< HEAD cluster: cluster, ruleManager: ruleManager, name: ruleCheckerName, @@ -103,6 +115,15 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage pendingList: cache.NewDefaultCache(maxPendingListLen), switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), record: newRecord(), +======= + cluster: cluster, + ruleManager: ruleManager, + pendingProcessedRegions: pendingProcessedRegions, + pendingList: cache.NewDefaultCache(maxPendingListLen), + switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()), + record: newRecord(), + r: rand.New(rand.NewSource(time.Now().UnixNano())), +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } } @@ -232,7 +253,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region ruleStores := c.getRuleFitStores(rf) isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic. - store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores) if store == 0 { ruleCheckerNoStoreAddCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -283,7 +304,7 @@ func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *place fastFailover = false } ruleStores := c.getRuleFitStores(rf) - store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) + store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId()) if store == 0 { ruleCheckerNoStoreReplaceCounter.Inc() c.handleFilterState(region, filterByTempState) @@ -424,7 +445,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R isWitness := rf.Rule.IsWitness && c.isWitnessEnabled() // If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic. - strategy := c.strategy(region, rf.Rule, isWitness) + strategy := c.strategy(c.r, region, rf.Rule, isWitness) ruleStores := c.getRuleFitStores(rf) oldStore := strategy.SelectStoreToRemove(ruleStores) if oldStore == 0 { @@ -645,7 +666,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb. return nil, false } -func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { +func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy { return &ReplicaStrategy{ checkerName: c.name, cluster: c.cluster, @@ -654,6 +675,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa region: region, extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.name, rule.LabelConstraints)}, fastFailover: fastFailover, + r: r, } } diff --git a/pkg/schedule/filter/candidates.go b/pkg/schedule/filter/candidates.go index 35ca4a2e284..9877bbaa0ac 100644 --- a/pkg/schedule/filter/candidates.go +++ b/pkg/schedule/filter/candidates.go @@ -17,7 +17,6 @@ package filter import ( "math/rand" "sort" - "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/config" @@ -32,8 +31,8 @@ type StoreCandidates struct { } // NewCandidates creates StoreCandidates with store list. -func NewCandidates(stores []*core.StoreInfo) *StoreCandidates { - return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores} +func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates { + return &StoreCandidates{r: r, Stores: stores} } // FilterSource keeps stores that can pass all source filters. diff --git a/pkg/schedule/filter/candidates_test.go b/pkg/schedule/filter/candidates_test.go index 13e8ed661cc..d421048aee3 100644 --- a/pkg/schedule/filter/candidates_test.go +++ b/pkg/schedule/filter/candidates_test.go @@ -15,7 +15,9 @@ package filter import ( + "math/rand" "testing" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" @@ -112,7 +114,7 @@ func newTestCandidates(ids ...uint64) *StoreCandidates { for _, id := range ids { stores = append(stores, core.NewStoreInfo(&metapb.Store{Id: id})) } - return NewCandidates(stores) + return NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores) } func check(re *require.Assertions, candidates *StoreCandidates, ids ...uint64) { diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index ba26b03e8a9..9c8fd05881e 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -520,7 +520,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } - target := filter.NewCandidates([]*core.StoreInfo{solver.Target}). + target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}). FilterTarget(conf, nil, l.filterCounter, finalFilters...). PickFirst() if target == nil { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 378f01c6ff9..f2c5a43310d 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -242,7 +242,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co filter.NewPlacementSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit), } - candidates := filter.NewCandidates(dstStores).FilterTarget(conf, collector, s.filterCounter, filters...) + candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, collector, s.filterCounter, filters...) if len(candidates.Stores) != 0 { solver.Step++ } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 6e712c18fe3..2f4c6b61a4e 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -16,6 +16,7 @@ package schedulers import ( "fmt" + "math/rand" "net/http" "time" @@ -61,11 +62,28 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth // BaseScheduler is a basic scheduler for all other complex scheduler type BaseScheduler struct { OpController *operator.Controller +<<<<<<< HEAD } // NewBaseScheduler returns a basic scheduler func NewBaseScheduler(opController *operator.Controller) *BaseScheduler { return &BaseScheduler{OpController: opController} +======= + R *rand.Rand + + name string + tp types.CheckerSchedulerType + conf schedulerConfig +} + +// NewBaseScheduler returns a basic scheduler +func NewBaseScheduler( + opController *operator.Controller, + tp types.CheckerSchedulerType, + conf schedulerConfig, +) *BaseScheduler { + return &BaseScheduler{OpController: opController, tp: tp, conf: conf, R: rand.New(rand.NewSource(time.Now().UnixNano()))} +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 187eea5d2c1..2a767ed4479 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -15,6 +15,7 @@ package schedulers import ( + "math/rand" "net/http" "strconv" @@ -272,7 +273,11 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() +<<<<<<< HEAD return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil +======= + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -295,10 +300,18 @@ type evictLeaderStoresConf interface { getKeyRangesByID(id uint64) []core.KeyRange } +<<<<<<< HEAD func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { +======= +func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) var ops []*operator.Operator for i := 0; i < batchSize; i++ { +<<<<<<< HEAD once := scheduleEvictLeaderOnce(name, typ, cluster, conf) +======= + once := scheduleEvictLeaderOnce(r, name, cluster, conf) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) // no more regions if len(once) == 0 { break @@ -312,7 +325,11 @@ func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, c return ops } +<<<<<<< HEAD func scheduleEvictLeaderOnce(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +======= +func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { @@ -343,7 +360,7 @@ func scheduleEvictLeaderOnce(name, typ string, cluster sche.SchedulerCluster, co } filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) - candidates := filter.NewCandidates(cluster.GetFollowerStores(region)). + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index a6665c3e5e7..dd215cef15b 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -143,7 +143,11 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { +<<<<<<< HEAD return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +======= + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index c7eb0f61a56..65606241225 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -292,7 +292,11 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) +<<<<<<< HEAD return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +======= + return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) } func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 62a1100d16b..3ae3d8a603f 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -109,8 +109,13 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([ } f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores) +<<<<<<< HEAD target := filter.NewCandidates(cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f). +======= + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). + FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, f). +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) RandomPick() if target == nil { log.Debug("label scheduler no target found for region", zap.Uint64("region-id", region.GetID())) diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index a621b595198..034cf463a45 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -88,8 +88,13 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *randomMergeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() +<<<<<<< HEAD store := filter.NewCandidates(cluster.GetStores()). FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}). +======= + store := filter.NewCandidates(s.R, cluster.GetStores()). + FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.Low}). +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) RandomPick() if store == nil { randomMergeNoSourceStoreCounter.Inc() diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 0e33fa802db..26447b8f0c8 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -91,7 +91,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // 1. random select a valid store. // 2. transfer a leader to the store. shuffleLeaderCounter.Inc() - targetStore := filter.NewCandidates(cluster.GetStores()). + targetStore := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, s.filters...). RandomPick() if targetStore == nil { diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f1d35e80925..d08094f8a39 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -132,7 +132,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluster) (*core.RegionInfo, *metapb.Peer) { - candidates := filter.NewCandidates(cluster.GetStores()). + candidates := filter.NewCandidates(s.R, cluster.GetStores()). FilterSource(cluster.GetSchedulerConfig(), nil, nil, s.filters...). Shuffle() @@ -171,7 +171,7 @@ func (s *shuffleRegionScheduler) scheduleAddPeer(cluster sche.SchedulerCluster, scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster.GetSchedulerConfig(), cluster.GetBasicCluster(), cluster.GetRuleManager(), region, store, nil) excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIDs()) - target := filter.NewCandidates(cluster.GetStores()). + target := filter.NewCandidates(s.R, cluster.GetStores()). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...). RandomPick() if target == nil { diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 2586065ea80..0baaf6ded74 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -15,6 +15,8 @@ package schedulers import ( + "math/rand" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" @@ -82,7 +84,11 @@ batchLoop: for i := 0; i < batchSize; i++ { select { case region := <-s.regions: +<<<<<<< HEAD op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region) +======= + op, err := scheduleTransferWitnessLeader(s.R, name, cluster, region) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) if err != nil { log.Debug("fail to create transfer leader operator", errs.ZapError(err)) continue @@ -99,7 +105,11 @@ batchLoop: return ops } +<<<<<<< HEAD func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { +======= +func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -108,8 +118,14 @@ func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ for _, peer := range region.GetPendingPeers() { unhealthyPeerStores[peer.GetStoreId()] = struct{}{} } +<<<<<<< HEAD filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) +======= + filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), + &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent}) + candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...) +>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675)) // Compatible with old TiKV transfer leader logic. target := candidates.RandomPick() targets := candidates.PickAll() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 0748ca200d0..9c8053c1334 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -225,7 +225,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bo if region == nil { continue } - target := filter.NewCandidates(cluster.GetFollowerStores(region)). + target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)). FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true, OperatorLevel: constant.Urgent}). RandomPick() if target == nil {