diff --git a/api-datatypes.go b/api-datatypes.go index 09b03af26..5a8d473c6 100644 --- a/api-datatypes.go +++ b/api-datatypes.go @@ -84,6 +84,12 @@ type UploadInfo struct { // not to be confused with `Expires` HTTP header. Expiration time.Time ExpirationRuleID string + + // Verified checksum values, if any. + ChecksumCRC32 string + ChecksumCRC32C string + ChecksumSHA1 string + ChecksumSHA256 string } // RestoreInfo contains information of the restore operation of an archived object @@ -148,6 +154,12 @@ type ObjectInfo struct { Restore *RestoreInfo + // Checksum values + ChecksumCRC32 string + ChecksumCRC32C string + ChecksumSHA1 string + ChecksumSHA256 string + // Error Err error `json:"-"` } diff --git a/api-get-options.go b/api-get-options.go index 184ef9f86..08ae95c42 100644 --- a/api-get-options.go +++ b/api-get-options.go @@ -38,6 +38,12 @@ type GetObjectOptions struct { ServerSideEncryption encrypt.ServerSide VersionID string PartNumber int + + // Include any checksums, if object was uploaded with checksum. + // For multipart objects this is a checksum of part checksums. + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + Checksum bool + // To be not used by external applications Internal AdvancedGetOptions } @@ -60,6 +66,9 @@ func (o GetObjectOptions) Header() http.Header { if o.Internal.ReplicationProxyRequest != "" { headers.Set(minIOBucketReplicationProxyRequest, o.Internal.ReplicationProxyRequest) } + if o.Checksum { + headers.Set("x-amz-checksum-mode", "ENABLED") + } return headers } diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index abcbd2981..5fec17a1c 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -24,6 +24,7 @@ import ( "encoding/hex" "encoding/xml" "fmt" + "hash/crc32" "io" "io/ioutil" "net/http" @@ -79,11 +80,23 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj return UploadInfo{}, err } + // Choose hash algorithms to be calculated by hashCopyN, + // avoid sha256 with non-v4 signature request or + // HTTPS connection. + hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256) + if len(hashSums) == 0 { + if opts.UserMetadata == nil { + opts.UserMetadata = make(map[string]string, 1) + } + opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" + } + // Initiate a new multipart upload. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) if err != nil { return UploadInfo{}, err } + delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") defer func() { if err != nil { @@ -100,12 +113,12 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj // Create a buffer. buf := make([]byte, partSize) + // Create checksums + // CRC32C is ~50% faster on AMD64 @ 30GB/s + var crcBytes []byte + customHeader := make(http.Header) + crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) for partNumber <= totalPartsCount { - // Choose hash algorithms to be calculated by hashCopyN, - // avoid sha256 with non-v4 signature request or - // HTTPS connection. - hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256) - length, rErr := readFull(reader, buf) if rErr == io.EOF && partNumber > 1 { break @@ -131,18 +144,23 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj md5Base64 string sha256Hex string ) + if hashSums["md5"] != nil { md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"]) } if hashSums["sha256"] != nil { sha256Hex = hex.EncodeToString(hashSums["sha256"]) } + if len(hashSums) == 0 { + crc.Reset() + crc.Write(buf[:length]) + cSum := crc.Sum(nil) + customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) + crcBytes = append(crcBytes, cSum...) + } // Proceed to upload the part. - objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, - md5Base64, sha256Hex, int64(length), - opts.ServerSideEncryption, - !opts.DisableContentSha256) + objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader) if uerr != nil { return UploadInfo{}, uerr } @@ -171,15 +189,25 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) } complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: part.ETag, - PartNumber: part.PartNumber, + ETag: part.ETag, + PartNumber: part.PartNumber, + ChecksumCRC32: part.ChecksumCRC32, + ChecksumCRC32C: part.ChecksumCRC32C, + ChecksumSHA1: part.ChecksumSHA1, + ChecksumSHA256: part.ChecksumSHA256, }) } // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{}) + opts = PutObjectOptions{} + if len(crcBytes) > 0 { + // Add hash of hashes. + crc.Reset() + crc.Write(crcBytes) + opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} + } + uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) if err != nil { return UploadInfo{}, err } @@ -242,9 +270,7 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, object } // uploadPart - Uploads a part in a multipart upload. -func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader, - partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide, streamSha256 bool, -) (ObjectPart, error) { +func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName string, uploadID string, reader io.Reader, partNumber int, md5Base64 string, sha256Hex string, size int64, sse encrypt.ServerSide, streamSha256 bool, customHeader http.Header) (ObjectPart, error) { // Input validation. if err := s3utils.CheckValidBucketName(bucketName); err != nil { return ObjectPart{}, err @@ -273,7 +299,9 @@ func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadI urlValues.Set("uploadId", uploadID) // Set encryption headers, if any. - customHeader := make(http.Header) + if customHeader == nil { + customHeader = make(http.Header) + } // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html // Server-side encryption is supported by the S3 Multipart Upload actions. // Unless you are using a customer-provided encryption key, you don't need @@ -306,11 +334,17 @@ func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadI } } // Once successfully uploaded, return completed part. - objPart := ObjectPart{} + h := resp.Header + objPart := ObjectPart{ + ChecksumCRC32: h.Get("x-amz-checksum-crc32"), + ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), + ChecksumSHA1: h.Get("x-amz-checksum-sha1"), + ChecksumSHA256: h.Get("x-amz-checksum-sha256"), + } objPart.Size = size objPart.PartNumber = partNumber // Trim off the odd double quotes from ETag in the beginning and end. - objPart.ETag = trimEtag(resp.Header.Get("ETag")) + objPart.ETag = trimEtag(h.Get("ETag")) return objPart, nil } diff --git a/api-put-object-streaming.go b/api-put-object-streaming.go index 11b3a5255..464bde7f3 100644 --- a/api-put-object-streaming.go +++ b/api-put-object-streaming.go @@ -22,6 +22,7 @@ import ( "context" "encoding/base64" "fmt" + "hash/crc32" "io" "net/http" "net/url" @@ -38,9 +39,8 @@ import ( // // Following code handles these types of readers. // -// - *minio.Object -// - Any reader which has a method 'ReadAt()' -// +// - *minio.Object +// - Any reader which has a method 'ReadAt()' func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions, ) (info UploadInfo, err error) { @@ -184,12 +184,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) // Proceed to upload the part. - objPart, err := c.uploadPart(ctx, bucketName, objectName, - uploadID, sectionReader, uploadReq.PartNum, - "", "", partSize, - opts.ServerSideEncryption, - !opts.DisableContentSha256, - ) + objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, nil) if err != nil { uploadedPartsCh <- uploadedPartRes{ Error: err, @@ -260,6 +255,13 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b return UploadInfo{}, err } + if !opts.SendContentMd5 { + if opts.UserMetadata == nil { + opts.UserMetadata = make(map[string]string, 1) + } + opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" + } + // Calculate the optimal parts info for a given size. totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) if err != nil { @@ -270,6 +272,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b if err != nil { return UploadInfo{}, err } + delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") // Aborts the multipart upload if the function returns // any error, since we do not resume we should purge @@ -281,6 +284,14 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b } }() + // Create checksums + // CRC32C is ~50% faster on AMD64 @ 30GB/s + var crcBytes []byte + customHeader := make(http.Header) + crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + md5Hash := c.md5Hasher() + defer md5Hash.Close() + // Total data read and written to server. should be equal to 'size' at the end of the call. var totalUploadedSize int64 @@ -292,7 +303,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b // Avoid declaring variables in the for loop var md5Base64 string - var hookReader io.Reader // Part number always starts with '1'. var partNumber int @@ -303,37 +313,34 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b partSize = lastPartSize } - if opts.SendContentMd5 { - length, rerr := readFull(reader, buf) - if rerr == io.EOF && partNumber > 1 { - break - } - - if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { - return UploadInfo{}, rerr - } + length, rerr := readFull(reader, buf) + if rerr == io.EOF && partNumber > 1 { + break + } - // Calculate md5sum. - hash := c.md5Hasher() - hash.Write(buf[:length]) - md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) - hash.Close() + if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { + return UploadInfo{}, rerr + } - // Update progress reader appropriately to the latest offset - // as we read from the source. - hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress) + // Calculate md5sum. + if opts.SendContentMd5 { + md5Hash.Reset() + md5Hash.Write(buf[:length]) + md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) } else { - // Update progress reader appropriately to the latest offset - // as we read from the source. - hookReader = newHook(reader, opts.Progress) + // Add CRC32C instead. + crc.Reset() + crc.Write(buf[:length]) + cSum := crc.Sum(nil) + customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) + crcBytes = append(crcBytes, cSum...) } - objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, - io.LimitReader(hookReader, partSize), - partNumber, md5Base64, "", partSize, - opts.ServerSideEncryption, - !opts.DisableContentSha256, - ) + // Update progress reader appropriately to the latest offset + // as we read from the source. + hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress) + + objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, hooked, partNumber, md5Base64, "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader) if uerr != nil { return UploadInfo{}, uerr } @@ -363,15 +370,26 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) } complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: part.ETag, - PartNumber: part.PartNumber, + ETag: part.ETag, + PartNumber: part.PartNumber, + ChecksumCRC32: part.ChecksumCRC32, + ChecksumCRC32C: part.ChecksumCRC32C, + ChecksumSHA1: part.ChecksumSHA1, + ChecksumSHA256: part.ChecksumSHA256, }) } // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{}) + opts = PutObjectOptions{} + if len(crcBytes) > 0 { + // Add hash of hashes. + crc.Reset() + crc.Write(crcBytes) + opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} + } + uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) if err != nil { return UploadInfo{}, err } @@ -490,14 +508,20 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, // extract lifecycle expiry date and rule ID expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) - + h := resp.Header return UploadInfo{ Bucket: bucketName, Key: objectName, - ETag: trimEtag(resp.Header.Get("ETag")), - VersionID: resp.Header.Get(amzVersionID), + ETag: trimEtag(h.Get("ETag")), + VersionID: h.Get(amzVersionID), Size: size, Expiration: expTime, ExpirationRuleID: ruleID, + + // Checksum values + ChecksumCRC32: h.Get("x-amz-checksum-crc32"), + ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), + ChecksumSHA1: h.Get("x-amz-checksum-sha1"), + ChecksumSHA256: h.Get("x-amz-checksum-sha256"), }, nil } diff --git a/api-put-object.go b/api-put-object.go index 9328fb6c1..321ad00aa 100644 --- a/api-put-object.go +++ b/api-put-object.go @@ -23,6 +23,7 @@ import ( "encoding/base64" "errors" "fmt" + "hash/crc32" "io" "net/http" "sort" @@ -215,18 +216,18 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part // // You must have WRITE permissions on a bucket to create an object. // -// - For size smaller than 16MiB PutObject automatically does a -// single atomic PUT operation. +// - For size smaller than 16MiB PutObject automatically does a +// single atomic PUT operation. // -// - For size larger than 16MiB PutObject automatically does a -// multipart upload operation. +// - For size larger than 16MiB PutObject automatically does a +// multipart upload operation. // -// - For size input as -1 PutObject does a multipart Put operation -// until input stream reaches EOF. Maximum object size that can -// be uploaded through this operation will be 5TiB. +// - For size input as -1 PutObject does a multipart Put operation +// until input stream reaches EOF. Maximum object size that can +// be uploaded through this operation will be 5TiB. // -// WARNING: Passing down '-1' will use memory and these cannot -// be reused for best outcomes for PutObject(), pass the size always. +// WARNING: Passing down '-1' will use memory and these cannot +// be reused for best outcomes for PutObject(), pass the size always. // // NOTE: Upon errors during upload multipart operation is entirely aborted. func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, @@ -299,11 +300,20 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam if err != nil { return UploadInfo{}, err } + + if !opts.SendContentMd5 { + if opts.UserMetadata == nil { + opts.UserMetadata = make(map[string]string, 1) + } + opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" + } + // Initiate a new multipart upload. uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) if err != nil { return UploadInfo{}, err } + delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") defer func() { if err != nil { @@ -320,6 +330,12 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam // Create a buffer. buf := make([]byte, partSize) + // Create checksums + // CRC32C is ~50% faster on AMD64 @ 30GB/s + var crcBytes []byte + customHeader := make(http.Header) + crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + for partNumber <= totalPartsCount { length, rerr := readFull(reader, buf) if rerr == io.EOF && partNumber > 1 { @@ -337,6 +353,12 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam hash.Write(buf[:length]) md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) hash.Close() + } else { + crc.Reset() + crc.Write(buf[:length]) + cSum := crc.Sum(nil) + customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) + crcBytes = append(crcBytes, cSum...) } // Update progress reader appropriately to the latest offset @@ -344,11 +366,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) // Proceed to upload the part. - objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, - md5Base64, "", int64(length), - opts.ServerSideEncryption, - !opts.DisableContentSha256, - ) + objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, "", int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader) if uerr != nil { return UploadInfo{}, uerr } @@ -377,15 +395,26 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) } complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: part.ETag, - PartNumber: part.PartNumber, + ETag: part.ETag, + PartNumber: part.PartNumber, + ChecksumCRC32: part.ChecksumCRC32, + ChecksumCRC32C: part.ChecksumCRC32C, + ChecksumSHA1: part.ChecksumSHA1, + ChecksumSHA256: part.ChecksumSHA256, }) } // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{}) + opts = PutObjectOptions{} + if len(crcBytes) > 0 { + // Add hash of hashes. + crc.Reset() + crc.Write(crcBytes) + opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} + } + uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) if err != nil { return UploadInfo{}, err } diff --git a/api-s3-datatypes.go b/api-s3-datatypes.go index 592d4cdcc..b1b848f4a 100644 --- a/api-s3-datatypes.go +++ b/api-s3-datatypes.go @@ -261,6 +261,12 @@ type ObjectPart struct { // Size of the uploaded part data. Size int64 + + // Checksum values of each part. + ChecksumCRC32 string + ChecksumCRC32C string + ChecksumSHA1 string + ChecksumSHA256 string } // ListObjectPartsResult container for ListObjectParts response. @@ -299,6 +305,12 @@ type completeMultipartUploadResult struct { Bucket string Key string ETag string + + // Checksum values, hash of hashes of parts. + ChecksumCRC32 string + ChecksumCRC32C string + ChecksumSHA1 string + ChecksumSHA256 string } // CompletePart sub container lists individual part numbers and their @@ -309,6 +321,12 @@ type CompletePart struct { // Part number identifies the part. PartNumber int ETag string + + // Checksum values + ChecksumCRC32 string + ChecksumCRC32C string + ChecksumSHA1 string + ChecksumSHA256 string } // completeMultipartUpload container for completing multipart upload. diff --git a/core.go b/core.go index 148671eec..f6132e79a 100644 --- a/core.go +++ b/core.go @@ -89,7 +89,7 @@ func (c Core) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarke // PutObjectPart - Upload an object part. func (c Core) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data io.Reader, size int64, md5Base64, sha256Hex string, sse encrypt.ServerSide) (ObjectPart, error) { streamSha256 := true - return c.uploadPart(ctx, bucket, object, uploadID, data, partID, md5Base64, sha256Hex, size, sse, streamSha256) + return c.uploadPart(ctx, bucket, object, uploadID, data, partID, md5Base64, sha256Hex, size, sse, streamSha256, nil) } // ListObjectParts - List uploaded parts of an incomplete upload.x diff --git a/functional_tests.go b/functional_tests.go index 59f347eff..483b5cb11 100644 --- a/functional_tests.go +++ b/functional_tests.go @@ -24,8 +24,11 @@ import ( "archive/zip" "bytes" "context" + "crypto/sha1" + "encoding/base64" "errors" "fmt" + "hash" "hash/crc32" "io" "io/ioutil" @@ -46,6 +49,7 @@ import ( "github.com/dustin/go-humanize" jsoniter "github.com/json-iterator/go" + "github.com/minio/sha256-simd" log "github.com/sirupsen/logrus" "github.com/minio/minio-go/v7" @@ -1991,6 +1995,167 @@ func testObjectTaggingWithVersioning() { successLogger(testName, function, args, startTime).Info() } +// Test PutObject with custom checksums. +func testPutObjectWithChecksums() { + // initialize logging params + startTime := time.Now() + testName := getFuncName() + function := "PutObject(bucketName, objectName, reader,size, opts)" + args := map[string]interface{}{ + "bucketName": "", + "objectName": "", + "opts": "minio.PutObjectOptions{UserMetadata: metadata, Progress: progress}", + } + + if !isFullMode() { + ignoredLog(testName, function, args, startTime, "Skipping functional tests for short/quick runs").Info() + return + } + + // Seed random based on current time. + rand.Seed(time.Now().Unix()) + + // Instantiate new minio client object. + c, err := minio.New(os.Getenv(serverEndpoint), + &minio.Options{ + Creds: credentials.NewStaticV4(os.Getenv(accessKey), os.Getenv(secretKey), ""), + Secure: mustParseBool(os.Getenv(enableHTTPS)), + }) + if err != nil { + logError(testName, function, args, startTime, "", "MinIO client object creation failed", err) + return + } + + // Enable tracing, write to stderr. + //c.TraceOn(os.Stderr) + + // Set user agent. + c.SetAppInfo("MinIO-go-FunctionalTest", "0.1.0") + + // Generate a new random bucket name. + bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test-") + args["bucketName"] = bucketName + + // Make a new bucket. + err = c.MakeBucket(context.Background(), bucketName, minio.MakeBucketOptions{Region: "us-east-1"}) + if err != nil { + logError(testName, function, args, startTime, "", "Make bucket failed", err) + return + } + + defer cleanupBucket(bucketName, c) + tests := []struct { + header string + hasher hash.Hash + + // Checksum values + ChecksumCRC32 string + ChecksumCRC32C string + ChecksumSHA1 string + ChecksumSHA256 string + }{ + {header: "x-amz-checksum-crc32", hasher: crc32.NewIEEE(), ChecksumCRC32: "yXTVFQ=="}, + {header: "x-amz-checksum-crc32c", hasher: crc32.New(crc32.MakeTable(crc32.Castagnoli)), ChecksumCRC32C: "zXqj7Q=="}, + {header: "x-amz-checksum-sha1", hasher: sha1.New(), ChecksumSHA1: "SwmAs3F75Sw/sE4dHehkvYtn9UE="}, + {header: "x-amz-checksum-sha256", hasher: sha256.New(), ChecksumSHA256: "8Tlu9msuw/cpmWNEnQx97axliBjiE6gK1doiY0N9WuA="}, + } + + for i, test := range tests { + bufSize := dataFileMap["datafile-129-MB"] + + // Save the data + objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "") + args["objectName"] = objectName + + cmpChecksum := func(got, want string) { + if want != got { + logError(testName, function, args, startTime, "", "checksum mismatch", fmt.Errorf("want %s, got %s", want, got)) + return + } + } + + meta := map[string]string{} + reader := getDataReader("datafile-129-MB") + b, err := io.ReadAll(reader) + if err != nil { + logError(testName, function, args, startTime, "", "Read failed", err) + return + } + h := test.hasher + h.Reset() + // Wrong CRC. + meta[test.header] = base64.StdEncoding.EncodeToString(h.Sum(nil)) + args["metadata"] = meta + + resp, err := c.PutObject(context.Background(), bucketName, objectName, bytes.NewReader(b), int64(bufSize), minio.PutObjectOptions{ + DisableMultipart: true, + UserMetadata: meta, + }) + if err == nil { + if i == 0 && resp.ChecksumCRC32 == "" { + ignoredLog(testName, function, args, startTime, "Checksums does not appear to be supported by backend").Info() + return + } + logError(testName, function, args, startTime, "", "PutObject failed", err) + return + } + + // Set correct CRC. + h.Write(b) + meta[test.header] = base64.StdEncoding.EncodeToString(h.Sum(nil)) + reader.Close() + + resp, err = c.PutObject(context.Background(), bucketName, objectName, bytes.NewReader(b), int64(bufSize), minio.PutObjectOptions{ + DisableMultipart: true, + UserMetadata: meta, + }) + if err != nil { + logError(testName, function, args, startTime, "", "PutObject failed", err) + return + } + cmpChecksum(resp.ChecksumSHA256, test.ChecksumSHA256) + cmpChecksum(resp.ChecksumSHA1, test.ChecksumSHA1) + cmpChecksum(resp.ChecksumCRC32, test.ChecksumCRC32) + cmpChecksum(resp.ChecksumCRC32C, test.ChecksumCRC32C) + + // Read the data back + gopts := minio.GetObjectOptions{Checksum: true} + r, err := c.GetObject(context.Background(), bucketName, objectName, gopts) + if err != nil { + logError(testName, function, args, startTime, "", "GetObject failed", err) + return + } + + st, err := r.Stat() + if err != nil { + logError(testName, function, args, startTime, "", "Stat failed", err) + return + } + + cmpChecksum(st.ChecksumSHA256, test.ChecksumSHA256) + cmpChecksum(st.ChecksumSHA1, test.ChecksumSHA1) + cmpChecksum(st.ChecksumCRC32, test.ChecksumCRC32) + cmpChecksum(st.ChecksumCRC32C, test.ChecksumCRC32C) + + if st.Size != int64(bufSize) { + logError(testName, function, args, startTime, "", "Number of bytes returned by PutObject does not match GetObject, expected "+string(bufSize)+" got "+string(st.Size), err) + return + } + + if err := r.Close(); err != nil { + logError(testName, function, args, startTime, "", "Object Close failed", err) + return + } + if err := r.Close(); err == nil { + logError(testName, function, args, startTime, "", "Object already closed, should respond with error", err) + return + } + delete(args, "metadata") + } + + successLogger(testName, function, args, startTime).Info() +} + // Test PutObject using a large data to trigger multipart readat func testPutObjectWithMetadata() { // initialize logging params @@ -12128,6 +12293,7 @@ func main() { testComposeObjectErrorCasesV2() testCompose10KSourcesV2() testUserMetadataCopyingV2() + testPutObjectWithChecksums() testPutObject0ByteV2() testPutObjectNoLengthV2() testPutObjectsUnknownV2() diff --git a/utils.go b/utils.go index 11a4f3403..f32f84ab0 100644 --- a/utils.go +++ b/utils.go @@ -376,6 +376,12 @@ func ToObjectInfo(bucketName string, objectName string, h http.Header) (ObjectIn UserTags: userTags, UserTagCount: tagCount, Restore: restore, + + // Checksum values + ChecksumCRC32: h.Get("x-amz-checksum-crc32"), + ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), + ChecksumSHA1: h.Get("x-amz-checksum-sha1"), + ChecksumSHA256: h.Get("x-amz-checksum-sha256"), }, nil } @@ -501,7 +507,7 @@ func isSSEHeader(headerKey string) bool { func isAmzHeader(headerKey string) bool { key := strings.ToLower(headerKey) - return strings.HasPrefix(key, "x-amz-meta-") || strings.HasPrefix(key, "x-amz-grant-") || key == "x-amz-acl" || isSSEHeader(headerKey) + return strings.HasPrefix(key, "x-amz-meta-") || strings.HasPrefix(key, "x-amz-grant-") || key == "x-amz-acl" || isSSEHeader(headerKey) || strings.HasPrefix(key, "x-amz-checksum-") } var (