diff --git a/.travis.yml b/.travis.yml index 283a803..082db21 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,7 @@ language: go go: - - 1.4 - - release - - tip + - "1.9" + - "1.10" script: - go get -t ./... diff --git a/metricsmgr/metrics_manager.go b/metricsmgr/metrics_manager.go index ecc6acb..5387dad 100644 --- a/metricsmgr/metrics_manager.go +++ b/metricsmgr/metrics_manager.go @@ -1,12 +1,11 @@ package metricsmgr import ( + "context" "fmt" "time" - "github.com/goware/go-metrics" "github.com/pressly/chainstore" - "golang.org/x/net/context" ) type metricsManager struct { diff --git a/s3store/readseeker.go b/s3store/readseeker.go new file mode 100644 index 0000000..6ff8c28 --- /dev/null +++ b/s3store/readseeker.go @@ -0,0 +1,51 @@ +package s3store + +import ( + "bytes" + "errors" +) + +type readSeeker struct { + buffer bytes.Buffer + index int64 +} + +func newReadSeeker(v []byte) *readSeeker { + return &readSeeker{bytes.NewBuffer(v), index: 0} +} + +func (rs *readSeeker) Bytes() []byte { + return rs.buffer.Bytes() +} + +func (rs *readSeeker) Read(p []byte) (int, error) { + n, err := bytes.NewBuffer(rs.buffer.Bytes()[rs.index:]).Read(p) + + if err == nil { + if rs.index+int64(len(p)) < int64(rs.buffer.Len()) { + rs.index += int64(len(p)) + } else { + rs.index = int64(rs.buffer.Len()) + } + } + + return n, err +} +func (rs *readSeeker) Seek(offset int64, whence int) (int64, error) { + var err error + var index int64 = 0 + + switch whence { + case 0: + if offset >= int64(rs.buffer.Len()) || offset < 0 { + err = errors.New("Invalid Offset.") + } else { + rs.index = offset + index = offset + } + default: + err = errors.New("Unsupported Seek Method.") + } + + return index, err +} diff --git a/s3store/s3_store.go b/s3store/s3_store.go index 1c81488..d6c9b90 100644 --- a/s3store/s3_store.go +++ b/s3store/s3_store.go @@ -1,16 +1,26 @@ package s3store import ( - "net/http" + "context" - "github.com/mitchellh/goamz/aws" - "github.com/mitchellh/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "github.com/pressly/chainstore" - "golang.org/x/net/context" ) +type Config struct { + S3Bucket string + S3AccessKey string + S3SecretKey string + S3Region string + KMSKeyID string +} + type s3Store struct { - BucketID, AccessKey, SecretKey string + conf Config conn *s3.S3 bucket *s3.Bucket @@ -18,66 +28,75 @@ type s3Store struct { } // New returns a S3 based store. -func New(bucketID string, accessKey string, secretKey string) chainstore.Store { - return &s3Store{BucketID: bucketID, AccessKey: accessKey, SecretKey: secretKey} +func New(conf Config) chainstore.Store { + return &s3Store{conf: conf} } -func (s *s3Store) Open() (err error) { - if s.opened { - return +func (s *s3Store) Open() error { + cfg := &aws.Config{ + Region: &s.conf.S3Region, } + session := session.New(cfg) - auth, err := aws.GetAuth(s.AccessKey, s.SecretKey) - if err != nil { - return + if s.conf.S3AccessKey != "" { + session.Config.WithCredentials(credentials.NewStaticCredentials(s.conf.S3AccessKey, s.conf.S3SecretKey, "")) + } else { + session.Config.WithCredentials(ec2rolecreds.NewCredentials(session)) } - s.conn = s3.New(auth, aws.USEast) // TODO: hardcoded region..? - s.conn.HTTPClient = func() *http.Client { - c := &http.Client{} - return c - } - s.bucket = s.conn.Bucket(s.BucketID) - s.opened = true - return + s.conn = s3.New(session) + + return nil } func (s *s3Store) Close() (err error) { - s.opened = false - return // TODO: .. nothing to do here..? + return } func (s *s3Store) Put(ctx context.Context, key string, val []byte) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // TODO: configurable options for acl when making new s3 store - return s.bucket.Put(key, val, `application/octet-stream`, s3.PublicRead) + params := &s3.PutObjectInput{ + Bucket: aws.String(s.conf.S3Bucket), + Key: aws.String(key), + ACL: aws.String("private"), + ContentType: aws.String(`application/octet-stream`), + Body: newReadSeeker(val), + } + + if s.conf.KMSKeyID != "" { + params.SetSSEKMSKeyId(s.conf.KMSKeyID) + params.SetServerSideEncryption(s3.ServerSideEncryptionAwsKms) } + + _, err := s.conn.PutObjectWithContext(aws.Context(ctx), params) + return err } -func (s *s3Store) Get(ctx context.Context, key string) (val []byte, err error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - val, err = s.bucket.Get(key) - if err != nil { - s3err, ok := err.(*s3.Error) - if ok && s3err.Code != "NoSuchKey" { - return nil, err - } - } - return val, nil +func (s *s3Store) Get(ctx context.Context, key string) ([]byte, error) { + params := &s3.GetObjectInput{ + Bucket: aws.String(s.conf.S3Bucket), + Key: aws.String(key), } + + resp, err := s.conn.GetObjectWithContext(ctx, params) + if err != nil { + return nil, err + } + + var val []byte + _, err = resp.Body.Read(val) + if err != nil { + return nil, err + } + + return val, nil } func (s *s3Store) Del(ctx context.Context, key string) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - return s.bucket.Del(key) + params := s3.DeleteObjectInput{ + Bucket: aws.String(s.conf.S3Bucket), + Key: aws.String(key), } + + _, err := s.conn.DeleteObjectWithContext(ctx, ¶ms) + return err } diff --git a/s3store/s3_store_test.go b/s3store/s3_store_test.go index 90b356d..ea59c29 100644 --- a/s3store/s3_store_test.go +++ b/s3store/s3_store_test.go @@ -3,12 +3,12 @@ package s3store import ( + "context" "os" "testing" "github.com/pressly/chainstore" "github.com/stretchr/testify/assert" - "golang.org/x/net/context" ) var ( @@ -31,7 +31,7 @@ func TestS3Store(t *testing.T) { assert := assert.New(t) - store = chainstore.New(New(bucketID, accessKey, secretKey)) + store = chainstore.New(New(Config{S3Bucket: bucketID, S3AccessKey: accessKey, S3SecretKey: secretKey, S3Region: "us-east-1"})) err = store.Open() assert.Nil(err) defer store.Close() diff --git a/timeout.go b/timeout.go index e424f9a..4b9b9c2 100644 --- a/timeout.go +++ b/timeout.go @@ -1,9 +1,8 @@ package chainstore import ( + "context" "time" - - "golang.org/x/net/context" ) func Timeout(d time.Duration, stores ...Store) Store { @@ -22,19 +21,19 @@ func (s *timeoutManager) Open() (err error) { return s.chain.Open() } func (s *timeoutManager) Close() (err error) { return s.chain.Close() } func (s *timeoutManager) Put(ctx context.Context, key string, val []byte) (err error) { - ctx, cancel = context.WithTimeout(ctx, s.timeout) + ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.chain.Put(ctx, key, val) } func (s *timeoutManager) Get(ctx context.Context, key string) (data []byte, err error) { - ctx, cancel = context.WithTimeout(ctx, s.timeout) + ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.chain.Get(ctx, key) } func (s *timeoutManager) Del(ctx context.Context, key string) (err error) { - ctx, cancel = context.WithTimeout(ctx, s.timeout) + ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() return s.chain.Del(ctx, key)