Skip to content

Commit

Permalink
Enhancing Transfer Performance: Parallel Multipart Upload for Distrib…
Browse files Browse the repository at this point in the history
…uting Large Files Across the Cluster (#16) (#17)

* feat: support large file transfer in cluster mode

* fix: fix the src prefix in single part transfer

* fix: fix the src prefix in single part transfer 2

* fix: fix the multipart split issue

* chore: using sfn to controll the part merging

* feat: add network throttling detect

* chore: remove the throttle detect feature and add sfn check

* fix: fix the updateItem issue

* doc: update the readme for giant object transfer
  • Loading branch information
YikaiHu authored Jan 4, 2024
1 parent a1e55b4 commit 18b7f76
Show file tree
Hide file tree
Showing 12 changed files with 631 additions and 28 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ENV SOURCE_TYPE Amazon_S3

ENV JOB_TABLE_NAME ''
ENV JOB_QUEUE_NAME ''
ENV SINGLE_PART_TABLE_NAME ''

ENV SRC_BUCKET ''
ENV SRC_PREFIX ''
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ destRegion: cn-north-1

jobTableName: test-table
jobQueueName: test-queue

singlePartTableName: single-part-test-table
sfnArn: sfn-arn
```
The sfaArn is a Step Function created by [Data Transfer Hub S3 Plugin](https://github.com/awslabs/data-transfer-hub/blob/main/docs/S3_PLUGIN.md).
By default, this tool will try to read a `config.yaml` in the same folder, if you create the configuration file in a different folder or with a different file name, please use extra option `--config xxx.yaml` to load your config file.


Expand Down
7 changes: 7 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func initConfig() {

viper.SetDefault("options.chunkSize", dth.DefaultChunkSize)
viper.SetDefault("options.multipartThreshold", dth.DefaultMultipartThreshold)
viper.SetDefault("options.giantFileThreshold", dth.DefaultGiantFileThreshold)
viper.SetDefault("options.maxKeys", dth.DefaultMaxKeys)
viper.SetDefault("options.messageBatchSize", dth.DefaultMessageBatchSize)
viper.SetDefault("options.finderDepth", dth.DefaultFinderDepth)
Expand Down Expand Up @@ -108,10 +109,13 @@ func initConfig() {

viper.BindEnv("jobTableName", "JOB_TABLE_NAME")
viper.BindEnv("jobQueueName", "JOB_QUEUE_NAME")
viper.BindEnv("singlePartTableName", "SINGLE_PART_TABLE_NAME")
viper.BindEnv("sfnArn", "SFN_ARN")

viper.BindEnv("options.maxKeys", "MAX_KEYS")
viper.BindEnv("options.chunkSize", "CHUNK_SIZE")
viper.BindEnv("options.multipartThreshold", "MULTIPART_THRESHOLD")
viper.BindEnv("options.giantFileThreshold", "GIANT_FILE_THRESHOLD")
viper.BindEnv("options.messageBatchSize", "MESSAGE_BATCH_SIZE")
viper.BindEnv("options.finderDepth", "FINDER_DEPTH")
viper.BindEnv("options.finderNumber", "FINDER_NUMBER")
Expand All @@ -138,6 +142,7 @@ func initConfig() {
options := &dth.JobOptions{
ChunkSize: viper.GetInt("options.chunkSize"),
MultipartThreshold: viper.GetInt("options.multipartThreshold"),
GiantFileThreshold: viper.GetInt("options.giantFileThreshold"),
MaxKeys: viper.GetInt32("options.maxKeys"),
MessageBatchSize: viper.GetInt("options.messageBatchSize"),
FinderDepth: viper.GetInt("options.finderDepth"),
Expand Down Expand Up @@ -166,6 +171,8 @@ func initConfig() {
DestInCurrentAccount: viper.GetBool("destInCurrentAccount"),
JobTableName: viper.GetString("jobTableName"),
JobQueueName: viper.GetString("jobQueueName"),
SinglePartTableName: viper.GetString("singlePartTableName"),
SfnArn: viper.GetString("sfnArn"),
JobOptions: options,
}

Expand Down
3 changes: 3 additions & 0 deletions config-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ destAcl: bucket-owner-full-control

jobTableName: test-table
jobQueueName: test-queue
singlePartTableName: single-part-test-table
sfnArn: sfn-arn

options:
chunkSize: 5
multipartThreshold: 10
giantFileThreshold: 1024
maxKeys: 1000
messageBatchSize: 10
finderDepth: 0
Expand Down
40 changes: 40 additions & 0 deletions dth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Client interface {
// READ
HeadObject(ctx context.Context, key *string) *Metadata
GetObject(ctx context.Context, key *string, size, start, chunkSize int64, version string) ([]byte, error)
GetObjectPart(ctx context.Context, key *string, bodyRange string) ([]byte, error)
ListObjects(ctx context.Context, continuationToken, prefix *string, maxKeys int32) ([]*Object, error)
ListCommonPrefixes(ctx context.Context, depth int, maxKeys int32) (prefixes []*string)
ListParts(ctx context.Context, key, uploadID *string) (parts map[int]*Part)
Expand Down Expand Up @@ -209,6 +210,45 @@ func (c *S3Client) GetObject(ctx context.Context, key *string, size, start, chun

}

// GetObjectPart is a function to get (download) object from Amazon S3
func (c *S3Client) GetObjectPart(ctx context.Context, key *string, bodyRange string) ([]byte, error) {
// log.Printf("S3> Downloading %s with %d bytes start from %d\n", key, size, start)
var input *s3.GetObjectInput

if c.isPayerRequest {
input = &s3.GetObjectInput{
Bucket: &c.bucket,
Key: key,
Range: &bodyRange,
RequestPayer: types.RequestPayerRequester,
}
} else {
input = &s3.GetObjectInput{
Bucket: &c.bucket,
Key: key,
Range: &bodyRange,
}
}

output, err := c.client.GetObject(ctx, input, getClientCredentialsModifyFn(c.isSrcClient, SRC_CRED, DST_CRED))
if err != nil {
log.Printf("S3> Unable to download %s with range: %s - %s\n", *key, bodyRange, err.Error())
return nil, err
}

defer output.Body.Close()

// Read response body
s, err := io.ReadAll(output.Body)

if err != nil {
log.Printf("S3> Unable to read the content of %s - %s\n", *key, err.Error())
return nil, err
}
return s, nil

}

// Internal func to call API to list objects.
func (c *S3Client) listObjectFn(ctx context.Context, continuationToken, prefix, delimiter *string, maxKeys int32) (*s3.ListObjectsV2Output, error) {

Expand Down
51 changes: 48 additions & 3 deletions dth/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ var (

// Object represents an object to be replicated.
type Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
Sequencer string `json:"sequencer,omitempty"`
Key string `json:"key"`
Size int64 `json:"size"`
Sequencer string `json:"sequencer,omitempty"`
PartNumber int `json:"partNumber,omitempty"`
TotalPartsCount int `json:"totalPartsCount,omitempty"`
UploadID string `json:"uploadID,omitempty"`
BodyRange string `json:"bodyRange,omitempty"`
}

// S3Event represents a basic structure of a S3 Event Message
Expand All @@ -63,6 +67,15 @@ type S3Event struct {
}
}

// SinglePartTransferEvent represents a basic structure of a SinglePart Transfer Event Message
type SinglePartTransferEvent struct {
ObjectKey string `json:"objectKey"`
PartNumber int `json:"partNumber"`
TotalPartsCount int `json:"totalPartsCount"`
UploadID string `json:"uploadID"`
BodyRange string `json:"bodyRange"`
}

// Part represents a part for multipart upload
type Part struct {
partNumber int
Expand Down Expand Up @@ -132,6 +145,20 @@ func newS3Event(str *string) (e *S3Event) {
return
}

// Helper function to create single part transfer event base on Json string
func newSinglePartTransferEvent(str *string) (e *SinglePartTransferEvent) {

e = new(SinglePartTransferEvent)
err := json.Unmarshal([]byte(*str), e)

if err != nil {
log.Printf("Unable to convert string to single part transfer job - %s", err.Error())
return nil
}
// log.Printf("Key: %s, Size: %d\n", m.Key, m.Size)
return
}

func getHex(str *string) int64 {
i64, _ := strconv.ParseInt(*str, 16, 64)
return i64
Expand Down Expand Up @@ -169,3 +196,21 @@ func appendPrefix(key, prefix *string) *string {
newkey := fmt.Sprintf("%s%s%s", *prefix, delimiter, *key)
return &newkey
}

func calculateCompletedBytes(bodyRange string) int64 {
// bodyRange format: "bytes=startByte-endByte"
parts := strings.Split(bodyRange, "=")
if len(parts) != 2 {
return 0
}

rangeParts := strings.Split(parts[1], "-")
if len(rangeParts) != 2 {
return 0
}

startByte, _ := strconv.ParseInt(rangeParts[0], 10, 64)
endByte, _ := strconv.ParseInt(rangeParts[1], 10, 64)

return endByte - startByte + 1
}
14 changes: 9 additions & 5 deletions dth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ const (
// DefaultMaxKeys is the maximum number of keys returned per listing request, default is 1000
DefaultMaxKeys int32 = 1000

// DefaultMultipartThreshold is the threshold size (in MB) to determine to use multipart upload or not.
// DefaultMultipartThreshold is the threshold size (in MB) to determine to use multipart upload or not (in single worker node).
// When object size is greater or equals to MultipartThreshold, multipart upload will be used.
DefaultMultipartThreshold int = 10

// DefaultGiantFileThreshold is the threshold size (in MB) to determine to use multipart upload or not (in whole worker cluster).
// When object size is greater or equals to GiantFileThreshold, the object will to split and transferred by the whole cluster.
DefaultGiantFileThreshold int = 1024

// DefaultChunkSize is the chunk size (in MB) for each part when using multipart upload
DefaultChunkSize int = 5

Expand All @@ -59,16 +63,16 @@ const (

// JobOptions is General Job Info
type JobOptions struct {
ChunkSize, MultipartThreshold, MessageBatchSize, FinderDepth, FinderNumber, WorkerNumber int
MaxKeys int32
IncludeMetadata bool
ChunkSize, MultipartThreshold, GiantFileThreshold, MessageBatchSize, FinderDepth, FinderNumber, WorkerNumber int
MaxKeys int32
IncludeMetadata bool
}

// JobConfig is General Job Info
type JobConfig struct {
SrcType, SrcBucket, SrcPrefix, SrcPrefixList, SrcRegion, SrcEndpoint, SrcCredential string
DestBucket, DestPrefix, DestRegion, DestCredential, DestStorageClass, DestAcl string
JobTableName, JobQueueName string
JobTableName, JobQueueName, SinglePartTableName, SfnArn string
SrcInCurrentAccount, DestInCurrentAccount, SkipCompare, PayerRequest bool
*JobOptions
}
Expand Down
Loading

0 comments on commit 18b7f76

Please sign in to comment.