Skip to content
This repository has been archived by the owner on Nov 23, 2018. It is now read-only.

Commit

Permalink
Use aws-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
c2h5oh committed Mar 14, 2018
1 parent 7581f42 commit 69e47f0
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 59 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
language: go
go:
- 1.4
- release
- tip
- "1.9"
- "1.10"

script:
- go get -t ./...
Expand Down
3 changes: 1 addition & 2 deletions metricsmgr/metrics_manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package metricsmgr

import (
"context"
"fmt"
"time"

"github.com/goware/go-metrics"
"github.com/pressly/chainstore"
"golang.org/x/net/context"
)

type metricsManager struct {
Expand Down
51 changes: 51 additions & 0 deletions s3store/readseeker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package s3store

import (
"bytes"
"errors"
)

type readSeeker struct {
buffer bytes.Buffer
index int64
}

func newReadSeeker(v []byte) *readSeeker {
return &readSeeker{bytes.NewBuffer(v), index: 0}
}

func (rs *readSeeker) Bytes() []byte {
return rs.buffer.Bytes()
}

func (rs *readSeeker) Read(p []byte) (int, error) {
n, err := bytes.NewBuffer(rs.buffer.Bytes()[rs.index:]).Read(p)

if err == nil {
if rs.index+int64(len(p)) < int64(rs.buffer.Len()) {
rs.index += int64(len(p))
} else {
rs.index = int64(rs.buffer.Len())
}
}

return n, err
}
func (rs *readSeeker) Seek(offset int64, whence int) (int64, error) {
var err error
var index int64 = 0

switch whence {
case 0:
if offset >= int64(rs.buffer.Len()) || offset < 0 {
err = errors.New("Invalid Offset.")
} else {
rs.index = offset
index = offset
}
default:
err = errors.New("Unsupported Seek Method.")
}

return index, err
}
113 changes: 66 additions & 47 deletions s3store/s3_store.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,102 @@
package s3store

import (
"net/http"
"context"

"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pressly/chainstore"
"golang.org/x/net/context"
)

type Config struct {
S3Bucket string
S3AccessKey string
S3SecretKey string
S3Region string
KMSKeyID string
}

type s3Store struct {
BucketID, AccessKey, SecretKey string
conf Config

conn *s3.S3
bucket *s3.Bucket
opened bool
}

// New returns a S3 based store.
func New(bucketID string, accessKey string, secretKey string) chainstore.Store {
return &s3Store{BucketID: bucketID, AccessKey: accessKey, SecretKey: secretKey}
func New(conf Config) chainstore.Store {
return &s3Store{conf: conf}
}

func (s *s3Store) Open() (err error) {
if s.opened {
return
func (s *s3Store) Open() error {
cfg := &aws.Config{
Region: &s.conf.S3Region,
}
session := session.New(cfg)

auth, err := aws.GetAuth(s.AccessKey, s.SecretKey)
if err != nil {
return
if s.conf.S3AccessKey != "" {
session.Config.WithCredentials(credentials.NewStaticCredentials(s.conf.S3AccessKey, s.conf.S3SecretKey, ""))
} else {
session.Config.WithCredentials(ec2rolecreds.NewCredentials(session))
}

s.conn = s3.New(auth, aws.USEast) // TODO: hardcoded region..?
s.conn.HTTPClient = func() *http.Client {
c := &http.Client{}
return c
}
s.bucket = s.conn.Bucket(s.BucketID)
s.opened = true
return
s.conn = s3.New(session)

return nil
}

func (s *s3Store) Close() (err error) {
s.opened = false
return // TODO: .. nothing to do here..?
return
}

func (s *s3Store) Put(ctx context.Context, key string, val []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// TODO: configurable options for acl when making new s3 store
return s.bucket.Put(key, val, `application/octet-stream`, s3.PublicRead)
params := &s3.PutObjectInput{
Bucket: aws.String(s.conf.S3Bucket),
Key: aws.String(key),
ACL: aws.String("private"),
ContentType: aws.String(`application/octet-stream`),
Body: newReadSeeker(val),
}

if s.conf.KMSKeyID != "" {
params.SetSSEKMSKeyId(s.conf.KMSKeyID)
params.SetServerSideEncryption(s3.ServerSideEncryptionAwsKms)
}

_, err := s.conn.PutObjectWithContext(aws.Context(ctx), params)
return err
}

func (s *s3Store) Get(ctx context.Context, key string) (val []byte, err error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
val, err = s.bucket.Get(key)
if err != nil {
s3err, ok := err.(*s3.Error)
if ok && s3err.Code != "NoSuchKey" {
return nil, err
}
}
return val, nil
func (s *s3Store) Get(ctx context.Context, key string) ([]byte, error) {
params := &s3.GetObjectInput{
Bucket: aws.String(s.conf.S3Bucket),
Key: aws.String(key),
}

resp, err := s.conn.GetObjectWithContext(ctx, params)
if err != nil {
return nil, err
}

var val []byte
_, err = resp.Body.Read(val)
if err != nil {
return nil, err
}

return val, nil
}

func (s *s3Store) Del(ctx context.Context, key string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return s.bucket.Del(key)
params := s3.DeleteObjectInput{
Bucket: aws.String(s.conf.S3Bucket),
Key: aws.String(key),
}

_, err := s.conn.DeleteObjectWithContext(ctx, &params)
return err
}
4 changes: 2 additions & 2 deletions s3store/s3_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
package s3store

import (
"context"
"os"
"testing"

"github.com/pressly/chainstore"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)

var (
Expand All @@ -31,7 +31,7 @@ func TestS3Store(t *testing.T) {

assert := assert.New(t)

store = chainstore.New(New(bucketID, accessKey, secretKey))
store = chainstore.New(New(Config{S3Bucket: bucketID, S3AccessKey: accessKey, S3SecretKey: secretKey, S3Region: "us-east-1"}))
err = store.Open()
assert.Nil(err)
defer store.Close()
Expand Down
9 changes: 4 additions & 5 deletions timeout.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package chainstore

import (
"context"
"time"

"golang.org/x/net/context"
)

func Timeout(d time.Duration, stores ...Store) Store {
Expand All @@ -22,19 +21,19 @@ func (s *timeoutManager) Open() (err error) { return s.chain.Open() }
func (s *timeoutManager) Close() (err error) { return s.chain.Close() }

func (s *timeoutManager) Put(ctx context.Context, key string, val []byte) (err error) {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
return s.chain.Put(ctx, key, val)
}

func (s *timeoutManager) Get(ctx context.Context, key string) (data []byte, err error) {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
return s.chain.Get(ctx, key)
}

func (s *timeoutManager) Del(ctx context.Context, key string) (err error) {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
return s.chain.Del(ctx, key)

Expand Down

0 comments on commit 69e47f0

Please sign in to comment.