From d0258f9731b5cab45cf24a104ad0d901ba6a954f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20LOYET?= <822436+fatpat@users.noreply.github.com> Date: Wed, 6 Dec 2023 06:49:08 +0100 Subject: [PATCH] [delete] bench object from listing the bucket backport listing feature from get to delete when doing a delete bench, during the prepare phase, instead of putting objects in the bucket use the objects already in there. This can be used to bench a specific use case with an already filled in bucket. New options: --list-existing to activate the listing bucket (current behaviour remains) --list-flat: disable recursive listing --- README.md | 9 ++- cli/delete.go | 11 +++ pkg/bench/delete.go | 190 ++++++++++++++++++++++++++++---------------- 3 files changed, 138 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index 05152df3..9dcabc27 100644 --- a/README.md +++ b/README.md @@ -324,13 +324,18 @@ It is possible by forcing md5 checksums on data by using the `--md5` option. ## DELETE -Benchmarking delete operations will upload `--objects` objects of size `--obj.size` and attempt to -delete as many it can within `--duration`. +Benchmarking delete operations will attempt to delete as many objects it can within `--duration`. + +By default, `--objects` objects of size `--obj.size` are uploaded beforing doin the actual bench. The delete operations are done in `--batch` objects per request in `--concurrent` concurrently running requests. If there are no more objects left the benchmark will end. +Using `--list-existing` will list at most `--objects` from the bucket and delete them instead of +deleting random objects (set it to 0 to use all objects from the lsiting). +Listing is restricted to `--prefix` if it is set and recursive listing can be disabled by setting `--list-flat`. + The analysis will include the upload stats as `PUT` operations and the `DELETE` operations. ``` diff --git a/cli/delete.go b/cli/delete.go index bc6f42f9..f5d486b8 100644 --- a/cli/delete.go +++ b/cli/delete.go @@ -39,6 +39,14 @@ var deleteFlags = []cli.Flag{ Value: 100, Usage: "Number of DELETE operations per batch.", }, + cli.BoolFlag{ + Name: "list-existing", + Usage: "Instead of preparing the bench by PUTing some objects, only use objects already in the bucket", + }, + cli.BoolFlag{ + Name: "list-flat", + Usage: "When using --list-existing, do not use recursive listing", + }, } var deleteCmd = cli.Command{ @@ -68,6 +76,9 @@ func mainDelete(ctx *cli.Context) error { Common: getCommon(ctx, newGenSource(ctx, "obj.size")), CreateObjects: ctx.Int("objects"), BatchSize: ctx.Int("batch"), + ListExisting: ctx.Bool("list-existing"), + ListFlat: ctx.Bool("list-flat"), + ListPrefix: ctx.String("prefix"), } return runBench(ctx, &b) } diff --git a/pkg/bench/delete.go b/pkg/bench/delete.go index 529658fd..40081feb 100644 --- a/pkg/bench/delete.go +++ b/pkg/bench/delete.go @@ -37,89 +37,139 @@ type Delete struct { CreateObjects int BatchSize int + ListExisting bool + ListFlat bool + ListPrefix string } // Prepare will create an empty bucket or delete any content already there // and upload a number of objects. func (d *Delete) Prepare(ctx context.Context) error { - if err := d.createEmptyBucket(ctx); err != nil { - return err - } - src := d.Source() - console.Eraseline() - console.Info("\rUploading ", d.CreateObjects, " objects of ", src.String()) - var wg sync.WaitGroup - wg.Add(d.Concurrency) - d.addCollector() - obj := make(chan struct{}, d.CreateObjects) - for i := 0; i < d.CreateObjects; i++ { - obj <- struct{}{} - } - close(obj) - var mu sync.Mutex var groupErr error - for i := 0; i < d.Concurrency; i++ { - go func(i int) { - defer wg.Done() - src := d.Source() - for range obj { - opts := d.PutOpts - rcv := d.Collector.Receiver() - done := ctx.Done() - select { - case <-done: - return - default: - } - obj := src.Object() - client, cldone := d.Client() - op := Operation{ - OpType: http.MethodPut, - Thread: uint16(i), - Size: obj.Size, - File: obj.Name, - ObjPerOp: 1, - Endpoint: client.EndpointURL().String(), - } + // prepare the bench by listing object from the bucket + if d.ListExisting { + cl, done := d.Client() - opts.ContentType = obj.ContentType - op.Start = time.Now() - res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts) - op.End = time.Now() - if err != nil { - err := fmt.Errorf("upload error: %w", err) - d.Error(err) - mu.Lock() - if groupErr == nil { - groupErr = err + // ensure the bucket exist + found, err := cl.BucketExists(ctx, d.Bucket) + if err != nil { + return err + } + if !found { + return (fmt.Errorf("bucket %s does not exist and --list-existing has been set", d.Bucket)) + } + + // list all objects + ctx, cancel := context.WithCancel(ctx) + defer cancel() + objectCh := cl.ListObjects(ctx, d.Bucket, minio.ListObjectsOptions{ + Prefix: d.ListPrefix, + Recursive: !d.ListFlat, + }) + + for object := range objectCh { + if object.Err != nil { + return object.Err + } + obj := generator.Object{ + Name: object.Key, + Size: object.Size, + } + + d.objects = append(d.objects, obj) + + // limit to ListingMaxObjects + if d.CreateObjects > 0 && len(d.objects) >= d.CreateObjects { + break + } + } + if len(d.objects) == 0 { + return (fmt.Errorf("no objects found for bucket %s", d.Bucket)) + } + done() + d.Collector = NewCollector() + + } else { // prepare the bench by creating the bucket and pushing some objects + + if err := d.createEmptyBucket(ctx); err != nil { + return err + } + src := d.Source() + console.Eraseline() + console.Info("\rUploading ", d.CreateObjects, " objects of ", src.String()) + var wg sync.WaitGroup + wg.Add(d.Concurrency) + d.addCollector() + obj := make(chan struct{}, d.CreateObjects) + for i := 0; i < d.CreateObjects; i++ { + obj <- struct{}{} + } + close(obj) + var mu sync.Mutex + for i := 0; i < d.Concurrency; i++ { + go func(i int) { + defer wg.Done() + src := d.Source() + for range obj { + opts := d.PutOpts + rcv := d.Collector.Receiver() + done := ctx.Done() + + select { + case <-done: + return + default: + } + obj := src.Object() + client, cldone := d.Client() + op := Operation{ + OpType: http.MethodPut, + Thread: uint16(i), + Size: obj.Size, + File: obj.Name, + ObjPerOp: 1, + Endpoint: client.EndpointURL().String(), } - mu.Unlock() - return - } - obj.VersionID = res.VersionID - if res.Size != obj.Size { - err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size) - d.Error(err) - mu.Lock() - if groupErr == nil { - groupErr = err + opts.ContentType = obj.ContentType + op.Start = time.Now() + res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts) + op.End = time.Now() + if err != nil { + err := fmt.Errorf("upload error: %w", err) + d.Error(err) + mu.Lock() + if groupErr == nil { + groupErr = err + } + mu.Unlock() + return } + obj.VersionID = res.VersionID + + if res.Size != obj.Size { + err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size) + d.Error(err) + mu.Lock() + if groupErr == nil { + groupErr = err + } + mu.Unlock() + return + } + cldone() + mu.Lock() + obj.Reader = nil + d.objects = append(d.objects, *obj) + d.prepareProgress(float64(len(d.objects)) / float64(d.CreateObjects)) mu.Unlock() - return + rcv <- op } - cldone() - mu.Lock() - obj.Reader = nil - d.objects = append(d.objects, *obj) - d.prepareProgress(float64(len(d.objects)) / float64(d.CreateObjects)) - mu.Unlock() - rcv <- op - } - }(i) + }(i) + } + wg.Wait() } - wg.Wait() // Shuffle objects. // Benchmark will pick from slice in order. @@ -217,7 +267,7 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err // Cleanup deletes everything uploaded to the bucket. func (d *Delete) Cleanup(ctx context.Context) { - if len(d.objects) > 0 { + if len(d.objects) > 0 && !d.ListExisting{ d.deleteAllInBucket(ctx, d.objects.Prefixes()...) } }