diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index e8d7a45e3cf49..f72488079d30a 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -62,65 +62,8 @@ func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, a return partNumber, result[1], actualSize, nil } -// Appends parts to an appendFile sequentially. -func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploadID string) { - fs.appendFileMapMu.Lock() - logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) - file := fs.appendFileMap[uploadID] - if file == nil { - file = &fsAppendFile{ - filePath: fmt.Sprintf("%s.%s", uploadID, mustGetUUID()), - } - fs.appendFileMap[uploadID] = file - } - fs.appendFileMapMu.Unlock() - - file.Lock() - defer file.Unlock() - - // Since we append sequentially nextPartNumber will always be len(file.parts)+1 - nextPartNumber := len(file.parts) + 1 - uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) - parts, err := fs.getUploadedChunks(ctx, bucket, object, uploadID) - if err != nil { - logger.LogIf(ctx, err) - return - } - - for _, part := range parts { - partNumber, etag, actualSize := part.Number, part.ETag, part.ActualSize - if partNumber < nextPartNumber { - // Part already appended. - continue - } - if partNumber > nextPartNumber { - // Required part number is not yet uploaded. - return - } - - partPath := pathJoin(uploadIDPath, fs.encodePartFile(partNumber, etag, actualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), file.filePath, entryBuf) - if err != nil { - reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath) - reqInfo.AppendTags("filepath", file.filePath) - logger.LogIf(ctx, err) - return - } - - file.parts = append(file.parts, partNumber) - nextPartNumber++ - } -} - // Get uploaded chunks from multipart folder in metabucket -func (fs *FSObjects) getUploadedChunks(ctx context.Context, bucket, object, uploadID string) ([]ObjectPartInfo, error) { +func (fs *FSObjects) getRemainingUploadedChunks(ctx context.Context, bucket, object, uploadID string, nextPartNumber int) ([]ObjectPartInfo, error) { // Setting count to -1 will read everything. foundFiles, err := fs.disk.ListDir(ctx, pathJoin(minioMetaMultipartBucket, fs.getMultipartSHADir(bucket, object)), uploadID, -1) if err != nil { @@ -138,6 +81,11 @@ func (fs *FSObjects) getUploadedChunks(ctx context.Context, bucket, object, uplo continue } + if number < nextPartNumber { + // Part already appended. + continue + } + uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID) fi, err := fs.disk.StatObject(ctx, pathJoin(minioMetaMultipartBucket, uploadIDPath, file)) if err != nil { @@ -437,8 +385,6 @@ func (fs *FSObjects) PutObjectPart(rctx context.Context, bucket, object, uploadI return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } - go fs.backgroundAppend(wctx, bucket, object, uploadID) - return PartInfo{ PartNumber: partID, LastModified: UTCNow(), @@ -535,7 +481,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload result.PartNumberMarker = partNumberMarker result.UserDefined = cloneMSS(fi.Metadata) - parts, err := fs.getUploadedChunks(ctx, bucket, object, uploadID) + parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, -1) if err != nil { return ListPartsInfo{}, toObjectErr(err, bucket, object) } @@ -635,7 +581,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Allocate parts similar to incoming slice. fi.Parts = make([]ObjectPartInfo, len(parts)) - chunks, err := fs.getUploadedChunks(ctx, bucket, object, uploadID) + chunks, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, -1) if err != nil { return ObjectInfo{}, err } @@ -688,63 +634,27 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } } - appendFallback := true // In case background-append did not append the required parts. appendFilePath := fmt.Sprintf("%s.%s", uploadID, mustGetUUID()) - // Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend() - // to take care of the following corner case: - // 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet. - // 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending - // from the beginning - fs.backgroundAppend(ctx, bucket, object, uploadID) - fs.appendFileMapMu.Lock() file := fs.appendFileMap[uploadID] delete(fs.appendFileMap, uploadID) fs.appendFileMapMu.Unlock() if file != nil { - file.Lock() - defer file.Unlock() - // Verify that appendFile has all the parts. - if len(file.parts) == len(parts) { - for i := range parts { - partIdx := objectPartIndex(chunks, i) - // All parts should have same part number. - if partIdx == -1 { - break - } - metaPart := chunks[partIdx] - if canonicalizeETag(parts[i].ETag) != metaPart.ETag { - break - } - if parts[i].PartNumber != metaPart.Number { - break - } - if i == len(parts)-1 { - appendFilePath = file.filePath - appendFallback = false - } - } - } + fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) } - - if appendFallback { - if file != nil { - fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false) + for _, fiPart := range fi.Parts { + partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) + entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } - for _, fiPart := range fi.Parts { - partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize)) - entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } + err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf) + if err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index b9da995766db9..ab751e5bc8b09 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -76,7 +76,6 @@ type FSObjects struct { // Represents the background append file. type fsAppendFile struct { sync.Mutex - parts []int // List of parts appended. filePath string // Absolute path of the file in the temp location. }