From e6462103098439e70737b454bcd3bdc87cb3ee2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20L=C3=BCtzner?= Date: Tue, 22 Feb 2022 08:21:43 +0100 Subject: [PATCH 1/3] fs-v1-multipart: Only stat remaining object parts There's no need to continously stat all object parts on disk if most of them have already been appended. (cherry picked from commit 3d4fb6aa92235671f23aa1cd2e2d91ba77b3f3ae) --- cmd/fs-v1-multipart.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index e8d7a45e3cf49..05aa765592aee 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -81,7 +81,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa // Since we append sequentially nextPartNumber will always be len(file.parts)+1 nextPartNumber := len(file.parts) + 1 uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) - parts, err := fs.getUploadedChunks(ctx, bucket, object, uploadID) + parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, nextPartNumber) if err != nil { logger.LogIf(ctx, err) return @@ -120,7 +120,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa } // Get uploaded chunks from multipart folder in metabucket -func (fs *FSObjects) getUploadedChunks(ctx context.Context, bucket, object, uploadID string) ([]ObjectPartInfo, error) { +func (fs *FSObjects) getRemainingUploadedChunks(ctx context.Context, bucket, object, uploadID string, nextPartNumber int) ([]ObjectPartInfo, error) { // Setting count to -1 will read everything. foundFiles, err := fs.disk.ListDir(ctx, pathJoin(minioMetaMultipartBucket, fs.getMultipartSHADir(bucket, object)), uploadID, -1) if err != nil { @@ -138,6 +138,11 @@ func (fs *FSObjects) getUploadedChunks(ctx context.Context, bucket, object, uplo continue } + if number < nextPartNumber { + // Part already appended. + continue + } + uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) fi, err := fs.disk.StatObject(ctx, pathJoin(minioMetaMultipartBucket, uploadIDPath, file)) if err != nil { @@ -535,7 +540,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload result.PartNumberMarker = partNumberMarker result.UserDefined = cloneMSS(fi.Metadata) - parts, err := fs.getUploadedChunks(ctx, bucket, object, uploadID) + parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, -1) if err != nil { return ListPartsInfo{}, toObjectErr(err, bucket, object) } @@ -635,7 +640,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Allocate parts similar to incoming slice. fi.Parts = make([]ObjectPartInfo, len(parts)) - chunks, err := fs.getUploadedChunks(ctx, bucket, object, uploadID) + chunks, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, -1) if err != nil { return ObjectInfo{}, err } From e5587860adc7ddd9c69ef0bf6632112a4d686a0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20L=C3=BCtzner?= Date: Fri, 25 Feb 2022 11:36:58 +0100 Subject: [PATCH 2/3] fs-v1-multipart: Remove backgroundAppend operation The backgroundAppend was meant as a performance optimization where uploaded chunks would be aggregated to a complete file in the background while more parts were uploaded at the same time. This might be an optimization on actual filesystem paths, but on Gluster it made the operation a lot slower because many redundant filesystem calls were executed. The CompleteMultipartUpload request which requires the uploaded chunks to be put together has to 1. wait for any ongoing backgroundAppend operations to complete, 2. enumerate all chunks and check if they were put together in the right order (otherwise start from scratch), 3. move the aggregated file. Removing this piece of code started out as an experiment, because I expected the chunks to not be aggregated at all. It turned out that there is a fallback which is also necessary in case the final object should have a different order or not contain some of the uploaded parts. This also ensures that a final aggregate is created. At least on GlusterFS this makes any CMU request run almost twice as fast as when using backgroundAppend. (cherry picked from commit 4ac0b96e2ef1cef69272cb4fe6e22e15d1749c69) --- cmd/fs-v1-multipart.go | 117 ++++------------------------------------- 1 file changed, 11 insertions(+), 106 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 05aa765592aee..f72488079d30a 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -62,63 +62,6 @@ func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, a return partNumber, result[1], actualSize, nil } -// Appends parts to an appendFile sequentially. -func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploadID string) { - fs.appendFileMapMu.Lock() - logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) - file := fs.appendFileMap[uploadID] - if file == nil { - file = &fsAppendFile{ - filePath: fmt.Sprintf("%s.%s", uploadID, mustGetUUID()), - } - fs.appendFileMap[uploadID] = file - } - fs.appendFileMapMu.Unlock() - - file.Lock() - defer file.Unlock() - - // Since we append sequentially nextPartNumber will always be len(file.parts)+1 - nextPartNumber := len(file.parts) + 1 - uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) - parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, nextPartNumber) - if err != nil { - logger.LogIf(ctx, err) - return - } - - for _, part := range parts { - partNumber, etag, actualSize := part.Number, part.ETag, part.ActualSize - if partNumber < nextPartNumber { - // Part already appended. - continue - } - if partNumber > nextPartNumber { - // Required part number is not yet uploaded. - return - } - - partPath := pathJoin(uploadIDPath, fs.encodePartFile(partNumber, etag, actualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), file.filePath, entryBuf) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - - file.parts = append(file.parts, partNumber) - nextPartNumber++ - } -} - // Get uploaded chunks from multipart folder in metabucket func (fs *FSObjects) getRemainingUploadedChunks(ctx context.Context, bucket, object, uploadID string, nextPartNumber int) ([]ObjectPartInfo, error) { // Setting count to -1 will read everything. @@ -442,8 +385,6 @@ func (fs *FSObjects) PutObjectPart(rctx context.Context, bucket, object, uploadI return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - go fs.backgroundAppend(wctx, bucket, object, uploadID) - return PartInfo{ PartNumber: partID, LastModified: UTCNow(), @@ -693,63 +634,27 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } } - appendFallback := true // In case background-append did not append the required parts. appendFilePath := fmt.Sprintf("%s.%s", uploadID, mustGetUUID()) - // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() - // to take care of the following corner case: - // 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet. - // 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending - // from the beginning - fs.backgroundAppend(ctx, bucket, object, uploadID) - fs.appendFileMapMu.Lock() file := fs.appendFileMap[uploadID] delete(fs.appendFileMap, uploadID) fs.appendFileMapMu.Unlock() if file != nil { - file.Lock() - defer file.Unlock() - // Verify that appendFile has all the parts. - if len(file.parts) == len(parts) { - for i := range parts { - partIdx := objectPartIndex(chunks, i) - // All parts should have same part number. - if partIdx == -1 { - break - } - metaPart := chunks[partIdx] - if canonicalizeETag(parts[i].ETag) != metaPart.ETag { - break - } - if parts[i].PartNumber != metaPart.Number { - break - } - if i == len(parts)-1 { - appendFilePath = file.filePath - appendFallback = false - } - } - } + fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) } - - if appendFallback { - if file != nil { - fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) + for _, fiPart := range fi.Parts { + partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) + entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } - for _, fiPart := range fi.Parts { - partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } + err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } } From a00b604cbcea451bd3c386336e64fe85f18c4934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20L=C3=BCtzner?= Date: Wed, 1 Jun 2022 10:12:00 +0200 Subject: [PATCH 3/3] fs-v1: Remove unused property (cherry picked from commit 9137a44e1280137ea6207d982a48bdda00061df7) --- cmd/fs-v1.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index b9da995766db9..ab751e5bc8b09 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -76,7 +76,6 @@ type FSObjects struct { // Represents the background append file. type fsAppendFile struct { sync.Mutex - parts []int // List of parts appended. filePath string // Absolute path of the file in the temp location. }