Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: reduce rand NewSource (#8675) #8729

Open
wants to merge 2 commits into
base: release-7.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 52 additions & 47 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package checker

import (
"fmt"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -64,6 +66,7 @@ type ReplicaChecker struct {
cluster schedule.Cluster
conf config.Config
regionWaitingList cache.Cache
r *rand.Rand
}

// NewReplicaChecker creates a replica checker.
Expand All @@ -72,49 +75,50 @@ func NewReplicaChecker(cluster schedule.Cluster, conf config.Config, regionWaiti
cluster: cluster,
conf: conf,
regionWaitingList: regionWaitingList,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

// GetType return ReplicaChecker's type
func (r *ReplicaChecker) GetType() string {
func (c *ReplicaChecker) GetType() string {
return replicaCheckerName
}

// Check verifies a region's replicas, creating an operator.Operator if need.
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
replicaCheckerCounter.Inc()
if r.IsPaused() {
if c.IsPaused() {
replicaCheckerPausedCounter.Inc()
return nil
}
if op := r.checkDownPeer(region); op != nil {
if op := c.checkDownPeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkOfflinePeer(region); op != nil {
if op := c.checkOfflinePeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkMakeUpReplica(region); op != nil {
if op := c.checkMakeUpReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkRemoveExtraReplica(region); op != nil {
if op := c.checkRemoveExtraReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
if op := r.checkLocationReplacement(region); op != nil {
if op := c.checkLocationReplacement(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
return nil
}

func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveDownReplicaEnabled() {
func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveDownReplicaEnabled() {
return nil
}

Expand All @@ -124,22 +128,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
continue
}
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.conf.GetMaxStoreDownTime() {
if store.DownTime() < c.conf.GetMaxStoreDownTime() {
continue
}
return r.fixPeer(region, storeID, downStatus)
return c.fixPeer(region, storeID, downStatus)
}
return nil
}

func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsReplaceOfflineReplicaEnabled() {
func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsReplaceOfflineReplicaEnabled() {
return nil
}

Expand All @@ -150,7 +154,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope

for _, peer := range region.GetPeers() {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
Expand All @@ -159,71 +163,71 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
continue
}

return r.fixPeer(region, storeID, offlineStatus)
return c.fixPeer(region, storeID, offlineStatus)
}

return nil
}

func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsMakeUpReplicaEnabled() {
func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsMakeUpReplicaEnabled() {
return nil
}
if len(region.GetPeers()) >= r.conf.GetMaxReplicas() {
if len(region.GetPeers()) >= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores)
if target == 0 {
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
r.regionWaitingList.Put(region.GetID(), nil)
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica)
op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica)
if err != nil {
log.Debug("create make-up-replica operator fail", errs.ZapError(err))
return nil
}
return op
}

func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveExtraReplicaEnabled() {
func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveExtraReplicaEnabled() {
return nil
}
// when add learner peer, the number of peer will exceed max replicas for a while,
// just comparing the the number of voters to avoid too many cancel add operator log.
if len(region.GetVoters()) <= r.conf.GetMaxReplicas() {
if len(region.GetVoters()) <= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
old := r.strategy(region).SelectStoreToRemove(regionStores)
regionStores := c.cluster.GetRegionStores(region)
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
r.regionWaitingList.Put(region.GetID(), nil)
c.regionWaitingList.Put(region.GetID(), nil)
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old)
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsLocationReplacementEnabled() {
func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsLocationReplacementEnabled() {
return nil
}

strategy := r.strategy(region)
regionStores := r.cluster.GetRegionStores(region)
strategy := c.strategy(c.r, region)
regionStores := c.cluster.GetRegionStores(region)
oldStore := strategy.SelectStoreToRemove(regionStores)
if oldStore == 0 {
replicaCheckerAllRightCounter.Inc()
Expand All @@ -237,19 +241,19 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
}

newPeer := &metapb.Peer{StoreId: newStore}
op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer)
op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
// Check the number of replicas first.
if len(region.GetVoters()) > r.conf.GetMaxReplicas() {
if len(region.GetVoters()) > c.conf.GetMaxReplicas() {
removeExtra := fmt.Sprintf("remove-extra-%s-replica", status)
op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID)
op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID)
if err != nil {
if status == offlineStatus {
replicaCheckerRemoveExtraOfflineFailedCounter.Inc()
Expand All @@ -261,8 +265,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID)
if target == 0 {
if status == offlineStatus {
replicaCheckerNoStoreOfflineCounter.Inc()
Expand All @@ -271,13 +275,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
r.regionWaitingList.Put(region.GetID(), nil)
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
replace := fmt.Sprintf("replace-%s-replica", status)
op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer)
op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer)
if err != nil {
if status == offlineStatus {
replicaCheckerReplaceOfflineFailedCounter.Inc()
Expand All @@ -289,12 +293,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy {
func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: replicaCheckerName,
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
region: region,
r: r,
}
}
7 changes: 5 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package checker

import (
"math/rand"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -26,6 +28,7 @@ import (
// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
r *rand.Rand
checkerName string // replica-checker / rule-checker
cluster schedule.Cluster
locationLabels []string
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
Expand Down Expand Up @@ -137,7 +140,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
if s.fastFailover {
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
source := filter.NewCandidates(s.r, coLocationStores).
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false)
Expand Down
12 changes: 8 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"math"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -91,6 +92,7 @@ type RuleChecker struct {
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
r *rand.Rand
}

// NewRuleChecker creates a checker instance.
Expand All @@ -103,6 +105,7 @@ func NewRuleChecker(ctx context.Context, cluster schedule.Cluster, ruleManager *
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetOpts().GetSwitchWitnessInterval()),
record: newRecord(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down Expand Up @@ -232,7 +235,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
ruleStores := c.getRuleFitStores(rf)
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic.
store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
if store == 0 {
ruleCheckerNoStoreAddCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -283,7 +286,7 @@ func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *place
fastFailover = false
}
ruleStores := c.getRuleFitStores(rf)
store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
if store == 0 {
ruleCheckerNoStoreReplaceCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -424,7 +427,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R

isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
strategy := c.strategy(region, rf.Rule, isWitness)
strategy := c.strategy(c.r, region, rf.Rule, isWitness)
ruleStores := c.getRuleFitStores(rf)
oldStore := strategy.SelectStoreToRemove(ruleStores)
if oldStore == 0 {
Expand Down Expand Up @@ -645,7 +648,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb.
return nil, false
}

func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: c.name,
cluster: c.cluster,
Expand All @@ -654,6 +657,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa
region: region,
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.name, rule.LabelConstraints)},
fastFailover: fastFailover,
r: r,
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filter
import (
"math/rand"
"sort"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/config"
Expand All @@ -32,8 +31,8 @@ type StoreCandidates struct {
}

// NewCandidates creates StoreCandidates with store list.
func NewCandidates(stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores}
func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: r, Stores: stores}
}

// FilterSource keeps stores that can pass all source filters.
Expand Down
Loading
Loading