diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d92175db6..71a71005de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## 4.47.1 - 2025-02-11 + +### Fixed + +- Fix an issue with left over staging files being left around in the `snowflake_streaming` output. (@rockwotj) + ## 4.47.0 - 2025-02-07 ### Added diff --git a/internal/impl/snowflake/streaming/uploader.go b/internal/impl/snowflake/streaming/uploader.go index d82d81ca34..0d1c1b10b7 100644 --- a/internal/impl/snowflake/streaming/uploader.go +++ b/internal/impl/snowflake/streaming/uploader.go @@ -28,7 +28,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/cenkalti/backoff/v4" "golang.org/x/oauth2" @@ -70,9 +69,8 @@ func newUploader(fileLocationInfo fileLocationInfo) (uploader, error) { if err != nil { return nil, err } - uploader := manager.NewUploader(client) return &s3Uploader{ - client: uploader, + client: client, bucket: bucket, pathPrefix: pathPrefix, }, nil @@ -146,19 +144,20 @@ func (u *azureUploader) upload(ctx context.Context, path string, encrypted, md5H } type s3Uploader struct { - client *manager.Uploader + client *s3.Client bucket, pathPrefix string } func (u *s3Uploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte, metadata map[string]string) error { input := &s3.PutObjectInput{ - Bucket: &u.bucket, - Key: aws.String(filepath.Join(u.pathPrefix, path)), - Body: bytes.NewReader(encrypted), - Metadata: metadata, - ContentMD5: aws.String(base64.StdEncoding.EncodeToString(md5Hash)), + Bucket: &u.bucket, + Key: aws.String(filepath.Join(u.pathPrefix, path)), + ContentLength: aws.Int64(int64(len(encrypted))), + Body: bytes.NewReader(encrypted), + Metadata: metadata, + ContentMD5: aws.String(base64.StdEncoding.EncodeToString(md5Hash)), } - _, err := u.client.Upload(ctx, input) + _, err := u.client.PutObject(ctx, input) return err } @@ -174,6 +173,9 @@ func (u *gcsUploader) upload(ctx context.Context, path string, encrypted, md5Has ow := object.NewWriter(ctx) ow.Metadata = metadata ow.MD5 = md5Hash + // Prevent resumable uploads and staging files in the bucket by removing the chunk size. + // https://cloud.google.com/storage/docs/uploading-objects-from-memory#storage-upload-object-from-memory-go + ow.ChunkSize = 0 for len(encrypted) > 0 { n, err := ow.Write(encrypted) if err != nil {