Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: use move-hot-write-leader operator (#7852) #8144

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -5877,6 +5877,21 @@
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "B"
},
{
"expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}-out",
"refId": "C",
"step": 4
},
{
"expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "D"
}
],
"thresholds": [],
Expand Down
46 changes: 6 additions & 40 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,20 +1500,12 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
targetLabel := strconv.FormatUint(dstStoreID, 10)
dim := bs.rankToDimString()

var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error)
switch bs.rwTy {
case utils.Read:
createOperator = bs.createReadOperator
case utils.Write:
createOperator = bs.createWriteOperator
}

currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID)
currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID)
if err == nil {
bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim)
ops = []*operator.Operator{currentOp}
if bs.cur.revertRegion != nil {
currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
currentOp, typ, err = bs.createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
if err == nil {
bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim)
ops = append(ops, currentOp)
Expand Down Expand Up @@ -1679,11 +1671,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, strateg
return operators
}

func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-read-leader",
"transfer-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
srcStoreID,
Expand All @@ -1696,7 +1688,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
if region.GetLeader().GetStoreId() == srcStoreID {
typ = "move-leader"
op, err = operator.CreateMoveLeaderOperator(
"move-hot-read-leader",
"move-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1705,7 +1697,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
} else {
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-read-peer",
"move-hot-"+bs.rwTy.String()+"-peer",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1716,32 +1708,6 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
return
}

func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-write-leader",
bs,
region,
srcStoreID,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role}
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-write-peer",
bs,
region,
operator.OpHotRegion,
srcStoreID,
dstPeer)
}
return
}

func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) {
op.SetPriorityLevel(constant.High)
op.FinishedCounters = append(op.FinishedCounters,
Expand Down
108 changes: 85 additions & 23 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func clearPendingInfluence(h *hotScheduler) {
h.regionPendings = make(map[uint64]*pendingInfluence)
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestUpgrade(t *testing.T) {
re := require.New(t)
cancel, _, _, oc := prepareSchedulersTest()
Expand Down Expand Up @@ -191,20 +196,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
}
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statistics.HistorySampleDuration = 0
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
}

func TestSplitIfRegionTooHot(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
Expand Down Expand Up @@ -393,10 +384,73 @@ func TestSplitBucketsByLoad(t *testing.T) {
}
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0
statistics.HistorySampleDuration = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
checkHotWriteRegionPlacement(re, true)
}

func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetEnableUseJointConsensus(true)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
tc.SetHotRegionCacheHitsThreshold(0)

tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z1", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z2", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z2", "host": "h4"})
tc.AddLabelsStore(5, 2, map[string]string{"zone": "z2", "host": "h5"})
tc.AddLabelsStore(6, 2, map[string]string{"zone": "z2", "host": "h6"})
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd", ID: "leader", Role: placement.Leader, Count: 1, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z1"}}},
})
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}},
})
tc.RuleManager.DeleteRule("pd", "default")

tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 0)
tc.UpdateStorageWrittenBytes(3, 6*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 3*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(5, 3*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(6, 6*units.MiB*utils.StoreHeartBeatReportInterval)

// Region 1, 2 and 3 are hot regions.
addRegionInfo(tc, utils.Write, []testRegionInfo{
{1, []uint64{1, 3, 5}, 512 * units.KiB, 0, 0},
{2, []uint64{1, 4, 6}, 512 * units.KiB, 0, 0},
{3, []uint64{1, 3, 6}, 512 * units.KiB, 0, 0},
})
ops, _ := hb.Schedule(tc, false)
re.NotEmpty(ops)
re.NotContains(ops[0].Step(1).String(), "transfer leader")
clearPendingInfluence(hb.(*hotScheduler))

tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd", ID: "voter", Role: placement.Voter, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}},
})
tc.RuleManager.DeleteRule("pd", "follower")
ops, _ = hb.Schedule(tc, false)
re.NotEmpty(ops)
re.NotContains(ops[0].Step(1).String(), "transfer leader")
}

