From 9991d2887fdcdbdc26085d123bee35077ef3e0f8 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 15 Apr 2024 13:09:39 +0800 Subject: [PATCH] address context Signed-off-by: nolouch --- pkg/core/context.go | 40 +++++++++++ pkg/core/region.go | 22 +++---- pkg/core/region_test.go | 7 +- pkg/mcs/scheduling/server/cluster.go | 44 ++++++------- pkg/syncer/client.go | 7 ++ pkg/utils/ctxutil/context.go | 27 -------- server/cluster/cluster.go | 44 +++++-------- server/cluster/cluster_test.go | 99 ++++++++++++++-------------- server/cluster/cluster_worker.go | 18 +++-- 9 files changed, 157 insertions(+), 151 deletions(-) create mode 100644 pkg/core/context.go delete mode 100644 pkg/utils/ctxutil/context.go diff --git a/pkg/core/context.go b/pkg/core/context.go new file mode 100644 index 00000000000..d355488e516 --- /dev/null +++ b/pkg/core/context.go @@ -0,0 +1,40 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + + "github.com/tikv/pd/pkg/ratelimit" +) + +// MetaProcessContext is a context for meta process. +type MetaProcessContext struct { + context.Context + Tracer RegionHeartbeatProcessTracer + TaskRunner ratelimit.Runner + Limiter *ratelimit.ConcurrencyLimiter +} + +// NewMetaProcessContext creates a new MetaProcessContext. +// used in tests. +func ContextTODO() *MetaProcessContext { + return &MetaProcessContext{ + Context: context.TODO(), + Tracer: NewNoopHeartbeatProcessTracer(), + TaskRunner: ratelimit.NewSyncRunner(), + // Limit default is nil + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index 4f84d20d94f..469ea95157b 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/ratelimit" - "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -714,7 +713,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -729,14 +728,14 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { - taskRunner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner) - limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter) + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + taskRunner := ctx.TaskRunner + limiter := ctx.Limiter // print log asynchronously - if ok { + if taskRunner != nil { debug = func(msg string, fields ...zap.Field) { taskRunner.RunTask( - ctx, + ctx.Context, ratelimit.TaskOpts{ TaskName: "Log", Limit: limiter, @@ -748,7 +747,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } info = func(msg string, fields ...zap.Field) { taskRunner.RunTask( - ctx, + ctx.Context, ratelimit.TaskOpts{ TaskName: "Log", Limit: limiter, @@ -982,11 +981,8 @@ func convertItemsToRegions(items []*regionItem) []*RegionInfo { } // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. -func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) { - tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(RegionHeartbeatProcessTracer) - if !ok { - tracer = NewNoopHeartbeatProcessTracer() - } +func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { + tracer := ctx.Tracer r.t.Lock() var ols []*regionItem origin := r.getRegionLocked(region.GetID()) diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 5bb09eb52b0..ae91886369f 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -15,7 +15,6 @@ package core import ( - "context" "crypto/rand" "fmt" "math" @@ -364,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(context.TODO(), regionA, regionB) + _, _, needSync := RegionGuide(ContextTODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -460,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) { regions := NewRegionsInfo() region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b")) go func() { - regions.AtomicCheckAndPutRegion(context.TODO(), region) + regions.AtomicCheckAndPutRegion(ContextTODO(), region) }() - regions.AtomicCheckAndPutRegion(context.TODO(), region) + regions.AtomicCheckAndPutRegion(ContextTODO(), region) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree")) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 7c471d783a4..42e8c3a35cb 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -31,7 +31,6 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -558,13 +557,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - tracer.Begin() - ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer) - ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter) + var runner ratelimit.Runner + runner = syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner) + runner = c.taskRunner } - + ctx := &core.MetaProcessContext{ + Context: c.ctx, + Limiter: c.hbConcurrencyLimiter, + Tracer: tracer, + TaskRunner: runner, + } + tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { tracer.OnAllStageFinished() return err @@ -575,16 +579,8 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { } // processRegionHeartbeat updates the region information. -func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.RegionInfo) error { - tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(core.RegionHeartbeatProcessTracer) - if !ok { - tracer = core.NewNoopHeartbeatProcessTracer() - } - runner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner) - if !ok { - runner = syncRunner - } - limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter) +func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { + tracer := ctx.Tracer origin, _, err := c.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { @@ -592,11 +588,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - runner.RunTask( + ctx.TaskRunner.RunTask( ctx, ratelimit.TaskOpts{ TaskName: "HandleStatsAsync", - Limit: limiter, + Limit: ctx.Limiter, }, func(_ context.Context) { cluster.HandleStatsAsync(c, region) @@ -612,11 +608,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - runner.RunTask( + ctx.TaskRunner.RunTask( ctx, ratelimit.TaskOpts{ TaskName: "ObserveRegionStatsAsync", - Limit: limiter, + Limit: ctx.Limiter, }, func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -640,11 +636,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio tracer.OnSaveCacheFinished() return err } - runner.RunTask( + ctx.TaskRunner.RunTask( ctx, ratelimit.TaskOpts{ TaskName: "HandleOverlaps", - Limit: limiter, + Limit: ctx.Limiter, }, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) @@ -653,11 +649,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio } tracer.OnSaveCacheFinished() // handle region stats - runner.RunTask( + ctx.TaskRunner.RunTask( ctx, ratelimit.TaskOpts{ TaskName: "CollectRegionStatsAsync", - Limit: c.hbConcurrencyLimiter, + Limit: ctx.Limiter, }, func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 558423722ff..8a2e757d5cd 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" @@ -205,6 +206,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } + ctx := &core.MetaProcessContext{ + Context: ctx, + TaskRunner: ratelimit.NewSyncRunner(), + Tracer: core.NewNoopHeartbeatProcessTracer(), + // no limit for followers. + } saveKV, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) diff --git a/pkg/utils/ctxutil/context.go b/pkg/utils/ctxutil/context.go deleted file mode 100644 index 8a4c7321828..00000000000 --- a/pkg/utils/ctxutil/context.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2024 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ctxutil - -// CtxKey is a custom type used as a key for values stored in Context. -type CtxKey string - -const ( - // HeartbeatTracerKey is the key for the heartbeat tracer in the context. - HeartbeatTracerKey CtxKey = "h_tracer" - // TaskRunnerKey is the key for the task runner in the context. - TaskRunnerKey CtxKey = "task_runner" - // LimiterKey is the key for the concurrency limiter in the context. - LimiterKey CtxKey = "limiter" -) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 132633c8a1c..dd59b63240f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -58,7 +58,6 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/unsaferecovery" - "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/netutil" @@ -1003,17 +1002,8 @@ var regionGuide = core.GenerateRegionGuideFunc(true) var syncRunner = ratelimit.NewSyncRunner() // processRegionHeartbeat updates the region information. -func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.RegionInfo) error { - tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(core.RegionHeartbeatProcessTracer) - if !ok { - tracer = core.NewNoopHeartbeatProcessTracer() - } - runner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner) - if !ok { - runner = syncRunner - } - limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter) - +func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { + tracer := ctx.Tracer origin, _, err := c.core.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { @@ -1023,11 +1013,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - runner.RunTask( - ctx, + ctx.TaskRunner.RunTask( + ctx.Context, ratelimit.TaskOpts{ TaskName: "HandleStatsAsync", - Limit: limiter, + Limit: ctx.Limiter, }, func(_ context.Context) { cluster.HandleStatsAsync(c, region) @@ -1047,11 +1037,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - runner.RunTask( - ctx, + ctx.TaskRunner.RunTask( + ctx.Context, ratelimit.TaskOpts{ TaskName: "ObserveRegionStatsAsync", - Limit: limiter, + Limit: ctx.Limiter, }, func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { @@ -1080,11 +1070,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R return err } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - runner.RunTask( - ctx, + ctx.TaskRunner.RunTask( + ctx.Context, ratelimit.TaskOpts{ TaskName: "HandleOverlaps", - Limit: limiter, + Limit: ctx.Limiter, }, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) @@ -1096,11 +1086,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R tracer.OnSaveCacheFinished() // handle region stats - runner.RunTask( - ctx, + ctx.TaskRunner.RunTask( + ctx.Context, ratelimit.TaskOpts{ TaskName: "CollectRegionStatsAsync", - Limit: c.hbConcurrencyLimiter, + Limit: ctx.Limiter, }, func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", @@ -1113,11 +1103,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - runner.RunTask( - ctx, + ctx.TaskRunner.RunTask( + ctx.Context, ratelimit.TaskOpts{ TaskName: "SaveRegionToKV", - Limit: c.hbConcurrencyLimiter, + Limit: ctx.Limiter, }, func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index bd921189e6f..d01446ba143 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -54,7 +54,6 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/operatorutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -632,7 +631,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { region := core.NewRegionInfo(regionMeta, leader, core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: utils.RegionHeartBeatReportInterval}), core.SetWrittenBytes(30000*10), core.SetWrittenKeys(300000*10)) - err = cluster.processRegionHeartbeat(context.TODO(), region) + err = cluster.processRegionHeartbeat(core.ContextTODO(), region) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -645,7 +644,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { StoreId: 4, } region = region.Clone(core.WithRemoveStorePeer(2), core.WithAddPeer(newPeer)) - err = cluster.processRegionHeartbeat(context.TODO(), region) + err = cluster.processRegionHeartbeat(core.ContextTODO(), region) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -682,8 +681,8 @@ func TestBucketHeartbeat(t *testing.T) { re.NoError(cluster.putStoreLocked(store)) } - re.NoError(cluster.processRegionHeartbeat(context.TODO(), regions[0])) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), regions[1])) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), regions[0])) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), regions[1])) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.NoError(cluster.processReportBuckets(buckets)) re.Equal(buckets, cluster.GetRegion(uint64(1)).GetBuckets()) @@ -702,13 +701,13 @@ func TestBucketHeartbeat(t *testing.T) { // case5: region update should inherit buckets. newRegion := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) opt.SetRegionBucketEnabled(true) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), newRegion)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), newRegion)) re.Len(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys(), 2) // case6: disable region bucket in opt.SetRegionBucketEnabled(false) newRegion2 := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), newRegion2)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), newRegion2)) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.Empty(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys()) } @@ -734,25 +733,25 @@ func TestRegionHeartbeat(t *testing.T) { for i, region := range regions { // region does not exist. - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is the same, not updated. - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(context.TODO(), stale)) + re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), stale)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -762,13 +761,13 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(context.TODO(), stale)) + re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), stale)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -780,38 +779,38 @@ func TestRegionHeartbeat(t *testing.T) { }, })) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -821,47 +820,47 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Flashback region = region.Clone(core.WithFlashback(true, 1)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) region = region.Clone(core.WithFlashback(false, 0)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) } @@ -917,7 +916,7 @@ func TestRegionHeartbeat(t *testing.T) { core.WithNewRegionID(10000), core.WithDecVersion(), ) - re.Error(cluster.processRegionHeartbeat(context.TODO(), overlapRegion)) + re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), overlapRegion)) region := &metapb.Region{} ok, err := storage.LoadRegion(regions[n-1].GetID(), region) re.True(ok) @@ -943,8 +942,8 @@ func TestRegionHeartbeat(t *testing.T) { ) tracer := core.NewHeartbeatProcessTracer() tracer.Begin() - ctx := context.TODO() - ctx = context.WithValue(ctx, ctxutil.HeartbeatTracerKey, tracer) + ctx := core.ContextTODO() + ctx.Tracer = tracer re.NoError(cluster.processRegionHeartbeat(ctx, overlapRegion)) tracer.OnAllStageFinished() re.Condition(func() bool { @@ -979,7 +978,9 @@ func TestRegionFlowChanged(t *testing.T) { regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})} processRegions := func(regions []*core.RegionInfo) { for _, r := range regions { - cluster.processRegionHeartbeat(ctx, r) + mctx := core.ContextTODO() + mctx.Context = ctx + cluster.processRegionHeartbeat(mctx, r) } } regions = core.SplitRegions(regions) @@ -1015,7 +1016,7 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys-1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(context.TODO(), region) + cluster.processRegionHeartbeat(core.ContextTODO(), region) regionID := region.GetID() re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test ApproximateSize and ApproximateKeys change. @@ -1025,16 +1026,16 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys+1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(context.TODO(), region) + cluster.processRegionHeartbeat(core.ContextTODO(), region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test MaxMergeRegionSize and MaxMergeRegionKeys change. cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize + 2)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys + 2)) - cluster.processRegionHeartbeat(context.TODO(), region) + cluster.processRegionHeartbeat(core.ContextTODO(), region) re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys)) - cluster.processRegionHeartbeat(context.TODO(), region) + cluster.processRegionHeartbeat(core.ContextTODO(), region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) } @@ -1097,11 +1098,11 @@ func TestConcurrentRegionHeartbeat(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat", "return(true)")) go func() { defer wg.Done() - cluster.processRegionHeartbeat(context.TODO(), source) + cluster.processRegionHeartbeat(core.ContextTODO(), source) }() time.Sleep(100 * time.Millisecond) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat")) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), target)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), target)) wg.Wait() checkRegion(re, cluster.GetRegionByKey([]byte{}), target) } @@ -1163,7 +1164,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { // Heartbeat and check region one by one. for _, r := range regions { - re.NoError(cluster.processRegionHeartbeat(context.TODO(), r)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), r)) checkRegion(re, cluster.GetRegion(r.GetID()), r) checkRegion(re, cluster.GetRegionByKey(r.GetStartKey()), r) @@ -1200,7 +1201,7 @@ func TestHeartbeatSplit(t *testing.T) { // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region1)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("foo")), region1) // split 1 to 2: [nil, m) 1: [m, nil), sync 2 first. @@ -1209,12 +1210,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region2)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region2)) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, nil) is missing before r1's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("z"))) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region1)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) // split 1 to 3: [m, q) 1: [q, nil), sync 1 first. @@ -1223,12 +1224,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region1)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, q) is missing before r3's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("n"))) - re.NoError(cluster.processRegionHeartbeat(context.TODO(), region3)) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region3)) checkRegion(re, cluster.GetRegionByKey([]byte("n")), region3) } @@ -1524,11 +1525,11 @@ func TestUpdateStorePendingPeerCount(t *testing.T) { }, } origin := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[:3]}, peers[0], core.WithPendingPeers(peers[1:3])) - re.NoError(tc.processRegionHeartbeat(context.TODO(), origin)) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), origin)) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 1, 1, 0}, tc.RaftCluster, re) newRegion := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[1:]}, peers[1], core.WithPendingPeers(peers[3:4])) - re.NoError(tc.processRegionHeartbeat(context.TODO(), newRegion)) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 0, 0, 1}, tc.RaftCluster, re) } @@ -2958,12 +2959,12 @@ func TestShouldRun(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(context.TODO(), nr)) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(context.TODO(), newRegion)) + re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) } @@ -3001,12 +3002,12 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(context.TODO(), nr)) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(context.TODO(), newRegion)) + re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 6cf5e0caff8..5c2bb950297 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -16,7 +16,6 @@ package cluster import ( "bytes" - "context" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -25,9 +24,9 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" - "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/pkg/versioninfo" @@ -40,13 +39,18 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - tracer.Begin() - ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer) - ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter) + var runner ratelimit.Runner + runner = syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner) + runner = c.taskRunner } - + ctx := &core.MetaProcessContext{ + Context: c.ctx, + Limiter: c.hbConcurrencyLimiter, + Tracer: tracer, + TaskRunner: runner, + } + tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { tracer.OnAllStageFinished() return err