Skip to content

Commit

Permalink
adding multiupload support in minio client (#1)
Browse files Browse the repository at this point in the history
* adding multiupload support in minio client

* add TTFB as part of 'mc admin trace' in pretty form (minio#4648)

* update deps to latest packages and update CREDITS (minio#4649)

---------

Co-authored-by: Harshavardhana <[email protected]>
  • Loading branch information
sunilmhta and harshavardhana authored Aug 15, 2023
1 parent 0a529d5 commit c4a4e75
Show file tree
Hide file tree
Showing 8 changed files with 975 additions and 35 deletions.
8 changes: 6 additions & 2 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func preserveAttributes(fd *os.File, attr map[string]string) *probe.Error {
return nil
}

/// Object operations.
// / Object operations.

func (f *fsClient) put(_ context.Context, reader io.Reader, size int64, progress io.Reader, opts PutOptions) (int64, *probe.Error) {
// ContentType is not handled on purpose.
Expand Down Expand Up @@ -392,6 +392,10 @@ func (f *fsClient) Put(ctx context.Context, reader io.Reader, size int64, progre
return f.put(ctx, reader, size, progress, opts)
}

func (f *fsClient) PutMultiple(ctx context.Context, readers []io.Reader, sizes []int64, progress io.Reader, opts []PutOptions) (n int64, err *probe.Error) {
panic("fsClient does not implement PutMultiple method. Try Put method.")
}

func (f *fsClient) putN(_ context.Context, reader io.Reader, size int64, progress io.Reader, opts PutOptions) (int64, *probe.Error) {
// ContentType is not handled on purpose.
// For filesystem this is a redundant information.
Expand Down Expand Up @@ -1030,7 +1034,7 @@ func (f *fsClient) listRecursiveInRoutine(contentCh chan *ClientContent) {
return nil
}

/// In following situations we need to handle listing properly.
// / In following situations we need to handle listing properly.
// - When filepath is '/usr' and prefix is '/usr/bi'
// - When filepath is '/usr/bin/subdir' and prefix is '/usr/bi'
// - Do not check filePrefix if its '.'
Expand Down
115 changes: 93 additions & 22 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type S3Client struct {
targetURL *ClientURL
api *minio.Client
virtualStyle bool
zcnClient *ZcnClient
}

const (
Expand Down Expand Up @@ -157,14 +158,8 @@ func newFactory() func(config *Config) (Client, *probe.Error) {
defer mutex.Unlock()
var api *minio.Client
var found bool
if api, found = clientCache[confSum]; !found {
// if Signature version '4' use NewV4 directly.
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, config.SessionToken)
// if Signature version '2' use NewV2 directly.
if strings.ToUpper(config.Signature) == "S3V2" {
creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "")
}

getTransport := func() http.RoundTripper {
var transport http.RoundTripper

if config.Transport != nil {
Expand Down Expand Up @@ -223,6 +218,16 @@ func newFactory() func(config *Config) (Client, *probe.Error) {
}
}

return transport
}
if api, found = clientCache[confSum]; !found {
// if Signature version '4' use NewV4 directly.
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, config.SessionToken)
// if Signature version '2' use NewV2 directly.
if strings.ToUpper(config.Signature) == "S3V2" {
creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "")
}

// Not found. Instantiate a new MinIO
var e error

Expand All @@ -231,7 +236,7 @@ func newFactory() func(config *Config) (Client, *probe.Error) {
Secure: useTLS,
Region: os.Getenv("MC_REGION"),
BucketLookup: config.Lookup,
Transport: transport,
Transport: getTransport(),
}

api, e = minio.New(hostName, &options)
Expand All @@ -253,6 +258,7 @@ func newFactory() func(config *Config) (Client, *probe.Error) {

// Store the new api object.
s3Clnt.api = api
s3Clnt.zcnClient = NewZcnClient(config, getTransport(), targetURL)

return s3Clnt, nil
}
Expand Down Expand Up @@ -978,6 +984,55 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
return 0, probe.NewError(BucketNameEmpty{})
}

opts, err := c.getMinioPutOptions(putOpts, progress)
if err != nil {
return 0, err
}

ui, e := c.api.PutObject(ctx, bucket, object, reader, size, opts)
// ui, e := c.api.PutMultipleObjects(ctx, bucket, []io.Reader{reader}, []int64{size}, []minio.PutObjectOptions{opts})
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "UnexpectedEOF" || e == io.EOF {
return ui.Size, probe.NewError(UnexpectedEOF{
TotalSize: size,
TotalWritten: ui.Size,
})
}
if errResponse.Code == "AccessDenied" {
return ui.Size, probe.NewError(PathInsufficientPermission{
Path: c.targetURL.String(),
})
}
if errResponse.Code == "MethodNotAllowed" {
return ui.Size, probe.NewError(ObjectAlreadyExists{
Object: object,
})
}
if errResponse.Code == "XMinioObjectExistsAsDirectory" {
return ui.Size, probe.NewError(ObjectAlreadyExistsAsDirectory{
Object: object,
})
}
if errResponse.Code == "NoSuchBucket" {
return ui.Size, probe.NewError(BucketDoesNotExist{
Bucket: bucket,
})
}
if errResponse.Code == "InvalidBucketName" {
return ui.Size, probe.NewError(BucketInvalid{
Bucket: bucket,
})
}
if errResponse.Code == "NoSuchKey" {
return ui.Size, probe.NewError(ObjectMissing{})
}
return ui.Size, probe.NewError(e)
}
return ui.Size, nil
}

func (c *S3Client) getMinioPutOptions(putOpts PutOptions, progress io.Reader) (minio.PutObjectOptions, *probe.Error) {
metadata := make(map[string]string, len(putOpts.metadata))
for k, v := range putOpts.metadata {
metadata[k] = v
Expand Down Expand Up @@ -1019,7 +1074,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
if ok {
tagsSet, e := tags.Parse(tagsHdr, true)
if e != nil {
return 0, probe.NewError(e)
return minio.PutObjectOptions{}, probe.NewError(e)
}
tagsMap = tagsSet.ToMap()
delete(metadata, "X-Amz-Tagging")
Expand All @@ -1044,8 +1099,8 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
opts := minio.PutObjectOptions{
UserMetadata: metadata,
UserTags: tagsMap,
Progress: progress,
ContentType: contentType,
Progress: progress,
CacheControl: cacheControl,
ContentDisposition: contentDisposition,
ContentEncoding: contentEncoding,
Expand Down Expand Up @@ -1074,12 +1129,32 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
opts.SendContentMd5 = true
}

ui, e := c.api.PutObject(ctx, bucket, object, reader, size, opts)
return opts, nil
}

func (c *S3Client) PutMultiple(ctx context.Context, readers []io.Reader, sizes []int64, progress io.Reader, putOpts []PutOptions) (int64, *probe.Error) {
// do not use the object name from here.
bucket, _ := c.url2BucketAndObject()
if bucket == "" {
return 0, probe.NewError(BucketNameEmpty{})
}

opts, e := c.getMinioPutOptions(putOpts[0], progress)
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "UnexpectedEOF" || e == io.EOF {
return 0, e
}

var objects []string
for _, opt := range putOpts {
_, object := url2BucketAndObject(&opt.targetUrl)
objects = append(objects, object)
}
ui, err := c.zcnClient.PutMultipleObjects(ctx, bucket, objects, readers, sizes, opts)
if err != nil {
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == "UnexpectedEOF" || err == io.EOF {
return ui.Size, probe.NewError(UnexpectedEOF{
TotalSize: size,
TotalSize: sizes[0],
TotalWritten: ui.Size,
})
}
Expand All @@ -1088,14 +1163,9 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
Path: c.targetURL.String(),
})
}
if errResponse.Code == "MethodNotAllowed" {
return ui.Size, probe.NewError(ObjectAlreadyExists{
Object: object,
})
}
if errResponse.Code == "XMinioObjectExistsAsDirectory" {
return ui.Size, probe.NewError(ObjectAlreadyExistsAsDirectory{
Object: object,
Object: objects[0],
})
}
if errResponse.Code == "NoSuchBucket" {
Expand All @@ -1111,8 +1181,9 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
if errResponse.Code == "NoSuchKey" {
return ui.Size, probe.NewError(ObjectMissing{})
}
return ui.Size, probe.NewError(e)
return ui.Size, probe.NewError(err)
}

return ui.Size, nil
}

Expand Down Expand Up @@ -1740,7 +1811,7 @@ func (c *S3Client) splitPath(path string) (bucketName, objectName string) {
return tokens[0], tokens[1]
}

/// Bucket API operations.
// / Bucket API operations.

func (c *S3Client) listVersions(ctx context.Context, b, o string, opts ListOptions) chan minio.ObjectInfo {
objectInfoCh := make(chan minio.ObjectInfo)
Expand Down
2 changes: 2 additions & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type PutOptions struct {
multipartSize uint64
multipartThreads uint
concurrentStream bool
targetUrl ClientURL
}

// StatOptions holds options of the HEAD operation
Expand Down Expand Up @@ -126,6 +127,7 @@ type Client interface {
// I/O operations with metadata.
Get(ctx context.Context, opts GetOptions) (reader io.ReadCloser, err *probe.Error)
Put(ctx context.Context, reader io.Reader, size int64, progress io.Reader, opts PutOptions) (n int64, err *probe.Error)
PutMultiple(ctx context.Context, readers []io.Reader, sizes []int64, progress io.Reader, opts []PutOptions) (n int64, err *probe.Error)

// Object Locking related API
PutObjectRetention(ctx context.Context, versionID string, mode minio.RetentionMode, retainUntilDate time.Time, bypassGovernance bool) *probe.Error
Expand Down
Loading

0 comments on commit c4a4e75

Please sign in to comment.