Skip to content

Commit

Permalink
[delete] bench object from listing the bucket
Browse files Browse the repository at this point in the history
backport listing feature from get to delete

when doing a delete bench, during the prepare phase, instead of putting
objects in the bucket use the objects already in there. This can be used
to bench a specific use case with an already filled in bucket.

New options:
--list-existing to activate the listing bucket (current
  behaviour remains)
--list-flat: disable recursive listing
  • Loading branch information
fatpat committed Dec 6, 2023
1 parent 72415ee commit d0258f9
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 72 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,18 @@ It is possible by forcing md5 checksums on data by using the `--md5` option.

## DELETE

Benchmarking delete operations will upload `--objects` objects of size `--obj.size` and attempt to
delete as many it can within `--duration`.
Benchmarking delete operations will attempt to delete as many objects it can within `--duration`.

By default, `--objects` objects of size `--obj.size` are uploaded beforing doin the actual bench.

The delete operations are done in `--batch` objects per request in `--concurrent` concurrently running requests.

If there are no more objects left the benchmark will end.

Using `--list-existing` will list at most `--objects` from the bucket and delete them instead of
deleting random objects (set it to 0 to use all objects from the lsiting).
Listing is restricted to `--prefix` if it is set and recursive listing can be disabled by setting `--list-flat`.

The analysis will include the upload stats as `PUT` operations and the `DELETE` operations.

```
Expand Down
11 changes: 11 additions & 0 deletions cli/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ var deleteFlags = []cli.Flag{
Value: 100,
Usage: "Number of DELETE operations per batch.",
},
cli.BoolFlag{
Name: "list-existing",
Usage: "Instead of preparing the bench by PUTing some objects, only use objects already in the bucket",
},
cli.BoolFlag{
Name: "list-flat",
Usage: "When using --list-existing, do not use recursive listing",
},
}

var deleteCmd = cli.Command{
Expand Down Expand Up @@ -68,6 +76,9 @@ func mainDelete(ctx *cli.Context) error {
Common: getCommon(ctx, newGenSource(ctx, "obj.size")),
CreateObjects: ctx.Int("objects"),
BatchSize: ctx.Int("batch"),
ListExisting: ctx.Bool("list-existing"),
ListFlat: ctx.Bool("list-flat"),
ListPrefix: ctx.String("prefix"),
}
return runBench(ctx, &b)
}
Expand Down
190 changes: 120 additions & 70 deletions pkg/bench/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,89 +37,139 @@ type Delete struct {

CreateObjects int
BatchSize int
ListExisting bool
ListFlat bool
ListPrefix string
}

// Prepare will create an empty bucket or delete any content already there
// and upload a number of objects.
func (d *Delete) Prepare(ctx context.Context) error {
if err := d.createEmptyBucket(ctx); err != nil {
return err
}
src := d.Source()
console.Eraseline()
console.Info("\rUploading ", d.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(d.Concurrency)
d.addCollector()
obj := make(chan struct{}, d.CreateObjects)
for i := 0; i < d.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
var mu sync.Mutex
var groupErr error
for i := 0; i < d.Concurrency; i++ {
go func(i int) {
defer wg.Done()
src := d.Source()
for range obj {
opts := d.PutOpts
rcv := d.Collector.Receiver()
done := ctx.Done()

select {
case <-done:
return
default:
}
obj := src.Object()
client, cldone := d.Client()
op := Operation{
OpType: http.MethodPut,
Thread: uint16(i),
Size: obj.Size,
File: obj.Name,
ObjPerOp: 1,
Endpoint: client.EndpointURL().String(),
}
// prepare the bench by listing object from the bucket
if d.ListExisting {
cl, done := d.Client()

opts.ContentType = obj.ContentType
op.Start = time.Now()
res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts)
op.End = time.Now()
if err != nil {
err := fmt.Errorf("upload error: %w", err)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
// ensure the bucket exist
found, err := cl.BucketExists(ctx, d.Bucket)
if err != nil {
return err
}
if !found {
return (fmt.Errorf("bucket %s does not exist and --list-existing has been set", d.Bucket))
}

// list all objects
ctx, cancel := context.WithCancel(ctx)
defer cancel()
objectCh := cl.ListObjects(ctx, d.Bucket, minio.ListObjectsOptions{
Prefix: d.ListPrefix,
Recursive: !d.ListFlat,
})

for object := range objectCh {
if object.Err != nil {
return object.Err
}
obj := generator.Object{
Name: object.Key,
Size: object.Size,
}

d.objects = append(d.objects, obj)

// limit to ListingMaxObjects
if d.CreateObjects > 0 && len(d.objects) >= d.CreateObjects {
break
}
}
if len(d.objects) == 0 {
return (fmt.Errorf("no objects found for bucket %s", d.Bucket))
}
done()
d.Collector = NewCollector()

} else { // prepare the bench by creating the bucket and pushing some objects

if err := d.createEmptyBucket(ctx); err != nil {
return err
}
src := d.Source()
console.Eraseline()
console.Info("\rUploading ", d.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(d.Concurrency)
d.addCollector()
obj := make(chan struct{}, d.CreateObjects)
for i := 0; i < d.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
var mu sync.Mutex
for i := 0; i < d.Concurrency; i++ {
go func(i int) {
defer wg.Done()
src := d.Source()
for range obj {
opts := d.PutOpts
rcv := d.Collector.Receiver()
done := ctx.Done()

select {
case <-done:
return
default:
}
obj := src.Object()
client, cldone := d.Client()
op := Operation{
OpType: http.MethodPut,
Thread: uint16(i),
Size: obj.Size,
File: obj.Name,
ObjPerOp: 1,
Endpoint: client.EndpointURL().String(),
}
mu.Unlock()
return
}
obj.VersionID = res.VersionID

if res.Size != obj.Size {
err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
opts.ContentType = obj.ContentType
op.Start = time.Now()
res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts)
op.End = time.Now()
if err != nil {
err := fmt.Errorf("upload error: %w", err)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
}
mu.Unlock()
return
}
obj.VersionID = res.VersionID

if res.Size != obj.Size {
err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
}
mu.Unlock()
return
}
cldone()
mu.Lock()
obj.Reader = nil
d.objects = append(d.objects, *obj)
d.prepareProgress(float64(len(d.objects)) / float64(d.CreateObjects))
mu.Unlock()
return
rcv <- op
}
cldone()
mu.Lock()
obj.Reader = nil
d.objects = append(d.objects, *obj)
d.prepareProgress(float64(len(d.objects)) / float64(d.CreateObjects))
mu.Unlock()
rcv <- op
}
}(i)
}(i)
}
wg.Wait()
}
wg.Wait()

// Shuffle objects.
// Benchmark will pick from slice in order.
Expand Down Expand Up @@ -217,7 +267,7 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err

// Cleanup deletes everything uploaded to the bucket.
func (d *Delete) Cleanup(ctx context.Context) {
if len(d.objects) > 0 {
if len(d.objects) > 0 && !d.ListExisting{
d.deleteAllInBucket(ctx, d.objects.Prefixes()...)
}
}

0 comments on commit d0258f9

Please sign in to comment.