Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: use move-hot-write-leader operator #7852

Merged
merged 7 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -5881,6 +5881,21 @@
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "B"
},
{
"expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you paste the grafana pic?

Suggested change
"expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",
"expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be negative, because its direction is out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}-out",
"refId": "C",
"step": 4
},
{
"expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "D"
}
],
"thresholds": [],
Expand Down
45 changes: 6 additions & 39 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,20 +1546,12 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
targetLabel := strconv.FormatUint(dstStoreID, 10)
dim := bs.rankToDimString()

var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error)
switch bs.rwTy {
case utils.Read:
createOperator = bs.createReadOperator
case utils.Write:
createOperator = bs.createWriteOperator
}

currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID)
currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID)
if err == nil {
bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim)
ops = []*operator.Operator{currentOp}
if bs.cur.revertRegion != nil {
currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
currentOp, typ, err = bs.createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
if err == nil {
bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim)
ops = append(ops, currentOp)
Expand Down Expand Up @@ -1725,11 +1717,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, strateg
return operators
}

func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-read-leader",
"transfer-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
dstStoreID,
Expand All @@ -1741,7 +1733,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
if region.GetLeader().GetStoreId() == srcStoreID {
typ = "move-leader"
op, err = operator.CreateMoveLeaderOperator(
"move-hot-read-leader",
"move-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1750,7 +1742,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
} else {
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-read-peer",
"move-hot-"+bs.rwTy.String()+"-peer",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1761,31 +1753,6 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
return
}

func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-write-leader",
bs,
region,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role}
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-write-peer",
bs,
region,
operator.OpHotRegion,
srcStoreID,
dstPeer)
}
return
}

func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) {
op.SetPriorityLevel(constant.High)
op.FinishedCounters = append(op.FinishedCounters,
Expand Down
57 changes: 32 additions & 25 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func clearPendingInfluence(h *hotScheduler) {
h.regionPendings = make(map[uint64]*pendingInfluence)
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestUpgrade(t *testing.T) {
re := require.New(t)
cancel, _, _, oc := prepareSchedulersTest()
Expand Down Expand Up @@ -193,20 +198,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
}
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
checkHotWriteRegionPlacement(re, true)
}

func TestSplitIfRegionTooHot(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
Expand Down Expand Up @@ -395,6 +386,15 @@ func TestSplitBucketsByLoad(t *testing.T) {
}
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
checkHotWriteRegionPlacement(re, true)
}

func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -446,14 +446,13 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b
tc.RuleManager.DeleteRule("pd", "follower")
ops, _ = hb.Schedule(tc, false)
re.NotEmpty(ops)
// TODO: fix the test
// re.NotContains(ops[0].Step(1).String(), "transfer leader")
re.NotContains(ops[0].Step(1).String(), "transfer leader")
}

func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) {
cancel, opt, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
Expand Down Expand Up @@ -511,12 +510,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand All @@ -536,10 +537,10 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
ops, _ := hb.Schedule(tc, false)
op := ops[0]
clearPendingInfluence(hb.(*hotScheduler))
re.Equal(4, op.Len())
re.Equal(5, op.Len())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -636,7 +637,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
statisticsInterval = 0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetHotRegionCacheHitsThreshold(0)
re.NoError(tc.RuleManager.SetRules([]*placement.Rule{
{
Expand Down Expand Up @@ -730,9 +731,11 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 2:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10)
default:
re.FailNow("wrong op: " + op.String())
Expand Down Expand Up @@ -823,12 +826,14 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -1164,9 +1169,11 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4)
cnt++
if cnt == 3 {
Expand Down
66 changes: 43 additions & 23 deletions pkg/utils/operatorutil/operator_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,27 @@ func trimTransferLeaders(op *operator.Operator) (steps []operator.OpStep, lastLe
// CheckTransferPeer checks if the operator is to transfer peer between the specified source and target stores.
func CheckTransferPeer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
re.NotNil(op)
var addLearnerTo, removePeerFrom uint64
steps, _ := trimTransferLeaders(op)
re.Len(steps, 3)
re.Equal(targetID, steps[0].(operator.AddLearner).ToStore)
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
switch len(steps) {
case 3: // without joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.IsType(operator.RemovePeer{}, steps[2])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[2].(operator.RemovePeer).FromStore
case 4: // with joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.ChangePeerV2Enter{}, steps[1])
re.IsType(operator.ChangePeerV2Leave{}, steps[2])
re.IsType(operator.RemovePeer{}, steps[3])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[3].(operator.RemovePeer).FromStore
default:
re.FailNow("unexpected operator steps")
}
re.Equal(sourceID, removePeerFrom)
re.Equal(targetID, addLearnerTo)
kind |= operator.OpRegion
re.Equal(kind, op.Kind()&kind)
}
Expand All @@ -88,32 +104,36 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op
// CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer
// peer between the specified source and target stores and it meanwhile
// transfers the leader out of source store.
// If targetID is 0, it means the operator is to transfer peer to any store.
func CheckTransferPeerWithLeaderTransfer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add more comments to indicate these functions are used for the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I will refactor them in another pr.

re.NotNil(op)
var addLearnerTo, removePeerFrom uint64
steps, lastLeader := trimTransferLeaders(op)
re.Len(steps, 3)
re.Equal(targetID, steps[0].(operator.AddLearner).ToStore)
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
switch len(steps) {
case 3: // without joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.IsType(operator.RemovePeer{}, steps[2])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[2].(operator.RemovePeer).FromStore
case 4: // with joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.ChangePeerV2Enter{}, steps[1])
re.IsType(operator.ChangePeerV2Leave{}, steps[2])
re.IsType(operator.RemovePeer{}, steps[3])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[3].(operator.RemovePeer).FromStore
default:
re.FailNow("unexpected operator steps")
}
re.NotZero(lastLeader)
re.NotEqual(sourceID, lastLeader)
kind |= operator.OpRegion
re.Equal(kind, op.Kind()&kind)
}

// CheckTransferPeerWithLeaderTransferFrom checks if the operator is to transfer
// peer out of the specified store and it meanwhile transfers the leader out of
// the store.
func CheckTransferPeerWithLeaderTransferFrom(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID uint64) {
re.NotNil(op)
steps, lastLeader := trimTransferLeaders(op)
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
re.NotZero(lastLeader)
re.NotEqual(sourceID, lastLeader)
kind |= operator.OpRegion | operator.OpLeader
re.Equal(kind, op.Kind()&kind)
re.Equal(sourceID, removePeerFrom)
if targetID != 0 {
re.Equal(targetID, addLearnerTo)
}
}

// CheckAddPeer checks if the operator is to add peer on specified store.
Expand Down
Loading