Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/slow cmu #19

Merged
merged 3 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 19 additions & 109 deletions cmd/fs-v1-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,65 +62,8 @@ func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, a
return partNumber, result[1], actualSize, nil
}

// Appends parts to an appendFile sequentially.
func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploadID string) {
fs.appendFileMapMu.Lock()
logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)
file := fs.appendFileMap[uploadID]
if file == nil {
file = &fsAppendFile{
filePath: fmt.Sprintf("%s.%s", uploadID, mustGetUUID()),
}
fs.appendFileMap[uploadID] = file
}
fs.appendFileMapMu.Unlock()

file.Lock()
defer file.Unlock()

// Since we append sequentially nextPartNumber will always be len(file.parts)+1
nextPartNumber := len(file.parts) + 1
uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID)
parts, err := fs.getUploadedChunks(ctx, bucket, object, uploadID)
if err != nil {
logger.LogIf(ctx, err)
return
}

for _, part := range parts {
partNumber, etag, actualSize := part.Number, part.ETag, part.ActualSize
if partNumber < nextPartNumber {
// Part already appended.
continue
}
if partNumber > nextPartNumber {
// Required part number is not yet uploaded.
return
}

partPath := pathJoin(uploadIDPath, fs.encodePartFile(partNumber, etag, actualSize))
entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath)
if err != nil {
reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath)
reqInfo.AppendTags("filepath", file.filePath)
logger.LogIf(ctx, err)
return
}
err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), file.filePath, entryBuf)
if err != nil {
reqInfo := logger.GetReqInfo(ctx).AppendTags("partPath", partPath)
reqInfo.AppendTags("filepath", file.filePath)
logger.LogIf(ctx, err)
return
}

file.parts = append(file.parts, partNumber)
nextPartNumber++
}
}

// Get uploaded chunks from multipart folder in metabucket
func (fs *FSObjects) getUploadedChunks(ctx context.Context, bucket, object, uploadID string) ([]ObjectPartInfo, error) {
func (fs *FSObjects) getRemainingUploadedChunks(ctx context.Context, bucket, object, uploadID string, nextPartNumber int) ([]ObjectPartInfo, error) {
// Setting count to -1 will read everything.
foundFiles, err := fs.disk.ListDir(ctx, pathJoin(minioMetaMultipartBucket, fs.getMultipartSHADir(bucket, object)), uploadID, -1)
if err != nil {
Expand All @@ -138,6 +81,11 @@ func (fs *FSObjects) getUploadedChunks(ctx context.Context, bucket, object, uplo
continue
}

if number < nextPartNumber {
// Part already appended.
continue
}

uploadIDPath := fs.getUploadIDDir(bucket, object, uploadID)
fi, err := fs.disk.StatObject(ctx, pathJoin(minioMetaMultipartBucket, uploadIDPath, file))
if err != nil {
Expand Down Expand Up @@ -437,8 +385,6 @@ func (fs *FSObjects) PutObjectPart(rctx context.Context, bucket, object, uploadI
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}

go fs.backgroundAppend(wctx, bucket, object, uploadID)

return PartInfo{
PartNumber: partID,
LastModified: UTCNow(),
Expand Down Expand Up @@ -535,7 +481,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
result.PartNumberMarker = partNumberMarker
result.UserDefined = cloneMSS(fi.Metadata)

parts, err := fs.getUploadedChunks(ctx, bucket, object, uploadID)
parts, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, -1)
if err != nil {
return ListPartsInfo{}, toObjectErr(err, bucket, object)
}
Expand Down Expand Up @@ -635,7 +581,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Allocate parts similar to incoming slice.
fi.Parts = make([]ObjectPartInfo, len(parts))

chunks, err := fs.getUploadedChunks(ctx, bucket, object, uploadID)
chunks, err := fs.getRemainingUploadedChunks(ctx, bucket, object, uploadID, -1)
if err != nil {
return ObjectInfo{}, err
}
Expand Down Expand Up @@ -688,63 +634,27 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
}

appendFallback := true // In case background-append did not append the required parts.
appendFilePath := fmt.Sprintf("%s.%s", uploadID, mustGetUUID())

// Most of the times appendFile would already be fully appended by now. We call fs.backgroundAppend()
// to take care of the following corner case:
// 1. The last PutObjectPart triggers go-routine fs.backgroundAppend, this go-routine has not started yet.
// 2. Now CompleteMultipartUpload gets called which sees that lastPart is not appended and starts appending
// from the beginning
fs.backgroundAppend(ctx, bucket, object, uploadID)

fs.appendFileMapMu.Lock()
file := fs.appendFileMap[uploadID]
delete(fs.appendFileMap, uploadID)
fs.appendFileMapMu.Unlock()

if file != nil {
file.Lock()
defer file.Unlock()
// Verify that appendFile has all the parts.
if len(file.parts) == len(parts) {
for i := range parts {
partIdx := objectPartIndex(chunks, i)
// All parts should have same part number.
if partIdx == -1 {
break
}
metaPart := chunks[partIdx]
if canonicalizeETag(parts[i].ETag) != metaPart.ETag {
break
}
if parts[i].PartNumber != metaPart.Number {
break
}
if i == len(parts)-1 {
appendFilePath = file.filePath
appendFallback = false
}
}
}
fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false)
}

if appendFallback {
if file != nil {
fs.disk.Delete(ctx, fs.disk.MetaTmpBucket(), file.filePath, false)
for _, fiPart := range fi.Parts {
partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize))
entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
for _, fiPart := range fi.Parts {
partPath := pathJoin(uploadIDPath, fs.encodePartFile(fiPart.Number, fiPart.ETag, fiPart.ActualSize))
entryBuf, err := fs.disk.ReadAll(ctx, minioMetaMultipartBucket, partPath)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
err = fs.disk.AppendFile(ctx, fs.disk.MetaTmpBucket(), appendFilePath, entryBuf)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
}

Expand Down
1 change: 0 additions & 1 deletion cmd/fs-v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type FSObjects struct {
// Represents the background append file.
type fsAppendFile struct {
sync.Mutex
parts []int // List of parts appended.
filePath string // Absolute path of the file in the temp location.
}

Expand Down