From e5587860adc7ddd9c69ef0bf6632112a4d686a0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20L=C3=BCtzner?= Date: Fri, 25 Feb 2022 11:36:58 +0100 Subject: [PATCH] fs-v1-multipart: Remove backgroundAppend operation The backgroundAppend was meant as a performance optimization where uploaded chunks would be aggregated to a complete file in the background while more parts were uploaded at the same time. This might be an optimization on actual filesystem paths, but on Gluster it made the operation a lot slower because many redundant filesystem calls were executed. The CompleteMultipartUpload request which requires the uploaded chunks to be put together has to 1. wait for any ongoing backgroundAppend operations to complete, 2. enumerate all chunks and check if they were put together in the right order (otherwise start from scratch), 3. move the aggregated file. Removing this piece of code started out as an experiment, because I expected the chunks to not be aggregated at all. It turned out that there is a fallback which is also necessary in case the final object should have a different order or not contain some of the uploaded parts. This also ensures that a final aggregate is created. At least on GlusterFS this makes any CMU request run almost twice as fast as when using backgroundAppend. (cherry picked from commit 4ac0b96e2ef1cef69272cb4fe6e22e15d1749c69) --- cmd/fs-v1-multipart.go | 117 ++++------------------------------------- 1 file changed, 11 insertions(+), 106 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 05aa765592aee..f72488079d30a 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -62,63 +62,6 @@ func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, a return partNumber, result[1], actualSize, nil } -// Appends parts to an appendFile sequentially. -func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploadID string) { - fs.appendFileMapMu.Lock() - logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) - file := fs.appendFileMap[uploadID] - if file == nil { - file = &fsAppendFile{ - filePath: fmt.Sprintf("%s.%s", uploadID, mustGetUUID()), - } - fs.appendFileMap[uploadID] = file - } - fs.appendFileMapMu.Unlock() - - file.Lock() - defer file.Unlock() - - // Since we append sequentially nextPartNumber will always be len(file.parts)+1 - nextPartNumber := len(file.parts) + 1 - uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) - parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, nextPartNumber) - if err != nil { - logger.LogIf(ctx, err) - return - } - - for _, part := range parts { - partNumber, etag, actualSize := part.Number, part.ETag, part.ActualSize - if partNumber < nextPartNumber { - // Part already appended. - continue - } - if partNumber > nextPartNumber { - // Required part number is not yet uploaded. - return - } - - partPath := pathJoin(uploadIDPath, fs.encodePartFile(partNumber, etag, actualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), file.filePath, entryBuf) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - - file.parts = append(file.parts, partNumber) - nextPartNumber++ - } -} - // Get uploaded chunks from multipart folder in metabucket func (fs *FSObjects) getRemainingUploadedChunks(ctx context.Context, bucket, object, uploadID string, nextPartNumber int) ([]ObjectPartInfo, error) { // Setting count to -1 will read everything. @@ -442,8 +385,6 @@ func (fs *FSObjects) PutObjectPart(rctx context.Context, bucket, object, uploadI return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - go fs.backgroundAppend(wctx, bucket, object, uploadID) - return PartInfo{ PartNumber: partID, LastModified: UTCNow(), @@ -693,63 +634,27 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } } - appendFallback := true // In case background-append did not append the required parts. appendFilePath := fmt.Sprintf("%s.%s", uploadID, mustGetUUID()) - // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() - // to take care of the following corner case: - // 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet. - // 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending - // from the beginning - fs.backgroundAppend(ctx, bucket, object, uploadID) - fs.appendFileMapMu.Lock() file := fs.appendFileMap[uploadID] delete(fs.appendFileMap, uploadID) fs.appendFileMapMu.Unlock() if file != nil { - file.Lock() - defer file.Unlock() - // Verify that appendFile has all the parts. - if len(file.parts) == len(parts) { - for i := range parts { - partIdx := objectPartIndex(chunks, i) - // All parts should have same part number. - if partIdx == -1 { - break - } - metaPart := chunks[partIdx] - if canonicalizeETag(parts[i].ETag) != metaPart.ETag { - break - } - if parts[i].PartNumber != metaPart.Number { - break - } - if i == len(parts)-1 { - appendFilePath = file.filePath - appendFallback = false - } - } - } + fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) } - - if appendFallback { - if file != nil { - fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) + for _, fiPart := range fi.Parts { + partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) + entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } - for _, fiPart := range fi.Parts { - partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } + err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } }