From 25469b54ac6c4231323bf25fa31b6d0eaa38c7bc Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 12 Apr 2024 12:34:53 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #7852 close tikv/pd#7848 Signed-off-by: ti-chi-bot --- metrics/grafana/pd.json | 15 ++ pkg/schedule/schedulers/hot_region.go | 17 +- pkg/schedule/schedulers/hot_region_test.go | 204 ++++++++++++++++++++- pkg/utils/operatorutil/operator_check.go | 66 ++++--- 4 files changed, 265 insertions(+), 37 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 46cae84b0c5..143ad00dd84 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -5595,6 +5595,21 @@ "intervalFactor": 1, "legendFormat": "store-{{store}}-in", "refId": "B" + }, + { + "expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "store-{{store}}-out", + "refId": "C", + "step": 4 + }, + { + "expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "store-{{store}}-in", + "refId": "D" } ], "thresholds": [], diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index b562df305e7..3776d7daaf8 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1457,6 +1457,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { targetLabel := strconv.FormatUint(dstStoreID, 10) dim := bs.rankToDimString() +<<<<<<< HEAD var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) switch bs.rwTy { case statistics.Read: @@ -1466,11 +1467,14 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { } currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID) +======= + currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID) +>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) if err == nil { bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim) ops = []*operator.Operator{currentOp} if bs.cur.revertRegion != nil { - currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID) + currentOp, typ, err = bs.createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID) if err == nil { bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim) ops = append(ops, currentOp) @@ -1545,11 +1549,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper return operators } -func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { +func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { if region.GetStorePeer(dstStoreID) != nil { typ = "transfer-leader" op, err = operator.CreateTransferLeaderOperator( - "transfer-hot-read-leader", + "transfer-hot-"+bs.rwTy.String()+"-leader", bs, region, srcStoreID, @@ -1562,7 +1566,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, if region.GetLeader().GetStoreId() == srcStoreID { typ = "move-leader" op, err = operator.CreateMoveLeaderOperator( - "move-hot-read-leader", + "move-hot-"+bs.rwTy.String()+"-leader", bs, region, operator.OpHotRegion, @@ -1571,7 +1575,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, } else { typ = "move-peer" op, err = operator.CreateMovePeerOperator( - "move-hot-read-peer", + "move-hot-"+bs.rwTy.String()+"-peer", bs, region, operator.OpHotRegion, @@ -1582,6 +1586,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, return } +<<<<<<< HEAD func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { if region.GetStorePeer(dstStoreID) != nil { typ = "transfer-leader" @@ -1608,6 +1613,8 @@ func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID return } +======= +>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) { op.SetPriorityLevel(constant.High) op.FinishedCounters = append(op.FinishedCounters, diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 5fc1cffb8ba..fb4ea73713e 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -67,6 +67,11 @@ func clearPendingInfluence(h *hotScheduler) { h.regionPendings = make(map[uint64]*pendingInfluence) } +func newTestRegion(id uint64) *core.RegionInfo { + peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}} + return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0]) +} + func TestUpgrade(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() @@ -189,6 +194,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } } +<<<<<<< HEAD func newTestRegion(id uint64) *core.RegionInfo { peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}} return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0]) @@ -204,6 +210,9 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { } func TestSplitBuckets(t *testing.T) { +======= +func TestSplitIfRegionTooHot(t *testing.T) { +>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) re := require.New(t) statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() @@ -239,15 +248,184 @@ func TestSplitBuckets(t *testing.T) { expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")} expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys) re.NoError(err) +<<<<<<< HEAD expectOp.GetCreateTime() re.Equal(expectOp.Brief(), op.Brief()) re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo()) +======= + solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader) + solve.cur = &solution{} + region := core.NewTestRegionInfo(1, 1, []byte("a"), []byte("f")) + + testdata := []struct { + hotBuckets [][]byte + splitKeys [][]byte + }{ + { + [][]byte{[]byte("a"), []byte("b"), []byte("f")}, + [][]byte{[]byte("b")}, + }, + { + [][]byte{[]byte(""), []byte("a"), []byte("")}, + nil, + }, + { + [][]byte{}, + nil, + }, + } + + for _, data := range testdata { + b := &metapb.Buckets{ + RegionId: 1, + PeriodInMs: 1000, + Keys: data.hotBuckets, + } + region.UpdateBuckets(b, region.GetBuckets()) + ops := solve.createSplitOperator([]*core.RegionInfo{region}, bySize) + if data.splitKeys == nil { + re.Empty(ops) + continue + } + re.Len(ops, 1) + op := ops[0] + re.Equal(splitHotReadBuckets, op.Desc()) + + expectOp, err := operator.CreateSplitRegionOperator(splitHotReadBuckets, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, data.splitKeys) + re.NoError(err) + re.Equal(expectOp.Brief(), op.Brief()) + } +} + +func TestSplitBucketsByLoad(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + cancel, _, tc, oc := prepareSchedulersTest() + tc.SetHotRegionCacheHitsThreshold(1) + tc.SetRegionBucketEnabled(true) + defer cancel() + hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader) + solve.cur = &solution{} + region := core.NewTestRegionInfo(1, 1, []byte("a"), []byte("f")) + testdata := []struct { + hotBuckets [][]byte + splitKeys [][]byte + }{ + { + [][]byte{[]byte(""), []byte("b"), []byte("")}, + [][]byte{[]byte("b")}, + }, + { + [][]byte{[]byte(""), []byte("a"), []byte("")}, + nil, + }, + { + [][]byte{[]byte("b"), []byte("c"), []byte("")}, + [][]byte{[]byte("c")}, + }, + } + for _, data := range testdata { + b := &metapb.Buckets{ + RegionId: 1, + PeriodInMs: 1000, + Keys: data.hotBuckets, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{10 * units.KiB, 10 * units.MiB}, + ReadKeys: []uint64{256, 256}, + ReadQps: []uint64{0, 0}, + WriteBytes: []uint64{0, 0}, + WriteQps: []uint64{0, 0}, + WriteKeys: []uint64{0, 0}, + }, + } + task := buckets.NewCheckPeerTask(b) + re.True(tc.HotBucketCache.CheckAsync(task)) + time.Sleep(time.Millisecond * 10) + ops := solve.createSplitOperator([]*core.RegionInfo{region}, byLoad) + if data.splitKeys == nil { + re.Empty(ops) + continue + } + re.Len(ops, 1) + op := ops[0] + re.Equal(splitHotReadBuckets, op.Desc()) + + expectOp, err := operator.CreateSplitRegionOperator(splitHotReadBuckets, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, data.splitKeys) + re.NoError(err) + re.Equal(expectOp.Brief(), op.Brief()) + } +} + +func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + statisticsInterval = 0 + checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) + checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) + checkHotWriteRegionPlacement(re, true) +} + +func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + tc.SetEnableUseJointConsensus(true) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) + tc.SetEnablePlacementRules(enablePlacementRules) + labels := []string{"zone", "host"} + tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) + hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + hb.(*hotScheduler).conf.SetHistorySampleDuration(0) + tc.SetHotRegionCacheHitsThreshold(0) + + tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) + tc.AddLabelsStore(2, 2, map[string]string{"zone": "z1", "host": "h2"}) + tc.AddLabelsStore(3, 2, map[string]string{"zone": "z2", "host": "h3"}) + tc.AddLabelsStore(4, 2, map[string]string{"zone": "z2", "host": "h4"}) + tc.AddLabelsStore(5, 2, map[string]string{"zone": "z2", "host": "h5"}) + tc.AddLabelsStore(6, 2, map[string]string{"zone": "z2", "host": "h6"}) + tc.RuleManager.SetRule(&placement.Rule{ + GroupID: "pd", ID: "leader", Role: placement.Leader, Count: 1, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z1"}}}, + }) + tc.RuleManager.SetRule(&placement.Rule{ + GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}}, + }) + tc.RuleManager.DeleteRule("pd", "default") + + tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 0) + tc.UpdateStorageWrittenBytes(3, 6*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 3*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(5, 3*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(6, 6*units.MiB*utils.StoreHeartBeatReportInterval) + + // Region 1, 2 and 3 are hot regions. + addRegionInfo(tc, utils.Write, []testRegionInfo{ + {1, []uint64{1, 3, 5}, 512 * units.KiB, 0, 0}, + {2, []uint64{1, 4, 6}, 512 * units.KiB, 0, 0}, + {3, []uint64{1, 3, 6}, 512 * units.KiB, 0, 0}, + }) + ops, _ := hb.Schedule(tc, false) + re.NotEmpty(ops) + re.NotContains(ops[0].Step(1).String(), "transfer leader") + clearPendingInfluence(hb.(*hotScheduler)) + + tc.RuleManager.SetRule(&placement.Rule{ + GroupID: "pd", ID: "voter", Role: placement.Voter, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}}, + }) + tc.RuleManager.DeleteRule("pd", "follower") + ops, _ = hb.Schedule(tc, false) + re.NotEmpty(ops) + re.NotContains(ops[0].Step(1).String(), "transfer leader") +>>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) } func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) { cancel, opt, tc, oc := prepareSchedulersTest() defer cancel() - tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) @@ -304,12 +482,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) - case 4: + case 5: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1) + operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) } else { // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) @@ -329,10 +509,10 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace ops, _ := hb.Schedule(tc, false) op := ops[0] clearPendingInfluence(hb.(*hotScheduler)) - re.Equal(4, op.Len()) + re.Equal(5, op.Len()) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1) + operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) } else { // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) @@ -429,7 +609,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) tc.SetHotRegionCacheHitsThreshold(0) re.NoError(tc.RuleManager.SetRules([]*placement.Rule{ { @@ -522,9 +702,11 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) case 2: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10) default: re.FailNow("wrong op: " + op.String()) @@ -615,12 +797,14 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) - case 4: + case 5: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1) + operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) } else { // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) @@ -952,9 +1136,11 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) - case 4: + case 5: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4) cnt++ if cnt == 3 { diff --git a/pkg/utils/operatorutil/operator_check.go b/pkg/utils/operatorutil/operator_check.go index f6517be29d7..4579eb4437e 100644 --- a/pkg/utils/operatorutil/operator_check.go +++ b/pkg/utils/operatorutil/operator_check.go @@ -65,11 +65,27 @@ func trimTransferLeaders(op *operator.Operator) (steps []operator.OpStep, lastLe // CheckTransferPeer checks if the operator is to transfer peer between the specified source and target stores. func CheckTransferPeer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) { re.NotNil(op) + var addLearnerTo, removePeerFrom uint64 steps, _ := trimTransferLeaders(op) - re.Len(steps, 3) - re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) - re.IsType(operator.PromoteLearner{}, steps[1]) - re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore) + switch len(steps) { + case 3: // without joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.PromoteLearner{}, steps[1]) + re.IsType(operator.RemovePeer{}, steps[2]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[2].(operator.RemovePeer).FromStore + case 4: // with joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.ChangePeerV2Enter{}, steps[1]) + re.IsType(operator.ChangePeerV2Leave{}, steps[2]) + re.IsType(operator.RemovePeer{}, steps[3]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[3].(operator.RemovePeer).FromStore + default: + re.FailNow("unexpected operator steps") + } + re.Equal(sourceID, removePeerFrom) + re.Equal(targetID, addLearnerTo) kind |= operator.OpRegion re.Equal(kind, op.Kind()&kind) } @@ -88,32 +104,36 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op // CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer // peer between the specified source and target stores and it meanwhile // transfers the leader out of source store. +// If targetID is 0, it means the operator is to transfer peer to any store. func CheckTransferPeerWithLeaderTransfer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) { re.NotNil(op) + var addLearnerTo, removePeerFrom uint64 steps, lastLeader := trimTransferLeaders(op) - re.Len(steps, 3) - re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) - re.IsType(operator.PromoteLearner{}, steps[1]) - re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore) + switch len(steps) { + case 3: // without joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.PromoteLearner{}, steps[1]) + re.IsType(operator.RemovePeer{}, steps[2]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[2].(operator.RemovePeer).FromStore + case 4: // with joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.ChangePeerV2Enter{}, steps[1]) + re.IsType(operator.ChangePeerV2Leave{}, steps[2]) + re.IsType(operator.RemovePeer{}, steps[3]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[3].(operator.RemovePeer).FromStore + default: + re.FailNow("unexpected operator steps") + } re.NotZero(lastLeader) re.NotEqual(sourceID, lastLeader) kind |= operator.OpRegion re.Equal(kind, op.Kind()&kind) -} - -// CheckTransferPeerWithLeaderTransferFrom checks if the operator is to transfer -// peer out of the specified store and it meanwhile transfers the leader out of -// the store. -func CheckTransferPeerWithLeaderTransferFrom(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID uint64) { - re.NotNil(op) - steps, lastLeader := trimTransferLeaders(op) - re.IsType(operator.AddLearner{}, steps[0]) - re.IsType(operator.PromoteLearner{}, steps[1]) - re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore) - re.NotZero(lastLeader) - re.NotEqual(sourceID, lastLeader) - kind |= operator.OpRegion | operator.OpLeader - re.Equal(kind, op.Kind()&kind) + re.Equal(sourceID, removePeerFrom) + if targetID != 0 { + re.Equal(targetID, addLearnerTo) + } } // CheckAddPeer checks if the operator is to add peer on specified store. From b42856d5cd13317055b8e2178ffe586befb90d23 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 12 Apr 2024 16:41:23 +0800 Subject: [PATCH 2/3] fix conflict Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region.go | 41 ----- pkg/schedule/schedulers/hot_region_test.go | 182 +-------------------- 2 files changed, 7 insertions(+), 216 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 3776d7daaf8..4201258527b 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1457,19 +1457,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { targetLabel := strconv.FormatUint(dstStoreID, 10) dim := bs.rankToDimString() -<<<<<<< HEAD - var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) - switch bs.rwTy { - case statistics.Read: - createOperator = bs.createReadOperator - case statistics.Write: - createOperator = bs.createWriteOperator - } - - currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID) -======= currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID) ->>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) if err == nil { bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim) ops = []*operator.Operator{currentOp} @@ -1586,35 +1574,6 @@ func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dst return } -<<<<<<< HEAD -func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { - if region.GetStorePeer(dstStoreID) != nil { - typ = "transfer-leader" - op, err = operator.CreateTransferLeaderOperator( - "transfer-hot-write-leader", - bs, - region, - srcStoreID, - dstStoreID, - []uint64{}, - operator.OpHotRegion) - } else { - srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers` - dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role} - typ = "move-peer" - op, err = operator.CreateMovePeerOperator( - "move-hot-write-peer", - bs, - region, - operator.OpHotRegion, - srcStoreID, - dstPeer) - } - return -} - -======= ->>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) { op.SetPriorityLevel(constant.High) op.FinishedCounters = append(op.FinishedCounters, diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index fb4ea73713e..e51d93c0c62 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -22,7 +22,6 @@ import ( "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -30,7 +29,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -194,170 +192,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } } -<<<<<<< HEAD -func newTestRegion(id uint64) *core.RegionInfo { - peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}} - return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0]) -} - -func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { - re := require.New(t) - statistics.Denoising = false - statistics.HistorySampleDuration = 0 - statisticsInterval = 0 - checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) - checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) -} - -func TestSplitBuckets(t *testing.T) { -======= -func TestSplitIfRegionTooHot(t *testing.T) { ->>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) - re := require.New(t) - statistics.Denoising = false - cancel, _, tc, oc := prepareSchedulersTest() - tc.SetHotRegionCacheHitsThreshold(1) - defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) - re.NoError(err) - solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader) - region := core.NewTestRegionInfo(1, 1, []byte(""), []byte("")) - - // the hot range is [a,c],[e,f] - b := &metapb.Buckets{ - RegionId: 1, - PeriodInMs: 1000, - Keys: [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")}, - Stats: &metapb.BucketStats{ - ReadBytes: []uint64{10 * units.KiB, 10 * units.KiB, 0, 10 * units.KiB, 10 * units.KiB}, - ReadKeys: []uint64{256, 256, 0, 256, 256}, - ReadQps: []uint64{0, 0, 0, 0, 0}, - WriteBytes: []uint64{0, 0, 0, 0, 0}, - WriteQps: []uint64{0, 0, 0, 0, 0}, - WriteKeys: []uint64{0, 0, 0, 0, 0}, - }, - } - - task := buckets.NewCheckPeerTask(b) - re.True(tc.HotBucketCache.CheckAsync(task)) - time.Sleep(time.Millisecond * 10) - ops := solve.createSplitOperator([]*core.RegionInfo{region}) - re.Equal(1, len(ops)) - op := ops[0] - re.Equal(splitBucket, op.Desc()) - expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")} - expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys) - re.NoError(err) -<<<<<<< HEAD - expectOp.GetCreateTime() - re.Equal(expectOp.Brief(), op.Brief()) - re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo()) -======= - solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader) - solve.cur = &solution{} - region := core.NewTestRegionInfo(1, 1, []byte("a"), []byte("f")) - - testdata := []struct { - hotBuckets [][]byte - splitKeys [][]byte - }{ - { - [][]byte{[]byte("a"), []byte("b"), []byte("f")}, - [][]byte{[]byte("b")}, - }, - { - [][]byte{[]byte(""), []byte("a"), []byte("")}, - nil, - }, - { - [][]byte{}, - nil, - }, - } - - for _, data := range testdata { - b := &metapb.Buckets{ - RegionId: 1, - PeriodInMs: 1000, - Keys: data.hotBuckets, - } - region.UpdateBuckets(b, region.GetBuckets()) - ops := solve.createSplitOperator([]*core.RegionInfo{region}, bySize) - if data.splitKeys == nil { - re.Empty(ops) - continue - } - re.Len(ops, 1) - op := ops[0] - re.Equal(splitHotReadBuckets, op.Desc()) - - expectOp, err := operator.CreateSplitRegionOperator(splitHotReadBuckets, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, data.splitKeys) - re.NoError(err) - re.Equal(expectOp.Brief(), op.Brief()) - } -} - -func TestSplitBucketsByLoad(t *testing.T) { - re := require.New(t) - statistics.Denoising = false - cancel, _, tc, oc := prepareSchedulersTest() - tc.SetHotRegionCacheHitsThreshold(1) - tc.SetRegionBucketEnabled(true) - defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) - re.NoError(err) - solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader) - solve.cur = &solution{} - region := core.NewTestRegionInfo(1, 1, []byte("a"), []byte("f")) - testdata := []struct { - hotBuckets [][]byte - splitKeys [][]byte - }{ - { - [][]byte{[]byte(""), []byte("b"), []byte("")}, - [][]byte{[]byte("b")}, - }, - { - [][]byte{[]byte(""), []byte("a"), []byte("")}, - nil, - }, - { - [][]byte{[]byte("b"), []byte("c"), []byte("")}, - [][]byte{[]byte("c")}, - }, - } - for _, data := range testdata { - b := &metapb.Buckets{ - RegionId: 1, - PeriodInMs: 1000, - Keys: data.hotBuckets, - Stats: &metapb.BucketStats{ - ReadBytes: []uint64{10 * units.KiB, 10 * units.MiB}, - ReadKeys: []uint64{256, 256}, - ReadQps: []uint64{0, 0}, - WriteBytes: []uint64{0, 0}, - WriteQps: []uint64{0, 0}, - WriteKeys: []uint64{0, 0}, - }, - } - task := buckets.NewCheckPeerTask(b) - re.True(tc.HotBucketCache.CheckAsync(task)) - time.Sleep(time.Millisecond * 10) - ops := solve.createSplitOperator([]*core.RegionInfo{region}, byLoad) - if data.splitKeys == nil { - re.Empty(ops) - continue - } - re.Len(ops, 1) - op := ops[0] - re.Equal(splitHotReadBuckets, op.Desc()) - - expectOp, err := operator.CreateSplitRegionOperator(splitHotReadBuckets, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, data.splitKeys) - re.NoError(err) - re.Equal(expectOp.Brief(), op.Brief()) - } -} - func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { re := require.New(t) statistics.Denoising = false @@ -375,9 +209,8 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) - hb.(*hotScheduler).conf.SetHistorySampleDuration(0) tc.SetHotRegionCacheHitsThreshold(0) tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) @@ -394,15 +227,15 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b }) tc.RuleManager.DeleteRule("pd", "default") - tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(1, 10*units.MiB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(2, 0) - tc.UpdateStorageWrittenBytes(3, 6*units.MiB*utils.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(4, 3*units.MiB*utils.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(5, 3*units.MiB*utils.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(6, 6*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 6*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 3*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(5, 3*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(6, 6*units.MiB*statistics.StoreHeartBeatReportInterval) // Region 1, 2 and 3 are hot regions. - addRegionInfo(tc, utils.Write, []testRegionInfo{ + addRegionInfo(tc, statistics.Write, []testRegionInfo{ {1, []uint64{1, 3, 5}, 512 * units.KiB, 0, 0}, {2, []uint64{1, 4, 6}, 512 * units.KiB, 0, 0}, {3, []uint64{1, 3, 6}, 512 * units.KiB, 0, 0}, @@ -419,7 +252,6 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b ops, _ = hb.Schedule(tc, false) re.NotEmpty(ops) re.NotContains(ops[0].Step(1).String(), "transfer leader") ->>>>>>> 33ae3b614 (scheduler: use move-hot-write-leader operator (#7852)) } func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) { From d6e19636a05b7a27779e8dc5b88f048bb1ab2c60 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 12 Apr 2024 16:42:39 +0800 Subject: [PATCH 3/3] fix conflict Signed-off-by: lhy1024 --- pkg/schedule/schedulers/hot_region_test.go | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index e51d93c0c62..09b8a6946f1 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -22,6 +22,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -29,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -192,6 +194,47 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } } +func TestSplitBuckets(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + cancel, _, tc, oc := prepareSchedulersTest() + tc.SetHotRegionCacheHitsThreshold(1) + defer cancel() + hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader) + region := core.NewTestRegionInfo(1, 1, []byte(""), []byte("")) + + // the hot range is [a,c],[e,f] + b := &metapb.Buckets{ + RegionId: 1, + PeriodInMs: 1000, + Keys: [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")}, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{10 * units.KiB, 10 * units.KiB, 0, 10 * units.KiB, 10 * units.KiB}, + ReadKeys: []uint64{256, 256, 0, 256, 256}, + ReadQps: []uint64{0, 0, 0, 0, 0}, + WriteBytes: []uint64{0, 0, 0, 0, 0}, + WriteQps: []uint64{0, 0, 0, 0, 0}, + WriteKeys: []uint64{0, 0, 0, 0, 0}, + }, + } + + task := buckets.NewCheckPeerTask(b) + re.True(tc.HotBucketCache.CheckAsync(task)) + time.Sleep(time.Millisecond * 10) + ops := solve.createSplitOperator([]*core.RegionInfo{region}) + re.Equal(1, len(ops)) + op := ops[0] + re.Equal(splitBucket, op.Desc()) + expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")} + expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys) + re.NoError(err) + expectOp.GetCreateTime() + re.Equal(expectOp.Brief(), op.Brief()) + re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo()) +} + func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { re := require.New(t) statistics.Denoising = false