diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 05aa765592aee..f72488079d30a 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -62,63 +62,6 @@ func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, a return partNumber, result[1], actualSize, nil } -// Appends parts to an appendFile sequentially. -func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploadID string) { - fs.appendFileMapMu.Lock() - logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) - file := fs.appendFileMap[uploadID] - if file == nil { - file = &fsAppendFile{ - filePath: fmt.Sprintf("%s.%s", uploadID, mustGetUUID()), - } - fs.appendFileMap[uploadID] = file - } - fs.appendFileMapMu.Unlock() - - file.Lock() - defer file.Unlock() - - // Since we append sequentially nextPartNumber will always be len(file.parts)+1 - nextPartNumber := len(file.parts) + 1 - uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) - parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, nextPartNumber) - if err != nil { - logger.LogIf(ctx, err) - return - } - - for _, part := range parts { - partNumber, etag, actualSize := part.Number, part.ETag, part.ActualSize - if partNumber < nextPartNumber { - // Part already appended. - continue - } - if partNumber > nextPartNumber { - // Required part number is not yet uploaded. - return - } - - partPath := pathJoin(uploadIDPath, fs.encodePartFile(partNumber, etag, actualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), file.filePath, entryBuf) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - - file.parts = append(file.parts, partNumber) - nextPartNumber++ - } -} - // Get uploaded chunks from multipart folder in metabucket func (fs *FSObjects) getRemainingUploadedChunks(ctx context.Context, bucket, object, uploadID string, nextPartNumber int) ([]ObjectPartInfo, error) { // Setting count to -1 will read everything. @@ -442,8 +385,6 @@ func (fs *FSObjects) PutObjectPart(rctx context.Context, bucket, object, uploadI return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - go fs.backgroundAppend(wctx, bucket, object, uploadID) - return PartInfo{ PartNumber: partID, LastModified: UTCNow(), @@ -693,63 +634,27 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } } - appendFallback := true // In case background-append did not append the required parts. appendFilePath := fmt.Sprintf("%s.%s", uploadID, mustGetUUID()) - // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() - // to take care of the following corner case: - // 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet. - // 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending - // from the beginning - fs.backgroundAppend(ctx, bucket, object, uploadID) - fs.appendFileMapMu.Lock() file := fs.appendFileMap[uploadID] delete(fs.appendFileMap, uploadID) fs.appendFileMapMu.Unlock() if file != nil { - file.Lock() - defer file.Unlock() - // Verify that appendFile has all the parts. - if len(file.parts) == len(parts) { - for i := range parts { - partIdx := objectPartIndex(chunks, i) - // All parts should have same part number. - if partIdx == -1 { - break - } - metaPart := chunks[partIdx] - if canonicalizeETag(parts[i].ETag) != metaPart.ETag { - break - } - if parts[i].PartNumber != metaPart.Number { - break - } - if i == len(parts)-1 { - appendFilePath = file.filePath - appendFallback = false - } - } - } + fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) } - - if appendFallback { - if file != nil { - fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) + for _, fiPart := range fi.Parts { + partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) + entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } - for _, fiPart := range fi.Parts { - partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } + err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } }