Skip to content

Commit

Permalink
*: reduce rand NewSource (#8675)
Browse files Browse the repository at this point in the history
close #8674

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Sep 29, 2024
1 parent f8a0d80 commit 25dedab
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 76 deletions.
99 changes: 52 additions & 47 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package checker

import (
"fmt"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -45,6 +47,7 @@ type ReplicaChecker struct {
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions *cache.TTLUint64
r *rand.Rand
}

// NewReplicaChecker creates a replica checker.
Expand All @@ -53,6 +56,7 @@ func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigPro
cluster: cluster,
conf: conf,
pendingProcessedRegions: pendingProcessedRegions,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand All @@ -67,40 +71,40 @@ func (*ReplicaChecker) GetType() types.CheckerSchedulerType {
}

// Check verifies a region's replicas, creating an operator.Operator if need.
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
replicaCheckerCounter.Inc()
if r.IsPaused() {
if c.IsPaused() {
replicaCheckerPausedCounter.Inc()
return nil
}
if op := r.checkDownPeer(region); op != nil {
if op := c.checkDownPeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkOfflinePeer(region); op != nil {
if op := c.checkOfflinePeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkMakeUpReplica(region); op != nil {
if op := c.checkMakeUpReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkRemoveExtraReplica(region); op != nil {
if op := c.checkRemoveExtraReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
if op := r.checkLocationReplacement(region); op != nil {
if op := c.checkLocationReplacement(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
return nil
}

func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveDownReplicaEnabled() {
func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveDownReplicaEnabled() {
return nil
}

Expand All @@ -110,22 +114,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
continue
}
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.conf.GetMaxStoreDownTime() {
if store.DownTime() < c.conf.GetMaxStoreDownTime() {
continue
}
return r.fixPeer(region, storeID, downStatus)
return c.fixPeer(region, storeID, downStatus)
}
return nil
}

func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsReplaceOfflineReplicaEnabled() {
func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsReplaceOfflineReplicaEnabled() {
return nil
}

Expand All @@ -136,7 +140,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope

for _, peer := range region.GetPeers() {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
Expand All @@ -145,71 +149,71 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
continue
}

return r.fixPeer(region, storeID, offlineStatus)
return c.fixPeer(region, storeID, offlineStatus)
}

return nil
}

func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsMakeUpReplicaEnabled() {
func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsMakeUpReplicaEnabled() {
return nil
}
if len(region.GetPeers()) >= r.conf.GetMaxReplicas() {
if len(region.GetPeers()) >= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores)
if target == 0 {
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica)
op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica)
if err != nil {
log.Debug("create make-up-replica operator fail", errs.ZapError(err))
return nil
}
return op
}

func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveExtraReplicaEnabled() {
func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveExtraReplicaEnabled() {
return nil
}
// when add learner peer, the number of peer will exceed max replicas for a while,
// just comparing the the number of voters to avoid too many cancel add operator log.
if len(region.GetVoters()) <= r.conf.GetMaxReplicas() {
if len(region.GetVoters()) <= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
old := r.strategy(region).SelectStoreToRemove(regionStores)
regionStores := c.cluster.GetRegionStores(region)
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old)
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsLocationReplacementEnabled() {
func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsLocationReplacementEnabled() {
return nil
}

strategy := r.strategy(region)
regionStores := r.cluster.GetRegionStores(region)
strategy := c.strategy(c.r, region)
regionStores := c.cluster.GetRegionStores(region)
oldStore := strategy.SelectStoreToRemove(regionStores)
if oldStore == 0 {
replicaCheckerAllRightCounter.Inc()
Expand All @@ -223,19 +227,19 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
}

newPeer := &metapb.Peer{StoreId: newStore}
op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer)
op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
// Check the number of replicas first.
if len(region.GetVoters()) > r.conf.GetMaxReplicas() {
if len(region.GetVoters()) > c.conf.GetMaxReplicas() {
removeExtra := fmt.Sprintf("remove-extra-%s-replica", status)
op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID)
op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID)
if err != nil {
if status == offlineStatus {
replicaCheckerRemoveExtraOfflineFailedCounter.Inc()
Expand All @@ -247,8 +251,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID)
if target == 0 {
if status == offlineStatus {
replicaCheckerNoStoreOfflineCounter.Inc()
Expand All @@ -257,13 +261,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
replace := fmt.Sprintf("replace-%s-replica", status)
op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer)
op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer)
if err != nil {
if status == offlineStatus {
replicaCheckerReplaceOfflineFailedCounter.Inc()
Expand All @@ -275,12 +279,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy {
func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: r.Name(),
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
checkerName: c.Name(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
region: region,
r: r,
}
}
7 changes: 5 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package checker

import (
"math/rand"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -26,6 +28,7 @@ import (
// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
r *rand.Rand
checkerName string // replica-checker / rule-checker
cluster sche.CheckerCluster
locationLabels []string
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
Expand Down Expand Up @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
if s.fastFailover {
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
source := filter.NewCandidates(s.r, coLocationStores).
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false)
Expand Down
12 changes: 8 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"math"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -56,6 +57,7 @@ type RuleChecker struct {
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
r *rand.Rand
}

// NewRuleChecker creates a checker instance.
Expand All @@ -67,6 +69,7 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
record: newRecord(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down Expand Up @@ -201,7 +204,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
ruleStores := c.getRuleFitStores(rf)
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic.
store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
if store == 0 {
ruleCheckerNoStoreAddCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -252,7 +255,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla
fastFailover = false
}
ruleStores := c.getRuleFitStores(rf)
store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
if store == 0 {
ruleCheckerNoStoreReplaceCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -393,7 +396,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R

isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
strategy := c.strategy(region, rf.Rule, isWitness)
strategy := c.strategy(c.r, region, rf.Rule, isWitness)
ruleStores := c.getRuleFitStores(rf)
oldStore := strategy.SelectStoreToRemove(ruleStores)
if oldStore == 0 {
Expand Down Expand Up @@ -618,7 +621,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb.
return nil, false
}

func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: c.Name(),
cluster: c.cluster,
Expand All @@ -627,6 +630,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa
region: region,
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)},
fastFailover: fastFailover,
r: r,
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filter
import (
"math/rand"
"sort"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/config"
Expand All @@ -32,8 +31,8 @@ type StoreCandidates struct {
}

// NewCandidates creates StoreCandidates with store list.
func NewCandidates(stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores}
func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: r, Stores: stores}
}

// FilterSource keeps stores that can pass all source filters.
Expand Down
Loading

0 comments on commit 25dedab

Please sign in to comment.