func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) {
cancel, opt, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
Expand Down Expand Up @@ -453,12 +507,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand All @@ -478,10 +534,10 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
ops, _ := hb.Schedule(tc, false)
op := ops[0]
clearPendingInfluence(hb.(*hotScheduler))
re.Equal(4, op.Len())
re.Equal(5, op.Len())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -578,7 +634,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
statisticsInterval = 0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetHotRegionCacheHitsThreshold(0)
re.NoError(tc.RuleManager.SetRules([]*placement.Rule{
{
Expand Down Expand Up @@ -671,9 +727,11 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 2:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10)
default:
re.FailNow("wrong op: " + op.String())
Expand Down Expand Up @@ -764,12 +822,14 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -1101,9 +1161,11 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4)
cnt++
if cnt == 3 {
Expand Down
66 changes: 43 additions & 23 deletions pkg/utils/operatorutil/operator_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,27 @@ func trimTransferLeaders(op *operator.Operator) (steps []operator.OpStep, lastLe
// CheckTransferPeer checks if the operator is to transfer peer between the specified source and target stores.
func CheckTransferPeer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
re.NotNil(op)
var addLearnerTo, removePeerFrom uint64
steps, _ := trimTransferLeaders(op)
re.Len(steps, 3)
re.Equal(targetID, steps[0].(operator.AddLearner).ToStore)
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
switch len(steps) {
case 3: // without joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.IsType(operator.RemovePeer{}, steps[2])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[2].(operator.RemovePeer).FromStore
case 4: // with joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.ChangePeerV2Enter{}, steps[1])
re.IsType(operator.ChangePeerV2Leave{}, steps[2])
re.IsType(operator.RemovePeer{}, steps[3])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[3].(operator.RemovePeer).FromStore
default:
re.FailNow("unexpected operator steps")
}
re.Equal(sourceID, removePeerFrom)
re.Equal(targetID, addLearnerTo)
kind |= operator.OpRegion
re.Equal(kind, op.Kind()&kind)
}
Expand All @@ -88,32 +104,36 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op
// CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer
// peer between the specified source and target stores and it meanwhile
// transfers the leader out of source store.
// If targetID is 0, it means the operator is to transfer peer to any store.
func CheckTransferPeerWithLeaderTransfer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
re.NotNil(op)
var addLearnerTo, removePeerFrom uint64
steps, lastLeader := trimTransferLeaders(op)
re.Len(steps, 3)
re.Equal(targetID, steps[0].(operator.AddLearner).ToStore)
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
switch len(steps) {
case 3: // without joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.IsType(operator.RemovePeer{}, steps[2])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[2].(operator.RemovePeer).FromStore
case 4: // with joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.ChangePeerV2Enter{}, steps[1])
re.IsType(operator.ChangePeerV2Leave{}, steps[2])
re.IsType(operator.RemovePeer{}, steps[3])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[3].(operator.RemovePeer).FromStore
default:
re.FailNow("unexpected operator steps")
}
re.NotZero(lastLeader)
re.NotEqual(sourceID, lastLeader)
kind |= operator.OpRegion
re.Equal(kind, op.Kind()&kind)
}

// CheckTransferPeerWithLeaderTransferFrom checks if the operator is to transfer
// peer out of the specified store and it meanwhile transfers the leader out of
// the store.
func CheckTransferPeerWithLeaderTransferFrom(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID uint64) {
re.NotNil(op)
steps, lastLeader := trimTransferLeaders(op)
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
re.NotZero(lastLeader)
re.NotEqual(sourceID, lastLeader)
kind |= operator.OpRegion | operator.OpLeader
re.Equal(kind, op.Kind()&kind)
re.Equal(sourceID, removePeerFrom)
if targetID != 0 {
re.Equal(targetID, addLearnerTo)
}
}

// CheckAddPeer checks if the operator is to add peer on specified store.
Expand Down
Loading