diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 916200bfa3e..8bd2616f41f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -28,6 +28,7 @@ type Cluster interface { GetLabelStats() *statistics.LabelStatistics GetCoordinator() *schedule.Coordinator GetRuleManager() *placement.RuleManager + GetBasicCluster() *core.BasicCluster } // HandleStatsAsync handles the flow asynchronously. @@ -55,8 +56,17 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { } // Collect collects the cluster information. -func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) { +func Collect(c Cluster, region *core.RegionInfo, hasRegionStats bool) { if hasRegionStats { - c.GetRegionStats().Observe(region, stores) + // get region again from root tree. make sure the observed region is the latest. + bc := c.GetBasicCluster() + if bc == nil { + return + } + region = bc.GetRegion(region.GetID()) + if region == nil { + return + } + c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region)) } } diff --git a/pkg/core/context.go b/pkg/core/context.go new file mode 100644 index 00000000000..ab149378b1d --- /dev/null +++ b/pkg/core/context.go @@ -0,0 +1,40 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + + "github.com/tikv/pd/pkg/ratelimit" +) + +// MetaProcessContext is a context for meta process. +type MetaProcessContext struct { + context.Context + Tracer RegionHeartbeatProcessTracer + TaskRunner ratelimit.Runner + Limiter *ratelimit.ConcurrencyLimiter +} + +// NewMetaProcessContext creates a new MetaProcessContext. +// used in tests, can be changed if no need to test concurrency. +func ContextTODO() *MetaProcessContext { + return &MetaProcessContext{ + Context: context.TODO(), + Tracer: NewNoopHeartbeatProcessTracer(), + TaskRunner: ratelimit.NewSyncRunner(), + // Limit default is nil + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index baabafa1fa9..41bfb4d31ad 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -16,6 +16,7 @@ package core import ( "bytes" + "context" "encoding/hex" "fmt" "math" @@ -35,6 +36,7 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -711,20 +713,50 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { noLog := func(string, ...zap.Field) {} - debug, info := noLog, noLog + d, i := noLog, noLog if enableLog { - debug = log.Debug - info = log.Info + d = log.Debug + i = log.Info } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + taskRunner := ctx.TaskRunner + limiter := ctx.Limiter + // print log asynchronously + debug, info := d, i + if taskRunner != nil { + debug = func(msg string, fields ...zap.Field) { + taskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "Log", + Limit: limiter, + }, + func(_ context.Context) { + d(msg, fields...) + }, + ) + } + info = func(msg string, fields ...zap.Field) { + taskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "Log", + Limit: limiter, + }, + func(_ context.Context) { + i(msg, fields...) + }, + ) + } + } if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", @@ -789,7 +821,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) { if log.GetLevel() <= zap.DebugLevel { - debug("down-peers changed", zap.Uint64("region-id", region.GetID())) + debug("down-peers changed", zap.Uint64("region-id", region.GetID()), zap.Reflect("before", origin.GetDownPeers()), zap.Reflect("after", region.GetDownPeers())) } saveCache, needSync = true, true return @@ -912,7 +944,7 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } - err := check(region, origin, ols) + err := check(region, origin, convertItemsToRegions(ols)) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) // return the state region to delete. @@ -933,48 +965,59 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo { } // PreCheckPutRegion checks if the region is valid to put. -func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) { - origin, overlaps := r.GetRelevantRegions(region, trace) +func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) { + origin, overlaps := r.GetRelevantRegions(region) err := check(region, origin, overlaps) return origin, overlaps, err } +func convertItemsToRegions(items []*regionItem) []*RegionInfo { + regions := make([]*RegionInfo, 0, len(items)) + for _, item := range items { + regions = append(regions, item.RegionInfo) + } + return regions +} + // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. -func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) { +func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { + tracer := ctx.Tracer r.t.Lock() var ols []*regionItem origin := r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } - trace.OnCheckOverlapsFinished() - err := check(region, origin, ols) + tracer.OnCheckOverlapsFinished() + err := check(region, origin, convertItemsToRegions(ols)) if err != nil { r.t.Unlock() - trace.OnValidateRegionFinished() + tracer.OnValidateRegionFinished() return nil, err } - trace.OnValidateRegionFinished() + tracer.OnValidateRegionFinished() origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) r.t.Unlock() - trace.OnSetRegionFinished() + tracer.OnSetRegionFinished() r.UpdateSubTree(region, origin, overlaps, rangeChanged) - trace.OnUpdateSubTreeFinished() + tracer.OnUpdateSubTreeFinished() return overlaps, nil } // GetRelevantRegions returns the relevant regions for a given region. -func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, _ RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) { +func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { r.t.RLock() defer r.t.RUnlock() origin = r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { - overlaps = r.tree.overlaps(®ionItem{RegionInfo: region}) + for _, item := range r.tree.overlaps(®ionItem{RegionInfo: region}) { + overlaps = append(overlaps, item.RegionInfo) + } } return } -func check(region, origin *RegionInfo, overlaps []*regionItem) error { +func check(region, origin *RegionInfo, overlaps []*RegionInfo) error { for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { @@ -1043,7 +1086,6 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol item = ®ionItem{RegionInfo: region} r.regions[region.GetID()] = item } - var overlaps []*RegionInfo if rangeChanged { overlaps = r.tree.update(item, withOverlaps, ol...) diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 3c6536a6a77..ae91886369f 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(regionA, regionB) + _, _, needSync := RegionGuide(ContextTODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -459,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) { regions := NewRegionsInfo() region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b")) go func() { - regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer()) + regions.AtomicCheckAndPutRegion(ContextTODO(), region) }() - regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer()) + regions.AtomicCheckAndPutRegion(ContextTODO(), region) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree")) } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 333e1730ec8..8c928f391eb 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -35,6 +35,11 @@ func (r *regionItem) GetStartKey() []byte { return r.meta.StartKey } +// GetID returns the ID of the region. +func (r *regionItem) GetID() uint64 { + return r.meta.GetId() +} + // GetEndKey returns the end key of the region. func (r *regionItem) GetEndKey() []byte { return r.meta.EndKey diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index 4e002fb8157..f4ef6cb67b3 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -158,6 +158,8 @@ func TestRegionTree(t *testing.T) { updateNewItem(tree, regionA) updateNewItem(tree, regionC) + re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c")))) + re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo) re.Nil(tree.search([]byte{})) re.Equal(regionA, tree.search([]byte("a"))) re.Nil(tree.search([]byte("b"))) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 7ee7ae88cd1..42e8c3a35cb 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,6 +2,7 @@ package server import ( "context" + "runtime" "sync" "sync/atomic" "time" @@ -15,6 +16,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" @@ -51,14 +53,22 @@ type Cluster struct { apiServerLeader atomic.Value clusterID uint64 running atomic.Bool + + taskRunner ratelimit.Runner + hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter } const ( regionLabelGCInterval = time.Hour requestTimeout = 3 * time.Second collectWaitTime = time.Minute + + // heartbeat relative const + hbConcurrentRunner = "heartbeat-concurrent-task-runner" ) +var syncRunner = ratelimit.NewSyncRunner() + // NewCluster creates a new cluster. func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { ctx, cancel := context.WithCancel(parentCtx) @@ -81,6 +91,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage: storage, clusterID: clusterID, checkMembershipCh: checkMembershipCh, + + taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute), + hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -517,6 +530,7 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() + c.taskRunner.Start() c.running.Store(true) } @@ -527,6 +541,7 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() + c.taskRunner.Stop() c.cancel() c.wg.Wait() } @@ -542,8 +557,19 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } + var runner ratelimit.Runner + runner = syncRunner + if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { + runner = c.taskRunner + } + ctx := &core.MetaProcessContext{ + Context: c.ctx, + Limiter: c.hbConcurrencyLimiter, + Tracer: tracer, + TaskRunner: runner, + } tracer.Begin() - if err := c.processRegionHeartbeat(region, tracer); err != nil { + if err := c.processRegionHeartbeat(ctx, region); err != nil { tracer.OnAllStageFinished() return err } @@ -553,26 +579,47 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { } // processRegionHeartbeat updates the region information. -func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { - origin, _, err := c.PreCheckPutRegion(region, tracer) +func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { + tracer := ctx.Tracer + origin, _, err := c.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - cluster.HandleStatsAsync(c, region) + ctx.TaskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "HandleStatsAsync", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + cluster.HandleStatsAsync(c, region) + }, + ) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + _, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.GetRegionStores(region)) + ctx.TaskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "ObserveRegionStatsAsync", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + if c.regionStats.RegionStatsNeedUpdate(region) { + cluster.Collect(c, region, hasRegionStats) + } + }, + ) } return nil } @@ -583,15 +630,35 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.Re // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.AtomicCheckAndPutRegion(region, tracer); err != nil { + + // Async task in next PR. + if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } - - cluster.HandleOverlaps(c, overlaps) + ctx.TaskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "HandleOverlaps", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + cluster.HandleOverlaps(c, overlaps) + }, + ) } tracer.OnSaveCacheFinished() - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + // handle region stats + ctx.TaskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "CollectRegionStatsAsync", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + cluster.Collect(c, region, hasRegionStats) + }, + ) tracer.OnCollectRegionStatsFinished() return nil } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index dd92a10179d..7f0ef21f791 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -29,6 +29,8 @@ const initialCapacity = 100 // Runner is the interface for running tasks. type Runner interface { RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error + Start() + Stop() } // Task is a task to be run. @@ -42,8 +44,8 @@ type Task struct { // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") -// AsyncRunner is a simple task runner that limits the number of concurrent tasks. -type AsyncRunner struct { +// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. +type ConcurrentRunner struct { name string maxPendingDuration time.Duration taskChan chan *Task @@ -53,16 +55,14 @@ type AsyncRunner struct { wg sync.WaitGroup } -// NewAsyncRunner creates a new AsyncRunner. -func NewAsyncRunner(name string, maxPendingDuration time.Duration) *AsyncRunner { - s := &AsyncRunner{ +// NewConcurrentRunner creates a new ConcurrentRunner. +func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *ConcurrentRunner { + s := &ConcurrentRunner{ name: name, maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), - stopChan: make(chan struct{}), } - s.Start() return s } @@ -74,7 +74,8 @@ type TaskOpts struct { } // Start starts the runner. -func (s *AsyncRunner) Start() { +func (s *ConcurrentRunner) Start() { + s.stopChan = make(chan struct{}) s.wg.Add(1) go func() { defer s.wg.Done() @@ -84,7 +85,6 @@ func (s *AsyncRunner) Start() { if task.Opts.Limit != nil { token, err := task.Opts.Limit.Acquire(context.Background()) if err != nil { - log.Error("failed to acquire semaphore", zap.String("task-name", task.Opts.TaskName), zap.Error(err)) continue } go s.run(task.Ctx, task.f, token) @@ -99,7 +99,7 @@ func (s *AsyncRunner) Start() { }() } -func (s *AsyncRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { +func (s *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { task(ctx) if token != nil { token.Release() @@ -107,7 +107,7 @@ func (s *AsyncRunner) run(ctx context.Context, task func(context.Context), token } } -func (s *AsyncRunner) processPendingTasks() { +func (s *ConcurrentRunner) processPendingTasks() { s.pendingMu.Lock() defer s.pendingMu.Unlock() for len(s.pendingTasks) > 0 { @@ -123,13 +123,13 @@ func (s *AsyncRunner) processPendingTasks() { } // Stop stops the runner. -func (s *AsyncRunner) Stop() { +func (s *ConcurrentRunner) Stop() { close(s.stopChan) s.wg.Wait() } // RunTask runs the task asynchronously. -func (s *AsyncRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { +func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { task := &Task{ Ctx: ctx, Opts: opt, @@ -166,3 +166,9 @@ func (*SyncRunner) RunTask(ctx context.Context, _ TaskOpts, f func(context.Conte f(ctx) return nil } + +// Start starts the runner. +func (*SyncRunner) Start() {} + +// Stop stops the runner. +func (*SyncRunner) Stop() {} diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go index 9b8dca231d1..507f8cf4ee8 100644 --- a/pkg/ratelimit/runner_test.go +++ b/pkg/ratelimit/runner_test.go @@ -23,10 +23,11 @@ import ( "github.com/stretchr/testify/require" ) -func TestAsyncRunner(t *testing.T) { +func TestConcurrentRunner(t *testing.T) { t.Run("RunTask", func(t *testing.T) { limiter := NewConcurrencyLimiter(1) - runner := NewAsyncRunner("test", time.Second) + runner := NewConcurrentRunner("test", time.Second) + runner.Start() defer runner.Stop() var wg sync.WaitGroup @@ -47,7 +48,8 @@ func TestAsyncRunner(t *testing.T) { t.Run("MaxPendingDuration", func(t *testing.T) { limiter := NewConcurrencyLimiter(1) - runner := NewAsyncRunner("test", 2*time.Millisecond) + runner := NewConcurrentRunner("test", 2*time.Millisecond) + runner.Start() defer runner.Stop() var wg sync.WaitGroup for i := 0; i < 10; i++ { diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index abf4c776f8a..1f370176383 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -52,6 +52,7 @@ const ( defaultEnableJointConsensus = true defaultEnableTiKVSplitRegion = true defaultEnableHeartbeatBreakdownMetrics = true + defaultEnableHeartbeatConcurrentRunner = false defaultEnableCrossTableMerge = true defaultEnableDiagnostic = true defaultStrictlyMatchLabel = false @@ -267,6 +268,9 @@ type ScheduleConfig struct { // EnableHeartbeatBreakdownMetrics is the option to enable heartbeat stats metrics. EnableHeartbeatBreakdownMetrics bool `toml:"enable-heartbeat-breakdown-metrics" json:"enable-heartbeat-breakdown-metrics,string"` + // EnableHeartbeatConcurrentRunner is the option to enable heartbeat concurrent runner. + EnableHeartbeatConcurrentRunner bool `toml:"enable-heartbeat-concurrent-runner" json:"enable-heartbeat-concurrent-runner,string"` + // Schedulers support for loading customized schedulers Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade @@ -382,6 +386,10 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) c.EnableHeartbeatBreakdownMetrics = defaultEnableHeartbeatBreakdownMetrics } + if !meta.IsDefined("enable-heartbeat-concurrent-runner") { + c.EnableHeartbeatConcurrentRunner = defaultEnableHeartbeatConcurrentRunner + } + if !meta.IsDefined("enable-cross-table-merge") { c.EnableCrossTableMerge = defaultEnableCrossTableMerge } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 52655b093a2..488763142e1 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -157,6 +157,18 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) { return true } + // expected to be zero for below type + if r.IsRegionStatsType(regionID, PendingPeer) && len(region.GetPendingPeers()) == 0 { + return true + } + if r.IsRegionStatsType(regionID, DownPeer) && len(region.GetDownPeers()) == 0 { + return true + } + if r.IsRegionStatsType(regionID, LearnerPeer) && len(region.GetLearners()) == 0 { + return true + } + + // merge return r.IsRegionStatsType(regionID, UndersizedRegion) != region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index ffbd71d2f1e..8a2e757d5cd 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" @@ -200,13 +201,18 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync)) } - tracer := core.NewNoopHeartbeatProcessTracer() - origin, _, err := bc.PreCheckPutRegion(region, tracer) + origin, _, err := bc.PreCheckPutRegion(region) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - saveKV, _, _ := regionGuide(region, origin) + ctx := &core.MetaProcessContext{ + Context: ctx, + TaskRunner: ratelimit.NewSyncRunner(), + Tracer: core.NewNoopHeartbeatProcessTracer(), + // no limit for followers. + } + saveKV, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a8116a5e78f..dd59b63240f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -21,6 +21,7 @@ import ( "io" "math" "net/http" + "runtime" "strconv" "strings" "sync" @@ -44,6 +45,7 @@ import ( mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/replication" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" @@ -103,6 +105,9 @@ const ( // minSnapshotDurationSec is the minimum duration that a store can tolerate. // It should enlarge the limiter if the snapshot's duration is less than this value. minSnapshotDurationSec = 5 + + // heartbeat relative const + hbConcurrentRunner = "heartbeat-async-task-runner" ) // Server is the interface for cluster. @@ -166,6 +171,9 @@ type RaftCluster struct { keyspaceGroupManager *keyspace.GroupManager independentServices sync.Map hbstreams *hbstream.HeartbeatStreams + + taskRunner ratelimit.Runner + hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter } // Status saves some state information. @@ -182,13 +190,15 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + taskRunner: ratelimit.NewConcurrentRunner(hbConcurrentRunner, time.Minute), + hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } } @@ -291,7 +301,6 @@ func (c *RaftCluster) Start(s Server) error { log.Warn("raft cluster has already been started") return nil } - c.isAPIServiceMode = s.IsAPIServiceMode() err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { @@ -347,6 +356,7 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true + c.taskRunner.Start() return nil } @@ -740,6 +750,7 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } + c.taskRunner.Stop() c.Unlock() c.wg.Wait() @@ -988,10 +999,12 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { } var regionGuide = core.GenerateRegionGuideFunc(true) +var syncRunner = ratelimit.NewSyncRunner() // processRegionHeartbeat updates the region information. -func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { - origin, _, err := c.core.PreCheckPutRegion(region, tracer) +func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { + tracer := ctx.Tracer + origin, _, err := c.core.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { return err @@ -1000,13 +1013,22 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer cor region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - cluster.HandleStatsAsync(c, region) + ctx.TaskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "HandleStatsAsync", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + cluster.HandleStatsAsync(c, region) + }, + ) } tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync := regionGuide(region, origin) + saveKV, saveCache, needSync := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1015,7 +1037,18 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer cor // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + ctx.TaskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "ObserveRegionStatsAsync", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + if c.regionStats.RegionStatsNeedUpdate(region) { + cluster.Collect(c, region, hasRegionStats) + } + }, + ) } return nil } @@ -1032,43 +1065,72 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer cor // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.AtomicCheckAndPutRegion(region, tracer); err != nil { + if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - cluster.HandleOverlaps(c, overlaps) + ctx.TaskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "HandleOverlaps", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + cluster.HandleOverlaps(c, overlaps) + }, + ) } regionUpdateCacheEventCounter.Inc() } tracer.OnSaveCacheFinished() - // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", - // region stats needs to be collected in API mode. - // We need to think of a better way to reduce this part of the cost in the future. - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + // handle region stats + ctx.TaskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "CollectRegionStatsAsync", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", + // region stats needs to be collected in API mode. + // We need to think of a better way to reduce this part of the cost in the future. + cluster.Collect(c, region, hasRegionStats) + }, + ) + tracer.OnCollectRegionStatsFinished() if c.storage != nil { - // If there are concurrent heartbeats from the same region, the last write will win even if - // writes to storage in the critical area. So don't use mutex to protect it. - // Not successfully saved to storage is not fatal, it only leads to longer warm-up - // after restart. Here we only log the error then go on updating cache. - for _, item := range overlaps { - if err := c.storage.DeleteRegion(item.GetMeta()); err != nil { - log.Error("failed to delete region from storage", - zap.Uint64("region-id", item.GetID()), - logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())), - errs.ZapError(err)) - } - } if saveKV { - if err := c.storage.SaveRegion(region.GetMeta()); err != nil { - log.Error("failed to save region to storage", - zap.Uint64("region-id", region.GetID()), - logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())), - errs.ZapError(err)) - } - regionUpdateKVEventCounter.Inc() + ctx.TaskRunner.RunTask( + ctx.Context, + ratelimit.TaskOpts{ + TaskName: "SaveRegionToKV", + Limit: ctx.Limiter, + }, + func(_ context.Context) { + // If there are concurrent heartbeats from the same region, the last write will win even if + // writes to storage in the critical area. So don't use mutex to protect it. + // Not successfully saved to storage is not fatal, it only leads to longer warm-up + // after restart. Here we only log the error then go on updating cache. + for _, item := range overlaps { + if err := c.storage.DeleteRegion(item.GetMeta()); err != nil { + log.Error("failed to delete region from storage", + zap.Uint64("region-id", item.GetID()), + logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())), + errs.ZapError(err)) + } + } + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { + log.Error("failed to save region to storage", + zap.Uint64("region-id", region.GetID()), + logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())), + errs.ZapError(err)) + } + regionUpdateKVEventCounter.Inc() + }, + ) } } @@ -2039,7 +2101,7 @@ func (c *RaftCluster) collectMetrics() { } func (c *RaftCluster) resetMetrics() { - resetHealthStatus() + c.resetHealthStatus() c.resetProgressIndicator() } @@ -2058,7 +2120,7 @@ func (c *RaftCluster) collectHealthStatus() { } } -func resetHealthStatus() { +func (*RaftCluster) resetHealthStatus() { healthStatusGauge.Reset() } @@ -2069,16 +2131,6 @@ func (c *RaftCluster) resetProgressIndicator() { storesETAGauge.Reset() } -func (c *RaftCluster) getRegionStoresLocked(region *core.RegionInfo) []*core.StoreInfo { - stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) - for _, p := range region.GetPeers() { - if store := c.core.GetStore(p.StoreId); store != nil { - stores = append(stores, store) - } - } - return stores -} - // OnStoreVersionChange changes the version of the cluster when needed. func (c *RaftCluster) OnStoreVersionChange() { c.RLock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index d3da21ab3dd..d01446ba143 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -631,7 +631,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { region := core.NewRegionInfo(regionMeta, leader, core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: utils.RegionHeartBeatReportInterval}), core.SetWrittenBytes(30000*10), core.SetWrittenKeys(300000*10)) - err = cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + err = cluster.processRegionHeartbeat(core.ContextTODO(), region) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -644,7 +644,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { StoreId: 4, } region = region.Clone(core.WithRemoveStorePeer(2), core.WithAddPeer(newPeer)) - err = cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + err = cluster.processRegionHeartbeat(core.ContextTODO(), region) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -681,8 +681,8 @@ func TestBucketHeartbeat(t *testing.T) { re.NoError(cluster.putStoreLocked(store)) } - re.NoError(cluster.processRegionHeartbeat(regions[0], core.NewNoopHeartbeatProcessTracer())) - re.NoError(cluster.processRegionHeartbeat(regions[1], core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), regions[0])) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), regions[1])) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.NoError(cluster.processReportBuckets(buckets)) re.Equal(buckets, cluster.GetRegion(uint64(1)).GetBuckets()) @@ -701,13 +701,13 @@ func TestBucketHeartbeat(t *testing.T) { // case5: region update should inherit buckets. newRegion := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) opt.SetRegionBucketEnabled(true) - re.NoError(cluster.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), newRegion)) re.Len(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys(), 2) // case6: disable region bucket in opt.SetRegionBucketEnabled(false) newRegion2 := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) - re.NoError(cluster.processRegionHeartbeat(newRegion2, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), newRegion2)) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.Empty(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys()) } @@ -733,25 +733,25 @@ func TestRegionHeartbeat(t *testing.T) { for i, region := range regions { // region does not exist. - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is the same, not updated. - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(stale, core.NewNoopHeartbeatProcessTracer())) + re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), stale)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -761,13 +761,13 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(stale, core.NewNoopHeartbeatProcessTracer())) + re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), stale)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -779,38 +779,38 @@ func TestRegionHeartbeat(t *testing.T) { }, })) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -820,47 +820,47 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Flashback region = region.Clone(core.WithFlashback(true, 1)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) region = region.Clone(core.WithFlashback(false, 0)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) } @@ -916,8 +916,7 @@ func TestRegionHeartbeat(t *testing.T) { core.WithNewRegionID(10000), core.WithDecVersion(), ) - tracer := core.NewHeartbeatProcessTracer() - re.Error(cluster.processRegionHeartbeat(overlapRegion, tracer)) + re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), overlapRegion)) region := &metapb.Region{} ok, err := storage.LoadRegion(regions[n-1].GetID(), region) re.True(ok) @@ -941,9 +940,11 @@ func TestRegionHeartbeat(t *testing.T) { core.WithStartKey(regions[n-2].GetStartKey()), core.WithNewRegionID(regions[n-1].GetID()+1), ) - tracer = core.NewHeartbeatProcessTracer() + tracer := core.NewHeartbeatProcessTracer() tracer.Begin() - re.NoError(cluster.processRegionHeartbeat(overlapRegion, tracer)) + ctx := core.ContextTODO() + ctx.Tracer = tracer + re.NoError(cluster.processRegionHeartbeat(ctx, overlapRegion)) tracer.OnAllStageFinished() re.Condition(func() bool { fileds := tracer.LogFields() @@ -977,7 +978,9 @@ func TestRegionFlowChanged(t *testing.T) { regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})} processRegions := func(regions []*core.RegionInfo) { for _, r := range regions { - cluster.processRegionHeartbeat(r, core.NewNoopHeartbeatProcessTracer()) + mctx := core.ContextTODO() + mctx.Context = ctx + cluster.processRegionHeartbeat(mctx, r) } } regions = core.SplitRegions(regions) @@ -1013,7 +1016,7 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys-1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(core.ContextTODO(), region) regionID := region.GetID() re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test ApproximateSize and ApproximateKeys change. @@ -1023,16 +1026,16 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys+1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(core.ContextTODO(), region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test MaxMergeRegionSize and MaxMergeRegionKeys change. cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize + 2)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys + 2)) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(core.ContextTODO(), region) re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys)) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(core.ContextTODO(), region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) } @@ -1095,11 +1098,11 @@ func TestConcurrentRegionHeartbeat(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat", "return(true)")) go func() { defer wg.Done() - cluster.processRegionHeartbeat(source, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(core.ContextTODO(), source) }() time.Sleep(100 * time.Millisecond) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat")) - re.NoError(cluster.processRegionHeartbeat(target, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), target)) wg.Wait() checkRegion(re, cluster.GetRegionByKey([]byte{}), target) } @@ -1161,7 +1164,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { // Heartbeat and check region one by one. for _, r := range regions { - re.NoError(cluster.processRegionHeartbeat(r, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), r)) checkRegion(re, cluster.GetRegion(r.GetID()), r) checkRegion(re, cluster.GetRegionByKey(r.GetStartKey()), r) @@ -1198,7 +1201,7 @@ func TestHeartbeatSplit(t *testing.T) { // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("foo")), region1) // split 1 to 2: [nil, m) 1: [m, nil), sync 2 first. @@ -1207,12 +1210,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region2, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region2)) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, nil) is missing before r1's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("z"))) - re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) // split 1 to 3: [m, q) 1: [q, nil), sync 1 first. @@ -1221,12 +1224,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, q) is missing before r3's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("n"))) - re.NoError(cluster.processRegionHeartbeat(region3, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region3)) checkRegion(re, cluster.GetRegionByKey([]byte("n")), region3) } @@ -1522,11 +1525,11 @@ func TestUpdateStorePendingPeerCount(t *testing.T) { }, } origin := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[:3]}, peers[0], core.WithPendingPeers(peers[1:3])) - re.NoError(tc.processRegionHeartbeat(origin, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), origin)) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 1, 1, 0}, tc.RaftCluster, re) newRegion := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[1:]}, peers[1], core.WithPendingPeers(peers[3:4])) - re.NoError(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 0, 0, 1}, tc.RaftCluster, re) } @@ -2137,6 +2140,7 @@ func newTestRaftCluster( opt *config.PersistOptions, s storage.Storage, ) *RaftCluster { + opt.GetScheduleConfig().EnableHeartbeatConcurrentRunner = false rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s} rc.InitCluster(id, opt, nil, nil) rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) @@ -2955,12 +2959,12 @@ func TestShouldRun(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(nr, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) } @@ -2998,12 +3002,12 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(nr, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index e23ae2100bd..5c2bb950297 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -24,6 +24,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/utils/logutil" @@ -38,8 +39,19 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } + var runner ratelimit.Runner + runner = syncRunner + if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { + runner = c.taskRunner + } + ctx := &core.MetaProcessContext{ + Context: c.ctx, + Limiter: c.hbConcurrencyLimiter, + Tracer: tracer, + TaskRunner: runner, + } tracer.Begin() - if err := c.processRegionHeartbeat(region, tracer); err != nil { + if err := c.processRegionHeartbeat(ctx, region); err != nil { tracer.OnAllStageFinished() return err } diff --git a/tests/cluster.go b/tests/cluster.go index cd90a6f2b45..c7368fe3c3a 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -88,6 +88,8 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err } func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { + // disable the heartbeat async runner in test + cfg.Schedule.EnableHeartbeatConcurrentRunner = false err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) if err != nil { return nil, err