diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index cc18817d9c5..7efef3c2b96 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -46,6 +46,8 @@ const ( slowNotifyFilterDuration = 10 * time.Millisecond watchRetryInterval = 30 * time.Second + + bigRequestThreshold = 4 * 1024 * 1024 // 4MB -> 16 RRU ) type selectType int @@ -69,6 +71,9 @@ type ResourceGroupKVInterceptor interface { OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) // OnResponse is used to consume tokens after receiving response. OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) + // OnResponseWait is used to consume tokens after receiving a response. If the response requires many tokens, we need to wait for the tokens. + // This is an optimized version of OnResponse for cases where the response requires many tokens, making the debt smaller and smoother. + OnResponseWait(ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, time.Duration, error) // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool } @@ -642,7 +647,7 @@ func (c *ResourceGroupsController) OnRequestWait( if err != nil { return nil, nil, time.Duration(0), 0, err } - return gc.onRequestWait(ctx, info) + return gc.onRequestWaitImpl(ctx, info) } // OnResponse is used to consume tokens after receiving response @@ -654,7 +659,19 @@ func (c *ResourceGroupsController) OnResponse( log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - return gc.onResponse(req, resp) + return gc.onResponseImpl(req, resp) +} + +// OnResponseWait is used to consume tokens after receiving response +func (c *ResourceGroupsController) OnResponseWait( + ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo, +) (*rmpb.Consumption, time.Duration, error) { + gc, ok := c.loadGroupController(resourceGroupName) + if !ok { + log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) + return &rmpb.Consumption{}, time.Duration(0), nil + } + return gc.onResponseWaitImpl(ctx, req, resp) } // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. @@ -722,6 +739,8 @@ type groupCostController struct { // fast path to make once token limit with un-limit burst. burstable *atomic.Bool + // is throttled + isThrottled *atomic.Bool lowRUNotifyChan chan<- notifyMsg tokenBucketUpdateChan chan<- *groupCostController @@ -770,6 +789,8 @@ type groupMetricsCollection struct { failedRequestCounterWithOthers prometheus.Counter failedRequestCounterWithThrottled prometheus.Counter tokenRequestCounter prometheus.Counter + runningKVRequestCounter prometheus.Gauge + consumeTokenHistogram prometheus.Observer } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -784,6 +805,8 @@ func initMetrics(oldName, name string) *groupMetricsCollection { failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType), requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name), tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name), + runningKVRequestCounter: groupRunningKVRequestCounter.WithLabelValues(name), + consumeTokenHistogram: tokenConsumedHistogram.WithLabelValues(name), } } @@ -841,6 +864,7 @@ func newGroupCostController( tokenBucketUpdateChan: tokenBucketUpdateChan, lowRUNotifyChan: lowRUNotifyChan, burstable: &atomic.Bool{}, + isThrottled: &atomic.Bool{}, } switch gc.mode { @@ -1179,6 +1203,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket if cfg.NewBurst < 0 { cfg.NewTokens = float64(counter.getTokenBucketFunc().Settings.FillRate) } + gc.isThrottled.Store(false) } else { // Otherwise the granted token is delivered to the client by fill rate. cfg.NewTokens = 0 @@ -1199,6 +1224,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket counter.notify.setupNotificationThreshold = 1 counter.notify.mu.Unlock() counter.lastDeadline = deadline + gc.isThrottled.Store(true) select { case gc.tokenBucketUpdateChan <- gc: default: @@ -1317,7 +1343,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { return value } -func (gc *groupCostController) onRequestWait( +func (gc *groupCostController) onRequestWaitImpl( ctx context.Context, info RequestInfo, ) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { delta := &rmpb.Consumption{} @@ -1332,11 +1358,13 @@ func (gc *groupCostController) onRequestWait( if !gc.burstable.Load() { var err error - now := time.Now() var i int var d time.Duration + gc.metrics.runningKVRequestCounter.Inc() + defer gc.metrics.runningKVRequestCounter.Dec() retryLoop: for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ { + now := time.Now() switch gc.mode { case rmpb.GroupMode_RawMode: res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) @@ -1352,6 +1380,10 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { + // record the consume token histogram if enable controller debug mode. + if enableControllerTraceLog.Load() { + gc.metrics.consumeTokenHistogram.Observe(v) + } res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } @@ -1399,7 +1431,7 @@ func (gc *groupCostController) onRequestWait( return delta, penalty, waitDuration, gc.getMeta().GetPriority(), nil } -func (gc *groupCostController) onResponse( +func (gc *groupCostController) onResponseImpl( req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error) { delta := &rmpb.Consumption{} @@ -1440,6 +1472,94 @@ func (gc *groupCostController) onResponse( return delta, nil } +func (gc *groupCostController) onResponseWaitImpl( + ctx context.Context, req RequestInfo, resp ResponseInfo, +) (*rmpb.Consumption, time.Duration, error) { + delta := &rmpb.Consumption{} + for _, calc := range gc.calculators { + calc.AfterKVRequest(delta, req, resp) + } + var ( + waitDuration time.Duration + d time.Duration + err error + ) + if !gc.burstable.Load() { + gc.metrics.runningKVRequestCounter.Inc() + defer gc.metrics.runningKVRequestCounter.Dec() + retryLoop: + for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ { + now := time.Now() + switch gc.mode { + case rmpb.GroupMode_RawMode: + res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { + // allow debt for small request or not in throttled. + if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() { + counter.limiter.RemoveTokens(time.Now(), v) + break retryLoop + } + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) + } + } + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { + break retryLoop + } + case rmpb.GroupMode_RUMode: + res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(delta, typ); v > 0 { + // record the consume token histogram if enable controller debug mode. + if enableControllerTraceLog.Load() { + gc.metrics.consumeTokenHistogram.Observe(v) + } + // allow debt for small request or not in throttled. + if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() { + counter.limiter.RemoveTokens(time.Now(), v) + break retryLoop + } + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) + } + } + if d, err = WaitReservations(context.Background(), now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { + break retryLoop + } + } + gc.metrics.requestRetryCounter.Inc() + time.Sleep(gc.mainCfg.WaitRetryInterval) + waitDuration += gc.mainCfg.WaitRetryInterval + } + if err != nil { + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() + } + return nil, waitDuration, err + } + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d + } + + gc.mu.Lock() + // Record the consumption of the request + add(gc.mu.consumption, delta) + // Record the consumption of the request by store + count := &rmpb.Consumption{} + *count = *delta + // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` + for _, calc := range gc.calculators { + calc.BeforeKVRequest(count, req) + } + add(gc.mu.storeCounter[req.StoreID()], count) + add(gc.mu.globalCounter, count) + gc.mu.Unlock() + + return delta, waitDuration, nil +} + // GetActiveResourceGroup is used to get active resource group. // This is used for test only. func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup { diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 821364c292f..a59be4d5a2d 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -104,7 +104,7 @@ func TestRequestAndResponseConsumption(t *testing.T) { kvCalculator := gc.getKVCalculator() for idx, testCase := range testCases { caseNum := fmt.Sprintf("case %d", idx) - consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req) + consumption, _, _, priority, err := gc.onRequestWaitImpl(context.TODO(), testCase.req) re.NoError(err, caseNum) re.Equal(priority, gc.meta.Priority) expectedConsumption := &rmpb.Consumption{} @@ -112,7 +112,7 @@ func TestRequestAndResponseConsumption(t *testing.T) { kvCalculator.calculateWriteCost(expectedConsumption, testCase.req) re.Equal(expectedConsumption.WRU, consumption.WRU) } - consumption, err = gc.onResponse(testCase.req, testCase.resp) + consumption, err = gc.onResponseImpl(testCase.req, testCase.resp) re.NoError(err, caseNum) kvCalculator.calculateReadCost(expectedConsumption, testCase.resp) kvCalculator.calculateCPUCost(expectedConsumption, testCase.resp) @@ -121,6 +121,46 @@ func TestRequestAndResponseConsumption(t *testing.T) { } } +func TestOnResponseWaitConsumption(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + req := &TestRequestInfo{ + isWrite: false, + } + resp := &TestResponseInfo{ + readBytes: 2000 * 64 * 1024, // 2000RU + succeed: true, + } + + consumption, waitTIme, err := gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + re.Zero(waitTIme) + verify := func() { + expectedConsumption := &rmpb.Consumption{} + kvCalculator := gc.getKVCalculator() + kvCalculator.calculateReadCost(expectedConsumption, resp) + re.Equal(expectedConsumption.RRU, consumption.RRU) + } + verify() + + // modify the counter, then on response should has wait time. + counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU] + gc.modifyTokenCounter(counter, &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 1000, + BurstLimit: 1000, + }, + }, + int64(5*time.Second/time.Millisecond), + ) + + consumption, waitTIme, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + re.NotZero(waitTIme) + verify() +} + func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) @@ -129,7 +169,7 @@ func TestResourceGroupThrottledError(t *testing.T) { writeBytes: 10000000, } // The group is throttled - _, _, _, _, err := gc.onRequestWait(context.TODO(), req) + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) re.Error(err) re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) } diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index faa2bad927e..48cdb082e77 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) + logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst)) if args.NewBurst < 0 { lim.last = now lim.tokens = args.NewTokens diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go index 30a0b850c7d..0706210207f 100644 --- a/client/resource_group/controller/metrics.go +++ b/client/resource_group/controller/metrics.go @@ -63,6 +63,14 @@ var ( Help: "Counter of failed request.", }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType}) + groupRunningKVRequestCounter = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "running_kv_request", + Help: "Counter of running kv request.", + }, []string{newResourceGroupNameLabel}) + requestRetryCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -95,6 +103,14 @@ var ( Name: "low_token_notified", Help: "Counter of low token request.", }, []string{newResourceGroupNameLabel}) + tokenConsumedHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: tokenRequestSubsystem, + Name: "consume", + Buckets: []float64{.5, 1, 2, 5, 10, 15, 20, 40, 64, 128, 256, 512, 1024, 2048}, // 0 ~ 2048 + Help: "Bucketed histogram of token consume.", + }, []string{newResourceGroupNameLabel}) ) var ( @@ -108,8 +124,10 @@ func init() { prometheus.MustRegister(successfulRequestDuration) prometheus.MustRegister(failedRequestCounter) prometheus.MustRegister(failedLimitReserveDuration) + prometheus.MustRegister(groupRunningKVRequestCounter) prometheus.MustRegister(requestRetryCounter) prometheus.MustRegister(tokenRequestDuration) prometheus.MustRegister(resourceGroupTokenRequestCounter) prometheus.MustRegister(lowTokenRequestNotifyCounter) + prometheus.MustRegister(tokenConsumedHistogram) }