Skip to content

Commit

Permalink
--ratelimit and --ratelimit-window flags
Browse files Browse the repository at this point in the history
when set, ratelimit operations during the prepare phase and the bench to
a global ratelimit threshold to each worker
  • Loading branch information
fatpat committed Jan 18, 2024
1 parent 1723cdb commit ddf3f50
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 8 deletions.
29 changes: 21 additions & 8 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/minio/cli"
"github.com/minio/mc/pkg/probe"
Expand Down Expand Up @@ -250,6 +251,16 @@ var ioFlags = []cli.Flag{
EnvVar: appNameUC + "_INFLUXDB_CONNECT",
Usage: "Send operations to InfluxDB. Specify as 'http://<token>@<hostname>:<port>/<bucket>/<org>'",
},
cli.Float64Flag{
Name: "ratelimit",
Value: 0,
Usage: "Ratelimit each worker to this number of actions per --ratelimit-window (set a positive value to activate, 0 by default)",
},
cli.DurationFlag{
Name: "ratelimit-window",
Value: 1 * time.Second,
Usage: "The time window to which the --ratelimit applies (1s by default)",
},
}

func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
Expand All @@ -264,13 +275,15 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
}
}
return bench.Common{
Client: newClient(ctx),
Concurrency: ctx.Int("concurrent"),
Source: src,
Bucket: ctx.String("bucket"),
Location: ctx.String("region"),
PutOpts: putOpts(ctx),
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
Client: newClient(ctx),
Concurrency: ctx.Int("concurrent"),
Source: src,
Bucket: ctx.String("bucket"),
Location: ctx.String("region"),
PutOpts: putOpts(ctx),
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
Ratelimit: ctx.Float64("ratelimit"),
RatelimitWindow: ctx.Duration("ratelimit-window"),
}
}
4 changes: 4 additions & 0 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type Common struct {

// Does destination support versioning?
Versioned bool

// ratelimiting
Ratelimit float64
RatelimitWindow time.Duration
}

