From 10d25b081862f73bf921243aaf2f855bf40eec1d Mon Sep 17 00:00:00 2001 From: Maciej Lisiewski Date: Wed, 14 Mar 2018 18:29:19 -0400 Subject: [PATCH 1/4] Use aws-sdk --- .travis.yml | 5 +- metricsmgr/metrics_manager.go | 3 +- s3store/readseeker.go | 51 +++++++++++++++ s3store/s3_store.go | 113 ++++++++++++++++++++-------------- s3store/s3_store_test.go | 4 +- timeout.go | 9 ++- 6 files changed, 126 insertions(+), 59 deletions(-) create mode 100644 s3store/readseeker.go diff --git a/.travis.yml b/.travis.yml index 283a803..082db21 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,7 @@ language: go go: - - 1.4 - - release - - tip + - "1.9" + - "1.10" script: - go get -t ./... diff --git a/metricsmgr/metrics_manager.go b/metricsmgr/metrics_manager.go index ecc6acb..5387dad 100644 --- a/metricsmgr/metrics_manager.go +++ b/metricsmgr/metrics_manager.go @@ -1,12 +1,11 @@ package metricsmgr import ( + "context" "fmt" "time" - "github.com/goware/go-metrics" "github.com/pressly/chainstore" - "golang.org/x/net/context" ) type metricsManager struct { diff --git a/s3store/readseeker.go b/s3store/readseeker.go new file mode 100644 index 0000000..809bc8b --- /dev/null +++ b/s3store/readseeker.go @@ -0,0 +1,51 @@ +package s3store + +import ( + "bytes" + "errors" +) + +type readSeeker struct { + buffer *bytes.Buffer + index int64 +} + +func newReadSeeker(v []byte) *readSeeker { + return &readSeeker{buffer: bytes.NewBuffer(v), index: 0} +} + +func (rs *readSeeker) Bytes() []byte { + return rs.buffer.Bytes() +} + +func (rs *readSeeker) Read(p []byte) (int, error) { + n, err := bytes.NewBuffer(rs.buffer.Bytes()[rs.index:]).Read(p) + + if err == nil { + if rs.index+int64(len(p)) < int64(rs.buffer.Len()) { + rs.index += int64(len(p)) + } else { + rs.index = int64(rs.buffer.Len()) + } + } + + return n, err +} +func (rs *readSeeker) Seek(offset int64, whence int) (int64, error) { + var err error + var index int64 = 0 + + switch whence { + case 0: + if offset >= int64(rs.buffer.Len()) || offset < 0 { + err = errors.New("Invalid Offset.") + } else { + rs.index = offset + index = offset + } + default: + err = errors.New("Unsupported Seek Method.") + } + + return index, err +} diff --git a/s3store/s3_store.go b/s3store/s3_store.go index 1c81488..d6c9b90 100644 --- a/s3store/s3_store.go +++ b/s3store/s3_store.go @@ -1,16 +1,26 @@ package s3store import ( - "net/http" + "context" - "github.com/mitchellh/goamz/aws" - "github.com/mitchellh/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "github.com/pressly/chainstore" - "golang.org/x/net/context" ) +type Config struct { + S3Bucket string + S3AccessKey string + S3SecretKey string + S3Region string + KMSKeyID string +} + type s3Store struct { - BucketID, AccessKey, SecretKey string + conf Config conn *s3.S3 bucket *s3.Bucket @@ -18,66 +28,75 @@ type s3Store struct { } // New returns a S3 based store. -func New(bucketID string, accessKey string, secretKey string) chainstore.Store { - return &s3Store{BucketID: bucketID, AccessKey: accessKey, SecretKey: secretKey} +func New(conf Config) chainstore.Store { + return &s3Store{conf: conf} } -func (s *s3Store) Open() (err error) { - if s.opened { - return +func (s *s3Store) Open() error { + cfg := &aws.Config{ + Region: &s.conf.S3Region, } + session := session.New(cfg) - auth, err := aws.GetAuth(s.AccessKey, s.SecretKey) - if err != nil { - return + if s.conf.S3AccessKey != "" { + session.Config.WithCredentials(credentials.NewStaticCredentials(s.conf.S3AccessKey, s.conf.S3SecretKey, "")) + } else { + session.Config.WithCredentials(ec2rolecreds.NewCredentials(session)) } - s.conn = s3.New(auth, aws.USEast) // TODO: hardcoded region..? - s.conn.HTTPClient = func() *http.Client { - c := &http.Client{} - return c - } - s.bucket = s.conn.Bucket(s.BucketID) - s.opened = true - return + s.conn = s3.New(session) + + return nil } func (s *s3Store) Close() (err error) { - s.opened = false - return // TODO: .. nothing to do here..? + return } func (s *s3Store) Put(ctx context.Context, key string, val []byte) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // TODO: configurable options for acl when making new s3 store - return s.bucket.Put(key, val, `application/octet-stream`, s3.PublicRead) + params := &s3.PutObjectInput{ + Bucket: aws.String(s.conf.S3Bucket), + Key: aws.String(key), + ACL: aws.String("private"), + ContentType: aws.String(`application/octet-stream`), + Body: newReadSeeker(val), + } + + if s.conf.KMSKeyID != "" { + params.SetSSEKMSKeyId(s.conf.KMSKeyID) + params.SetServerSideEncryption(s3.ServerSideEncryptionAwsKms) } + + _, err := s.conn.PutObjectWithContext(aws.Context(ctx), params) + return err } -func (s *s3Store) Get(ctx context.Context, key string) (val []byte, err error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - val, err = s.bucket.Get(key) - if err != nil { - s3err, ok := err.(*s3.Error) - if ok && s3err.Code != "NoSuchKey" { - return nil, err - } - } - return val, nil +func (s *s3Store) Get(ctx context.Context, key string) ([]byte, error) { + params := &s3.GetObjectInput{ + Bucket: aws.String(s.conf.S3Bucket), + Key: aws.String(key), } + + resp, err := s.conn.GetObjectWithContext(ctx, params) + if err != nil { + return nil, err + } + + var val []byte + _, err = resp.Body.Read(val) + if err != nil { + return nil, err + } + + return val, nil } func (s *s3Store) Del(ctx context.Context, key string) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - return s.bucket.Del(key) + params := s3.DeleteObjectInput{ + Bucket: aws.String(s.conf.S3Bucket), + Key: aws.String(key), } + + _, err := s.conn.DeleteObjectWithContext(ctx, ¶ms) + return err } diff --git a/s3store/s3_store_test.go b/s3store/s3_store_test.go index 90b356d..ea59c29 100644 --- a/s3store/s3_store_test.go +++ b/s3store/s3_store_test.go @@ -3,12 +3,12 @@ package s3store import ( + "context" "os" "testing" "github.com/pressly/chainstore" "github.com/stretchr/testify/assert" - "golang.org/x/net/context" ) var ( @@ -31,7 +31,7 @@ func TestS3Store(t *testing.T) { assert := assert.New(t) - store = chainstore.New(New(bucketID, accessKey, secretKey)) + store = chainstore.New(New(Config{S3Bucket: bucketID, S3AccessKey: accessKey, S3SecretKey: secretKey, S3Region: "us-east-1"})) err = store.Open() assert.Nil(err) defer store.Close() diff --git a/timeout.go b/timeout.go index e424f9a..4b9b9c2 100644 --- a/timeout.go +++ b/timeout.go @@ -1,9 +1,8 @@ package chainstore import ( + "context" "time" - - "golang.org/x/net/context" ) func Timeout(d time.Duration, stores ...Store) Store { @@ -22,19 +21,19 @@ func (s *timeoutManager) Open() (err error) { return s.chain.Open() } func (s *timeoutManager) Close() (err error) { return s.chain.Close() } func (s *timeoutManager) Put(ctx context.Context, key string, val []byte) (err error) { - ctx, cancel = context.WithTimeout(ctx, s.timeout) + ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.chain.Put(ctx, key, val) } func (s *timeoutManager) Get(ctx context.Context, key string) (data []byte, err error) { - ctx, cancel = context.WithTimeout(ctx, s.timeout) + ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.chain.Get(ctx, key) } func (s *timeoutManager) Del(ctx context.Context, key string) (err error) { - ctx, cancel = context.WithTimeout(ctx, s.timeout) + ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.chain.Del(ctx, key) From 20f521997e5b000a034fbad33b61e40a1d8809e2 Mon Sep 17 00:00:00 2001 From: Maciej Lisiewski Date: Wed, 14 Mar 2018 19:00:30 -0400 Subject: [PATCH 2/4] update go-metrix import --- metricsmgr/metrics_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metricsmgr/metrics_manager.go b/metricsmgr/metrics_manager.go index 5387dad..4cf13c1 100644 --- a/metricsmgr/metrics_manager.go +++ b/metricsmgr/metrics_manager.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pressly/chainstore" + metrics "github.com/rcrowley/go-metrics" ) type metricsManager struct { From 834599a5712bb08595c2ff88d92d8a6058b48a99 Mon Sep 17 00:00:00 2001 From: Maciej Lisiewski Date: Wed, 14 Mar 2018 19:07:11 -0400 Subject: [PATCH 3/4] On second thought this is a silly package, let's not use it --- chainstore_test.go | 3 +- metricsmgr/metrics_manager.go | 84 ------------------------------ metricsmgr/metrics_manager_test.go | 35 ------------- 3 files changed, 1 insertion(+), 121 deletions(-) delete mode 100644 metricsmgr/metrics_manager.go delete mode 100644 metricsmgr/metrics_manager_test.go diff --git a/chainstore_test.go b/chainstore_test.go index a1c3434..199d351 100644 --- a/chainstore_test.go +++ b/chainstore_test.go @@ -15,7 +15,6 @@ import ( "github.com/pressly/chainstore/logmgr" "github.com/pressly/chainstore/lrumgr" "github.com/pressly/chainstore/memstore" - "github.com/pressly/chainstore/metricsmgr" "github.com/stretchr/testify/assert" "golang.org/x/net/context" ) @@ -100,7 +99,7 @@ func TestAsyncChain(t *testing.T) { }, logmgr.New(logger, "async"), &testStore{}, - metricsmgr.New("chaintest", + chainstore.New( fs, lrumgr.New(100, bs), ), diff --git a/metricsmgr/metrics_manager.go b/metricsmgr/metrics_manager.go deleted file mode 100644 index 4cf13c1..0000000 --- a/metricsmgr/metrics_manager.go +++ /dev/null @@ -1,84 +0,0 @@ -package metricsmgr - -import ( - "context" - "fmt" - "time" - - "github.com/pressly/chainstore" - metrics "github.com/rcrowley/go-metrics" -) - -type metricsManager struct { - namespace string - chain chainstore.Store -} - -// New returns a metrics store. -func New(namespace string, stores ...chainstore.Store) chainstore.Store { - return &metricsManager{ - namespace: namespace, - chain: chainstore.New(stores...), - } -} - -func (m *metricsManager) Open() (err error) { - _, err = m.measure("Open", func() ([]byte, error) { - err := m.chain.Open() - return nil, err - }) - return -} - -func (m *metricsManager) Close() (err error) { - _, err = m.measure("Close", func() ([]byte, error) { - err := m.chain.Close() - return nil, err - }) - return -} - -func (m *metricsManager) Put(ctx context.Context, key string, val []byte) (err error) { - select { - case <-ctx.Done(): - return ctx.Err() - default: - _, err = m.measure("Put", func() ([]byte, error) { - err := m.chain.Put(ctx, key, val) - return nil, err - }) - return - } -} - -func (m *metricsManager) Get(ctx context.Context, key string) (val []byte, err error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - val, err = m.measure("Get", func() ([]byte, error) { - val, err := m.chain.Get(ctx, key) - return val, err - }) - return - } -} - -func (m *metricsManager) Del(ctx context.Context, key string) (err error) { - select { - case <-ctx.Done(): - return ctx.Err() - default: - _, err = m.measure("Del", func() ([]byte, error) { - err := m.chain.Del(ctx, key) - return nil, err - }) - return - } -} - -func (m *metricsManager) measure(method string, fn func() ([]byte, error)) ([]byte, error) { - name := fmt.Sprintf("%s.%s", m.namespace, method) - defer metrics.MeasureSince([]string{name}, time.Now()) - return fn() -} diff --git a/metricsmgr/metrics_manager_test.go b/metricsmgr/metrics_manager_test.go deleted file mode 100644 index fe70ae3..0000000 --- a/metricsmgr/metrics_manager_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package metricsmgr - -import ( - "testing" - - "github.com/pressly/chainstore" - "github.com/stretchr/testify/assert" - "golang.org/x/net/context" -) - -func TestMetricsMgrStore(t *testing.T) { - var store chainstore.Store - var err error - - ctx := context.Background() - - assert := assert.New(t) - - store = chainstore.New(New("ns")) - err = store.Open() - assert.Nil(err) - defer store.Close() - - // Put a bunch of objects - e1 := store.Put(ctx, "hi", []byte{1, 2, 3}) - e2 := store.Put(ctx, "bye", []byte{4, 5, 6}) - assert.Nil(e1) - assert.Nil(e2) - - // Delete those objects - e1 = store.Del(ctx, "hi") - e2 = store.Del(ctx, "bye") - assert.Equal(e1, nil) - assert.Equal(e2, nil) -} From 7182a3497a907088e3ddf8e02c3758d4f4c44f00 Mon Sep 17 00:00:00 2001 From: Maciej Lisiewski Date: Mon, 19 Mar 2018 12:07:14 -0400 Subject: [PATCH 4/4] Fix test/example --- README.md | 35 ++++++++++++----------------- chainstore.go | 2 +- example/main.go | 41 +++++++++++++++++----------------- s3store/readseeker.go | 51 ------------------------------------------- s3store/s3_store.go | 11 +++++----- 5 files changed, 41 insertions(+), 99 deletions(-) delete mode 100644 s3store/readseeker.go diff --git a/README.md b/README.md index e7071f1..f3af13f 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,6 @@ import ( "github.com/pressly/chainstore" "github.com/pressly/chainstore/boltstore" "github.com/pressly/chainstore/lrumgr" - "github.com/pressly/chainstore/metricsmgr" "github.com/pressly/chainstore/s3store" "golang.org/x/net/context" ) @@ -47,15 +46,15 @@ func main() { ctx := context.Background() diskStore := lrumgr.New(500*1024*1024, // 500MB of working data - metricsmgr.New("chainstore.ex.bolt", - boltstore.New("/tmp/store.db", "myBucket"), - ), + boltstore.New("/tmp/store.db", "myBucket"), ) - remoteStore := metricsmgr.New("chainstore.ex.s3", - // NOTE: you'll have to supply your own keys in order for this example to work properly - s3store.New(bucketID, accessKey, secretKey), - ) + remoteStore := s3store.New(s3store.Config{ + S3Bucket: bucketID, + S3AccessKey: accessKey, + S3SecretKey: secretKey, + S3Region: region, + }) dataStore := chainstore.New(diskStore, remoteStore) @@ -63,14 +62,15 @@ func main() { /* dataStore := chainstore.New( lrumgr.New(500*1024*1024, // 500MB of working data - metricsmgr.New("chainstore.ex.bolt", - boltstore.New("/tmp/store.db", "myBucket"), - ), + boltstore.New("/tmp/store.db", "myBucket"), ), - metricsmgr.New("chainstore.ex.s3", + s3store.New(s3store.Config{ // NOTE: you'll have to supply your own keys in order for this example to work properly - s3store.New("myBucket", "access-key", "secret-key"), - ), + S3Bucket: bucketID, + S3AccessKey: accessKey, + S3SecretKey: secretKey, + S3Region: region, + }) ) */ @@ -81,13 +81,6 @@ func main() { log.Fatalf("Open: %q", err) } - // Since we've used the metricsManager above (metricsmgr), any calls to the boltstore - // and s3store will be measured. Next is to send metrics to librato, graphite, influxdb, - // whatever.. via github.com/goware/go-metrics - // go librato.Librato(metrics.DefaultRegistry, 10e9, ...) - - //-- - // Save the object in the chain. It will be Put() synchronously into diskStore, // the boltdb engine, and then immediately dispatch background Put()'s to the // other stores down the chain, in this case S3. diff --git a/chainstore.go b/chainstore.go index b6512dd..18ba94e 100644 --- a/chainstore.go +++ b/chainstore.go @@ -7,7 +7,7 @@ import ( ) var ( - keyInvalidator = regexp.MustCompile(`(i?)[^a-z0-9\/_\-:\.]`) + keyInvalidator = regexp.MustCompile(`[^a-zA-Z0-9\/_\-:\.]`) ) const ( diff --git a/example/main.go b/example/main.go index 89e8852..fe7095d 100644 --- a/example/main.go +++ b/example/main.go @@ -9,7 +9,6 @@ import ( "github.com/pressly/chainstore" "github.com/pressly/chainstore/boltstore" "github.com/pressly/chainstore/lrumgr" - "github.com/pressly/chainstore/metricsmgr" "github.com/pressly/chainstore/s3store" "golang.org/x/net/context" ) @@ -18,27 +17,33 @@ var ( bucketID string accessKey string secretKey string + region string ) func init() { bucketID = os.Getenv("S3_BUCKET") accessKey = os.Getenv("S3_ACCESS_KEY") secretKey = os.Getenv("S3_SECRET_KEY") + region = os.Getenv("S3_REGION") + + if region == "" { + region = "us-east-1" + } } func main() { ctx := context.Background() diskStore := lrumgr.New(500*1024*1024, // 500MB of working data - metricsmgr.New("chainstore.ex.bolt", - boltstore.New("/tmp/store.db", "myBucket"), - ), + boltstore.New("/tmp/store.db", "myBucket"), ) - remoteStore := metricsmgr.New("chainstore.ex.s3", - // NOTE: you'll have to supply your own keys in order for this example to work properly - s3store.New(bucketID, accessKey, secretKey), - ) + remoteStore := s3store.New(s3store.Config{ + S3Bucket: bucketID, + S3AccessKey: accessKey, + S3SecretKey: secretKey, + S3Region: region, + }) dataStore := chainstore.New(diskStore, remoteStore) @@ -46,14 +51,15 @@ func main() { /* dataStore := chainstore.New( lrumgr.New(500*1024*1024, // 500MB of working data - metricsmgr.New("chainstore.ex.bolt", - boltstore.New("/tmp/store.db", "myBucket"), - ), + boltstore.New("/tmp/store.db", "myBucket"), ), - metricsmgr.New("chainstore.ex.s3", + s3store.New(s3store.Config{ // NOTE: you'll have to supply your own keys in order for this example to work properly - s3store.New("myBucket", "access-key", "secret-key"), - ), + S3Bucket: bucketID, + S3AccessKey: accessKey, + S3SecretKey: secretKey, + S3Region: region, + }) ) */ @@ -64,13 +70,6 @@ func main() { log.Fatalf("Open: %q", err) } - // Since we've used the metricsManager above (metricsmgr), any calls to the boltstore - // and s3store will be measured. Next is to send metrics to librato, graphite, influxdb, - // whatever.. via github.com/goware/go-metrics - // go librato.Librato(metrics.DefaultRegistry, 10e9, ...) - - //-- - // Save the object in the chain. It will be Put() synchronously into diskStore, // the boltdb engine, and then immediately dispatch background Put()'s to the // other stores down the chain, in this case S3. diff --git a/s3store/readseeker.go b/s3store/readseeker.go deleted file mode 100644 index 809bc8b..0000000 --- a/s3store/readseeker.go +++ /dev/null @@ -1,51 +0,0 @@ -package s3store - -import ( - "bytes" - "errors" -) - -type readSeeker struct { - buffer *bytes.Buffer - index int64 -} - -func newReadSeeker(v []byte) *readSeeker { - return &readSeeker{buffer: bytes.NewBuffer(v), index: 0} -} - -func (rs *readSeeker) Bytes() []byte { - return rs.buffer.Bytes() -} - -func (rs *readSeeker) Read(p []byte) (int, error) { - n, err := bytes.NewBuffer(rs.buffer.Bytes()[rs.index:]).Read(p) - - if err == nil { - if rs.index+int64(len(p)) < int64(rs.buffer.Len()) { - rs.index += int64(len(p)) - } else { - rs.index = int64(rs.buffer.Len()) - } - } - - return n, err -} -func (rs *readSeeker) Seek(offset int64, whence int) (int64, error) { - var err error - var index int64 = 0 - - switch whence { - case 0: - if offset >= int64(rs.buffer.Len()) || offset < 0 { - err = errors.New("Invalid Offset.") - } else { - rs.index = offset - index = offset - } - default: - err = errors.New("Unsupported Seek Method.") - } - - return index, err -} diff --git a/s3store/s3_store.go b/s3store/s3_store.go index d6c9b90..89faed2 100644 --- a/s3store/s3_store.go +++ b/s3store/s3_store.go @@ -1,7 +1,9 @@ package s3store import ( + "bytes" "context" + "io/ioutil" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -59,7 +61,7 @@ func (s *s3Store) Put(ctx context.Context, key string, val []byte) error { Key: aws.String(key), ACL: aws.String("private"), ContentType: aws.String(`application/octet-stream`), - Body: newReadSeeker(val), + Body: bytes.NewReader(val), } if s.conf.KMSKeyID != "" { @@ -77,13 +79,12 @@ func (s *s3Store) Get(ctx context.Context, key string) ([]byte, error) { Key: aws.String(key), } - resp, err := s.conn.GetObjectWithContext(ctx, params) + resp, err := s.conn.GetObjectWithContext(aws.Context(ctx), params) if err != nil { return nil, err } - var val []byte - _, err = resp.Body.Read(val) + val, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } @@ -97,6 +98,6 @@ func (s *s3Store) Del(ctx context.Context, key string) error { Key: aws.String(key), } - _, err := s.conn.DeleteObjectWithContext(ctx, ¶ms) + _, err := s.conn.DeleteObjectWithContext(aws.Context(ctx), ¶ms) return err }