From b9ff2677e2e97a0649f6cbd3ac6ea9f358aecb4c Mon Sep 17 00:00:00 2001 From: Kervin Hu <34271744+YikaiHu@users.noreply.github.com> Date: Fri, 1 Sep 2023 11:39:42 +0800 Subject: [PATCH] Enhancing Transfer Performance: Parallel Multipart Upload for Distributing Large Files Across the Cluster (#16) * feat: support large file transfer in cluster mode * fix: fix the src prefix in single part transfer * fix: fix the src prefix in single part transfer 2 * fix: fix the multipart split issue * chore: using sfn to controll the part merging * feat: add network throttling detect * chore: remove the throttle detect feature and add sfn check * fix: fix the updateItem issue * doc: update the readme for giant object transfer --- Dockerfile | 1 + README.md | 5 + cmd/root.go | 7 + config-example.yaml | 3 + dth/client.go | 40 ++++++ dth/common.go | 51 +++++++- dth/config.go | 14 +- dth/job.go | 303 +++++++++++++++++++++++++++++++++++++++++--- dth/job_test.go | 46 +++++++ dth/service.go | 173 +++++++++++++++++++++++++ go.mod | 5 +- go.sum | 11 ++ 12 files changed, 631 insertions(+), 28 deletions(-) create mode 100644 dth/job_test.go diff --git a/Dockerfile b/Dockerfile index 7293477..87c0282 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,7 @@ ENV SOURCE_TYPE Amazon_S3 ENV JOB_TABLE_NAME '' ENV JOB_QUEUE_NAME '' +ENV SINGLE_PART_TABLE_NAME '' ENV SRC_BUCKET '' ENV SRC_PREFIX '' diff --git a/README.md b/README.md index 157bef3..710e688 100644 --- a/README.md +++ b/README.md @@ -59,8 +59,13 @@ destRegion: cn-north-1 jobTableName: test-table jobQueueName: test-queue + +singlePartTableName: single-part-test-table +sfnArn: sfn-arn ``` +The sfaArn is a Step Function created by [Data Transfer Hub S3 Plugin](https://github.com/awslabs/data-transfer-hub/blob/main/docs/S3_PLUGIN.md). + By default, this tool will try to read a `config.yaml` in the same folder, if you create the configuration file in a different folder or with a different file name, please use extra option `--config xxx.yaml` to load your config file. diff --git a/cmd/root.go b/cmd/root.go index e91247e..3128755 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -80,6 +80,7 @@ func initConfig() { viper.SetDefault("options.chunkSize", dth.DefaultChunkSize) viper.SetDefault("options.multipartThreshold", dth.DefaultMultipartThreshold) + viper.SetDefault("options.giantFileThreshold", dth.DefaultGiantFileThreshold) viper.SetDefault("options.maxKeys", dth.DefaultMaxKeys) viper.SetDefault("options.messageBatchSize", dth.DefaultMessageBatchSize) viper.SetDefault("options.finderDepth", dth.DefaultFinderDepth) @@ -108,10 +109,13 @@ func initConfig() { viper.BindEnv("jobTableName", "JOB_TABLE_NAME") viper.BindEnv("jobQueueName", "JOB_QUEUE_NAME") + viper.BindEnv("singlePartTableName", "SINGLE_PART_TABLE_NAME") + viper.BindEnv("sfnArn", "SFN_ARN") viper.BindEnv("options.maxKeys", "MAX_KEYS") viper.BindEnv("options.chunkSize", "CHUNK_SIZE") viper.BindEnv("options.multipartThreshold", "MULTIPART_THRESHOLD") + viper.BindEnv("options.giantFileThreshold", "GIANT_FILE_THRESHOLD") viper.BindEnv("options.messageBatchSize", "MESSAGE_BATCH_SIZE") viper.BindEnv("options.finderDepth", "FINDER_DEPTH") viper.BindEnv("options.finderNumber", "FINDER_NUMBER") @@ -138,6 +142,7 @@ func initConfig() { options := &dth.JobOptions{ ChunkSize: viper.GetInt("options.chunkSize"), MultipartThreshold: viper.GetInt("options.multipartThreshold"), + GiantFileThreshold: viper.GetInt("options.giantFileThreshold"), MaxKeys: viper.GetInt32("options.maxKeys"), MessageBatchSize: viper.GetInt("options.messageBatchSize"), FinderDepth: viper.GetInt("options.finderDepth"), @@ -166,6 +171,8 @@ func initConfig() { DestInCurrentAccount: viper.GetBool("destInCurrentAccount"), JobTableName: viper.GetString("jobTableName"), JobQueueName: viper.GetString("jobQueueName"), + SinglePartTableName: viper.GetString("singlePartTableName"), + SfnArn: viper.GetString("sfnArn"), JobOptions: options, } diff --git a/config-example.yaml b/config-example.yaml index 0a9bdc3..1b2d792 100644 --- a/config-example.yaml +++ b/config-example.yaml @@ -18,10 +18,13 @@ destAcl: bucket-owner-full-control jobTableName: test-table jobQueueName: test-queue +singlePartTableName: single-part-test-table +sfnArn: sfn-arn options: chunkSize: 5 multipartThreshold: 10 + giantFileThreshold: 1024 maxKeys: 1000 messageBatchSize: 10 finderDepth: 0 diff --git a/dth/client.go b/dth/client.go index 8cad6bb..e402d99 100644 --- a/dth/client.go +++ b/dth/client.go @@ -41,6 +41,7 @@ type Client interface { // READ HeadObject(ctx context.Context, key *string) *Metadata GetObject(ctx context.Context, key *string, size, start, chunkSize int64, version string) ([]byte, error) + GetObjectPart(ctx context.Context, key *string, bodyRange string) ([]byte, error) ListObjects(ctx context.Context, continuationToken, prefix *string, maxKeys int32) ([]*Object, error) ListCommonPrefixes(ctx context.Context, depth int, maxKeys int32) (prefixes []*string) ListParts(ctx context.Context, key, uploadID *string) (parts map[int]*Part) @@ -209,6 +210,45 @@ func (c *S3Client) GetObject(ctx context.Context, key *string, size, start, chun } +// GetObjectPart is a function to get (download) object from Amazon S3 +func (c *S3Client) GetObjectPart(ctx context.Context, key *string, bodyRange string) ([]byte, error) { + // log.Printf("S3> Downloading %s with %d bytes start from %d\n", key, size, start) + var input *s3.GetObjectInput + + if c.isPayerRequest { + input = &s3.GetObjectInput{ + Bucket: &c.bucket, + Key: key, + Range: &bodyRange, + RequestPayer: types.RequestPayerRequester, + } + } else { + input = &s3.GetObjectInput{ + Bucket: &c.bucket, + Key: key, + Range: &bodyRange, + } + } + + output, err := c.client.GetObject(ctx, input, getClientCredentialsModifyFn(c.isSrcClient, SRC_CRED, DST_CRED)) + if err != nil { + log.Printf("S3> Unable to download %s with range: %s - %s\n", *key, bodyRange, err.Error()) + return nil, err + } + + defer output.Body.Close() + + // Read response body + s, err := io.ReadAll(output.Body) + + if err != nil { + log.Printf("S3> Unable to read the content of %s - %s\n", *key, err.Error()) + return nil, err + } + return s, nil + +} + // Internal func to call API to list objects. func (c *S3Client) listObjectFn(ctx context.Context, continuationToken, prefix, delimiter *string, maxKeys int32) (*s3.ListObjectsV2Output, error) { diff --git a/dth/common.go b/dth/common.go index 21e243e..7986f55 100644 --- a/dth/common.go +++ b/dth/common.go @@ -47,9 +47,13 @@ var ( // Object represents an object to be replicated. type Object struct { - Key string `json:"key"` - Size int64 `json:"size"` - Sequencer string `json:"sequencer,omitempty"` + Key string `json:"key"` + Size int64 `json:"size"` + Sequencer string `json:"sequencer,omitempty"` + PartNumber int `json:"partNumber,omitempty"` + TotalPartsCount int `json:"totalPartsCount,omitempty"` + UploadID string `json:"uploadID,omitempty"` + BodyRange string `json:"bodyRange,omitempty"` } // S3Event represents a basic structure of a S3 Event Message @@ -63,6 +67,15 @@ type S3Event struct { } } +// SinglePartTransferEvent represents a basic structure of a SinglePart Transfer Event Message +type SinglePartTransferEvent struct { + ObjectKey string `json:"objectKey"` + PartNumber int `json:"partNumber"` + TotalPartsCount int `json:"totalPartsCount"` + UploadID string `json:"uploadID"` + BodyRange string `json:"bodyRange"` +} + // Part represents a part for multipart upload type Part struct { partNumber int @@ -132,6 +145,20 @@ func newS3Event(str *string) (e *S3Event) { return } +// Helper function to create single part transfer event base on Json string +func newSinglePartTransferEvent(str *string) (e *SinglePartTransferEvent) { + + e = new(SinglePartTransferEvent) + err := json.Unmarshal([]byte(*str), e) + + if err != nil { + log.Printf("Unable to convert string to single part transfer job - %s", err.Error()) + return nil + } + // log.Printf("Key: %s, Size: %d\n", m.Key, m.Size) + return +} + func getHex(str *string) int64 { i64, _ := strconv.ParseInt(*str, 16, 64) return i64 @@ -169,3 +196,21 @@ func appendPrefix(key, prefix *string) *string { newkey := fmt.Sprintf("%s%s%s", *prefix, delimiter, *key) return &newkey } + +func calculateCompletedBytes(bodyRange string) int64 { + // bodyRange format: "bytes=startByte-endByte" + parts := strings.Split(bodyRange, "=") + if len(parts) != 2 { + return 0 + } + + rangeParts := strings.Split(parts[1], "-") + if len(rangeParts) != 2 { + return 0 + } + + startByte, _ := strconv.ParseInt(rangeParts[0], 10, 64) + endByte, _ := strconv.ParseInt(rangeParts[1], 10, 64) + + return endByte - startByte + 1 +} diff --git a/dth/config.go b/dth/config.go index 4a3e953..5004158 100644 --- a/dth/config.go +++ b/dth/config.go @@ -34,10 +34,14 @@ const ( // DefaultMaxKeys is the maximum number of keys returned per listing request, default is 1000 DefaultMaxKeys int32 = 1000 - // DefaultMultipartThreshold is the threshold size (in MB) to determine to use multipart upload or not. + // DefaultMultipartThreshold is the threshold size (in MB) to determine to use multipart upload or not (in single worker node). // When object size is greater or equals to MultipartThreshold, multipart upload will be used. DefaultMultipartThreshold int = 10 + // DefaultGiantFileThreshold is the threshold size (in MB) to determine to use multipart upload or not (in whole worker cluster). + // When object size is greater or equals to GiantFileThreshold, the object will to split and transferred by the whole cluster. + DefaultGiantFileThreshold int = 1024 + // DefaultChunkSize is the chunk size (in MB) for each part when using multipart upload DefaultChunkSize int = 5 @@ -59,16 +63,16 @@ const ( // JobOptions is General Job Info type JobOptions struct { - ChunkSize, MultipartThreshold, MessageBatchSize, FinderDepth, FinderNumber, WorkerNumber int - MaxKeys int32 - IncludeMetadata bool + ChunkSize, MultipartThreshold, GiantFileThreshold, MessageBatchSize, FinderDepth, FinderNumber, WorkerNumber int + MaxKeys int32 + IncludeMetadata bool } // JobConfig is General Job Info type JobConfig struct { SrcType, SrcBucket, SrcPrefix, SrcPrefixList, SrcRegion, SrcEndpoint, SrcCredential string DestBucket, DestPrefix, DestRegion, DestCredential, DestStorageClass, DestAcl string - JobTableName, JobQueueName string + JobTableName, JobQueueName, SinglePartTableName, SfnArn string SrcInCurrentAccount, DestInCurrentAccount, SkipCompare, PayerRequest bool *JobOptions } diff --git a/dth/job.go b/dth/job.go index 7655b56..9d9f5e1 100644 --- a/dth/job.go +++ b/dth/job.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "log" "math" "strings" @@ -39,6 +40,9 @@ const ( // Transfer is an action to transfer an object Transfer + + // TransferSinglePart is an action to transfer a single part of an giant object + TransferSinglePart ) // Job is an interface of a process to run by this tool @@ -53,6 +57,7 @@ type Finder struct { srcClient, desClient Client sqs *SqsService cfg *JobConfig + sfn *SfnService } // Worker is an implemenation of Job interface @@ -62,13 +67,38 @@ type Worker struct { cfg *JobConfig sqs *SqsService db *DBService + splitPartDb *DBService + sfn *SfnService } // TransferResult stores the result after transfer. type TransferResult struct { - status string - etag *string - err error + status string + splitPartsCount int + etag *string + err error +} + +type PartTransferJobParams struct { + SrcBucket string + DstBucket string + ObjectKey string + PartNumber int + TotalPartsCount int + StartByte int64 + EndByte int64 + UploadID string + BodyRange string +} + +type PartTransferResult struct { + ObjectKey string + PartNumber int + TotalPartsCount int + UploadID string + ETag string + Status string + Error string } // helper function to check credentials @@ -106,6 +136,7 @@ func getCredentials(ctx context.Context, param string, inCurrentAccount bool, sm // NewFinder creates a new Finder instance func NewFinder(ctx context.Context, cfg *JobConfig) (f *Finder) { sqs, _ := NewSqsService(ctx, cfg.JobQueueName) + sfn, _ := NewSfnService(ctx, cfg.SfnArn) sm, err := NewSecretService(ctx) if err != nil { log.Printf("Warning - Unable to load credentials, use default setting - %s\n", err.Error()) @@ -131,6 +162,7 @@ func NewFinder(ctx context.Context, cfg *JobConfig) (f *Finder) { f = &Finder{ srcClient: srcClient, desClient: desClient, + sfn: sfn, sqs: sqs, cfg: cfg, } @@ -169,6 +201,10 @@ func (f *Finder) Run(ctx context.Context) { log.Fatalf("Queue might not be empty ... Please try again later") } + if !f.sfn.IsNoRunningTask(ctx) { + log.Fatalf("There might still be ongoing tasks to merge giant objects ... Please try again later") + } + // Maximum number of queued batches to be sent to SQS var bufferSize int = 500 @@ -450,6 +486,9 @@ func NewWorker(ctx context.Context, cfg *JobConfig) (w *Worker) { sqs, _ := NewSqsService(ctx, cfg.JobQueueName) db, _ := NewDBService(ctx, cfg.JobTableName) + splitPartDb, _ := NewDBService(ctx, cfg.SinglePartTableName) + + sfn, _ := NewSfnService(ctx, cfg.SfnArn) sm, err := NewSecretService(ctx) if err != nil { @@ -474,11 +513,13 @@ func NewWorker(ctx context.Context, cfg *JobConfig) (w *Worker) { DST_CRED = desCred return &Worker{ - srcClient: srcClient, - desClient: desClient, - sqs: sqs, - db: db, - cfg: cfg, + srcClient: srcClient, + desClient: desClient, + sqs: sqs, + db: db, + splitPartDb: splitPartDb, + sfn: sfn, + cfg: cfg, } } @@ -518,7 +559,7 @@ func (w *Worker) Run(ctx context.Context) { destKey := appendPrefix(&obj.Key, &w.cfg.DestPrefix) - if action == Transfer { + if action == Transfer || action == TransferSinglePart { processCh <- struct{}{} go w.startMigration(ctx, obj, rh, destKey, transferCh, processCh) } @@ -589,6 +630,17 @@ func (w *Worker) processMessage(ctx context.Context, msg, rh *string) (obj *Obje } else { log.Printf("Unknown S3 Event %s, do nothing", event.Records[0].EventName) } + } else if strings.Contains(*msg, `"bodyRange":`) { + // Simply check if msg body contains "bodyRange" to determine if it's a message for single part transfer + singlePartTransferEvent := newSinglePartTransferEvent(msg) + obj = new(Object) + obj.Key = singlePartTransferEvent.ObjectKey // we do not need to use unescape again + obj.Size = 0 // set 0 for single part transfer + obj.PartNumber = singlePartTransferEvent.PartNumber + obj.TotalPartsCount = singlePartTransferEvent.TotalPartsCount + obj.UploadID = singlePartTransferEvent.UploadID + obj.BodyRange = singlePartTransferEvent.BodyRange + action = TransferSinglePart } else { obj = newObject(msg) action = Transfer @@ -603,17 +655,28 @@ func (w *Worker) startMigration(ctx context.Context, obj *Object, rh, destKey *s log.Printf("Migrating from %s/%s to %s/%s\n", w.cfg.SrcBucket, obj.Key, w.cfg.DestBucket, *destKey) - // Log in DynamoDB - w.db.PutItem(ctx, obj) - // Start a heart beat go w.heartBeat(ctx1, &obj.Key, rh) var res *TransferResult - if obj.Size <= int64(w.cfg.MultipartThreshold*MB) { - res = w.migrateSmallFile(ctx, obj, destKey, transferCh) + + // handle single part transfer + if obj.BodyRange != "" { + // Log in DynamoDB + w.splitPartDb.PutSinglePartItem(ctx, obj) + res = w.migrateSinglePart(ctx, obj, destKey, transferCh) } else { - res = w.migrateBigFile(ctx, obj, destKey, transferCh) + // Log in DynamoDB + w.db.PutItem(ctx, obj) + + if obj.Size <= int64(w.cfg.MultipartThreshold*MB) { + res = w.migrateSmallFile(ctx, obj, destKey, transferCh) + } else if obj.Size < int64(w.cfg.GiantFileThreshold*MB) { + res = w.migrateBigFile(ctx, obj, destKey, transferCh) + } else { + log.Printf("Migrating GiantFile with size %d\n", obj.Size) + res = w.migrateGiantFile(ctx, obj, destKey, transferCh) + } } w.processResult(ctx, obj, rh, res) @@ -645,11 +708,20 @@ func (w *Worker) startDelete(ctx context.Context, obj *Object, rh, destKey *stri } // processResult is a function to process transfer result, including delete the message, log to DynamoDB +// For status == PART_DONE, we do not update the main DDB, it will be updated by multi-part controller Lambda func (w *Worker) processResult(ctx context.Context, obj *Object, rh *string, res *TransferResult) { // log.Println("Processing result...") - log.Printf("----->Transferred 1 object %s with status %s\n", obj.Key, res.status) - w.db.UpdateItem(ctx, &obj.Key, res) + if res.status == "SPLIT_DONE" { + log.Printf("----->Split 1 object %s into %d parts\n", obj.Key, res.splitPartsCount) + w.db.UpdateItem(ctx, &obj.Key, res) + } else if res.status == "PART_DONE" || res.status == "PART_ERROR" || res.status == "PART_CANCEL" { + log.Printf("----->(Multi-part) Transferred 1 part %s with status %s\n", obj.Key, res.status) + w.splitPartDb.UpdateSinglePartItem(ctx, obj, res) + } else { + log.Printf("----->Transferred 1 object %s with status %s\n", obj.Key, res.status) + w.db.UpdateItem(ctx, &obj.Key, res) + } if res.status == "ERROR" { if strings.Contains(res.err.Error(), "403") { @@ -658,7 +730,8 @@ func (w *Worker) processResult(ctx context.Context, obj *Object, rh *string, res } } - if res.status == "DONE" || res.status == "CANCEL" { + if res.status == "DONE" || res.status == "CANCEL" || res.status == "SPLIT_DONE" || + res.status == "PART_DONE" || res.status == "PART_CANCEL" { w.sqs.DeleteMessage(ctx, rh) } } @@ -712,6 +785,147 @@ func (w *Worker) migrateSmallFile(ctx context.Context, obj *Object, destKey *str } +// Internal func to deal with the transferring of a single part of a large file. +// Simply transfer a single part +func (w *Worker) migrateSinglePart(ctx context.Context, obj *Object, destKey *string, transferCh chan struct{}) *TransferResult { + + // Add a transferring record + transferCh <- struct{}{} + + result := w.transferSinglePart(ctx, obj, destKey) + + // Remove the transferring record after transfer is completed + <-transferCh + + return result +} + +// Internal func to calculate the appropriate part size for cluster concurrent transfer. +// Considering the size of the cluster and the maximum size of a single S3 object is 5TB, +// set the maximum number of part fragments to 10000, and set the minimum part size to 5MB. +// This function will calculate the total part and the corresponding part size under the above restrictions. +func (w *Worker) calculatePartSize(totalSize int64) (int, int64) { + // Max number of Parts allowed by Amazon S3 is 10000 + maxPartFragments := 10000 + minPartSize := int64(5 * 1024 * 1024) // 5MB in bytes + + // Calculate the maximum allowed part size based on the maximum number of part fragments + maxAllowedPartSize := (totalSize / int64(maxPartFragments)) + 1 + + // Ensure the calculated part size is within the allowed range + partSize := maxAllowedPartSize + if partSize < minPartSize { + partSize = minPartSize + } + + // Calculate the number of parts based on the adjusted part size + numParts := int(math.Ceil(float64(totalSize) / float64(partSize))) + + return numParts, partSize +} + +// Internal func to split large object and send sub parts transfer job messages back to sqs. +// First need to create/get an uploadID, then use UploadID and to create parts transfer job messages. +// Finally, send all the transfer job messages back to sqs. +func (w *Worker) generateMultiPartTransferJobs(ctx context.Context, obj *Object, destKey *string) (int, error) { + var err error + totalSize := obj.Size + totalPartsCount, partSize := w.calculatePartSize(totalSize) + + // log.Printf("totalPartsCount: %d, partSize: %d\n", totalPartsCount, partSize) + + // Get existing upload ID or initiate multipart upload + uploadID := w.desClient.GetUploadID(ctx, destKey) + if uploadID == nil { + // Add metadata to CreateMultipartUpload func. + var meta *Metadata + if w.cfg.IncludeMetadata { + meta = w.srcClient.HeadObject(ctx, &obj.Key) + } + + uploadID, err = w.desClient.CreateMultipartUpload(ctx, destKey, &w.cfg.DestStorageClass, &w.cfg.DestAcl, meta) + if err != nil { + log.Printf("Failed to create upload ID - %s for %s\n", err.Error(), *destKey) + return 0, err + } + } + + // Create and send transfer job messages for each part + for partNumber := 1; partNumber <= totalPartsCount; partNumber++ { + startByte := int64(int64(partNumber-1) * partSize) + endByte := int64(int64(partNumber)*partSize) - 1 + if endByte > totalSize { + endByte = totalSize + } + + params := PartTransferJobParams{ + SrcBucket: w.cfg.SrcBucket, + DstBucket: w.cfg.DestBucket, + ObjectKey: obj.Key, + PartNumber: partNumber, + TotalPartsCount: totalPartsCount, + StartByte: startByte, + EndByte: endByte, + UploadID: *uploadID, + BodyRange: fmt.Sprintf("bytes=%d-%d", startByte, endByte), + } + + // Send transfer job message to SQS + w.sendTransferJobMessage(ctx, params) + } + err = w.InvokeStepFunction(ctx, *uploadID, totalPartsCount, obj.Key) + + if err != nil { + log.Printf("Failed to invoke Step Function - %s\n", err.Error()) + return 0, err + } + return totalPartsCount, nil +} + +// Internal func to send single part transfer job message to sqs. +func (w *Worker) sendTransferJobMessage(ctx context.Context, params PartTransferJobParams) { + // Create a message body with transfer job parameters + messageBody := fmt.Sprintf( + `{ + "objectKey": "%s", + "partNumber": %d, + "totalPartsCount": %d, + "uploadID": "%s", + "bodyRange": "%s" + }`, + params.ObjectKey, + params.PartNumber, + params.TotalPartsCount, + params.UploadID, + params.BodyRange, + ) + + // Send message to SQS + w.sqs.SendMessage(ctx, &messageBody) +} + +// Internal func to deal with the transferring of giant file. +func (w *Worker) migrateGiantFile(ctx context.Context, obj *Object, destKey *string, transferCh chan struct{}) *TransferResult { + var err error + var splitPartsCount int + splitPartsCount, err = w.generateMultiPartTransferJobs(ctx, obj, destKey) + + if err != nil { + log.Printf("Failed to create upload ID - %s for %s\n", err.Error(), *destKey) + return &TransferResult{ + status: "ERROR", + err: err, + } + } + + return &TransferResult{ + status: "SPLIT_DONE", + splitPartsCount: splitPartsCount, + etag: nil, + err: err, + } +} + // Internal func to deal with the transferring of large file. // First need to create/get an uploadID, then use UploadID to upload each parts // Finally, need to combine all parts into a single file. @@ -904,3 +1118,56 @@ func (w *Worker) transfer(ctx context.Context, obj *Object, destKey *string, sta } } + +// transfer is a process to download data from source and upload to destination +func (w *Worker) transferSinglePart(ctx context.Context, obj *Object, destKey *string) (result *TransferResult) { + var etag *string + var err error + + log.Printf("----->Downloading single part %s from %s/%s - Part %d\n", obj.BodyRange, w.cfg.SrcBucket, obj.Key, obj.PartNumber) + + body, err := w.srcClient.GetObjectPart(ctx, &obj.Key, obj.BodyRange) + if err != nil { + + var ae *types.NoSuchKey + if errors.As(err, &ae) { + log.Printf("No such key %s, the object might be deleted. Cancelling...", obj.Key) + return &TransferResult{ + status: "PART_CANCEL", + err: err, + } + } + // status = "ERROR" + return &TransferResult{ + status: "PART_ERROR", + err: err, + } + } + + // Use UploadPart for multipart upload + log.Printf("----->Uploading range: %s to %s/%s - Part %d\n", obj.BodyRange, w.cfg.DestBucket, *destKey, obj.PartNumber) + etag, err = w.desClient.UploadPart(ctx, destKey, body, &obj.UploadID, obj.PartNumber) + + body = nil // release memory + + if err != nil { + return &TransferResult{ + status: "PART_ERROR", + err: err, + } + } + + completedBytes := calculateCompletedBytes(obj.BodyRange) + + log.Printf("----->Completed %d Bytes for range: %s from %s/%s to %s/%s - Part %d\n", completedBytes, obj.BodyRange, w.cfg.SrcBucket, obj.Key, w.cfg.DestBucket, *destKey, obj.PartNumber) + return &TransferResult{ + status: "PART_DONE", + etag: etag, + } +} + +func (w *Worker) InvokeStepFunction(ctx context.Context, uploadID string, totalPartsCount int, objectKey string) error { + log.Printf("Invoke Step Function for %s with uploadID %s and totalPartsCount %d\n", objectKey, uploadID, totalPartsCount) + err := w.sfn.InvokeStepFunction(ctx, uploadID, totalPartsCount, objectKey) + return err +} diff --git a/dth/job_test.go b/dth/job_test.go new file mode 100644 index 0000000..f290bc6 --- /dev/null +++ b/dth/job_test.go @@ -0,0 +1,46 @@ +/* +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dth + +import ( + "testing" +) + +func TestCalculatePartSize(t *testing.T) { + w := Worker{} + + tests := []struct { + totalSize int64 + expectedParts int + expectedPartSize int64 + }{ + {1024 * 1024 * 1024 * 1, 21, 52428800}, // 1GB + {1024 * 1024 * 1024 * 5, 103, 52428800}, // 5GB + {1024 * 1024 * 1024 * 10, 205, 52428800}, // 10GB + {1024 * 1024 * 1024 * 100, 2048, 52428800}, // 100GB + {1024 * 1024 * 1024 * 1024, 10000, 109951163}, // 1TB + {1024 * 1024 * 1024 * 1024 * 5, 10000, 549755814}, // 5TB + } + + for _, test := range tests { + numParts, partSize := w.calculatePartSize(test.totalSize) + if numParts != test.expectedParts || partSize != test.expectedPartSize { + t.Errorf("For total size %d, expected parts: %d, expected part size: %d; got parts: %d, part size: %d, part size in MB: %d", + test.totalSize, test.expectedParts, test.expectedPartSize, numParts, partSize, partSize/1024/1024) + } + } +} diff --git a/dth/service.go b/dth/service.go index 366708d..c9cafc6 100644 --- a/dth/service.go +++ b/dth/service.go @@ -19,6 +19,7 @@ package dth import ( "context" "encoding/base64" + "encoding/json" "fmt" "log" "strconv" @@ -28,6 +29,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" dtype "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" sm "github.com/aws/aws-sdk-go-v2/service/secretsmanager" + stepFunction "github.com/aws/aws-sdk-go-v2/service/sfn" + sfnType "github.com/aws/aws-sdk-go-v2/service/sfn/types" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) @@ -41,6 +44,18 @@ type Item struct { // ExtraInfo Metadata } +// SinglePartItem holds info about the single part info to be stored in DynamoDB +type SinglePartItem struct { + UploadId string + PartNumber int + ObjectKey string + TotalPartsCount int + RetryCount int + JobStatus, Etag string + StartTimestamp, EndTimestamp, SpentTime int64 + StartTime, EndTime string +} + // DBService is a wrapper service used to interact with Amazon DynamoDB type DBService struct { tableName string @@ -58,6 +73,12 @@ type SecretService struct { client *sm.Client } +// SFNService is a wrapper service used to interact with Amazon Step Function +type SfnService struct { + sfnArn string + client *stepFunction.Client +} + // NewSecretService is a helper func to create a SecretService instance func NewSecretService(ctx context.Context) (*SecretService, error) { @@ -113,6 +134,20 @@ func NewSqsService(ctx context.Context, queueName string) (*SqsService, error) { return &SqsService, nil } +// NewSfnService is a helper func to create a SfnService instance +func NewSfnService(ctx context.Context, sfnArn string) (*SfnService, error) { + + cfg := loadDefaultConfig(ctx) + + // Create an Amazon Step Function client. + client := stepFunction.NewFromConfig(cfg) + + return &SfnService{ + sfnArn: sfnArn, + client: client, + }, nil +} + // SendMessage function sends 1 message at a time to the Queue func (ss *SqsService) SendMessage(ctx context.Context, body *string) { // log.Printf("Sending 1 Message to Queue") @@ -331,6 +366,41 @@ func (db *DBService) PutItem(ctx context.Context, o *Object) error { return err } +// PutSinglePartItem is a function to creates a new item, or replaces an old item with a new item in DynamoDB +// Restart a transfer of an single part will replace the old item with new info +func (db *DBService) PutSinglePartItem(ctx context.Context, o *Object) error { + + item := &SinglePartItem{ + UploadId: o.UploadID, + PartNumber: o.PartNumber, + TotalPartsCount: o.TotalPartsCount, + ObjectKey: o.Key, + RetryCount: 0, + JobStatus: "STARTED", + StartTime: time.Now().Format("2006/01/02 15:04:05"), + StartTimestamp: time.Now().Unix(), + } + + itemAttr, err := attributevalue.MarshalMap(item) + + if err != nil { + log.Printf("Unable to Marshal DynamoDB attributes for %s - %s\n", o.Key, err.Error()) + } else { + input := &dynamodb.PutItemInput{ + TableName: &db.tableName, + Item: itemAttr, + } + + _, err = db.client.PutItem(ctx, input) + + if err != nil { + log.Printf("Failed to put item for %s in DynamoDB - %s\n", o.Key, err.Error()) + } + } + + return err +} + // UpdateItem is a function to update an item in DynamoDB func (db *DBService) UpdateItem(ctx context.Context, key *string, result *TransferResult) error { // log.Printf("Update item for %s in DynamoDB\n", *key) @@ -372,6 +442,48 @@ func (db *DBService) UpdateItem(ctx context.Context, key *string, result *Transf return err } +// UpdateItem is a function to update an single part item in DynamoDB +func (db *DBService) UpdateSinglePartItem(ctx context.Context, o *Object, result *TransferResult) error { + etag := "" + if result.etag != nil { + etag = *result.etag + } + + expr := "set JobStatus = :s, Etag = :tg, EndTime = :et, EndTimestamp = :etm, SpentTime = :etm - StartTimestamp" + if result.status == "PART_ERROR" { + expr += ", RetryCount = RetryCount + :inc" + } + + input := &dynamodb.UpdateItemInput{ + TableName: &db.tableName, + Key: map[string]dtype.AttributeValue{ + "UploadId": &dtype.AttributeValueMemberS{Value: o.UploadID}, + "PartNumber": &dtype.AttributeValueMemberN{Value: fmt.Sprintf("%d", o.PartNumber)}, + }, + ExpressionAttributeValues: map[string]dtype.AttributeValue{ + ":s": &dtype.AttributeValueMemberS{Value: result.status}, + ":tg": &dtype.AttributeValueMemberS{Value: etag}, + ":et": &dtype.AttributeValueMemberS{Value: time.Now().Format("2006/01/02 15:04:05")}, + ":etm": &dtype.AttributeValueMemberN{Value: fmt.Sprintf("%d", time.Now().Unix())}, + }, + ReturnValues: "NONE", + UpdateExpression: &expr, + } + + if result.status == "PART_ERROR" { + input.ExpressionAttributeValues[":inc"] = &dtype.AttributeValueMemberN{ + Value: fmt.Sprintf("%d", 1), + } + } + + _, err := db.client.UpdateItem(ctx, input) + + if err != nil { + log.Printf("Failed to update item for %s - UploadId %s - Part number %d in Single Part DynamoDB - %s\n", o.Key, o.UploadID, o.PartNumber, err.Error()) + } + return err +} + // UpdateSequencer is a function to update an item with new Sequencer in DynamoDB func (db *DBService) UpdateSequencer(ctx context.Context, key, sequencer *string) error { // log.Printf("Update Sequencer for %s in DynamoDB\n", *key) @@ -437,3 +549,64 @@ func (db *DBService) QueryItem(ctx context.Context, key *string) (*Item, error) return item, nil } + +// QueryItem is a function to query an item by Key in DynamoDB +func (sfn *SfnService) InvokeStepFunction(ctx context.Context, uploadID string, totalPartsCount int, objectKey string) error { + // log.Printf("Query item for %s in DynamoDB\n", *key) + + input := map[string]interface{}{ + "arguments": map[string]interface{}{ + "uploadID": uploadID, + "totalPartsCount": totalPartsCount, + "objectKey": objectKey, + }, + } + inputJSON := generateJSONInput(input) + + _, err := sfn.client.StartExecution(ctx, &stepFunction.StartExecutionInput{ + StateMachineArn: &sfn.sfnArn, + Input: &inputJSON, + }) + + if err != nil { + log.Printf("Error invoking Step Function for %s 's uploadId %s - %s\n", objectKey, uploadID, err.Error()) + } + + return err +} + +// CountCurrentRunningTasks is a function to count the number of current running tasks in Step Function +func (sfn *SfnService) CountCurrentRunningTasks(ctx context.Context) (int, error) { + response, err := sfn.client.ListExecutions(ctx, &stepFunction.ListExecutionsInput{ + StateMachineArn: &sfn.sfnArn, + StatusFilter: sfnType.ExecutionStatusRunning, + }) + if err != nil { + log.Printf("Error listing running tasks in Step Function - %s\n", err.Error()) + return 0, err + } + + return len(response.Executions), nil +} + +// IsNoRunningTask is a function to check if there is no running task in Step Function +func (sfn *SfnService) IsNoRunningTask(ctx context.Context) (isNoRunning bool) { + isNoRunning = false + runningTasksCount, err := sfn.CountCurrentRunningTasks(ctx) + if err != nil { + log.Printf("Error listing running tasks in Step Function %s - %s\n", sfn.sfnArn, err.Error()) + return + } + + log.Printf("Giant object merging Step Function %s has %d not competed task(s)\n", sfn.sfnArn, runningTasksCount) + + if runningTasksCount <= 0 { + isNoRunning = true + } + return +} + +func generateJSONInput(inputMap map[string]interface{}) string { + jsonBytes, _ := json.Marshal(inputMap) + return string(jsonBytes) +} diff --git a/go.mod b/go.mod index 715dce9..606c8fb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module golang.a2z.com/dthcli go 1.16 require ( - github.com/aws/aws-sdk-go-v2 v1.8.1 + github.com/aws/aws-sdk-go-v2 v1.21.0 github.com/aws/aws-sdk-go-v2/config v1.6.1 github.com/aws/aws-sdk-go-v2/credentials v1.3.3 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.1.1 @@ -11,8 +11,9 @@ require ( github.com/aws/aws-sdk-go-v2/service/dynamodb v1.3.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.13.0 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.3.1 + github.com/aws/aws-sdk-go-v2/service/sfn v1.19.5 github.com/aws/aws-sdk-go-v2/service/sqs v1.4.1 - github.com/aws/smithy-go v1.7.0 + github.com/aws/smithy-go v1.14.2 github.com/spf13/cobra v1.1.3 github.com/spf13/viper v1.7.1 ) diff --git a/go.sum b/go.sum index f2b487e..800cfda 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj github.com/aws/aws-sdk-go-v2 v1.6.0/go.mod h1:tI4KhsR5VkzlUa2DZAdwx7wCAYGwkZZ1H31PYrBFx1w= github.com/aws/aws-sdk-go-v2 v1.8.1 h1:GcFgQl7MsBygmeeqXyV1ivrTEmsVz/rdFJaTcltG9ag= github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0= +github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc= +github.com/aws/aws-sdk-go-v2 v1.21.0/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= github.com/aws/aws-sdk-go-v2/config v1.6.1 h1:qrZINaORyr78syO1zfD4l7r4tZjy0Z1l0sy4jiysyOM= github.com/aws/aws-sdk-go-v2/config v1.6.1/go.mod h1:t/y3UPu0XEDy0cEw6mvygaBQaPzWiYAxfP2SzgtvclA= github.com/aws/aws-sdk-go-v2/credentials v1.3.3 h1:A13QPatmUl41SqUfnuT3V0E3XiNGL6qNTOINbE8cZL4= @@ -33,6 +35,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1 h1:rc+fRGvlKbeSd9IFhFS1KWBs github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1/go.mod h1:+GTydg3uHmVlQdkRoetz6VHKbOMEYof70m19IpMLifc= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.4.1 h1:iJMRP1dYMC0FNzPmNKJOEv7ZqyIfQl1H0+1Dpv+z3Bs= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.4.1/go.mod h1:Ir1Mb3X4bIw0dxJQVW55CdXGlVy+t+I9zZpqPJKjHto= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 h1:22dGT7PneFMx4+b3pz7lMTRyN8ZKH7M2cW4GP9yUS2g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41/go.mod h1:CrObHAuPneJBlfEJ5T3szXOUkLEThaGfvnhTf33buas= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 h1:SijA0mgjV8E+8G45ltVHs0fvKpTj8xmZJ3VwhGKtUSI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35/go.mod h1:SJC1nEVVva1g3pHAIdCp7QsRIkMmLAgoDquQ9Rr8kYw= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1 h1:IkqRRUZTKaS16P2vpX+FNc2jq3JWa3c478gykQp4ow4= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1/go.mod h1:Pv3WenDjI0v2Jl7UaMFIIbPOBbhn33RmmAmGgkXDoqY= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.3.1 h1:uAn7miAtNHewvTnGbXOA0dW1yE/3spgq9lB2D3sfpeQ= @@ -50,6 +56,8 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.13.0 h1:2oMLrNpOSpkDTocIVv3Fut1XrmlbK github.com/aws/aws-sdk-go-v2/service/s3 v1.13.0/go.mod h1:Tzxhu3GnCpj45WJqXyxcLF2gUHzTcmY7CzpQ9x9KVls= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.3.1 h1:atHdsCczZyM/y9QIoCQnxudoKk8+ya2EPIplDOofkjw= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.3.1/go.mod h1:ayQUSrG5QyIl2jRSB0YnoJ1e9swNxsBWaCK3hNI2caI= +github.com/aws/aws-sdk-go-v2/service/sfn v1.19.5 h1:uGuCRiB/3dCGb0iInxJJJTeMvTxM7wIdFv9R0uSFLKQ= +github.com/aws/aws-sdk-go-v2/service/sfn v1.19.5/go.mod h1:k+qjkSU3GMlQ2w66KFtWHQF74MG+LhwhSHX60FBp4rA= github.com/aws/aws-sdk-go-v2/service/sqs v1.4.1 h1:BBW0EAZrJh3pesbs9m7ghgC8kylLjTPmWN3yG7POjnA= github.com/aws/aws-sdk-go-v2/service/sqs v1.4.1/go.mod h1:iNDdLqeQrLCNX00hEi9lh5nNtQmjpigLgYwaTBLa0Ss= github.com/aws/aws-sdk-go-v2/service/sso v1.3.3 h1:K2gCnGvAASpz+jqP9iyr+F/KNjmTYf8aWOtTQzhmZ5w= @@ -59,6 +67,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.2/go.mod h1:RBhoMJB8yFToaCnbe0jNq5 github.com/aws/smithy-go v1.4.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg= github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ= +github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= @@ -102,6 +112,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=