Skip to content

Commit

Permalink
address context
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 15, 2024
1 parent 4de6106 commit 9991d28
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 151 deletions.
40 changes: 40 additions & 0 deletions pkg/core/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

"github.com/tikv/pd/pkg/ratelimit"
)

// MetaProcessContext is a context for meta process.
type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
Limiter *ratelimit.ConcurrencyLimiter
}

// NewMetaProcessContext creates a new MetaProcessContext.
// used in tests.
func ContextTODO() *MetaProcessContext {
return &MetaProcessContext{
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
}
22 changes: 9 additions & 13 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/ctxutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -714,7 +713,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -729,14 +728,14 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
taskRunner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner)
limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter)
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
taskRunner := ctx.TaskRunner
limiter := ctx.Limiter
// print log asynchronously
if ok {
if taskRunner != nil {
debug = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
ctx,
ctx.Context,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
Expand All @@ -748,7 +747,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
info = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
ctx,
ctx.Context,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
Expand Down Expand Up @@ -982,11 +981,8 @@ func convertItemsToRegions(items []*regionItem) []*RegionInfo {
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {
tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(RegionHeartbeatProcessTracer)
if !ok {
tracer = NewNoopHeartbeatProcessTracer()
}
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
Expand Down
7 changes: 3 additions & 4 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package core

import (
"context"
"crypto/rand"
"fmt"
"math"
Expand Down Expand Up @@ -364,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(context.TODO(), regionA, regionB)
_, _, needSync := RegionGuide(ContextTODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -460,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) {
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(context.TODO(), region)
regions.AtomicCheckAndPutRegion(ContextTODO(), region)
}()
regions.AtomicCheckAndPutRegion(context.TODO(), region)
regions.AtomicCheckAndPutRegion(ContextTODO(), region)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

Expand Down
44 changes: 20 additions & 24 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/ctxutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -558,13 +557,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
tracer.Begin()
ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer)
ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter)
var runner ratelimit.Runner
runner = syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner)
runner = c.taskRunner
}

ctx := &core.MetaProcessContext{
Context: c.ctx,
Limiter: c.hbConcurrencyLimiter,
Tracer: tracer,
TaskRunner: runner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
tracer.OnAllStageFinished()
return err
Expand All @@ -575,28 +579,20 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
}

// processRegionHeartbeat updates the region information.
func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.RegionInfo) error {
tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(core.RegionHeartbeatProcessTracer)
if !ok {
tracer = core.NewNoopHeartbeatProcessTracer()
}
runner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner)
if !ok {
runner = syncRunner
}
limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter)
func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error {
tracer := ctx.Tracer
origin, _, err := c.PreCheckPutRegion(region)
tracer.OnPreCheckFinished()
if err != nil {
return err
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

runner.RunTask(
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleStatsAsync",
Limit: limiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
Expand All @@ -612,11 +608,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
runner.RunTask(
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "ObserveRegionStatsAsync",
Limit: limiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -640,11 +636,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
tracer.OnSaveCacheFinished()
return err
}
runner.RunTask(
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleOverlaps",
Limit: limiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -653,11 +649,11 @@ func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.Regio
}
tracer.OnSaveCacheFinished()
// handle region stats
runner.RunTask(
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "CollectRegionStatsAsync",
Limit: c.hbConcurrencyLimiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
Expand Down
7 changes: 7 additions & 0 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -205,6 +206,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
ctx := &core.MetaProcessContext{
Context: ctx,
TaskRunner: ratelimit.NewSyncRunner(),
Tracer: core.NewNoopHeartbeatProcessTracer(),
// no limit for followers.
}
saveKV, _, _ := regionGuide(ctx, region, origin)
overlaps := bc.PutRegion(region)

Expand Down
27 changes: 0 additions & 27 deletions pkg/utils/ctxutil/context.go

This file was deleted.

44 changes: 17 additions & 27 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/ctxutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/netutil"
Expand Down Expand Up @@ -1003,17 +1002,8 @@ var regionGuide = core.GenerateRegionGuideFunc(true)
var syncRunner = ratelimit.NewSyncRunner()

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.RegionInfo) error {
tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(core.RegionHeartbeatProcessTracer)
if !ok {
tracer = core.NewNoopHeartbeatProcessTracer()
}
runner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner)
if !ok {
runner = syncRunner
}
limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter)

func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error {
tracer := ctx.Tracer
origin, _, err := c.core.PreCheckPutRegion(region)
tracer.OnPreCheckFinished()
if err != nil {
Expand All @@ -1023,11 +1013,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
runner.RunTask(
ctx,
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "HandleStatsAsync",
Limit: limiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
Expand All @@ -1047,11 +1037,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
runner.RunTask(
ctx,
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "ObserveRegionStatsAsync",
Limit: limiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down Expand Up @@ -1080,11 +1070,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
return err
}
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
runner.RunTask(
ctx,
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "HandleOverlaps",
Limit: limiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -1096,11 +1086,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R

tracer.OnSaveCacheFinished()
// handle region stats
runner.RunTask(
ctx,
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "CollectRegionStatsAsync",
Limit: c.hbConcurrencyLimiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
Expand All @@ -1113,11 +1103,11 @@ func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.R
tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
if saveKV {
runner.RunTask(
ctx,
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.TaskOpts{
TaskName: "SaveRegionToKV",
Limit: c.hbConcurrencyLimiter,
Limit: ctx.Limiter,
},
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down
Loading

0 comments on commit 9991d28

Please sign in to comment.