Skip to content

Commit

Permalink
client/controller: wait for tokens to reduce the debet
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Jul 31, 2024
1 parent 5d77447 commit ee638dc
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 9 deletions.
130 changes: 125 additions & 5 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
slowNotifyFilterDuration = 10 * time.Millisecond

watchRetryInterval = 30 * time.Second

bigRequestThreshold = 4 * 1024 * 1024 // 4MB -> 16 RRU
)

type selectType int
Expand All @@ -69,6 +71,9 @@ type ResourceGroupKVInterceptor interface {
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// OnResponseWait is used to consume tokens after receiving a response. If the response requires many tokens, we need to wait for the tokens.
// This is an optimized version of OnResponse for cases where the response requires many tokens, making the debt smaller and smoother.
OnResponseWait(ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, time.Duration, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}
Expand Down Expand Up @@ -642,7 +647,7 @@ func (c *ResourceGroupsController) OnRequestWait(
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
return gc.onRequestWait(ctx, info)
return gc.onRequestWaitImpl(ctx, info)
}

// OnResponse is used to consume tokens after receiving response
Expand All @@ -654,7 +659,19 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
return gc.onResponse(req, resp)
return gc.onResponseImpl(req, resp)
}

// OnResponseWait is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponseWait(
ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, time.Duration, error) {
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, time.Duration(0), nil
}
return gc.onResponseWaitImpl(ctx, req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand Down Expand Up @@ -722,6 +739,8 @@ type groupCostController struct {

// fast path to make once token limit with un-limit burst.
burstable *atomic.Bool
// is throttled
isThrottled *atomic.Bool

lowRUNotifyChan chan<- notifyMsg
tokenBucketUpdateChan chan<- *groupCostController
Expand Down Expand Up @@ -770,6 +789,8 @@ type groupMetricsCollection struct {
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Expand All @@ -784,6 +805,8 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: groupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: tokenConsumedHistogram.WithLabelValues(name),
}
}

Expand Down Expand Up @@ -841,6 +864,7 @@ func newGroupCostController(
tokenBucketUpdateChan: tokenBucketUpdateChan,
lowRUNotifyChan: lowRUNotifyChan,
burstable: &atomic.Bool{},
isThrottled: &atomic.Bool{},
}

switch gc.mode {
Expand Down Expand Up @@ -1179,6 +1203,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
if cfg.NewBurst < 0 {
cfg.NewTokens = float64(counter.getTokenBucketFunc().Settings.FillRate)
}
gc.isThrottled.Store(false)
} else {
// Otherwise the granted token is delivered to the client by fill rate.
cfg.NewTokens = 0
Expand All @@ -1199,6 +1224,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
counter.notify.setupNotificationThreshold = 1
counter.notify.mu.Unlock()
counter.lastDeadline = deadline
gc.isThrottled.Store(true)
select {
case gc.tokenBucketUpdateChan <- gc:
default:
Expand Down Expand Up @@ -1317,7 +1343,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {
return value
}

func (gc *groupCostController) onRequestWait(
func (gc *groupCostController) onRequestWaitImpl(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
delta := &rmpb.Consumption{}
Expand All @@ -1332,11 +1358,13 @@ func (gc *groupCostController) onRequestWait(

if !gc.burstable.Load() {
var err error
now := time.Now()
var i int
var d time.Duration
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
retryLoop:
for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ {
now := time.Now()
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
Expand All @@ -1352,6 +1380,10 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
gc.metrics.consumeTokenHistogram.Observe(v)
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
Expand Down Expand Up @@ -1399,7 +1431,7 @@ func (gc *groupCostController) onRequestWait(
return delta, penalty, waitDuration, gc.getMeta().GetPriority(), nil
}

func (gc *groupCostController) onResponse(
func (gc *groupCostController) onResponseImpl(
req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
Expand Down Expand Up @@ -1440,6 +1472,94 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

func (gc *groupCostController) onResponseWaitImpl(
ctx context.Context, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, time.Duration, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var (
waitDuration time.Duration
d time.Duration
err error
)
if !gc.burstable.Load() {
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
retryLoop:
for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ {
now := time.Now()
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
// allow debt for small request or not in throttled.
if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() {
counter.limiter.RemoveTokens(time.Now(), v)
break retryLoop
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
gc.metrics.consumeTokenHistogram.Observe(v)
}
// allow debt for small request or not in throttled.
if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() {
counter.limiter.RemoveTokens(time.Now(), v)
break retryLoop
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(context.Background(), now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
add(gc.mu.storeCounter[req.StoreID()], count)
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

return delta, waitDuration, nil
}

// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
Expand Down
46 changes: 43 additions & 3 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, _, priority, err := gc.onRequestWaitImpl(context.TODO(), testCase.req)
re.NoError(err, caseNum)
re.Equal(priority, gc.meta.Priority)
expectedConsumption := &rmpb.Consumption{}
if testCase.req.IsWrite() {
kvCalculator.calculateWriteCost(expectedConsumption, testCase.req)
re.Equal(expectedConsumption.WRU, consumption.WRU)
}
consumption, err = gc.onResponse(testCase.req, testCase.resp)
consumption, err = gc.onResponseImpl(testCase.req, testCase.resp)
re.NoError(err, caseNum)
kvCalculator.calculateReadCost(expectedConsumption, testCase.resp)
kvCalculator.calculateCPUCost(expectedConsumption, testCase.resp)
Expand All @@ -121,6 +121,46 @@ func TestRequestAndResponseConsumption(t *testing.T) {
}
}

func TestOnResponseWaitConsumption(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)

req := &TestRequestInfo{
isWrite: false,
}
resp := &TestResponseInfo{
readBytes: 2000 * 64 * 1024, // 2000RU
succeed: true,
}

consumption, waitTIme, err := gc.onResponseWaitImpl(context.TODO(), req, resp)
re.NoError(err)
re.Zero(waitTIme)
verify := func() {
expectedConsumption := &rmpb.Consumption{}
kvCalculator := gc.getKVCalculator()
kvCalculator.calculateReadCost(expectedConsumption, resp)
re.Equal(expectedConsumption.RRU, consumption.RRU)
}
verify()

// modify the counter, then on response should has wait time.
counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU]
gc.modifyTokenCounter(counter, &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 1000,
BurstLimit: 1000,
},
},
int64(5*time.Second/time.Millisecond),
)

consumption, waitTIme, err = gc.onResponseWaitImpl(context.TODO(), req, resp)
re.NoError(err)
re.NotZero(waitTIme)
verify()
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
Expand All @@ -129,7 +169,7 @@ func TestResourceGroupThrottledError(t *testing.T) {
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
_, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand Down
18 changes: 18 additions & 0 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ var (
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType})

groupRunningKVRequestCounter = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "running_kv_request",
Help: "Counter of running kv request.",
}, []string{newResourceGroupNameLabel})

requestRetryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -95,6 +103,14 @@ var (
Name: "low_token_notified",
Help: "Counter of low token request.",
}, []string{newResourceGroupNameLabel})
tokenConsumedHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "consume",
Buckets: []float64{.5, 1, 2, 5, 10, 15, 20, 40, 64, 128, 256, 512, 1024, 2048}, // 0 ~ 2048
Help: "Bucketed histogram of token consume.",
}, []string{newResourceGroupNameLabel})
)

var (
Expand All @@ -108,8 +124,10 @@ func init() {
prometheus.MustRegister(successfulRequestDuration)
prometheus.MustRegister(failedRequestCounter)
prometheus.MustRegister(failedLimitReserveDuration)
prometheus.MustRegister(groupRunningKVRequestCounter)
prometheus.MustRegister(requestRetryCounter)
prometheus.MustRegister(tokenRequestDuration)
prometheus.MustRegister(resourceGroupTokenRequestCounter)
prometheus.MustRegister(lowTokenRequestNotifyCounter)
prometheus.MustRegister(tokenConsumedHistogram)
}

0 comments on commit ee638dc

Please sign in to comment.