Skip to content

Commit

Permalink
upgrade nio to v3, fix large files upload to s3 issue, add verbose lo…
Browse files Browse the repository at this point in the history
…gging

Signed-off-by: aharonh <[email protected]>
  • Loading branch information
aharonh committed Jun 12, 2024
1 parent cd1c414 commit 55bd6e9
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# HAS_DEP := $(shell command -v dep;)
# DEP_VERSION := v0.5.0
GIT_TAG := $(shell git describe --tags --always)
GIT_TAG := $(shell git tag | tail -n 1)
GIT_COMMIT := $(shell git rev-parse --short HEAD)
LDFLAGS := "-X main.GitTag=${GIT_TAG} -X main.GitCommit=${GIT_COMMIT}"
DIST := $(CURDIR)/dist
Expand Down
16 changes: 11 additions & 5 deletions cmd/skbn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ func NewRootCmd(args []string) *cobra.Command {
}

type cpCmd struct {
src string
dst string
parallel int
bufferSize float64
src string
dst string
parallel int
bufferSize float64
s3partSize int64
s3maxUploadParts int
verbose bool

out io.Writer
}
Expand All @@ -52,7 +55,7 @@ func NewCpCmd(out io.Writer) *cobra.Command {
Short: "Copy files or directories Kubernetes and Cloud storage",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if err := skbn.Copy(c.src, c.dst, c.parallel, c.bufferSize); err != nil {
if err := skbn.Copy(c.src, c.dst, c.parallel, c.bufferSize, c.s3partSize, c.s3maxUploadParts, c.verbose); err != nil {
log.Fatal(err)
}
},
Expand All @@ -63,6 +66,9 @@ func NewCpCmd(out io.Writer) *cobra.Command {
f.StringVar(&c.dst, "dst", "", "path to copy to. Example: s3://<bucketName>/path/to/copyto")
f.IntVarP(&c.parallel, "parallel", "p", 1, "number of files to copy in parallel. set this flag to 0 for full parallelism")
f.Float64VarP(&c.bufferSize, "buffer-size", "b", 6.75, "in memory buffer size (MB) to use for files copy (buffer per file)")
f.Int64VarP(&c.s3partSize, "s3-part-size", "s", 128*1024*1024, "size of each part in bytes for multipart upload to S3. Default is 128MB. Consider that the default MaxUploadParts is 10000 so max file size with default s3 settings is 1.28TB.")
f.IntVarP(&c.s3maxUploadParts, "s3-max-upload-parts", "m", 10000, "maximum number of parts for multipart upload to S3. Default is 10000.")
f.BoolVarP(&c.verbose, "verbose", "v", false, "verbose output")

cmd.MarkFlagRequired("src")
cmd.MarkFlagRequired("dst")
Expand Down
9 changes: 6 additions & 3 deletions examples/code/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
func main() {
src := "k8s://namespace/pod/container/path/to/copy/from"
dst := "s3://bucket/path/to/copy/to"
parallel := 0 // all at once
bufferSize := 1.0 // 1GB of in memory buffer size
parallel := 0 // all at once
bufferSize := 1.0 // 1GB of in memory buffer size
s3partSize := int64(5 * 1024 * 1024) // 5MB
s3maxUploadParts := 10000
verbose := true

if err := skbn.Copy(src, dst, parallel, bufferSize); err != nil {
if err := skbn.Copy(src, dst, parallel, bufferSize, s3partSize, s3maxUploadParts, verbose); err != nil {
log.Fatal(err)
}
}
16 changes: 10 additions & 6 deletions examples/in-cluster/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,28 @@ spec:
labels:
name: test-skbn
app: test-skbn
spotinst.io/restrict-scale-down: "true"
annotations:
iam.amazonaws.com/role: test-dbbackup-operator
iam.amazonaws.com/role: test-dbbackup-operator

spec:
restartPolicy: OnFailure
serviceAccount: opsflow
containers:
- name: test-skbn
image: nuvo/skbn:0.4.2
image: nuvo/skbn:0.5.6
resources:
limits:
memory: "4Gi"
cpu: "4000m"
memory: "3Gi"
cpu: "2000m"
command: ["skbn"]
args: [
"cp",
"--parallel", "15",
"--parallel", "1",
"--src", "k8s://invu-sre/cassandra-primary-r1-sts-0/cassandra/var/lib/cassandra/test.file",
"--dst", "s3://nuvo-dev-dbbackup/test.file"
"--dst", "s3://nuvo-dev-dbbackup/test.file",
"--buffer-size", "512",
"--verbose"
]
env:
- name: AWS_REGION
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aws/aws-sdk-go v1.53.20
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
github.com/spf13/cobra v1.8.0
gopkg.in/djherbis/nio.v2 v2.0.3
k8s.io/api v0.0.0-20181204000039-89a74a8d264d
k8s.io/apimachinery v0.0.0-20181127025237-2b1284ed4c93
k8s.io/client-go v10.0.0+incompatible
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ github.com/aws/aws-sdk-go v1.53.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3Tj
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/djherbis/buffer v1.1.0/go.mod h1:VwN8VdFkMY0DCALdY8o00d3IZ6Amz/UNVMWcSaJT44o=
github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ=
github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE=
github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4=
github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/elazarl/goproxy v0.0.0-20231117061959-7cc037d33fb5 h1:m62nsMU279qRD9PQSWD1l66kmkXzuYcnVJqL4XLeV2M=
Expand Down Expand Up @@ -115,8 +118,6 @@ google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/djherbis/nio.v2 v2.0.3 h1:GV76XfYUQjScV0wVKruEyqN48jrng+g63ckfkDOnlHA=
gopkg.in/djherbis/nio.v2 v2.0.3/go.mod h1:APzEZFGm9Q+QzSl8yResRU/4xnWJtY3onKsxwnQdeNM=
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
Expand Down
4 changes: 2 additions & 2 deletions pkg/skbn/abs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string
}

// DownloadFromAbs downloads a single file from azure blob storage
func DownloadFromAbs(ctx context.Context, iClient interface{}, path string, writer io.Writer) error {
func DownloadFromAbs(ctx context.Context, iClient interface{}, path string, writer io.Writer, verbose bool) error {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func DownloadFromAbs(ctx context.Context, iClient interface{}, path string, writ
}

// UploadToAbs uploads a single file to azure blob storage
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, reader io.Reader) error {
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, reader io.Reader, verbose bool) error {
pSplit := strings.Split(toPath, "/")
if err := validateAbsPath(pSplit); err != nil {
return err
Expand Down
16 changes: 14 additions & 2 deletions pkg/skbn/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -105,7 +106,7 @@ func GetListOfFilesFromK8s(iClient interface{}, path, findType, findName string)
}

// DownloadFromK8s downloads a single file from Kubernetes
func DownloadFromK8s(iClient interface{}, path string, writer io.Writer) error {
func DownloadFromK8s(iClient interface{}, path string, writer io.Writer, verbose bool) error {
client := *iClient.(*K8sClient)
pSplit := strings.Split(path, "/")
if err := validateK8sPath(pSplit); err != nil {
Expand All @@ -118,13 +119,24 @@ func DownloadFromK8s(iClient interface{}, path string, writer io.Writer) error {
attempt := 0
for attempt < attempts {
attempt++
if verbose {
log.Printf("Attempt %d to download file from %s/%s/%s:%s", attempt, namespace, podName, containerName, pathToCopy)
}

stderr, err := Exec(client, namespace, podName, containerName, command, nil, writer)

if (verbose && len(stderr) != 0) || err != nil {
log.Printf("STDERR: %s", stderr)
log.Printf("Error: %v", err)
}
if attempt == attempts {
log.Printf("this was last attempt")
if len(stderr) != 0 {
log.Printf("STDERR: %s", stderr)
return fmt.Errorf("STDERR: " + (string)(stderr))
}
if err != nil {
log.Printf("Error: %v", err)
return err
}
}
Expand All @@ -138,7 +150,7 @@ func DownloadFromK8s(iClient interface{}, path string, writer io.Writer) error {
}

// UploadToK8s uploads a single file to Kubernetes
func UploadToK8s(iClient interface{}, toPath, fromPath string, reader io.Reader) error {
func UploadToK8s(iClient interface{}, toPath, fromPath string, reader io.Reader, verbose bool) error {
client := *iClient.(*K8sClient)
pSplit := strings.Split(toPath, "/")
if err := validateK8sPath(pSplit); err != nil {
Expand Down
57 changes: 54 additions & 3 deletions pkg/skbn/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package skbn
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -80,10 +81,13 @@ func GetListOfFilesFromS3(iClient interface{}, path string) ([]string, error) {
}

// DownloadFromS3 downloads a single file from S3
func DownloadFromS3(iClient interface{}, path string, writer io.Writer) error {
func DownloadFromS3(iClient interface{}, path string, writer io.Writer, verbose bool) error {
s := iClient.(*session.Session)
pSplit := strings.Split(path, "/")
if err := validateS3Path(pSplit); err != nil {
if verbose {
log.Printf("validate s3 path error: %s", err)
}
return err
}
bucket, s3Path := initS3Variables(pSplit)
Expand All @@ -93,6 +97,10 @@ func DownloadFromS3(iClient interface{}, path string, writer io.Writer) error {
for attempt < attempts {
attempt++

if verbose {
log.Printf("Attempt %d to download file from s3://%s/%s", attempt, bucket, s3Path)
}

downloader := s3manager.NewDownloader(s)
downloader.Concurrency = 1 // support writerWrapper

Expand All @@ -101,8 +109,19 @@ func DownloadFromS3(iClient interface{}, path string, writer io.Writer) error {
Bucket: aws.String(bucket),
Key: aws.String(s3Path),
})

if verbose {
log.Printf("Downloaded file from s3://%s/%s", bucket, s3Path)
}
if err != nil {
if verbose {
log.Printf("Error: %v", err)
log.Printf("Attempt: %v", attempt)
}
if attempt == attempts {
if verbose {
log.Printf("This was last attempt")
}
return err
}
utils.Sleep(attempt)
Expand All @@ -123,10 +142,13 @@ func (ww writerWrapper) WriteAt(p []byte, off int64) (n int, err error) {
}

// UploadToS3 uploads a single file to S3
func UploadToS3(iClient interface{}, toPath, fromPath string, reader io.Reader) error {
func UploadToS3(iClient interface{}, toPath, fromPath string, reader io.Reader, s3partSize int64, s3maxUploadParts int, verbose bool) error {
s := iClient.(*session.Session)
pSplit := strings.Split(toPath, "/")
if err := validateS3Path(pSplit); err != nil {
if verbose {
log.Printf("validate s3 path error: %s", err)
}
return err
}
if len(pSplit) == 1 {
Expand All @@ -140,15 +162,34 @@ func UploadToS3(iClient interface{}, toPath, fromPath string, reader io.Reader)
for attempt < attempts {
attempt++

uploader := s3manager.NewUploader(s)
if verbose {
log.Printf("Attempt %d to upload file to s3://%s/%s", attempt, bucket, s3Path)
}

// uploader := s3manager.NewUploader(s)
uploader := s3manager.NewUploader(s, func(u *s3manager.Uploader) {
u.PartSize = s3partSize
u.MaxUploadParts = s3maxUploadParts
})

_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(s3Path),
Body: reader,
})

if verbose {
log.Printf("Uploaded file to s3://%s/%s", bucket, s3Path)
}
if err != nil {
if verbose {
log.Printf("Error: %v", err)
log.Printf("Attempt: %v", attempt)
}
if attempt == attempts {
if verbose {
log.Printf("This was last attempt")
}
return err
}
utils.Sleep(attempt)
Expand All @@ -160,6 +201,16 @@ func UploadToS3(iClient interface{}, toPath, fromPath string, reader io.Reader)
return nil
}

// calculatePartSize calculates an appropriate part size for the multipart upload
func calculatePartSize(fileSize int64) int64 {
const maxParts = 10000
partSize := fileSize / maxParts
if partSize < 5*1024*1024 {
partSize = 5 * 1024 * 1024 // Minimum part size of 5 MB
}
return partSize
}

func getNewSession() (*session.Session, error) {

awsConfig := &aws.Config{}
Expand Down
Loading

0 comments on commit 55bd6e9

Please sign in to comment.