Skip to content

Commit

Permalink
--ratelimit and --ratelimit-window flags (#295)
Browse files Browse the repository at this point in the history
* .gitignore: *.zst

* --rps-limit flag

when set, rate limit operations during the prepare and the main phase of
the bench. The ratelimit applies globally to all concurrent worker of
each warp instance.
  • Loading branch information
fatpat authored Feb 22, 2024
1 parent 5a70e16 commit bddb29b
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 1 deletion.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@
/.idea
warp
*~
dist
dist

# zst files
*.zst
16 changes: 16 additions & 0 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/minio/pkg/v2/console"
"github.com/minio/warp/pkg/bench"
"github.com/minio/warp/pkg/generator"

"golang.org/x/time/rate"
)

// Collection of warp flags currently supported
Expand Down Expand Up @@ -250,6 +252,11 @@ var ioFlags = []cli.Flag{
EnvVar: appNameUC + "_INFLUXDB_CONNECT",
Usage: "Send operations to InfluxDB. Specify as 'http://<token>@<hostname>:<port>/<bucket>/<org>'",
},
cli.Float64Flag{
Name: "rps-limit",
Value: 0,
Usage: "Rate limit each instance to this number of requests per second (0 to disable)",
},
}

func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
Expand All @@ -263,6 +270,14 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
extra = append(extra, in)
}
}

rpsLimit := ctx.Float64("rps-limit")
var rpsLimiter *rate.Limiter
if rpsLimit > 0 {
// set burst to 1 as limiter will always be called to wait for 1 token
rpsLimiter = rate.NewLimiter(rate.Limit(rpsLimit), 1)
}

return bench.Common{
Client: newClient(ctx),
Concurrency: ctx.Int("concurrent"),
Expand All @@ -272,5 +287,6 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
PutOpts: putOpts(ctx),
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
RpsLimiter: rpsLimiter,
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
13 changes: 13 additions & 0 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/pkg/v2/console"
"github.com/minio/warp/pkg/generator"

"golang.org/x/time/rate"
)