const (
Expand Down
20 changes: 20 additions & 0 deletions pkg/bench/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (d *Delete) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := d.Source()
ratelimit := InitRatelimit(d.Ratelimit, d.RatelimitWindow)

for range obj {
opts := d.PutOpts
rcv := d.Collector.Receiver()
Expand All @@ -72,6 +74,15 @@ func (d *Delete) Prepare(ctx context.Context) error {
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

obj := src.Object()
client, cldone := d.Client()
op := Operation{
Expand Down Expand Up @@ -148,6 +159,7 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err
rcv := c.Receiver()
defer wg.Done()
done := ctx.Done()
ratelimit := InitRatelimit(d.Ratelimit, d.RatelimitWindow)

<-wait
for {
Expand All @@ -157,6 +169,14 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

// Fetch d.BatchSize objects
mu.Lock()
if len(d.objects) == 0 {
Expand Down
20 changes: 20 additions & 0 deletions pkg/bench/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,23 @@ func (g *Get) Prepare(ctx context.Context) error {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
ratelimit := InitRatelimit(g.Ratelimit, g.RatelimitWindow)

for range obj {
select {
case <-ctx.Done():
return
default:
}

if ratelimit.Limit() {
select {
case <-ctx.Done():
return
default:
}
}

obj := src.Object()

name := obj.Name
Expand Down Expand Up @@ -258,6 +268,7 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error)
defer wg.Done()
opts := g.GetOpts
done := ctx.Done()
ratelimit := InitRatelimit(g.Ratelimit, g.RatelimitWindow)

<-wait
for {
Expand All @@ -266,6 +277,15 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error)
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

fbr := firstByteRecorder{}
obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
Expand Down
19 changes: 19 additions & 0 deletions pkg/bench/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,23 @@ func (d *List) Prepare(ctx context.Context) error {
rcv := d.Collector.Receiver()
done := ctx.Done()
exists := make(map[string]struct{}, objPerPrefix)
ratelimit := InitRatelimit(d.Ratelimit, d.RatelimitWindow)

for j := 0; j < objPerPrefix; j++ {
select {
case <-done:
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

obj := src.Object()
// Assure we don't have duplicates
for {
Expand Down Expand Up @@ -188,6 +198,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
if d.NoPrefix {
wantN *= d.Concurrency
}
ratelimit := InitRatelimit(d.Ratelimit, d.RatelimitWindow)

<-wait
for {
Expand All @@ -197,6 +208,14 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

prefix := objs[0].Prefix
client, cldone := d.Client()
op := Operation{
Expand Down
21 changes: 21 additions & 0 deletions pkg/bench/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (g *Mixed) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := g.Source()
ratelimit := InitRatelimit(g.Ratelimit, g.RatelimitWindow)

for range obj {
opts := g.PutOpts
done := ctx.Done()
Expand All @@ -182,6 +184,15 @@ func (g *Mixed) Prepare(ctx context.Context) error {
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

obj := src.Object()
client, clDone := g.Client()
opts.ContentType = obj.ContentType
Expand Down Expand Up @@ -239,6 +250,7 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro
putOpts := g.PutOpts
statOpts := g.StatOpts
getOpts := g.GetOpts
ratelimit := InitRatelimit(g.Ratelimit, g.RatelimitWindow)

<-wait
for {
Expand All @@ -247,6 +259,15 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

operation := g.Dist.getOp()
switch operation {
case http.MethodGet:
Expand Down
20 changes: 20 additions & 0 deletions pkg/bench/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,23 @@ func (g *Multipart) Prepare(ctx context.Context) error {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
ratelimit := InitRatelimit(g.Ratelimit, g.RatelimitWindow)

for partN := range obj {
select {
case <-ctx.Done():
return
default:
}

if ratelimit.Limit() {
select {
case <-ctx.Done():
return
default:
}
}

name := g.ObjName
// New input for each version
obj := src.Object()
Expand Down Expand Up @@ -205,6 +215,7 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations,
defer wg.Done()
opts := g.GetOpts
done := ctx.Done()
ratelimit := InitRatelimit(g.Ratelimit, g.RatelimitWindow)

<-wait
for {
Expand All @@ -213,6 +224,15 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations,
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

fbr := firstByteRecorder{}
part := rng.Intn(len(g.objects))
obj := g.objects[part]
Expand Down
10 changes: 10 additions & 0 deletions pkg/bench/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error)
defer wg.Done()
opts := u.PutOpts
done := ctx.Done()
ratelimit := InitRatelimit(u.Ratelimit, u.RatelimitWindow)

<-wait
for {
Expand All @@ -67,6 +68,15 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error)
return
default:
}

if ratelimit.Limit() {
select {
case <-done:
return
default:
}
}

obj := src.Object()
opts.ContentType = obj.ContentType
client, cldone := u.Client()
Expand Down
50 changes: 50 additions & 0 deletions pkg/bench/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package bench

import (
"time"
)

type Ratelimit struct {
limit float64
window time.Duration
counter int64
start time.Time
}

func InitRatelimit(limit float64, window time.Duration) Ratelimit {
return Ratelimit{
limit: limit,
window: window,
counter: 0,
}
}

// Limit returns true if the ratelimit is effective, false otherwise
// Check there is room for a new request, if not, sleeping for the duration
// that will requests rate to stay whithin the ratelimit
func (r *Ratelimit) Limit() bool {
if r.limit <= 0 {
return false
}

// init the relative clock when first used
if r.start.IsZero() {
r.start = time.Now()
}

// calculate the time to sleep before next request to stay under the limit within the window
// sleepDuration <= 0: we have room for new requests, no need to ratelimit
// sleepDuration > 0: no more room for new requests, we have to wait before allowing new requests
elapsed := time.Since(r.start)
sleepDuration := time.Duration(float64(r.counter)/(r.limit/float64(r.window))) - elapsed

// increment the number of requests
r.counter++

if sleepDuration <= 0 {
return false
}

time.Sleep(sleepDuration)
return true
}
Loading

0 comments on commit ddf3f50

Please sign in to comment.