Skip to content

Commit

Permalink
mint: Add concurrent upload test (#1506)
Browse files Browse the repository at this point in the history
Fails before minio/minio#12541 - fixed with PR.
  • Loading branch information
klauspost authored Jun 22, 2021
1 parent 410eb9e commit 0823af6
Showing 1 changed file with 310 additions and 0 deletions.
310 changes: 310 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -1054,6 +1055,153 @@ func testGetObjectWithVersioning() {
successLogger(testName, function, args, startTime).Info()
}

func testPutObjectWithVersioning() {
// initialize logging params
startTime := time.Now()
testName := getFuncName()
function := "GetObject()"
args := map[string]interface{}{}

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.New(os.Getenv(serverEndpoint),
&minio.Options{
Creds: credentials.NewStaticV4(os.Getenv(accessKey), os.Getenv(secretKey), ""),
Secure: mustParseBool(os.Getenv(enableHTTPS)),
})
if err != nil {
logError(testName, function, args, startTime, "", "MinIO client object creation failed", err)
return
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("MinIO-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test-")
args["bucketName"] = bucketName

// Make a new bucket.
err = c.MakeBucket(context.Background(), bucketName, minio.MakeBucketOptions{Region: "us-east-1", ObjectLocking: true})
if err != nil {
logError(testName, function, args, startTime, "", "Make bucket failed", err)
return
}

err = c.EnableVersioning(context.Background(), bucketName)
if err != nil {
logError(testName, function, args, startTime, "", "Enable versioning failed", err)
return
}

objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
args["objectName"] = objectName

const n = 10
// Read input...

// Save the data concurrently.
var wg sync.WaitGroup
wg.Add(n)
var buffers = make([][]byte, n)
var errs [n]error
for i := 0; i < n; i++ {
r := newRandomReader(int64((1<<20)*i+i), int64(i))
buf, err := ioutil.ReadAll(r)
if err != nil {
logError(testName, function, args, startTime, "", "unexpected failure", err)
return
}
buffers[i] = buf

go func(i int) {
defer wg.Done()
_, errs[i] = c.PutObject(context.Background(), bucketName, objectName, bytes.NewReader(buf), int64(len(buf)), minio.PutObjectOptions{PartSize: 5 << 20})
}(i)
}
wg.Wait()
for _, err := range errs {
if err != nil {
logError(testName, function, args, startTime, "", "PutObject failed", err)
return
}
}

objectsInfo := c.ListObjects(context.Background(), bucketName, minio.ListObjectsOptions{WithVersions: true, Recursive: true})
var results []minio.ObjectInfo
for info := range objectsInfo {
if info.Err != nil {
logError(testName, function, args, startTime, "", "Unexpected error during listing objects", err)
return
}
results = append(results, info)
}

if len(results) != n {
logError(testName, function, args, startTime, "", "Unexpected number of Version elements in listing objects", nil)
return
}

sort.Slice(results, func(i, j int) bool {
return results[i].Size < results[j].Size
})

sort.Slice(buffers, func(i, j int) bool {
return len(buffers[i]) < len(buffers[j])
})

for i := 0; i < len(results); i++ {
opts := minio.GetObjectOptions{VersionID: results[i].VersionID}
reader, err := c.GetObject(context.Background(), bucketName, objectName, opts)
if err != nil {
logError(testName, function, args, startTime, "", "error during GET object", err)
return
}
statInfo, err := reader.Stat()
if err != nil {
logError(testName, function, args, startTime, "", "error during calling reader.Stat()", err)
return
}
if statInfo.ETag != results[i].ETag {
logError(testName, function, args, startTime, "", "error during HEAD object, unexpected ETag", err)
return
}
if statInfo.LastModified.Unix() != results[i].LastModified.Unix() {
logError(testName, function, args, startTime, "", "error during HEAD object, unexpected Last-Modified", err)
return
}
if statInfo.Size != results[i].Size {
logError(testName, function, args, startTime, "", "error during HEAD object, unexpected Content-Length", err)
return
}

tmpBuffer := bytes.NewBuffer([]byte{})
_, err = io.Copy(tmpBuffer, reader)
if err != nil {
logError(testName, function, args, startTime, "", "unexpected io.Copy()", err)
return
}

if !bytes.Equal(tmpBuffer.Bytes(), buffers[i]) {
logError(testName, function, args, startTime, "", "unexpected content of GetObject()", err)
return
}
}

// Delete all objects and their versions as long as the bucket itself
if err = cleanupVersionedBucket(bucketName, c); err != nil {
logError(testName, function, args, startTime, "", "CleanupBucket failed", err)
return
}

successLogger(testName, function, args, startTime).Info()
}

func testCopyObjectWithVersioning() {
// initialize logging params
startTime := time.Now()
Expand Down Expand Up @@ -1191,6 +1339,166 @@ func testCopyObjectWithVersioning() {
successLogger(testName, function, args, startTime).Info()
}

func testConcurrentCopyObjectWithVersioning() {
// initialize logging params
startTime := time.Now()
testName := getFuncName()
function := "CopyObject()"
args := map[string]interface{}{}

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.New(os.Getenv(serverEndpoint),
&minio.Options{
Creds: credentials.NewStaticV4(os.Getenv(accessKey), os.Getenv(secretKey), ""),
Secure: mustParseBool(os.Getenv(enableHTTPS)),
})
if err != nil {
logError(testName, function, args, startTime, "", "MinIO client object creation failed", err)
return
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("MinIO-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test-")
args["bucketName"] = bucketName

// Make a new bucket.
err = c.MakeBucket(context.Background(), bucketName, minio.MakeBucketOptions{Region: "us-east-1", ObjectLocking: true})
if err != nil {
logError(testName, function, args, startTime, "", "Make bucket failed", err)
return
}

err = c.EnableVersioning(context.Background(), bucketName)
if err != nil {
logError(testName, function, args, startTime, "", "Enable versioning failed", err)
return
}

objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
args["objectName"] = objectName

var testFiles = []string{"datafile-10-kB"}
for _, testFile := range testFiles {
r := getDataReader(testFile)
buf, err := ioutil.ReadAll(r)
if err != nil {
logError(testName, function, args, startTime, "", "unexpected failure", err)
return
}
r.Close()
_, err = c.PutObject(context.Background(), bucketName, objectName, bytes.NewReader(buf), int64(len(buf)), minio.PutObjectOptions{})
if err != nil {
logError(testName, function, args, startTime, "", "PutObject failed", err)
return
}
}

objectsInfo := c.ListObjects(context.Background(), bucketName, minio.ListObjectsOptions{WithVersions: true, Recursive: true})
var infos []minio.ObjectInfo
for info := range objectsInfo {
if info.Err != nil {
logError(testName, function, args, startTime, "", "Unexpected error during listing objects", err)
return
}
infos = append(infos, info)
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Size < infos[j].Size
})

reader, err := c.GetObject(context.Background(), bucketName, objectName, minio.GetObjectOptions{VersionID: infos[0].VersionID})
if err != nil {
logError(testName, function, args, startTime, "", "GetObject of the oldest version content failed", err)
return
}

oldestContent, err := ioutil.ReadAll(reader)
if err != nil {
logError(testName, function, args, startTime, "", "Reading the oldest object version failed", err)
return
}

// Copy Source
srcOpts := minio.CopySrcOptions{
Bucket: bucketName,
Object: objectName,
VersionID: infos[0].VersionID,
}
args["src"] = srcOpts

dstOpts := minio.CopyDestOptions{
Bucket: bucketName,
Object: objectName + "-copy",
}
args["dst"] = dstOpts

// Perform the Copy concurrently
const n = 10
var wg sync.WaitGroup
wg.Add(n)
var errs [n]error
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
_, errs[i] = c.CopyObject(context.Background(), dstOpts, srcOpts)
}(i)
}
wg.Wait()
for _, err := range errs {
if err != nil {
logError(testName, function, args, startTime, "", "CopyObject failed", err)
return
}
}

objectsInfo = c.ListObjects(context.Background(), bucketName, minio.ListObjectsOptions{WithVersions: true, Recursive: false, Prefix: dstOpts.Object})
infos = []minio.ObjectInfo{}
for info := range objectsInfo {
// Destination object
readerCopy, err := c.GetObject(context.Background(), bucketName, objectName+"-copy", minio.GetObjectOptions{VersionID: info.VersionID})
if err != nil {
logError(testName, function, args, startTime, "", "GetObject failed", err)
return
}
defer readerCopy.Close()

newestContent, err := ioutil.ReadAll(readerCopy)
if err != nil {
logError(testName, function, args, startTime, "", "Reading from GetObject reader failed", err)
return
}

if len(newestContent) == 0 || !bytes.Equal(oldestContent, newestContent) {
logError(testName, function, args, startTime, "", "Unexpected destination object content", err)
return
}
infos = append(infos, info)
}

if len(infos) != n {
logError(testName, function, args, startTime, "", "Unexpected number of Version elements in listing objects", nil)
return
}

// Delete all objects and their versions as long as the bucket itself
if err = cleanupVersionedBucket(bucketName, c); err != nil {
logError(testName, function, args, startTime, "", "CleanupBucket failed", err)
return
}

successLogger(testName, function, args, startTime).Info()
}

func testComposeObjectWithVersioning() {
// initialize logging params
startTime := time.Now()
Expand Down Expand Up @@ -11416,6 +11724,7 @@ func main() {
testFPutObjectContextV2()
testFGetObjectContextV2()
testPutObjectContextV2()
testPutObjectWithVersioning()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down Expand Up @@ -11453,6 +11762,7 @@ func main() {
testStatObjectWithVersioning()
testGetObjectWithVersioning()
testCopyObjectWithVersioning()
testConcurrentCopyObjectWithVersioning()
testComposeObjectWithVersioning()
testRemoveObjectWithVersioning()
testRemoveObjectsWithVersioning()
Expand Down

0 comments on commit 0823af6

Please sign in to comment.