type Benchmark interface {
Expand Down Expand Up @@ -93,6 +95,9 @@ type Common struct {

// Does destination support versioning?
Versioned bool

// ratelimiting
RpsLimiter *rate.Limiter
}

const (
Expand Down Expand Up @@ -250,3 +255,11 @@ func (c *Common) addCollector() {
}
c.Collector.extra = c.ExtraOut
}

func (c *Common) rpsLimit(ctx context.Context) error {
if c.RpsLimiter == nil {
return nil
}

return c.RpsLimiter.Wait(ctx)
}
10 changes: 10 additions & 0 deletions pkg/bench/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (d *Delete) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := d.Source()

for range obj {
opts := d.PutOpts
rcv := d.Collector.Receiver()
Expand All @@ -72,6 +73,11 @@ func (d *Delete) Prepare(ctx context.Context) error {
return
default:
}

if d.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
client, cldone := d.Client()
op := Operation{
Expand Down Expand Up @@ -157,6 +163,10 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err
default:
}

if d.rpsLimit(ctx) != nil {
return
}

// Fetch d.BatchSize objects
mu.Lock()
if len(d.objects) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/bench/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (g *Get) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := src.Object()

name := obj.Name
Expand Down Expand Up @@ -266,6 +271,11 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error)
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

fbr := firstByteRecorder{}
obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
Expand Down
9 changes: 9 additions & 0 deletions pkg/bench/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (d *List) Prepare(ctx context.Context) error {
return
default:
}

if d.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
// Assure we don't have duplicates
for {
Expand Down Expand Up @@ -197,6 +202,10 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
default:
}

if d.rpsLimit(ctx) != nil {
return
}

prefix := objs[0].Prefix
client, cldone := d.Client()
op := Operation{
Expand Down
11 changes: 11 additions & 0 deletions pkg/bench/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (g *Mixed) Prepare(ctx context.Context) error {
go func(i int) {

Check failure on line 173 in pkg/bench/mixed.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'i' seems to be unused, consider removing or renaming it as _ (revive)
defer wg.Done()
src := g.Source()

for range obj {
opts := g.PutOpts
done := ctx.Done()
Expand All @@ -182,6 +183,11 @@ func (g *Mixed) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
client, clDone := g.Client()
opts.ContentType = obj.ContentType
Expand Down Expand Up @@ -247,6 +253,11 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

operation := g.Dist.getOp()
switch operation {
case http.MethodGet:
Expand Down
10 changes: 10 additions & 0 deletions pkg/bench/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (g *Multipart) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

name := g.ObjName
// New input for each version
obj := src.Object()
Expand Down Expand Up @@ -213,6 +218,11 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations,
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

fbr := firstByteRecorder{}
part := rng.Intn(len(g.objects))
obj := g.objects[part]
Expand Down
5 changes: 5 additions & 0 deletions pkg/bench/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error)
return
default:
}

if u.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
opts.ContentType = obj.ContentType
client, cldone := u.Client()
Expand Down
11 changes: 11 additions & 0 deletions pkg/bench/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (g *Retention) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := g.Source()

for range obj {
opts := g.PutOpts
rcv := g.Collector.Receiver()
Expand All @@ -83,6 +84,11 @@ func (g *Retention) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
name := obj.Name
for ver := 0; ver < g.Versions; ver++ {
Expand Down Expand Up @@ -168,6 +174,11 @@ func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations,
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
op := Operation{
Expand Down
6 changes: 6 additions & 0 deletions pkg/bench/s3zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (g *S3Zip) Prepare(ctx context.Context) error {
return
default:
}

obj := src.Object()

opts.ContentType = obj.ContentType
Expand Down Expand Up @@ -150,6 +151,11 @@ func (g *S3Zip) Start(ctx context.Context, wait chan struct{}) (Operations, erro
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

fbr := firstByteRecorder{}
obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
Expand Down
10 changes: 10 additions & 0 deletions pkg/bench/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (g *Select) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
client, cldone := g.Client()
op := Operation{
Expand Down Expand Up @@ -153,6 +158,11 @@ func (g *Select) Start(ctx context.Context, wait chan struct{}) (Operations, err
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

fbr := firstByteRecorder{}
obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
Expand Down
5 changes: 5 additions & 0 deletions pkg/bench/snowball.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (s *Snowball) Start(ctx context.Context, wait chan struct{}) (Operations, e
return
default:
}

if s.rpsLimit(ctx) != nil {
return
}

buf.Reset()
w := io.Writer(&buf)
if s.Compress {
Expand Down
10 changes: 10 additions & 0 deletions pkg/bench/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (g *Stat) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := src.Object()

name := obj.Name
Expand Down Expand Up @@ -173,6 +178,11 @@ func (g *Stat) Start(ctx context.Context, wait chan struct{}) (Operations, error
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
op := Operation{
Expand Down
11 changes: 11 additions & 0 deletions pkg/bench/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (g *Versioned) Prepare(ctx context.Context) error {
go func(i int) {

Check failure on line 78 in pkg/bench/versioned.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'i' seems to be unused, consider removing or renaming it as _ (revive)
defer wg.Done()
src := g.Source()

for range obj {
opts := g.PutOpts
done := ctx.Done()
Expand All @@ -87,6 +88,11 @@ func (g *Versioned) Prepare(ctx context.Context) error {
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

obj := src.Object()
client, clDone := g.Client()
opts.ContentType = obj.ContentType
Expand Down Expand Up @@ -151,6 +157,11 @@ func (g *Versioned) Start(ctx context.Context, wait chan struct{}) (Operations,
return
default:
}

if g.rpsLimit(ctx) != nil {
return
}

operation := g.Dist.getOp()
switch operation {
case http.MethodGet:
Expand Down

0 comments on commit bddb29b

Please sign in to comment.