From d066ac9d4525db10b04018ed961e96e05d40e477 Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Wed, 9 Dec 2015 10:47:52 -0800 Subject: [PATCH 1/9] first stab at adding shard metadata --- triton/store.go | 98 ++++++++++++++++++++++++++++++++++------- triton/store_test.go | 15 +++++++ triton/stream_reader.go | 37 ++++++++++------ triton/uploader.go | 22 +++++++++ 4 files changed, 143 insertions(+), 29 deletions(-) diff --git a/triton/store.go b/triton/store.go index 4a6a149..79dd2cf 100644 --- a/triton/store.go +++ b/triton/store.go @@ -2,14 +2,16 @@ package triton import ( "bytes" + "encoding/json" "fmt" + "github.com/golang/snappy" + "github.com/skarademir/naturalsort" + "github.com/tinylib/msgp/msgp" "io" "log" "os" + "sort" "time" - - "github.com/golang/snappy" - "github.com/tinylib/msgp/msgp" ) type CheckpointService interface { @@ -18,17 +20,16 @@ type CheckpointService interface { // A store manages buffering records together into files, and uploading them somewhere. type Store struct { - name string - reader StreamReader + name string + reader StreamReader + streamMetadata *streamMetadata // Our uploaders manages sending our datafiles somewhere - uploader *S3Uploader - + uploader *S3Uploader currentLogTime time.Time currentWriter io.WriteCloser currentFilename *string - - buf *bytes.Buffer + buf *bytes.Buffer } func (s *Store) closeWriter() error { @@ -48,7 +49,8 @@ func (s *Store) closeWriter() error { s.currentWriter = nil if s.uploader != nil { - err = s.uploader.Upload(*s.currentFilename, s.generateKeyname()) + keyName := s.generateKeyname() + err = s.uploader.Upload(*s.currentFilename, keyName) if err != nil { log.Println("Failed to upload:", err) return fmt.Errorf("Failed to upload") @@ -59,17 +61,32 @@ func (s *Store) closeWriter() error { log.Println("Failed to cleanup:", err) return fmt.Errorf("Failed to cleanup writer") } - + err = s.uploadMetadata(keyName) + if err != nil { + return fmt.Errorf("failed to upload metadata: %s", err.Error()) + } } s.currentFilename = nil - s.reader.Checkpoint() } + s.streamMetadata = newStreamMetadata() return nil } +func (s *Store) uploadMetadata(keyName string) (err error) { + // upload the metadata + var metadataBuf bytes.Buffer + err = json.NewEncoder(&metadataBuf).Encode(&s.streamMetadata) + if err != nil { + err = fmt.Errorf("failed to upload metadata: %s", err.Error()) + return + } + s.uploader.UploadBuf(&metadataBuf, keyName+".metadata") + return +} + func (s *Store) openWriter(fname string) (err error) { if s.currentWriter != nil { return fmt.Errorf("Existing writer still open") @@ -205,11 +222,60 @@ func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store) { buf := bytes.NewBuffer(b) s = &Store{ - name: name, - reader: r, - buf: buf, - uploader: up, + name: name, + reader: r, + buf: buf, + uploader: up, + streamMetadata: newStreamMetadata(), } return } + +type streamMetadata struct { + // shard ID => shardInfo + Shards map[string]*shardInfo `json:"shards"` +} + +func newStreamMetadata() *streamMetadata { + return &streamMetadata{ + Shards: make(map[string]*shardInfo), + } +} + +func (s *streamMetadata) noteSequenceNumber(sequenceNum string, shardID string) { + sh := s.Shards[shardID] + if sh == nil { + sh = &shardInfo{} + s.Shards[shardID] = sh + } + sh.noteSequenceNumber(sequenceNum) +} + +type shardInfo struct { + MinSequenceNumber string `json:"min_sequence_number"` + MaxSequenceNumber string `json:"max_sequence_number"` +} + +func (s *shardInfo) noteSequenceNumber(sequenceNum string) { + if s.MinSequenceNumber == "" { + s.MinSequenceNumber = sequenceNum + } else { + nums := naturalsort.NaturalSort([]string{ + sequenceNum, + s.MinSequenceNumber, + }) + sort.Sort(nums) + s.MinSequenceNumber = nums[0] + } + if s.MaxSequenceNumber == "" { + s.MaxSequenceNumber = sequenceNum + } else { + nums := naturalsort.NaturalSort([]string{ + sequenceNum, + s.MaxSequenceNumber, + }) + sort.Sort(nums) + s.MaxSequenceNumber = nums[1] + } +} diff --git a/triton/store_test.go b/triton/store_test.go index 95390fa..6accc75 100644 --- a/triton/store_test.go +++ b/triton/store_test.go @@ -112,3 +112,18 @@ func TestPut(t *testing.T) { } } } + +func TestShardInfo(t *testing.T) { + si := &shardInfo{} + si.noteSequenceNumber("12345") + si.noteSequenceNumber("01234") + si.noteSequenceNumber("11") + + if si.MinSequenceNumber != "11" { + t.Fatalf("expecting the min sequence number to be 11 but got %q", si.MinSequenceNumber) + } + if si.MaxSequenceNumber != "12345" { + t.Fatalf("expecting the max sequence number to be 12345 but got %q", si.MaxSequenceNumber) + } + +} diff --git a/triton/stream_reader.go b/triton/stream_reader.go index fe5fc8d..f82e03e 100644 --- a/triton/stream_reader.go +++ b/triton/stream_reader.go @@ -22,7 +22,7 @@ type StreamReader interface { type multiShardStreamReader struct { checkpointer Checkpointer readers []*ShardStreamReader - recStream chan map[string]interface{} + recStream chan *shardRecord allWg sync.WaitGroup done chan struct{} quit chan struct{} @@ -37,13 +37,19 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) { return } -func (msr *multiShardStreamReader) ReadRecord() (rec map[string]interface{}, err error) { +func (msr *multiShardStreamReader) ReadRecord() (result map[string]interface{}, err error) { + shardRecord, err := msr.readShardRecord() + result = shardRecord.record + return +} + +func (msr *multiShardStreamReader) readShardRecord() (result *shardRecord, err error) { select { - case rec = <-msr.recStream: - return rec, nil + case result = <-msr.recStream: case <-msr.done: - return nil, io.EOF + err = io.EOF } + return } func (msr *multiShardStreamReader) Stop() { @@ -56,12 +62,12 @@ const maxShards int = 100 func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error) { msr := multiShardStreamReader{ - c, - make([]*ShardStreamReader, 0), - make(chan map[string]interface{}), - sync.WaitGroup{}, - make(chan struct{}), - make(chan struct{}, maxShards), + checkpointer: c, + readers: make([]*ShardStreamReader, 0), + recStream: make(chan *shardRecord), + allWg: sync.WaitGroup{}, + done: make(chan struct{}), + quit: make(chan struct{}, maxShards), } shards, err := ListShards(svc, streamName) @@ -118,7 +124,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr return } -func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface{}, done chan struct{}) { +func processStreamToChan(r *ShardStreamReader, recChan chan *shardRecord, done chan struct{}) { for { select { case <-done: @@ -150,9 +156,14 @@ func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface log.Println("Extra bytes in stream record", len(eb)) return } + shardRec := &shardRecord{ + record: rec, + shard: string(r.ShardID), + sequence: *kRec.SequenceNumber, + } select { - case recChan <- rec: + case recChan <- shardRec: case <-done: return } diff --git a/triton/uploader.go b/triton/uploader.go index e023feb..e0fd3c6 100644 --- a/triton/uploader.go +++ b/triton/uploader.go @@ -2,6 +2,7 @@ package triton import ( "fmt" + "io" "log" "os" @@ -27,6 +28,7 @@ func (s *S3Uploader) Upload(fileName, keyName string) (err error) { } log.Println("Uploading", fileName) + ui := s3manager.UploadInput{ Bucket: aws.String(s.bucketName), Key: aws.String(keyName), @@ -45,6 +47,26 @@ func (s *S3Uploader) Upload(fileName, keyName string) (err error) { return } +func (s *S3Uploader) UploadBuf(r io.Reader, keyName string) (err error) { + log.Println("Uploading", keyName) + + ui := s3manager.UploadInput{ + Bucket: aws.String(s.bucketName), + Key: aws.String(keyName), + Body: r, + } + + _, err = s.uploader.Upload(&ui) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + return fmt.Errorf("Failed to upload: %v (%v)", awsErr.Code(), awsErr.Message()) + } + return + } else { + log.Println("Completed upload to", keyName) + } + return +} func NewUploader(c client.ConfigProvider, bucketName string) *S3Uploader { m := s3manager.NewUploader(c) From 634562d41466e686d4a80d9ae5072089dd23036b Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Wed, 9 Dec 2015 10:51:22 -0800 Subject: [PATCH 2/9] Forgot this one --- triton/shard_record.go | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 triton/shard_record.go diff --git a/triton/shard_record.go b/triton/shard_record.go new file mode 100644 index 0000000..40774f1 --- /dev/null +++ b/triton/shard_record.go @@ -0,0 +1,11 @@ +package triton + +type shardRecord struct { + record map[string]interface{} + shard string + sequence string +} + +type shardReader interface { + readShardRecord() (result *shardRecord, err error) +} From db02b28d1282c0db53bdafab841703eeae0b4b6f Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Wed, 9 Dec 2015 15:09:00 -0800 Subject: [PATCH 3/9] add a benchmark --- triton/store_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/triton/store_test.go b/triton/store_test.go index 6accc75..77a0d7f 100644 --- a/triton/store_test.go +++ b/triton/store_test.go @@ -127,3 +127,10 @@ func TestShardInfo(t *testing.T) { } } + +func BenchmarkShardInfo(b *testing.B) { + si := &shardInfo{} + for i := 0; i < b.N; i++ { + si.noteSequenceNumber("12345") + } +} From 5e551d65911069cf850cad0cd729c75bfddcb2c4 Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Thu, 10 Dec 2015 13:35:08 -0800 Subject: [PATCH 4/9] code review fixes --- triton/shard_record.go | 12 +++++------ triton/store.go | 46 ++++++++++++++++++++++------------------- triton/stream_reader.go | 22 +++++++++++--------- triton/uploader.go | 2 +- 4 files changed, 44 insertions(+), 38 deletions(-) diff --git a/triton/shard_record.go b/triton/shard_record.go index 40774f1..d7f415a 100644 --- a/triton/shard_record.go +++ b/triton/shard_record.go @@ -1,11 +1,11 @@ package triton -type shardRecord struct { - record map[string]interface{} - shard string - sequence string +type ShardRecord struct { + Record map[string]interface{} + ShardID ShardID + SequenceNumber SequenceNumber } -type shardReader interface { - readShardRecord() (result *shardRecord, err error) +type ShardReader interface { + ReadShardRecord() (result *ShardRecord, err error) } diff --git a/triton/store.go b/triton/store.go index 79dd2cf..d912478 100644 --- a/triton/store.go +++ b/triton/store.go @@ -11,17 +11,19 @@ import ( "log" "os" "sort" + "sync" "time" ) -type CheckpointService interface { - Checkpoint(string) error +type ShardReaderCheckpointer interface { + ShardReader + Checkpoint() error } // A store manages buffering records together into files, and uploading them somewhere. type Store struct { name string - reader StreamReader + reader ShardReaderCheckpointer streamMetadata *streamMetadata // Our uploaders manages sending our datafiles somewhere @@ -83,7 +85,7 @@ func (s *Store) uploadMetadata(keyName string) (err error) { err = fmt.Errorf("failed to upload metadata: %s", err.Error()) return } - s.uploader.UploadBuf(&metadataBuf, keyName+".metadata") + s.uploader.UploadData(&metadataBuf, keyName+".metadata") return } @@ -101,7 +103,6 @@ func (s *Store) openWriter(fname string) (err error) { s.currentFilename = &fname s.currentWriter = f s.currentLogTime = time.Now() - return } @@ -197,7 +198,7 @@ func (s *Store) Store() (err error) { // TODO: We're unmarshalling and then marshalling msgpack here when // there is not real reason except that's a more useful general // interface. We should add another that is ReadRaw - rec, err := s.reader.ReadRecord() + shardRec, err := s.reader.ReadShardRecord() if err != nil { if err == io.EOF { break @@ -205,8 +206,8 @@ func (s *Store) Store() (err error) { return err } } - - err = s.PutRecord(rec) + s.streamMetadata.noteSequenceNumber(shardRec.ShardID, shardRec.SequenceNumber) + err = s.PutRecord(shardRec.Record) if err != nil { return err } @@ -217,7 +218,7 @@ func (s *Store) Store() (err error) { const BUFFER_SIZE int = 1024 * 1024 -func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store) { +func NewStore(name string, r ShardReaderCheckpointer, up *S3Uploader) (s *Store) { b := make([]byte, 0, BUFFER_SIZE) buf := bytes.NewBuffer(b) @@ -234,16 +235,19 @@ func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store) { type streamMetadata struct { // shard ID => shardInfo - Shards map[string]*shardInfo `json:"shards"` + Shards map[ShardID]*shardInfo `json:"shards"` + sync.Mutex } func newStreamMetadata() *streamMetadata { return &streamMetadata{ - Shards: make(map[string]*shardInfo), + Shards: make(map[ShardID]*shardInfo), } } -func (s *streamMetadata) noteSequenceNumber(sequenceNum string, shardID string) { +func (s *streamMetadata) noteSequenceNumber(shardID ShardID, sequenceNum SequenceNumber) { + s.Lock() + defer s.Unlock() sh := s.Shards[shardID] if sh == nil { sh = &shardInfo{} @@ -253,29 +257,29 @@ func (s *streamMetadata) noteSequenceNumber(sequenceNum string, shardID string) } type shardInfo struct { - MinSequenceNumber string `json:"min_sequence_number"` - MaxSequenceNumber string `json:"max_sequence_number"` + MinSequenceNumber SequenceNumber `json:"min_sequence_number"` + MaxSequenceNumber SequenceNumber `json:"max_sequence_number"` } -func (s *shardInfo) noteSequenceNumber(sequenceNum string) { +func (s *shardInfo) noteSequenceNumber(sequenceNum SequenceNumber) { if s.MinSequenceNumber == "" { s.MinSequenceNumber = sequenceNum } else { nums := naturalsort.NaturalSort([]string{ - sequenceNum, - s.MinSequenceNumber, + string(sequenceNum), + string(s.MinSequenceNumber), }) sort.Sort(nums) - s.MinSequenceNumber = nums[0] + s.MinSequenceNumber = SequenceNumber(nums[0]) } if s.MaxSequenceNumber == "" { s.MaxSequenceNumber = sequenceNum } else { nums := naturalsort.NaturalSort([]string{ - sequenceNum, - s.MaxSequenceNumber, + string(sequenceNum), + string(s.MaxSequenceNumber), }) sort.Sort(nums) - s.MaxSequenceNumber = nums[1] + s.MaxSequenceNumber = SequenceNumber(nums[1]) } } diff --git a/triton/stream_reader.go b/triton/stream_reader.go index f82e03e..00e7a8b 100644 --- a/triton/stream_reader.go +++ b/triton/stream_reader.go @@ -22,7 +22,7 @@ type StreamReader interface { type multiShardStreamReader struct { checkpointer Checkpointer readers []*ShardStreamReader - recStream chan *shardRecord + recStream chan *ShardRecord allWg sync.WaitGroup done chan struct{} quit chan struct{} @@ -38,12 +38,14 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) { } func (msr *multiShardStreamReader) ReadRecord() (result map[string]interface{}, err error) { - shardRecord, err := msr.readShardRecord() - result = shardRecord.record + shardRecord, err := msr.ReadShardRecord() + if err != nil { + result = shardRecord.Record + } return } -func (msr *multiShardStreamReader) readShardRecord() (result *shardRecord, err error) { +func (msr *multiShardStreamReader) ReadShardRecord() (result *ShardRecord, err error) { select { case result = <-msr.recStream: case <-msr.done: @@ -64,7 +66,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr msr := multiShardStreamReader{ checkpointer: c, readers: make([]*ShardStreamReader, 0), - recStream: make(chan *shardRecord), + recStream: make(chan *ShardRecord), allWg: sync.WaitGroup{}, done: make(chan struct{}), quit: make(chan struct{}, maxShards), @@ -124,7 +126,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr return } -func processStreamToChan(r *ShardStreamReader, recChan chan *shardRecord, done chan struct{}) { +func processStreamToChan(r *ShardStreamReader, recChan chan *ShardRecord, done chan struct{}) { for { select { case <-done: @@ -156,10 +158,10 @@ func processStreamToChan(r *ShardStreamReader, recChan chan *shardRecord, done c log.Println("Extra bytes in stream record", len(eb)) return } - shardRec := &shardRecord{ - record: rec, - shard: string(r.ShardID), - sequence: *kRec.SequenceNumber, + shardRec := &ShardRecord{ + Record: rec, + ShardID: r.ShardID, + SequenceNumber: SequenceNumber(*kRec.SequenceNumber), } select { diff --git a/triton/uploader.go b/triton/uploader.go index e0fd3c6..1368e71 100644 --- a/triton/uploader.go +++ b/triton/uploader.go @@ -47,7 +47,7 @@ func (s *S3Uploader) Upload(fileName, keyName string) (err error) { return } -func (s *S3Uploader) UploadBuf(r io.Reader, keyName string) (err error) { +func (s *S3Uploader) UploadData(r io.Reader, keyName string) (err error) { log.Println("Uploading", keyName) ui := s3manager.UploadInput{ From 594597748fd247ffd70e208fc56c63e70f2d18ce Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Fri, 18 Dec 2015 11:48:37 -0500 Subject: [PATCH 5/9] Add archive tailing --- triton/archive_key.go | 82 +++++++++++ triton/archive_key_test.go | 20 +++ triton/archive_repository.go | 100 ++++++++++++++ triton/record.go | 5 + triton/stream_metadata.go | 98 +++++++++++++ triton/tail.go | 259 +++++++++++++++++++++++++++++++++++ triton/tail/tail.go | 194 ++++++++++++++++++++++++++ triton/tail/tail_test.go | 16 +++ triton/tail_test.go | 97 +++++++++++++ 9 files changed, 871 insertions(+) create mode 100644 triton/archive_key.go create mode 100644 triton/archive_key_test.go create mode 100644 triton/archive_repository.go create mode 100644 triton/record.go create mode 100644 triton/stream_metadata.go create mode 100644 triton/tail.go create mode 100644 triton/tail/tail.go create mode 100644 triton/tail/tail_test.go create mode 100644 triton/tail_test.go diff --git a/triton/archive_key.go b/triton/archive_key.go new file mode 100644 index 0000000..5b7935d --- /dev/null +++ b/triton/archive_key.go @@ -0,0 +1,82 @@ +package triton + +import ( + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +// ArchiveKey is a struct representing the path value for the Triton S3 keys +type ArchiveKey struct { + Client string + Stream string + Time time.Time +} + +// Path encodes the ArchiveKey to a string path +func (a ArchiveKey) Path() string { + return fmt.Sprintf("%04d%02d%02d/%s-%d.tri", a.Time.Year(), a.Time.Month(), a.Time.Day(), a.fullStreamName(), a.Time.Unix()) +} + +const ( + metadataSuffix = ".metadata" +) + +// MetadataPath encodes the ArchiveKey to a string path with the metadata suffix applied +func (a ArchiveKey) MetadataPath() string { + return a.Path() + metadataSuffix +} + +// fullStreamName returns the full stream name (stream + "-" + client) if there is a client name or just stream +func (a ArchiveKey) fullStreamName() (stream string) { + stream = a.Stream + if a.Client != "" { + stream += "-" + a.Client + } + return +} + +// PathPrefix returns the string key prefix without the timestamp +func (a ArchiveKey) PathPrefix() string { + return fmt.Sprintf("%04d%02d%02d/%s-", a.Time.Year(), a.Time.Month(), a.Time.Day(), a.fullStreamName()) +} + +func (a ArchiveKey) Equal(other ArchiveKey) (result bool) { + if a.Stream != other.Stream { + return false + } + if a.Time.Truncate(time.Second) != other.Time.Truncate(time.Second) { + return false + } + if a.Client != other.Client { + return false + } + return true +} + +var archiveKeyPattern = regexp.MustCompile(`^/?(?P\d{8})\/(?P.+)\-(?P\d+)\.tri$`) + +// Decode an archive S3 key into an ArchiveKey +func DecodeArchiveKey(keyName string) (a ArchiveKey, err error) { + res := archiveKeyPattern.FindStringSubmatch(keyName) + if res == nil { + err = fmt.Errorf("Invalid key name") + return + } + ts, err := strconv.ParseInt(res[3], 10, 64) + if err != nil { + err = fmt.Errorf("Failed to parse timestamp value: %s", err.Error()) + return + } + a.Time = time.Unix(ts, 0) + nameParts := strings.Split(res[2], "-") + if len(nameParts) != 2 { + err = fmt.Errorf("Failure parsing stream name: %v", res[2]) + return + } + a.Stream = nameParts[0] + a.Client = nameParts[1] + return +} diff --git a/triton/archive_key_test.go b/triton/archive_key_test.go new file mode 100644 index 0000000..f688df6 --- /dev/null +++ b/triton/archive_key_test.go @@ -0,0 +1,20 @@ +package triton + +import ( + "testing" + "time" +) + +func TestArchiveKeyPathCodec(t *testing.T) { + aTime := time.Now() + archiveKey := ArchiveKey{Time: aTime, Stream: "a", Client: "b"} + archiveKey2, err := DecodeArchiveKey(archiveKey.Path()) + + if err != nil { + t.Fatalf("unexpected error: %s", err.Error()) + } + if !archiveKey.Equal(archiveKey2) { + t.Fatalf("expecting %+v == %+v", archiveKey, archiveKey2) + } + +} diff --git a/triton/archive_repository.go b/triton/archive_repository.go new file mode 100644 index 0000000..d5c2bdc --- /dev/null +++ b/triton/archive_repository.go @@ -0,0 +1,100 @@ +package triton + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "io" + "io/ioutil" + "sort" + "strings" + "time" +) + +// ArchiveRepository manages reading and writing Archives +type ArchiveRepository struct { + s3Service S3Service + s3Uploader S3UploaderService + stream string + bucket string + client string +} + +func NewArchiveRepository(s3Service S3Service, s3Uploader S3UploaderService, bucket string, stream string, client string) *ArchiveRepository { + return &ArchiveRepository{ + s3Service: s3Service, + s3Uploader: s3Uploader, + bucket: bucket, + stream: stream, + client: client, + } +} + +// Upload the archive for a stream at Time t +func (ar *ArchiveRepository) Upload(t time.Time, contents io.ReadCloser, metadata *StreamMetadata) (err error) { + archiveKey := ArchiveKey{Stream: ar.stream, Time: t, Client: ar.client} + _, err = ar.s3Uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(ar.bucket), + Key: aws.String(archiveKey.Path()), + Body: contents, + }) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + return fmt.Errorf("Failed to upload: %v (%v)", awsErr.Code(), awsErr.Message()) + } + return + } + var buf bytes.Buffer + err = json.NewEncoder(&buf).Encode(metadata) + if err != nil { + return + } + _, err = ar.s3Uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(ar.bucket), + Key: aws.String(archiveKey.MetadataPath()), + Body: ioutil.NopCloser(&buf), + }) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + return fmt.Errorf("Failed to upload metadata: %v (%v)", awsErr.Code(), awsErr.Message()) + } + return + } + return +} + +// ArchivesAtDate lists all the archives for a stream stored at a UTC date represented by aDate +func (ar *ArchiveRepository) ArchivesAtDate(aDate time.Time) (result []StoreArchive, err error) { + keyPrefix := ArchiveKey{Time: aDate, Stream: ar.stream, Client: ar.client}.PathPrefix() + keys := []string{} + err = ar.s3Service.ListObjectsPages(&s3.ListObjectsInput{ + Bucket: aws.String(ar.bucket), + Prefix: aws.String(keyPrefix), + }, func(output *s3.ListObjectsOutput, lastPage bool) (shouldContinue bool) { + for _, object := range output.Contents { + keys = append(keys, *object.Key) + } + return true + }) + if err != nil { + return + } + sort.Sort(sort.StringSlice(keys)) + for _, key := range keys { + if strings.HasSuffix(key, metadataSuffix) { + continue + } + var sa StoreArchive + sa, err = NewStoreArchive(ar.bucket, key, ar.s3Service) + if err != nil { + err = fmt.Errorf("failed to create store archive for %q: %s", key, err) + return + } + result = append(result, sa) + } + return +} diff --git a/triton/record.go b/triton/record.go new file mode 100644 index 0000000..f99b601 --- /dev/null +++ b/triton/record.go @@ -0,0 +1,5 @@ +package triton + +type Record map[string]interface{} + +func NewRecord() Record { return make(Record) } diff --git a/triton/stream_metadata.go b/triton/stream_metadata.go new file mode 100644 index 0000000..e598d52 --- /dev/null +++ b/triton/stream_metadata.go @@ -0,0 +1,98 @@ +package triton + +import ( + "encoding/json" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "sort" + "sync" +) + +type StreamMetadata struct { + // shard ID => ShardInfo + Shards map[ShardID]*ShardInfo `json:"shards"` + sync.Mutex `json:"-"` +} + +func NewStreamMetadata() *StreamMetadata { + return &StreamMetadata{ + Shards: make(map[ShardID]*ShardInfo), + } +} + +func (s *StreamMetadata) noteSequenceNumber(shardID ShardID, sequenceNum SequenceNumber) { + s.Lock() + defer s.Unlock() + sh := s.Shards[shardID] + if sh == nil { + sh = &ShardInfo{} + s.Shards[shardID] = sh + } + sh.noteSequenceNumber(sequenceNum) +} + +type ShardInfo struct { + MinSequenceNumber SequenceNumber `json:"min_sequence_number"` + MaxSequenceNumber SequenceNumber `json:"max_sequence_number"` +} + +func (s *ShardInfo) noteSequenceNumber(sequenceNum SequenceNumber) { + if s.MinSequenceNumber == "" { + s.MinSequenceNumber = sequenceNum + } else { + nums := []string{ + string(sequenceNum), + string(s.MinSequenceNumber), + } + sort.Sort(sort.StringSlice(nums)) + s.MinSequenceNumber = SequenceNumber(nums[0]) + } + if s.MaxSequenceNumber == "" { + s.MaxSequenceNumber = sequenceNum + } else { + nums := []string{ + string(sequenceNum), + string(s.MaxSequenceNumber), + } + sort.Sort(sort.StringSlice(nums)) + s.MaxSequenceNumber = SequenceNumber(nums[1]) + } +} + +// ReadStreamMetadataParams are params for ReadStreamMetadata +type ReadStreamMetadataParams struct { + s3Service S3Service // S3 client + Bucket string // S3 Bucket + ArchiveKey string // The base s3 key of the archive. We store the metadata like $s3key.metadata +} + +const ( + streamMetadataExt = ".metadata" + awsNoSuchEntity = "404" +) + +// ReadStreamMetadata loads the metadata for a stream archive +// +// Returns result=>nil on no metadata + +func ReadStreamMetadata(client S3Service, bucket, key string) (result *StreamMetadata, err error) { + getObjectOutput, err := client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key + streamMetadataExt), + }) + if err != nil { + if awsError, ok := err.(awserr.Error); ok { + // Return nil on 404/NoSuchEntity + if awsError.Code() == awsNoSuchEntity { + err = nil + return + } + } + return + } + defer getObjectOutput.Body.Close() + result = NewStreamMetadata() + err = json.NewDecoder(getObjectOutput.Body).Decode(result) + return +} diff --git a/triton/tail.go b/triton/tail.go new file mode 100644 index 0000000..817bc7c --- /dev/null +++ b/triton/tail.go @@ -0,0 +1,259 @@ +package triton + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/tinylib/msgp/msgp" + "io" + "log" + "sync" + "time" +) + +type TailAt struct { + stream string + streamInited bool + bucket string + client string + at time.Time + closed bool + records chan Record + errors chan error + region string + kinesisService KinesisService + s3Service S3Service + pollInterval time.Duration + emptyPollInterval time.Duration + sync.Mutex +} + +// NewTailAtParams are the parameters for invoking NewTailAt +type NewTailAtParams struct { + S3Service S3Service + KinesisService KinesisService + StreamName string + Bucket string + Client string + At time.Time + PollInterval time.Duration + EmptyPollInterval time.Duration +} + +const ( + // The amount of time to poll for new kinesis records if we encounter an empty stream + DefaultEmptyPollInterval = 10 * time.Second + DefaultPollInterval = 1 * time.Millisecond +) + +// NewTail returns a new tailing stream starting at "at" +func NewTailAt(params *NewTailAtParams) (tail *TailAt) { + if params.EmptyPollInterval == 0 { + params.EmptyPollInterval = DefaultEmptyPollInterval + } + + if params.PollInterval == 0 { + params.PollInterval = DefaultPollInterval + } + + tail = &TailAt{ + at: params.At, + emptyPollInterval: params.EmptyPollInterval, + kinesisService: params.KinesisService, + pollInterval: params.PollInterval, + s3Service: params.S3Service, + stream: params.StreamName, + bucket: params.Bucket, + client: params.Client, + records: make(chan Record), + errors: make(chan error), + } + return +} + +func (t *TailAt) Next() (record Record, err error) { + t.Lock() + defer t.Unlock() + go t.initStream() + select { + case record = <-t.records: + case err = <-t.errors: + } + return +} + +func (t *TailAt) sendArchivedRecords() (lastMetadata *StreamMetadata, err error) { + log.Println("sending archived records") + archiveRepository := NewArchiveRepository(t.s3Service, nil, t.bucket, t.stream, t.client) + aTime := t.at.AddDate(0, 0, -1) + end := t.at.AddDate(0, 0, 2) + var lastArchive *StoreArchive + for aTime.Before(end) { + if t.closed { + break + } + if !aTime.Before(end) { + break + } + log.Println("checking", aTime) + var archives []StoreArchive + archives, err = archiveRepository.ArchivesAtDate(aTime) + if err != nil { + return + } + log.Println("archives: ", archives) + for _, archive := range archives { + if t.closed { + return + } + lastArchive = &archive + log.Println("archive time:", archive.T, "tail time:", t.at) + if archive.T.Before(t.at) { + log.Println("archive is before target") + continue + } + for { + log.Println("reading records from archive") + var rec map[string]interface{} + rec, err = archive.ReadRecord() + if err == io.EOF { + err = nil + break + } else if err != nil { + return + } + log.Println("record", rec) + t.records <- rec + } + } + aTime = aTime.AddDate(0, 0, 1) + } + log.Println("done sending archives") + if lastArchive != nil { + log.Println("loading metadata") + lastMetadata, err = lastArchive.GetStreamMetadata() + } + return +} + +func (t *TailAt) initStream() { + if t.streamInited { + return + } + t.streamInited = true + + lastStreamMetadata, err := t.sendArchivedRecords() + if err != nil { + t.errors <- err + return + } + log.Println("done sending archived records") + err = t.sendKinesisRecords(lastStreamMetadata) + if err != nil { + t.errors <- err + } +} + +func (t *TailAt) sendKinesisRecords(previousMetadata *StreamMetadata) (err error) { + shards, err := t.listShards() + if err != nil { + return + } + + // send all of the records in `startingKey` + // load metadata for starting key + // then send kinesis records + // load the sequenceNumbers for the last key + for _, shard := range shards { + var lastSequenceNumber SequenceNumber + if previousMetadata != nil && previousMetadata.Shards[shard] != nil { + lastSequenceNumber = previousMetadata.Shards[shard].MaxSequenceNumber + } + go t.sendKinesisRecordsForShard(shard, lastSequenceNumber) + } + return +} + +// sendKinesisRecordsForShard starts sending records to TailAt.records for the shard optionally starting at starting startingSequenceNumber +// +// If a startingSequenceNumber is specified the reader tries to start reading +// records at startingSequenceNumber, otherwise it tries to find a starting +// sequence number at TRIM_HORIZON to begin reading. +func (t *TailAt) sendKinesisRecordsForShard(shard ShardID, startingSequenceNumber SequenceNumber) { + log.Println("sendKinesisRecordsForShard", shard, startingSequenceNumber) + + // Start reading from shard and send records to t.records + iterator, err := t.getStreamIterator(shard, startingSequenceNumber) + if err != nil { + t.errors <- err + return + } + for { + if t.closed { + break + } + var getRecordsInput kinesis.GetRecordsInput + getRecordsInput.ShardIterator = aws.String(iterator) + var getRecordsOutput *kinesis.GetRecordsOutput + getRecordsOutput, err = t.kinesisService.GetRecords(&getRecordsInput) + if err != nil { + // catch rate limiting errors + t.errors <- err + return + } + iterator = *getRecordsOutput.NextShardIterator + if len(getRecordsOutput.Records) == 0 { + time.Sleep(t.emptyPollInterval) + } else { + for _, kinesisRecord := range getRecordsOutput.Records { + rec, _, err := msgp.ReadMapStrIntfBytes(kinesisRecord.Data, nil) + if err != nil { + t.errors <- fmt.Errorf("unexpected error decoding record (%s): %s", shard, err.Error()) + return + } + t.records <- rec + } + } + } +} + +// getStreamIterator returns an iterator for the stream and shard, optionally starting at startingSequenceNumber or at TrimHorizon +func (t *TailAt) getStreamIterator(shardID ShardID, startingSequenceNumber SequenceNumber) (iteratorID string, err error) { + // Handle the case where startingSequenceNumber is invalid + var getShardIteratorInput kinesis.GetShardIteratorInput + getShardIteratorInput.ShardId = aws.String(string(shardID)) + getShardIteratorInput.StreamName = aws.String(t.stream) + if startingSequenceNumber != "" { + getShardIteratorInput.StartingSequenceNumber = aws.String(string(startingSequenceNumber)) + getShardIteratorInput.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAtSequenceNumber) + } else { + getShardIteratorInput.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon) + } + getShardIteratorOutput, err := t.kinesisService.GetShardIterator(&getShardIteratorInput) + if err != nil { + return + } + iteratorID = *getShardIteratorOutput.ShardIterator + return +} + +// listShardsForStream helper method to list all the shards for a stream +func (t *TailAt) listShards() (result []ShardID, err error) { + describeStreamOutput, err := t.kinesisService.DescribeStream(&kinesis.DescribeStreamInput{ + StreamName: aws.String(t.stream), + }) + if err != nil { + return + } + for _, shard := range describeStreamOutput.StreamDescription.Shards { + result = append(result, ShardID(*shard.ShardId)) + } + return +} + +// Close closes the tail stream +func (t *TailAt) Close() { + t.Lock() + defer t.Unlock() + t.closed = true +} diff --git a/triton/tail/tail.go b/triton/tail/tail.go new file mode 100644 index 0000000..0bd62bc --- /dev/null +++ b/triton/tail/tail.go @@ -0,0 +1,194 @@ +package tail + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/postmates/go-triton/triton" + "io" + "sync" + "time" +) + +type TailAt struct { + stream string + streamInited bool + bucket string + at time.Time + closed bool + records chan triton.Record + errors chan error + region string + kinesisService triton.KinesisService + s3Service triton.S3Service + PollInterval time.Duration + EmptyPollInterval time.Duration + sync.Mutex +} + +// NewTail returns a new tailing stream starting at "at" +func NewTailAt(bucket string, streamName string, at time.Time) (tail *TailAt, err error) { + tail = &TailAt{ + stream: streamName, + at: at, + EmptyPollInterval: time.Second * 10, + PollInterval: time.Second, + } + return +} + +func (t *TailAt) Next() (record triton.Record, err error) { + t.Lock() + defer t.Unlock() + go t.initStream() + select { + case record = <-t.records: + case err = <-t.errors: + } + return +} + +func (t *TailAt) sendArchivedRecords() (lastMetadata *triton.StreamMetadata, err error) { + archiveRepository := triton.NewS3StoreArchiveRepository(t.s3Service, t.bucket, t.stream) + aTime := t.at.AddDate(0, 0, -1) + end := t.at.AddDate(0, 0, 2) + var lastArchive *triton.StoreArchive + for !t.closed && aTime.Before(end) { + var archives []triton.StoreArchive + archives, err = archiveRepository.ArchivesAtDate(t.at) + if err != nil { + return + } + for _, archive := range archives { + if t.closed { + return + } + lastArchive = &archive + if archive.T.Before(t.at) { + continue + } + for { + var rec map[string]interface{} + rec, err = archive.ReadRecord() + if err == io.EOF { + err = nil + break + } else if err != nil { + return + } + t.records <- rec + } + } + aTime = t.at.AddDate(0, 0, 1) + } + if lastArchive != nil { + lastMetadata, err = lastArchive.GetStreamMetadata() + } + return +} + +func (t *TailAt) initStream() { + if t.streamInited { + return + } + t.streamInited = true + + lastStreamMetadata, err := t.sendArchivedRecords() + if err != nil { + t.errors <- err + return + } + err = t.sendKinesisRecords(lastStreamMetadata) + if err != nil { + t.errors <- err + } +} + +func (t *TailAt) sendKinesisRecords(previousMetadata *triton.StreamMetadata) (err error) { + shards, err := t.listShards() + if err != nil { + return + } + + // send all of the records in `startingKey` + // load metadata for starting key + // then send kinesis records + // load the sequenceNumbers for the last key + for _, shard := range shards { + var lastSequenceNumber triton.SequenceNumber + if previousMetadata != nil && previousMetadata.Shards[shard] != nil { + lastSequenceNumber = previousMetadata.Shards[shard].MaxSequenceNumber + } + go t.sendKinesisRecordsForShard(shard, lastSequenceNumber) + } + return +} + +// sendKinesisRecordsForShard starts sending records to TailAt.records for the shard optionally starting at starting startingSequenceNumber +// +// If a startingSequenceNumber is specified the reader tries to start reading +// records at startingSequenceNumber, otherwise it tries to find a starting +// sequence number at TRIM_HORIZON to begin reading. +func (t *TailAt) sendKinesisRecordsForShard(shard triton.ShardID, startingSequenceNumber triton.SequenceNumber) { + // Start reading from shard and send records to t.records + iterator, err := t.getStreamIterator(shard, startingSequenceNumber) + if err != nil { + t.errors <- err + return + } + for !t.closed { + var getRecordsInput kinesis.GetRecordsInput + getRecordsInput.ShardIterator = aws.String(iterator) + var getRecordsOutput *kinesis.GetRecordsOutput + getRecordsOutput, err = t.kinesisService.GetRecords(&getRecordsInput) + if err != nil { + // catch rate limiting errors + t.errors <- err + return + } + iterator = *getRecordsOutput.NextShardIterator + if len(getRecordsOutput.Records) == 0 { + time.Sleep(t.EmptyPollInterval) + } + } +} + +// getStreamIterator returns an iterator for the stream and shard, optionally starting at startingSequenceNumber or at LATEST +func (t *TailAt) getStreamIterator(shardID triton.ShardID, startingSequenceNumber triton.SequenceNumber) (iteratorID string, err error) { + // Handle the case where startingSequenceNumber is invalid + var getShardIteratorInput kinesis.GetShardIteratorInput + getShardIteratorInput.ShardId = aws.String(string(shardID)) + getShardIteratorInput.StreamName = aws.String(t.stream) + if startingSequenceNumber != "" { + getShardIteratorInput.StartingSequenceNumber = aws.String(string(startingSequenceNumber)) + getShardIteratorInput.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAtSequenceNumber) + } else { + getShardIteratorInput.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon) + } + getShardIteratorOutput, err := t.kinesisService.GetShardIterator(&getShardIteratorInput) + if err != nil { + return + } + iteratorID = *getShardIteratorOutput.ShardIterator + return +} + +// listShardsForStream helper method to list all the shards for a stream +func (t *TailAt) listShards() (result []triton.ShardID, err error) { + describeStreamOutput, err := t.kinesisService.DescribeStream(&kinesis.DescribeStreamInput{ + StreamName: aws.String(t.stream), + }) + if err != nil { + return + } + for _, shard := range describeStreamOutput.StreamDescription.Shards { + result = append(result, triton.ShardID(*shard.ShardId)) + } + return +} + +// Close closes the tail stream +func (t *TailAt) Close() { + t.Lock() + defer t.Unlock() + t.closed = true +} diff --git a/triton/tail/tail_test.go b/triton/tail/tail_test.go new file mode 100644 index 0000000..084298e --- /dev/null +++ b/triton/tail/tail_test.go @@ -0,0 +1,16 @@ +package tail + +import ( + "testing" + "time" +) + +var testBucket *testin + +func TestTailStream(t *testing.T) { + +} + +func BenchmarkParseKey(b *testing.B) { + +} diff --git a/triton/tail_test.go b/triton/tail_test.go new file mode 100644 index 0000000..545abf8 --- /dev/null +++ b/triton/tail_test.go @@ -0,0 +1,97 @@ +package triton + +import ( + "bytes" + "github.com/golang/snappy" + "github.com/tinylib/msgp/msgp" + "io" + "io/ioutil" + "log" + "testing" + "time" +) + +type recordEncoder struct { + out *msgp.Writer +} + +func newRecordEncoder(w io.Writer) *recordEncoder { + return &recordEncoder{ + out: msgp.NewWriter(snappy.NewWriter(w)), + } +} + +func (e *recordEncoder) Encode(r Record) (err error) { + if err = e.out.WriteMapStrIntf(map[string]interface{}(r)); err != nil { + return + } + err = e.out.Flush() + return +} + +func TestTailStream(t *testing.T) { + s3Service := newTestS3Service() + s3UploaderService := s3Service.uploader() + kinesisService := newTestKinesisService() + stream := "test_stream" + bucket := "testbucket" + tailTime := time.Now() + ar := NewArchiveRepository(s3Service, s3UploaderService, bucket, stream, "archive") + rec1 := NewRecord() + rec1["key"] = "1" + rec2 := NewRecord() + rec2["key"] = "2" + rec3 := NewRecord() + rec3["key"] = "3" + + // prepare s3 + + // send day1 + recBuf := bytes.Buffer{} + newRecordEncoder(&recBuf).Encode(rec1) + metadata := NewStreamMetadata() + metadata.noteSequenceNumber(ShardID("A"), SequenceNumber("1")) + ar.Upload(tailTime.AddDate(0, 0, -1), ioutil.NopCloser(&recBuf), metadata) + + // send day2 + recBuf = bytes.Buffer{} + newRecordEncoder(&recBuf).Encode(rec2) + metadata = NewStreamMetadata() + metadata.noteSequenceNumber(ShardID("A"), SequenceNumber("2")) + ar.Upload(tailTime.Add(time.Hour), ioutil.NopCloser(&recBuf), metadata) + log.Println("uploaded records") + + // put day3 record on kinesis: + st := newTestKinesisStream("test_stream") + s1 := newTestKinesisShard() + s1.AddRecord(SequenceNumber("3"), rec3) + st.AddShard(ShardID("A"), s1) + kinesisService.AddStream(st) + + // run the tail: + tailAt := NewTailAt(&NewTailAtParams{ + S3Service: s3Service, + KinesisService: kinesisService, + Bucket: bucket, + StreamName: stream, + At: tailTime, + }) + log.Println("loading record") + record, err := tailAt.Next() + if err != nil { + t.Fatalf("unexpected error: %s", err.Error()) + } + + if record["key"] != "2" { + t.Fatalf("unexpected record: %s", record["key"]) + } + + record, err = tailAt.Next() + if err != nil { + t.Fatalf("unexpected error: %s", err.Error()) + } + + if record["key"] != "3" { + t.Fatalf("unexpected record: %s", record["key"]) + } +} From 57fc259b1593890821fa4111b6535ad58e84472c Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Fri, 18 Dec 2015 13:16:15 -0500 Subject: [PATCH 6/9] Add triton tailing, fix tests --- .gitignore | 2 + Makefile | 4 + triton/archive.go | 42 +++----- triton/archive_test.go | 38 ++++--- triton/aws.go | 6 ++ triton/config_test.go | 2 - triton/shard_record.go | 2 +- triton/store.go | 67 +----------- triton/store_reader.go | 6 +- triton/store_test.go | 21 ++-- triton/stream_reader.go | 4 +- triton/stream_reader_test.go | 4 +- triton/tail.go | 16 +-- triton/tail/tail.go | 194 ----------------------------------- triton/tail/tail_test.go | 16 --- triton/test_util.go | 119 +++++++++++++++++++++ triton/uploader.go | 1 - 17 files changed, 191 insertions(+), 353 deletions(-) delete mode 100644 triton/tail/tail.go delete mode 100644 triton/tail/tail_test.go diff --git a/.gitignore b/.gitignore index 23015e5..43e0158 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ build/ src/github.com/ src/gopkg.in/ +/.go/ +cscope.* diff --git a/Makefile b/Makefile index 4325f11..7af4a93 100644 --- a/Makefile +++ b/Makefile @@ -16,3 +16,7 @@ build: clean: rm -rf build + +cscope: + find $$GOPATH/src -type f -iname "*.go"> cscope.files + cscope -b -k diff --git a/triton/archive.go b/triton/archive.go index ec1d695..3dc71e5 100644 --- a/triton/archive.go +++ b/triton/archive.go @@ -1,9 +1,6 @@ package triton import ( - "fmt" - "regexp" - "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -17,8 +14,7 @@ type StoreArchive struct { Key string ClientName string - T time.Time - SortValue int + T time.Time s3Svc S3Service rdr Reader @@ -43,39 +39,31 @@ func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) { } func (sa *StoreArchive) parseKeyName(keyName string) (err error) { - re := regexp.MustCompile(`(?P\d{8})\/(?P.+)\-(?P\d+)\.tri$`) - res := re.FindAllStringSubmatch(keyName, -1) - - if len(res) != 1 { - return fmt.Errorf("Invalid key name") - } - - sa.T, err = time.Parse("20060102", res[0][1]) - - n, err := fmt.Sscanf(res[0][3], "%d", &sa.SortValue) - if n != 1 { - return fmt.Errorf("Failed to parse sort value") + key, err := DecodeArchiveKey(keyName) + if err != nil { + return } + sa.T = key.Time + sa.StreamName = key.Stream + sa.ClientName = key.Client + return +} - nameParts := strings.Split(res[0][2], "-") - if len(nameParts) != 2 { - return fmt.Errorf("Failure parsing stream name: %v", res[0][2]) - } - sa.StreamName = nameParts[0] - sa.ClientName = nameParts[1] +// Read the stream metadata associated with this store archive instance +func (sa *StoreArchive) GetStreamMetadata() (result *StreamMetadata, err error) { + result, err = ReadStreamMetadata(sa.s3Svc, sa.Bucket, sa.Key) return } +// NewStoreArchive returns a StoreArchive instance func NewStoreArchive(bucketName, keyName string, svc S3Service) (sa StoreArchive, err error) { sa.Bucket = bucketName sa.Key = keyName sa.s3Svc = svc - err = sa.parseKeyName(keyName) if err != nil { - return sa, err + return } - - return sa, nil + return } diff --git a/triton/archive_test.go b/triton/archive_test.go index a78b0e1..e176170 100644 --- a/triton/archive_test.go +++ b/triton/archive_test.go @@ -1,19 +1,24 @@ package triton import ( + "fmt" "io" "testing" "time" ) +var ( + streamTime = time.Date(2015, time.August, 1, 0, 0, 0, 0, time.UTC) + streamKeyPath = fmt.Sprintf("%04d%02d%02d/test_stream-archive-%d.tri", streamTime.Year(), streamTime.Month(), streamTime.Day(), streamTime.Unix()) +) + func TestNewArchive(t *testing.T) { - key := "20150801/test_stream-archive-123455.tri" - sa, err := NewStoreArchive("foo", key, nil) + sa, err := NewStoreArchive("foo", streamKeyPath, nil) if err != nil { - t.Fatal("Error creating sa", err) + t.Fatal("Error creating sa:", err.Error()) } - if sa.Key != key { + if sa.Key != streamKeyPath { t.Error("Failed to store key") } @@ -24,45 +29,38 @@ func TestNewArchive(t *testing.T) { if sa.ClientName != "archive" { t.Error("Should have a client name") } - - if sa.T != time.Date(2015, time.August, 1, 0, 0, 0, 0, time.UTC) { - t.Error("StreamName mismatch", sa.StreamName) + if !sa.T.Equal(streamTime) { + t.Errorf("Stream time mismatch: %s != %s", sa.T, streamTime) } if sa.Bucket != "foo" { - t.Error("bucket name mismatch") + t.Error("bucket name mismatch, %s != %s", sa.Bucket, "foo") } - if sa.SortValue != 123455 { - t.Error("Sort value mismatch") - } } func TestNewArchiveShard(t *testing.T) { - sa, err := NewStoreArchive("foo", "20150801/test_stream-store_test-123455.tri", nil) + sa, err := NewStoreArchive("foo", streamKeyPath, nil) if err != nil { - t.Fatal("Error creating sa", err) + t.Fatalf("Error creating sa", err) } if sa.StreamName != "test_stream" { t.Error("StreamName mismatch", sa.StreamName) } - if sa.ClientName != "store_test" { + if sa.ClientName != "archive" { t.Error("Should have a client name") } - if sa.T != time.Date(2015, time.August, 1, 0, 0, 0, 0, time.UTC) { - t.Error("StreamName mismatch", sa.StreamName) + if !sa.T.Equal(streamTime) { + t.Errorf("StreamName mismatch %s != %s", sa.T, streamTime) } - if sa.SortValue != 123455 { - t.Error("Sort value mismatch") - } } func TestReadEmpty(t *testing.T) { - sa, err := NewStoreArchive("foo", "20150801/test_stream-store_test-123455.tri", &nullS3Service{}) + sa, err := NewStoreArchive("foo", streamKeyPath, &nullS3Service{}) if err != nil { t.Fatal("Error creating sa", err) } diff --git a/triton/aws.go b/triton/aws.go index 5bc86ad..e550738 100644 --- a/triton/aws.go +++ b/triton/aws.go @@ -22,6 +22,7 @@ type KinesisService interface { type S3Service interface { GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error) + ListObjectsPages(*s3.ListObjectsInput, func(*s3.ListObjectsOutput, bool) bool) error } type S3UploaderService interface { @@ -45,3 +46,8 @@ func (s *nullS3Service) ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput loo := &s3.ListObjectsOutput{} return loo, nil } + +func (s *nullS3Service) ListObjectsPages(input *s3.ListObjectsInput, f func(*s3.ListObjectsOutput, bool) bool) error { + f(&s3.ListObjectsOutput{}, true) + return nil +} diff --git a/triton/config_test.go b/triton/config_test.go index cf547b1..c41a266 100644 --- a/triton/config_test.go +++ b/triton/config_test.go @@ -40,9 +40,7 @@ func TestNewConfigFromFile(t *testing.T) { } func TestMissingStream(t *testing.T) { - c := Config{} - _, err := c.ConfigForName("foo") if err != nil { // all good diff --git a/triton/shard_record.go b/triton/shard_record.go index d7f415a..1eede23 100644 --- a/triton/shard_record.go +++ b/triton/shard_record.go @@ -1,7 +1,7 @@ package triton type ShardRecord struct { - Record map[string]interface{} + Record Record ShardID ShardID SequenceNumber SequenceNumber } diff --git a/triton/store.go b/triton/store.go index d912478..ae9b584 100644 --- a/triton/store.go +++ b/triton/store.go @@ -5,13 +5,10 @@ import ( "encoding/json" "fmt" "github.com/golang/snappy" - "github.com/skarademir/naturalsort" "github.com/tinylib/msgp/msgp" "io" "log" "os" - "sort" - "sync" "time" ) @@ -24,7 +21,7 @@ type ShardReaderCheckpointer interface { type Store struct { name string reader ShardReaderCheckpointer - streamMetadata *streamMetadata + streamMetadata *StreamMetadata // Our uploaders manages sending our datafiles somewhere uploader *S3Uploader @@ -72,7 +69,7 @@ func (s *Store) closeWriter() error { s.currentFilename = nil s.reader.Checkpoint() } - s.streamMetadata = newStreamMetadata() + s.streamMetadata = NewStreamMetadata() return nil } @@ -113,12 +110,7 @@ func (s *Store) generateFilename() (name string) { } func (s *Store) generateKeyname() (name string) { - day_s := s.currentLogTime.Format("20060102") - ts_s := fmt.Sprintf("%d", s.currentLogTime.Unix()) - - name = fmt.Sprintf("%s/%s-%s.tri", day_s, s.name, ts_s) - - return + return ArchiveKey{Time: s.currentLogTime, Stream: s.name}.Path() } func (s *Store) getCurrentWriter() (w io.Writer, err error) { @@ -227,59 +219,8 @@ func NewStore(name string, r ShardReaderCheckpointer, up *S3Uploader) (s *Store) reader: r, buf: buf, uploader: up, - streamMetadata: newStreamMetadata(), + streamMetadata: NewStreamMetadata(), } return } - -type streamMetadata struct { - // shard ID => shardInfo - Shards map[ShardID]*shardInfo `json:"shards"` - sync.Mutex -} - -func newStreamMetadata() *streamMetadata { - return &streamMetadata{ - Shards: make(map[ShardID]*shardInfo), - } -} - -func (s *streamMetadata) noteSequenceNumber(shardID ShardID, sequenceNum SequenceNumber) { - s.Lock() - defer s.Unlock() - sh := s.Shards[shardID] - if sh == nil { - sh = &shardInfo{} - s.Shards[shardID] = sh - } - sh.noteSequenceNumber(sequenceNum) -} - -type shardInfo struct { - MinSequenceNumber SequenceNumber `json:"min_sequence_number"` - MaxSequenceNumber SequenceNumber `json:"max_sequence_number"` -} - -func (s *shardInfo) noteSequenceNumber(sequenceNum SequenceNumber) { - if s.MinSequenceNumber == "" { - s.MinSequenceNumber = sequenceNum - } else { - nums := naturalsort.NaturalSort([]string{ - string(sequenceNum), - string(s.MinSequenceNumber), - }) - sort.Sort(nums) - s.MinSequenceNumber = SequenceNumber(nums[0]) - } - if s.MaxSequenceNumber == "" { - s.MaxSequenceNumber = sequenceNum - } else { - nums := naturalsort.NaturalSort([]string{ - string(sequenceNum), - string(s.MaxSequenceNumber), - }) - sort.Sort(nums) - s.MaxSequenceNumber = SequenceNumber(nums[1]) - } -} diff --git a/triton/store_reader.go b/triton/store_reader.go index 5143b64..1531f63 100644 --- a/triton/store_reader.go +++ b/triton/store_reader.go @@ -44,11 +44,7 @@ func (l StoreArchiveList) Swap(i, j int) { } func (l StoreArchiveList) Less(i, j int) bool { - if l[i].T != l[j].T { - return l[i].T.Before(l[j].T) - } else { - return l[i].SortValue < l[j].SortValue - } + return l[i].T.Before(l[j].T) } func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, startDate, endDate time.Time) (Reader, error) { diff --git a/triton/store_test.go b/triton/store_test.go index 77a0d7f..26d5398 100644 --- a/triton/store_test.go +++ b/triton/store_test.go @@ -17,6 +17,10 @@ func (nsr *nullStreamReader) ReadRecord() (map[string]interface{}, error) { return nil, io.EOF } +func (nsr *nullStreamReader) ReadShardRecord() (*ShardRecord, error) { + return nil, io.EOF +} + func (nsr *nullStreamReader) Checkpoint() error { return nil } @@ -39,7 +43,7 @@ func TestGenerateKeyname(t *testing.T) { s.currentLogTime = time.Date(2015, 6, 30, 2, 45, 0, 0, time.UTC) name := s.generateKeyname() if name != "20150630/test-1435632300.tri" { - t.Errorf("Bad file file %v", name) + t.Errorf("Bad file %v", name) } } @@ -114,22 +118,21 @@ func TestPut(t *testing.T) { } func TestShardInfo(t *testing.T) { - si := &shardInfo{} + si := &ShardInfo{} si.noteSequenceNumber("12345") - si.noteSequenceNumber("01234") - si.noteSequenceNumber("11") + si.noteSequenceNumber("12346") - if si.MinSequenceNumber != "11" { - t.Fatalf("expecting the min sequence number to be 11 but got %q", si.MinSequenceNumber) + if si.MinSequenceNumber != "12345" { + t.Fatalf("expecting the min sequence number to be 12345 but got %q", si.MinSequenceNumber) } - if si.MaxSequenceNumber != "12345" { - t.Fatalf("expecting the max sequence number to be 12345 but got %q", si.MaxSequenceNumber) + if si.MaxSequenceNumber != "12346" { + t.Fatalf("expecting the max sequence number to be 12346 but got %q", si.MaxSequenceNumber) } } func BenchmarkShardInfo(b *testing.B) { - si := &shardInfo{} + si := &ShardInfo{} for i := 0; i < b.N; i++ { si.noteSequenceNumber("12345") } diff --git a/triton/stream_reader.go b/triton/stream_reader.go index 00e7a8b..987968a 100644 --- a/triton/stream_reader.go +++ b/triton/stream_reader.go @@ -17,6 +17,7 @@ type StreamReader interface { Reader Checkpoint() error Stop() + ReadShardRecord() (*ShardRecord, error) } type multiShardStreamReader struct { @@ -40,8 +41,9 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) { func (msr *multiShardStreamReader) ReadRecord() (result map[string]interface{}, err error) { shardRecord, err := msr.ReadShardRecord() if err != nil { - result = shardRecord.Record + return } + result = shardRecord.Record return } diff --git a/triton/stream_reader_test.go b/triton/stream_reader_test.go index 77f1f70..41c0b34 100644 --- a/triton/stream_reader_test.go +++ b/triton/stream_reader_test.go @@ -1,5 +1,6 @@ package triton +import "fmt" import "testing" func TestNewStreamReader(t *testing.T) { @@ -40,9 +41,10 @@ func TestNewStreamReader(t *testing.T) { rec1, err := sr.ReadRecord() if err != nil { - t.Error(err) + t.Fatal(err.Error()) return } + fmt.Println("rec1: ", rec1) // Records could be in any order if rec1["value"].(string) == "a" { diff --git a/triton/tail.go b/triton/tail.go index 817bc7c..76d8c87 100644 --- a/triton/tail.go +++ b/triton/tail.go @@ -6,7 +6,6 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/tinylib/msgp/msgp" "io" - "log" "sync" "time" ) @@ -82,8 +81,8 @@ func (t *TailAt) Next() (record Record, err error) { return } +// sendArchivedRecords sends all the archived records between tail at and up to a day afterward. func (t *TailAt) sendArchivedRecords() (lastMetadata *StreamMetadata, err error) { - log.Println("sending archived records") archiveRepository := NewArchiveRepository(t.s3Service, nil, t.bucket, t.stream, t.client) aTime := t.at.AddDate(0, 0, -1) end := t.at.AddDate(0, 0, 2) @@ -95,25 +94,20 @@ func (t *TailAt) sendArchivedRecords() (lastMetadata *StreamMetadata, err error) if !aTime.Before(end) { break } - log.Println("checking", aTime) var archives []StoreArchive archives, err = archiveRepository.ArchivesAtDate(aTime) if err != nil { return } - log.Println("archives: ", archives) for _, archive := range archives { if t.closed { return } lastArchive = &archive - log.Println("archive time:", archive.T, "tail time:", t.at) if archive.T.Before(t.at) { - log.Println("archive is before target") continue } for { - log.Println("reading records from archive") var rec map[string]interface{} rec, err = archive.ReadRecord() if err == io.EOF { @@ -122,20 +116,19 @@ func (t *TailAt) sendArchivedRecords() (lastMetadata *StreamMetadata, err error) } else if err != nil { return } - log.Println("record", rec) t.records <- rec } } aTime = aTime.AddDate(0, 0, 1) } - log.Println("done sending archives") if lastArchive != nil { - log.Println("loading metadata") lastMetadata, err = lastArchive.GetStreamMetadata() } return } +// initStream is internal method that starts sending archived records followed +// by reading from Kinesis shards in parallel func (t *TailAt) initStream() { if t.streamInited { return @@ -147,7 +140,6 @@ func (t *TailAt) initStream() { t.errors <- err return } - log.Println("done sending archived records") err = t.sendKinesisRecords(lastStreamMetadata) if err != nil { t.errors <- err @@ -180,8 +172,6 @@ func (t *TailAt) sendKinesisRecords(previousMetadata *StreamMetadata) (err error // records at startingSequenceNumber, otherwise it tries to find a starting // sequence number at TRIM_HORIZON to begin reading. func (t *TailAt) sendKinesisRecordsForShard(shard ShardID, startingSequenceNumber SequenceNumber) { - log.Println("sendKinesisRecordsForShard", shard, startingSequenceNumber) - // Start reading from shard and send records to t.records iterator, err := t.getStreamIterator(shard, startingSequenceNumber) if err != nil { diff --git a/triton/tail/tail.go b/triton/tail/tail.go deleted file mode 100644 index 0bd62bc..0000000 --- a/triton/tail/tail.go +++ /dev/null @@ -1,194 +0,0 @@ -package tail - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/postmates/go-triton/triton" - "io" - "sync" - "time" -) - -type TailAt struct { - stream string - streamInited bool - bucket string - at time.Time - closed bool - records chan triton.Record - errors chan error - region string - kinesisService triton.KinesisService - s3Service triton.S3Service - PollInterval time.Duration - EmptyPollInterval time.Duration - sync.Mutex -} - -// NewTail returns a new tailing stream starting at "at" -func NewTailAt(bucket string, streamName string, at time.Time) (tail *TailAt, err error) { - tail = &TailAt{ - stream: streamName, - at: at, - EmptyPollInterval: time.Second * 10, - PollInterval: time.Second, - } - return -} - -func (t *TailAt) Next() (record triton.Record, err error) { - t.Lock() - defer t.Unlock() - go t.initStream() - select { - case record = <-t.records: - case err = <-t.errors: - } - return -} - -func (t *TailAt) sendArchivedRecords() (lastMetadata *triton.StreamMetadata, err error) { - archiveRepository := triton.NewS3StoreArchiveRepository(t.s3Service, t.bucket, t.stream) - aTime := t.at.AddDate(0, 0, -1) - end := t.at.AddDate(0, 0, 2) - var lastArchive *triton.StoreArchive - for !t.closed && aTime.Before(end) { - var archives []triton.StoreArchive - archives, err = archiveRepository.ArchivesAtDate(t.at) - if err != nil { - return - } - for _, archive := range archives { - if t.closed { - return - } - lastArchive = &archive - if archive.T.Before(t.at) { - continue - } - for { - var rec map[string]interface{} - rec, err = archive.ReadRecord() - if err == io.EOF { - err = nil - break - } else if err != nil { - return - } - t.records <- rec - } - } - aTime = t.at.AddDate(0, 0, 1) - } - if lastArchive != nil { - lastMetadata, err = lastArchive.GetStreamMetadata() - } - return -} - -func (t *TailAt) initStream() { - if t.streamInited { - return - } - t.streamInited = true - - lastStreamMetadata, err := t.sendArchivedRecords() - if err != nil { - t.errors <- err - return - } - err = t.sendKinesisRecords(lastStreamMetadata) - if err != nil { - t.errors <- err - } -} - -func (t *TailAt) sendKinesisRecords(previousMetadata *triton.StreamMetadata) (err error) { - shards, err := t.listShards() - if err != nil { - return - } - - // send all of the records in `startingKey` - // load metadata for starting key - // then send kinesis records - // load the sequenceNumbers for the last key - for _, shard := range shards { - var lastSequenceNumber triton.SequenceNumber - if previousMetadata != nil && previousMetadata.Shards[shard] != nil { - lastSequenceNumber = previousMetadata.Shards[shard].MaxSequenceNumber - } - go t.sendKinesisRecordsForShard(shard, lastSequenceNumber) - } - return -} - -// sendKinesisRecordsForShard starts sending records to TailAt.records for the shard optionally starting at starting startingSequenceNumber -// -// If a startingSequenceNumber is specified the reader tries to start reading -// records at startingSequenceNumber, otherwise it tries to find a starting -// sequence number at TRIM_HORIZON to begin reading. -func (t *TailAt) sendKinesisRecordsForShard(shard triton.ShardID, startingSequenceNumber triton.SequenceNumber) { - // Start reading from shard and send records to t.records - iterator, err := t.getStreamIterator(shard, startingSequenceNumber) - if err != nil { - t.errors <- err - return - } - for !t.closed { - var getRecordsInput kinesis.GetRecordsInput - getRecordsInput.ShardIterator = aws.String(iterator) - var getRecordsOutput *kinesis.GetRecordsOutput - getRecordsOutput, err = t.kinesisService.GetRecords(&getRecordsInput) - if err != nil { - // catch rate limiting errors - t.errors <- err - return - } - iterator = *getRecordsOutput.NextShardIterator - if len(getRecordsOutput.Records) == 0 { - time.Sleep(t.EmptyPollInterval) - } - } -} - -// getStreamIterator returns an iterator for the stream and shard, optionally starting at startingSequenceNumber or at LATEST -func (t *TailAt) getStreamIterator(shardID triton.ShardID, startingSequenceNumber triton.SequenceNumber) (iteratorID string, err error) { - // Handle the case where startingSequenceNumber is invalid - var getShardIteratorInput kinesis.GetShardIteratorInput - getShardIteratorInput.ShardId = aws.String(string(shardID)) - getShardIteratorInput.StreamName = aws.String(t.stream) - if startingSequenceNumber != "" { - getShardIteratorInput.StartingSequenceNumber = aws.String(string(startingSequenceNumber)) - getShardIteratorInput.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAtSequenceNumber) - } else { - getShardIteratorInput.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon) - } - getShardIteratorOutput, err := t.kinesisService.GetShardIterator(&getShardIteratorInput) - if err != nil { - return - } - iteratorID = *getShardIteratorOutput.ShardIterator - return -} - -// listShardsForStream helper method to list all the shards for a stream -func (t *TailAt) listShards() (result []triton.ShardID, err error) { - describeStreamOutput, err := t.kinesisService.DescribeStream(&kinesis.DescribeStreamInput{ - StreamName: aws.String(t.stream), - }) - if err != nil { - return - } - for _, shard := range describeStreamOutput.StreamDescription.Shards { - result = append(result, triton.ShardID(*shard.ShardId)) - } - return -} - -// Close closes the tail stream -func (t *TailAt) Close() { - t.Lock() - defer t.Unlock() - t.closed = true -} diff --git a/triton/tail/tail_test.go b/triton/tail/tail_test.go deleted file mode 100644 index 084298e..0000000 --- a/triton/tail/tail_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package tail - -import ( - "testing" - "time" -) - -var testBucket *testin - -func TestTailStream(t *testing.T) { - -} - -func BenchmarkParseKey(b *testing.B) { - -} diff --git a/triton/test_util.go b/triton/test_util.go index cbd2569..6465ee0 100644 --- a/triton/test_util.go +++ b/triton/test_util.go @@ -4,11 +4,17 @@ package triton import ( "bytes" "fmt" + "io" + "io/ioutil" "log" + "net/http" "strings" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/tinylib/msgp/msgp" ) @@ -140,3 +146,116 @@ func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) return dso, nil } + +// S3 testing: + +type object struct { + content []byte +} + +type bucket struct { + items map[string]*object +} + +func newBucket() *bucket { + return &bucket{ + items: make(map[string]*object), + } +} + +// testS3Service represents a S3 service as defined by the aws-sdk-go S3 API +type testS3Service struct { + buckets map[string]*bucket +} + +// newTestS3Service creates a new testS3Service instance +func newTestS3Service() *testS3Service { + return &testS3Service{ + buckets: make(map[string]*bucket), + } +} + +// Put is a helper method to set key to a value byte slice +func (m *testS3Service) Put(bucket string, key string, value []byte) { + log.Printf("testS3Service Put: %s/%s %d \n", bucket, key, len(value)) + b, bucketExists := m.buckets[bucket] + if !bucketExists { + b = newBucket() + m.buckets[bucket] = b + } + b.items[key] = &object{ + content: value, + } +} + +// GetObject is a minimal mock implementation of the S3 GetObject API method. +// This currently only supports the Key parameter and the Body response +// parameter. +func (m *testS3Service) GetObject(input *s3.GetObjectInput) (output *s3.GetObjectOutput, err error) { + bucket, bucketExists := m.buckets[*input.Bucket] + if !bucketExists { + err = awserr.New(fmt.Sprintf("%s", http.StatusNotFound), "No such entity", fmt.Errorf("bucket %q does not exist", *input.Bucket)) + return + } + + obj, keyExists := bucket.items[*input.Key] + if !keyExists { + err = awserr.New(fmt.Sprintf("%s", http.StatusNotFound), "No such entity", fmt.Errorf("key %q does not exist", *input.Key)) + return + } + + output = &s3.GetObjectOutput{} + output.Body = ioutil.NopCloser(bytes.NewBuffer(obj.content)) + return +} + +// ListObjects is a mock implementation of the S3 ListObjects method. This +// currently only supports the prefix parameter +func (m *testS3Service) ListObjects(input *s3.ListObjectsInput) (result *s3.ListObjectsOutput, err error) { + log.Println("ListObjects:", *input) + result = &s3.ListObjectsOutput{} + bucket, bucketExists := m.buckets[*input.Bucket] + log.Printf("testS3Service list: %s\n", *input.Bucket) + if !bucketExists { + err = awserr.New(fmt.Sprintf("%s", http.StatusNotFound), "No such entity", fmt.Errorf("bucket %q does not exist", *input.Bucket)) + return + } + for key, _ := range bucket.items { + if input.Prefix != nil { + if !strings.HasPrefix(key, *input.Prefix) { + continue + } + } + result.Contents = append(result.Contents, &s3.Object{ + Key: aws.String(key), + }) + } + return +} + +// ListObjectsPages is an implementation of the S3 ListObjectsPages API method. +// This currently doesn't support any parameters and just falls through to +// ListObjects() +func (m *testS3Service) ListObjectsPages(input *s3.ListObjectsInput, f func(*s3.ListObjectsOutput, bool) bool) (err error) { + res, err := m.ListObjects(input) + if res != nil { + f(res, true) + } + return +} + +func (m *testS3Service) uploader() *testS3UploaderService { + return &testS3UploaderService{testS3Service: m} + +} + +type testS3UploaderService struct { + testS3Service *testS3Service +} + +func (u *testS3UploaderService) Upload(input *s3manager.UploadInput) (output *s3manager.UploadOutput, err error) { + var buf bytes.Buffer + io.Copy(&buf, input.Body) + u.testS3Service.Put(*input.Bucket, *input.Key, buf.Bytes()) + return +} diff --git a/triton/uploader.go b/triton/uploader.go index 1368e71..ea394bc 100644 --- a/triton/uploader.go +++ b/triton/uploader.go @@ -8,7 +8,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - //"github.com/aws/aws-sdk-go/aws/awsutil" "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) From 2fe1a01e338c5f763ce29feb8758b960372c25d7 Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Mon, 4 Jan 2016 11:09:07 -0800 Subject: [PATCH 7/9] * Miscellaneous cleanups, split modules up a little * Add ShardRecordReader interface * Add shardReader that uses channels * Update tail interface --- Makefile | 3 + triton/archive.go | 3 +- triton/archive_reader.go | 10 +- triton/common.go | 8 ++ triton/multi_shard_reader.go | 198 ++++++++++++++++++++++++++++++ triton/multi_shard_reader_test.go | 30 +++++ triton/reader.go | 37 +----- triton/reader_test.go | 2 +- triton/serial_reader.go | 35 ++++++ triton/shard_reader.go | 165 +++++++++++++++++++++++++ triton/shard_reader_test.go | 78 ++++++++++++ triton/shard_record.go | 4 - triton/store.go | 30 ++--- triton/store_reader.go | 16 +-- triton/store_test.go | 2 +- triton/stream.go | 151 +---------------------- triton/stream_reader.go | 181 +++++---------------------- triton/stream_test.go | 97 ++++----------- triton/tail.go | 51 +++----- triton/util.go | 1 + 20 files changed, 624 insertions(+), 478 deletions(-) create mode 100644 triton/common.go create mode 100644 triton/multi_shard_reader.go create mode 100644 triton/multi_shard_reader_test.go create mode 100644 triton/serial_reader.go create mode 100644 triton/shard_reader.go create mode 100644 triton/shard_reader_test.go create mode 100644 triton/util.go diff --git a/Makefile b/Makefile index 7af4a93..095652d 100644 --- a/Makefile +++ b/Makefile @@ -20,3 +20,6 @@ clean: cscope: find $$GOPATH/src -type f -iname "*.go"> cscope.files cscope -b -k + +tags: + diff --git a/triton/archive.go b/triton/archive.go index 3dc71e5..007dbfa 100644 --- a/triton/archive.go +++ b/triton/archive.go @@ -20,7 +20,7 @@ type StoreArchive struct { rdr Reader } -func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) { +func (sa *StoreArchive) ReadRecord() (rec Record, err error) { if sa.rdr == nil { out, err := sa.s3Svc.GetObject(&s3.GetObjectInput{ Bucket: aws.String(sa.Bucket), @@ -33,7 +33,6 @@ func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) { sa.rdr = NewArchiveReader(out.Body) } - rec, err = sa.rdr.ReadRecord() return } diff --git a/triton/archive_reader.go b/triton/archive_reader.go index f1389b4..2dfa731 100644 --- a/triton/archive_reader.go +++ b/triton/archive_reader.go @@ -13,16 +13,18 @@ type ArchiveReader struct { mr *msgp.Reader } -func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error) { - rec = make(map[string]interface{}) - +func (r *ArchiveReader) ReadRecord() (sr Record, err error) { + rec := make(map[string]interface{}) err = r.mr.ReadMapStrIntf(rec) + if err != nil { + return + } + sr = rec return } func NewArchiveReader(ir io.Reader) (or Reader) { sr := snappy.NewReader(ir) mr := msgp.NewReader(sr) - return &ArchiveReader{mr} } diff --git a/triton/common.go b/triton/common.go new file mode 100644 index 0000000..b9969c7 --- /dev/null +++ b/triton/common.go @@ -0,0 +1,8 @@ +package triton + +// Some types to make sure our lists of func args don't get confused +type ShardID string +type SequenceNumber string + +// For tracking ShardID => Last Sequence Number +type ShardToSequenceNumber map[ShardID]SequenceNumber diff --git a/triton/multi_shard_reader.go b/triton/multi_shard_reader.go new file mode 100644 index 0000000..d30ae34 --- /dev/null +++ b/triton/multi_shard_reader.go @@ -0,0 +1,198 @@ +package triton + +// Module for a record reader which consumes from a streamf ro a each shard + +import ( + "io" + "sync" + "time" +) + +// NewMultiShardReaderParams are the parameters to NewMultiShardReader +type NewMultiShardReaderParams struct { + KinesisService KinesisService // The Kinesis service + Stream string // The stream name like "courier_activity" + ShardToSequenceNumber ShardToSequenceNumber // A mapping from shard ID to sequence number, to start from a shard after a sequence number + ReloadShardsInterval time.Duration // Reload the list of shards + // and iterators this interval to handle the case of shards changing while + // reading records. +} + +const defaultReloadShardsInterval = time.Minute * 5 // By default for new shards every five minutes + +// NewMultiShardReader creates a new MultiShardReader +func NewMultiShardReader(params *NewMultiShardReaderParams) (result *MultiShardReader) { + // Passing in a null kinesis service is a programming error + if params.KinesisService == nil { + panic("expecting a KinesisService") + } + + // Not specifying a stream is a programming error + if params.Stream == "" { + panic("expecting a stream") + } + + if params.ReloadShardsInterval == 0 { + params.ReloadShardsInterval = defaultReloadShardsInterval + } + + var shardToSequenceNumber ShardToSequenceNumber + if params.ShardToSequenceNumber != nil { + shardToSequenceNumber = params.ShardToSequenceNumber + } else { + shardToSequenceNumber = make(ShardToSequenceNumber) + } + + result = &MultiShardReader{ + records: make(chan *ShardRecord), + closeCh: make(chan bool, 1), + shardToSequenceNumber: shardToSequenceNumber, + kinesisService: params.KinesisService, + stream: params.Stream, + errors: make(chan error), + resetTimer: time.NewTicker(params.ReloadShardsInterval), + } + go result.resetShardReaders() + return +} + +// MultiShardReader looks up the available shards for a stream in Kinesis and reads from them. Records are available from ReadShardRecord() +type MultiShardReader struct { + inited bool + readers []*shardReader + records chan *ShardRecord + closeCh chan bool + mutex sync.Mutex + closed bool + shardToSequenceNumber ShardToSequenceNumber + kinesisService KinesisService + stream string + errors chan error + disconnects []chan bool + resetTimer *time.Ticker +} + +// Stop stops the reader +func (msr *MultiShardReader) Stop() { + msr.mutex.Lock() + defer msr.mutex.Unlock() + msr.closed = true + msr.closeCh <- true +} + +// Reset and disconnect child shard readers +func (msr *MultiShardReader) resetShardReaders() { + msr.mutex.Lock() + defer msr.mutex.Unlock() + // Send each reader the close signal so that it stops trying to write + for _, sr := range msr.readers { + sr.close <- true + } + // Reset the slice of readers + msr.readers = nil + + // Disconnect the shard readers channels from our channels + for _, disconnect := range msr.disconnects { + disconnect <- true + } + msr.disconnects = nil + + // Start new shard readers: + shardIDs, err := ListShards(msr.kinesisService, msr.stream) + if err != nil { + msr.errors <- err + return + } + for _, shardID := range shardIDs { + // Create a new shard reader + sr := newShardReader(&newShardReaderParams{ + stream: msr.stream, + shardID: shardID, + startAfterSequenceNumber: msr.shardToSequenceNumber[shardID], + kinesisService: msr.kinesisService, + }) + // Connect the shard reader channels to our channels + msr.disconnects = append(msr.disconnects, + connectShardRecordChannel(msr.records, sr.records), + connectErrorChannel(msr.errors, sr.errors)) + } + return +} + +// ReadShardRecord returns a ShardRecord or an error if an record was unable to be decoded or if there was an error from Kinesis +func (msr *MultiShardReader) ReadShardRecord() (result *ShardRecord, err error) { + if msr.closed { + // Return an EOF when we are closed + err = io.EOF + return + } + for { + select { + + case <-msr.resetTimer.C: + // Whenever resetTimer fires, reset the shard readers. This allows for + // the case where the list of shards changes due to merges/splits + msr.resetShardReaders() + case err = <-msr.errors: + // Return an error + return + case result = <-msr.records: + // Keep track of shardID => sequence # + msr.shardToSequenceNumber[result.ShardID] = result.SequenceNumber + return + // If we're closed, quit + case <-msr.closeCh: + err = io.EOF + return + } + } + return +} + +// connectShardRecordChannel connects a chan *ShardRecord to another, +// piping each record from src to dst. +// +// Returns a channel which removes the connection on receiving any bool value +func connectShardRecordChannel(dst chan *ShardRecord, src chan *ShardRecord) (stop chan bool) { + stop = make(chan bool, 1) + go func() { + for { + var rec *ShardRecord + select { + case rec = <-src: + case <-stop: + return + } + select { + case dst <- rec: + case <-stop: + return + } + } + }() + return +} + +// connectErrorChannel connects a chan error to another, +// piping each record from src to dst. +// +// Returns a channel which removes the connection on receiving any bool value +func connectErrorChannel(dst chan error, src chan error) (stop chan bool) { + stop = make(chan bool, 1) + go func() { + for { + var err error + select { + case err = <-src: + case <-stop: + return + } + select { + case dst <- err: + case <-stop: + return + } + } + }() + return +} diff --git a/triton/multi_shard_reader_test.go b/triton/multi_shard_reader_test.go new file mode 100644 index 0000000..140b86b --- /dev/null +++ b/triton/multi_shard_reader_test.go @@ -0,0 +1,30 @@ +package triton + +import ( + "testing" +) + +func TestReadFromMultiShardReader(t *testing.T) { + svc := newTestKinesisService() + st := newTestKinesisStream("test-stream") + s1 := newTestKinesisShard() + + r1 := map[string]interface{}{"foo": "bar"} + s1.AddRecord(SequenceNumber("a"), r1) + st.AddShard("shard-0000", s1) + svc.AddStream(st) + + s := newShardReader(&newShardReaderParams{ + kinesisService: svc, + stream: "test-stream", + shardID: "shard-0000", + }) + select { + case e := <-s.errors: + t.Fatalf("unexpected error: %s", e.Error()) + case r := <-s.records: + if r.Record["foo"].(string) != "bar" { + t.Fatalf("expecting bar") + } + } +} diff --git a/triton/reader.go b/triton/reader.go index 6f34bee..604a2b3 100644 --- a/triton/reader.go +++ b/triton/reader.go @@ -1,38 +1,7 @@ package triton -import ( - "io" - "log" -) - +// An interface for reading records +// A call to ReadRecord() will return either a record or an error. At the end of a stream io.EOF will be returned. type Reader interface { - ReadRecord() (rec map[string]interface{}, err error) -} - -// A SerialReader let's us read from multiple readers, in sequence -type SerialReader struct { - readers []Reader - r_idx int -} - -func (sr *SerialReader) ReadRecord() (rec map[string]interface{}, err error) { - for sr.r_idx < len(sr.readers) { - rec, err = sr.readers[sr.r_idx].ReadRecord() - if err != nil { - if err == io.EOF { - log.Println("Archive complete. Next...") - sr.r_idx += 1 - } else { - return - } - } else { - return rec, nil - } - } - - return nil, io.EOF -} - -func NewSerialReader(readers []Reader) Reader { - return &SerialReader{readers, 0} + ReadRecord() (rec Record, err error) } diff --git a/triton/reader_test.go b/triton/reader_test.go index 0ab8306..96053d7 100644 --- a/triton/reader_test.go +++ b/triton/reader_test.go @@ -23,7 +23,7 @@ func TestSerialReaderEmtpy(t *testing.T) { type instantEOFReader struct{} -func (sr *instantEOFReader) ReadRecord() (rec map[string]interface{}, err error) { +func (sr *instantEOFReader) ReadRecord() (rec Record, err error) { return nil, io.EOF } diff --git a/triton/serial_reader.go b/triton/serial_reader.go new file mode 100644 index 0000000..0c58f36 --- /dev/null +++ b/triton/serial_reader.go @@ -0,0 +1,35 @@ +package triton + +import ( + "io" + "log" +) + +// A SerialReader let's us read from multiple readers, in sequence +type SerialReader struct { + readers []Reader + r_idx int +} + +func (sr *SerialReader) ReadRecord() (rec Record, err error) { + for sr.r_idx < len(sr.readers) { + rec, err = sr.readers[sr.r_idx].ReadRecord() + if err != nil { + if err == io.EOF { + log.Println("Archive complete. Next...") + err = nil + sr.r_idx += 1 + } else { + return + } + } else { + return + } + } + err = io.EOF + return +} + +func NewSerialReader(readers []Reader) Reader { + return &SerialReader{readers: readers, r_idx: 0} +} diff --git a/triton/shard_reader.go b/triton/shard_reader.go new file mode 100644 index 0000000..3f38414 --- /dev/null +++ b/triton/shard_reader.go @@ -0,0 +1,165 @@ +package triton + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/tinylib/msgp/msgp" + "log" + "time" +) + +const ( + // Read the latest records + shardIteratorTypeLatest = "LATEST" + // Read the records after this sequence number + shardIteratorTypeAfterSequenceNumber = "AFTER_SEQUENCE_NUMBER" + // The small interval to poll shards + minShardPollInterval = time.Second + // The number of records to process at once + recordLimit = 10000 +) + +type ShardReader interface { + ReadShardRecord() (rec *ShardRecord, err error) + Stop() +} + +// newShardReaderParams are params for starting and returning a new shardReader +type newShardReaderParams struct { + stream string + shardID ShardID + startAfterSequenceNumber SequenceNumber + kinesisService KinesisService +} + +// newShardReader creates a new shardReader struct and starts the loop +func newShardReader(params *newShardReaderParams) (sr *shardReader) { + sr = &shardReader{ + newShardReaderParams: *params, + records: make(chan *ShardRecord), + close: make(chan bool, 1), + } + go sr.loop() + return +} + +// shardRecord is a struct for the shardReader state +type shardReader struct { + newShardReaderParams + nextIterator string + records chan *ShardRecord + errors chan error + close chan bool + closed bool +} + +// loop runs the shard reader loop +func (sr *shardReader) loop() { + defer close(sr.records) // After we exit close the records channel + ticker := time.NewTicker(minShardPollInterval) // Create a ticker so that we only read records from the API every minShardPollInterval + defer ticker.Stop() // Close the ticker channel after we exit + kinesisRecords, err := sr.readKinesisRecords() // Read a slice of shardRecord + for !sr.closed { + // Empty shardRecs case: + if len(kinesisRecords) == 0 { + select { + case <-ticker.C: // Wait for the timer + case <-sr.close: // Return if we received a close signal + return + } + // Get some more shard records: + kinesisRecords, err = sr.readKinesisRecords() + if err != nil { + sr.errors <- err + } + } + // If we didn't get any shard records, try again later + if len(kinesisRecords) == 0 { + continue + } + shardRecord, err := sr.convertKinesisRecordToShardRecord(kinesisRecords[0]) + kinesisRecords = kinesisRecords[1:] + if err != nil { + sr.errors <- err + } + select { + case sr.records <- shardRecord: // try to send a record + case <-sr.close: // OR try to close + return + } + } +} + +func (sr *shardReader) convertKinesisRecordToShardRecord(kr *kinesis.Record) (result *ShardRecord, err error) { + result = &ShardRecord{} + rec, _, err := msgp.ReadMapStrIntfBytes(kr.Data, nil) + if err != nil { + return + } + result.Record = Record(rec) + result.SequenceNumber = SequenceNumber(*kr.SequenceNumber) + result.ShardID = ShardID(sr.shardID) + return +} + +// initShardIterator initializes the nextIterator field with a valid iterator +// value or returns an error +func (sr *shardReader) initShardIterator() (err error) { + // If we already have a nextIterator return + if sr.nextIterator != "" { + return + } + + gsi := &kinesis.GetShardIteratorInput{} + // shardIteratorType is AFTER_SEQUENCE_NUMBER if startAfterSequenceNumber is + // passed in as a parameter, otherwise LATEST is used + shardIteratorType := shardIteratorTypeLatest + if sr.startAfterSequenceNumber != "" { + shardIteratorType = shardIteratorTypeAfterSequenceNumber + gsi.StartingSequenceNumber = aws.String(string(sr.startAfterSequenceNumber)) + } + gsi.StreamName = aws.String(sr.stream) + gsi.ShardId = aws.String(string(sr.shardID)) + gsi.ShardIteratorType = aws.String(shardIteratorType) + gso, err := sr.kinesisService.GetShardIterator(gsi) + // Handle GetShardIterator errors: + if err != nil { + err = fmt.Errorf("error initializing shard iterator for stream: %q, shard: %q, error: %s", sr.stream, sr.shardID, err.Error()) + return + } + sr.nextIterator = *gso.ShardIterator + return +} + +// readKinesisRecords loads records from the shard using the latest shardIterator +func (s *shardReader) readKinesisRecords() (records []*kinesis.Record, err error) { + // Initialize nextIterator if necessary: + if s.nextIterator == "" { + err = s.initShardIterator() + if err != nil { + log.Println("failed to initialize iterator:", err.Error()) + return + } + } + + // Load the records: + gro, err := s.kinesisService.GetRecords(&kinesis.GetRecordsInput{ + Limit: aws.Int64(recordLimit), + ShardIterator: aws.String(s.nextIterator), + }) + if err != nil { + log.Println("failed to get records:", err.Error()) + return + } + records = gro.Records + + // Update nextIterator if available + if gro.NextShardIterator != nil { + s.nextIterator = *gro.NextShardIterator + } else { + s.nextIterator = "" + s.startAfterSequenceNumber = "" + } + return +} diff --git a/triton/shard_reader_test.go b/triton/shard_reader_test.go new file mode 100644 index 0000000..5e063ad --- /dev/null +++ b/triton/shard_reader_test.go @@ -0,0 +1,78 @@ +package triton + +import ( + "testing" +) + +func TestNewShardStreamReaderFromSequence(t *testing.T) { + svc := NullKinesisService{} + s := newShardReader(&newShardReaderParams{ + kinesisService: &svc, + stream: "test-stream", + shardID: "shard-0001", + startAfterSequenceNumber: SequenceNumber("abc123"), + }) + if s.stream != "test-stream" { + t.Errorf("bad stream name") + } + + if s.shardID != "shard-0001" { + t.Errorf("bad ShardID") + } + + if s.nextIterator != "" { + t.Errorf("bad NextIteratorValue") + } + +} + +func TestReadKinesisRecords(t *testing.T) { + svc := newTestKinesisService() + st := newTestKinesisStream("test-stream") + s1 := newTestKinesisShard() + + r1 := make(map[string]interface{}) + s1.AddRecord(SequenceNumber("a"), r1) + st.AddShard("shard-0000", s1) + svc.AddStream(st) + + s := newShardReader(&newShardReaderParams{ + kinesisService: svc, + stream: "test-stream", + shardID: "shard-0000", + }) + records, err := s.readKinesisRecords() + if err != nil { + t.Errorf("Received error %v", err) + return + } + + if len(records) != 1 { + t.Errorf("Should have a record") + } +} + +func TestReadShardRecords(t *testing.T) { + svc := newTestKinesisService() + st := newTestKinesisStream("test-stream") + s1 := newTestKinesisShard() + + r1 := map[string]interface{}{"foo": "bar"} + s1.AddRecord(SequenceNumber("a"), r1) + st.AddShard("shard-0000", s1) + svc.AddStream(st) + + s := newShardReader(&newShardReaderParams{ + kinesisService: svc, + stream: "test-stream", + shardID: "shard-0000", + }) + select { + case e := <-s.errors: + t.Fatalf("unexpected error: %s", e.Error()) + case r := <-s.records: + if r.Record["foo"].(string) != "bar" { + t.Fatalf("expecting bar") + } + } +} diff --git a/triton/shard_record.go b/triton/shard_record.go index 1eede23..d88bd37 100644 --- a/triton/shard_record.go +++ b/triton/shard_record.go @@ -5,7 +5,3 @@ type ShardRecord struct { ShardID ShardID SequenceNumber SequenceNumber } - -type ShardReader interface { - ReadShardRecord() (result *ShardRecord, err error) -} diff --git a/triton/store.go b/triton/store.go index ae9b584..2f72016 100644 --- a/triton/store.go +++ b/triton/store.go @@ -12,15 +12,10 @@ import ( "time" ) -type ShardReaderCheckpointer interface { - ShardReader - Checkpoint() error -} - // A store manages buffering records together into files, and uploading them somewhere. type Store struct { name string - reader ShardReaderCheckpointer + reader StreamReader streamMetadata *StreamMetadata // Our uploaders manages sending our datafiles somewhere @@ -171,7 +166,7 @@ func (s *Store) Put(b []byte) (err error) { return } - if s.buf.Len()+len(b) >= BUFFER_SIZE { + if s.buf.Len()+len(b) >= bufferSize { s.flushBuffer() } @@ -187,31 +182,26 @@ func (s *Store) Close() (err error) { func (s *Store) Store() (err error) { for { - // TODO: We're unmarshalling and then marshalling msgpack here when - // there is not real reason except that's a more useful general - // interface. We should add another that is ReadRaw - shardRec, err := s.reader.ReadShardRecord() + var shardRec *ShardRecord + shardRec, err = s.reader.ReadShardRecord() if err != nil { if err == io.EOF { - break - } else { - return err + err = nil } } s.streamMetadata.noteSequenceNumber(shardRec.ShardID, shardRec.SequenceNumber) err = s.PutRecord(shardRec.Record) if err != nil { - return err + return } } - - return nil + return } -const BUFFER_SIZE int = 1024 * 1024 +const bufferSize int = 1024 * 1024 -func NewStore(name string, r ShardReaderCheckpointer, up *S3Uploader) (s *Store) { - b := make([]byte, 0, BUFFER_SIZE) +func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store) { + b := make([]byte, 0, bufferSize) buf := bytes.NewBuffer(b) s = &Store{ diff --git a/triton/store_reader.go b/triton/store_reader.go index 1531f63..6ebc873 100644 --- a/triton/store_reader.go +++ b/triton/store_reader.go @@ -47,7 +47,7 @@ func (l StoreArchiveList) Less(i, j int) bool { return l[i].T.Before(l[j].T) } -func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, startDate, endDate time.Time) (Reader, error) { +func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, startDate, endDate time.Time) (result Reader, err error) { allDates := listDatesFromRange(startDate, endDate) archives := make(StoreArchiveList, 0, len(allDates)) @@ -57,13 +57,14 @@ func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, st if clientName != "" { prefix = fmt.Sprintf("%s%s-", prefix, clientName) } - resp, err := svc.ListObjects(&s3.ListObjectsInput{ + var resp *s3.ListObjectsOutput + resp, err = svc.ListObjects(&s3.ListObjectsInput{ Bucket: aws.String(bucketName), Prefix: aws.String(prefix), }) if err != nil { - return nil, err + return } for _, o := range resp.Contents { @@ -90,7 +91,8 @@ func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, st } if foundClientName != a.ClientName { - return nil, fmt.Errorf("Multiple clients found: %s and %s", foundClientName, a.ClientName) + err = fmt.Errorf("Multiple clients found: %s and %s", foundClientName, a.ClientName) + return } } @@ -98,10 +100,10 @@ func NewStoreReader(svc S3Service, bucketName, clientName, streamName string, st // Convert to a list of Readers... feels like there should be a better way // here. Is this what generics are for? Or is there an interface for a list? - readers := make([]Reader, len(archives)) + readers := make([]Reader, 0, len(archives)) for i := range archives { readers[i] = &archives[i] } - - return NewSerialReader(readers), nil + result = NewSerialReader(readers) + return } diff --git a/triton/store_test.go b/triton/store_test.go index 26d5398..51b14e1 100644 --- a/triton/store_test.go +++ b/triton/store_test.go @@ -13,7 +13,7 @@ import ( type nullStreamReader struct{} -func (nsr *nullStreamReader) ReadRecord() (map[string]interface{}, error) { +func (nsr *nullStreamReader) ReadRecord() (Record, error) { return nil, io.EOF } diff --git a/triton/stream.go b/triton/stream.go index 4eb4e98..a1bfb27 100644 --- a/triton/stream.go +++ b/triton/stream.go @@ -2,7 +2,6 @@ package triton import ( "fmt" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -10,153 +9,6 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) -// Some types to make sure our lists of func args don't get confused -type ShardID string - -type SequenceNumber string - -// A ShardStreamReader provides records from a Kinesis stream. -// It's specific to a single shard. A Stream is blocking, and will avoid -// overloading a shard by limiting how often it attempts to consume records. -type ShardStreamReader struct { - StreamName string - ShardID ShardID - ShardIteratorType string - NextIteratorValue *string - LastSequenceNumber *SequenceNumber - - service KinesisService - records []*kinesis.Record - lastRequest *time.Time -} - -// Recommended minimum polling interval to keep from overloading a Kinesis -// shard. -const MIN_POLL_INTERVAL = 1.0 * time.Second - -func (s *ShardStreamReader) initIterator() (err error) { - gsi := kinesis.GetShardIteratorInput{ - StreamName: aws.String(s.StreamName), - ShardId: aws.String(string(s.ShardID)), - ShardIteratorType: aws.String(s.ShardIteratorType), - } - - if s.LastSequenceNumber != nil { - gsi.StartingSequenceNumber = aws.String(string(*s.LastSequenceNumber)) - } - - gso, err := s.service.GetShardIterator(&gsi) - if err != nil { - return err - } - - s.NextIteratorValue = gso.ShardIterator - return nil -} - -func (s *ShardStreamReader) wait(minInterval time.Duration) { - if s.lastRequest != nil { - sleepTime := minInterval - time.Since(*s.lastRequest) - if sleepTime >= time.Duration(0) { - time.Sleep(sleepTime) - } - } - - n := time.Now() - s.lastRequest = &n -} - -func (s *ShardStreamReader) fetchMoreRecords() (err error) { - s.wait(MIN_POLL_INTERVAL) - - if s.NextIteratorValue == nil { - err := s.initIterator() - if err != nil { - return err - } - } - - gri := &kinesis.GetRecordsInput{ - Limit: aws.Int64(1000), - ShardIterator: s.NextIteratorValue, - } - - gro, err := s.service.GetRecords(gri) - if err != nil { - return err - } - - s.records = gro.Records - s.NextIteratorValue = gro.NextShardIterator - - return nil -} - -// Get the next record from the Shard Stream -// -// If records are already loaded, this returns the next record quickly. -// -// If not, it may block fetching them from the underlying API. In the event -// the API doesn't have any records prepared either, this method will return a -// nil record. This allows the caller to do other things rather than just -// blocking in this call forever or needing to pass in other flow control -// signals. -func (s *ShardStreamReader) Get() (r *kinesis.Record, err error) { - if len(s.records) == 0 { - err := s.fetchMoreRecords() - if err != nil { - return nil, err - } - } - - if len(s.records) > 0 { - r := s.records[0] - s.records = s.records[1:] - - if r.SequenceNumber == nil { - panic("missing sequence number") - } - - sn := SequenceNumber(*r.SequenceNumber) - s.LastSequenceNumber = &sn - - return r, nil - } else { - return nil, nil - } -} - -// Create a new stream given a specific sequence number -// -// This uses the Kinesis AFTER_SEQUENCE_NUMBER interator type, so this assumes -// the provided sequenceNumber has already been processed, and the caller wants -// records produced since. -func NewShardStreamReaderFromSequence(svc KinesisService, streamName string, sid ShardID, sn SequenceNumber) (s *ShardStreamReader) { - s = &ShardStreamReader{ - StreamName: streamName, - ShardID: sid, - ShardIteratorType: "AFTER_SEQUENCE_NUMBER", - LastSequenceNumber: &sn, - service: svc, - } - - return s -} - -// Create a new stream starting at the latest position -// -// This uses the Kinesis LATEST iterator type and assumes the caller only wants new data. -func NewShardStreamReader(svc KinesisService, streamName string, sid ShardID) (s *ShardStreamReader) { - s = &ShardStreamReader{ - StreamName: streamName, - ShardID: sid, - ShardIteratorType: "LATEST", - service: svc, - } - - return s -} - // Utility function to pick a shard id given an integer shard number. // Use this if you want the 2nd shard, but don't know what the id would be. func PickShardID(svc KinesisService, streamName string, shardNum int) (sid ShardID, err error) { @@ -164,7 +16,8 @@ func PickShardID(svc KinesisService, streamName string, shardNum int) (sid Shard if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == "ResourceNotFoundException" { - return "", fmt.Errorf("Failed to find stream %v", awsErr.Message()) + err = fmt.Errorf("Failed to find stream %q: %v", streamName, awsErr.Message()) + return } } diff --git a/triton/stream_reader.go b/triton/stream_reader.go index 987968a..f0eedc6 100644 --- a/triton/stream_reader.go +++ b/triton/stream_reader.go @@ -1,177 +1,60 @@ package triton -import ( - "fmt" - "io" - "log" - "sync" - - "github.com/tinylib/msgp/msgp" -) - -// A StreamReader is a higher-level interface for reading data from a live Triton stream. -// -// By implementing a Reader interface, we can delivery processed triton data to the client. -// In addition, we provide checkpointing service. type StreamReader interface { + ShardReader Reader Checkpoint() error - Stop() - ReadShardRecord() (*ShardRecord, error) } -type multiShardStreamReader struct { - checkpointer Checkpointer - readers []*ShardStreamReader - recStream chan *ShardRecord - allWg sync.WaitGroup - done chan struct{} - quit chan struct{} +type streamReader struct { + reader ShardReader + checkpointer Checkpointer + shardIDToSequenceNumber map[ShardID]SequenceNumber } -func (msr *multiShardStreamReader) Checkpoint() (err error) { - for _, r := range msr.readers { - if r.LastSequenceNumber != nil { - err = msr.checkpointer.Checkpoint(r.ShardID, *r.LastSequenceNumber) - } +func NewStreamReader(kinesisService KinesisService, stream string, c Checkpointer) (rc StreamReader, err error) { + rc = &streamReader{ + reader: NewMultiShardReader(&NewMultiShardReaderParams{ + KinesisService: kinesisService, + Stream: stream, + }), + checkpointer: c, + shardIDToSequenceNumber: make(map[ShardID]SequenceNumber), } return } -func (msr *multiShardStreamReader) ReadRecord() (result map[string]interface{}, err error) { - shardRecord, err := msr.ReadShardRecord() - if err != nil { - return - } - result = shardRecord.Record - return +func (rc *streamReader) Stop() { + rc.reader.Stop() } -func (msr *multiShardStreamReader) ReadShardRecord() (result *ShardRecord, err error) { - select { - case result = <-msr.recStream: - case <-msr.done: - err = io.EOF +func (rc *streamReader) Checkpoint() (err error) { + for shardID, seqNum := range rc.shardIDToSequenceNumber { + err = rc.checkpointer.Checkpoint(shardID, seqNum) + if err != nil { + return + } } return } -func (msr *multiShardStreamReader) Stop() { - msr.quit <- struct{}{} - log.Println("Triggered stop, waiting to complete") - msr.allWg.Wait() -} - -const maxShards int = 100 - -func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error) { - msr := multiShardStreamReader{ - checkpointer: c, - readers: make([]*ShardStreamReader, 0), - recStream: make(chan *ShardRecord), - allWg: sync.WaitGroup{}, - done: make(chan struct{}), - quit: make(chan struct{}, maxShards), - } +// implement the Reader interface - shards, err := ListShards(svc, streamName) +func (rc *streamReader) ReadRecord() (rec Record, err error) { + sr, err := rc.ReadShardRecord() if err != nil { return } - - if len(shards) == 0 { - return nil, fmt.Errorf("No shards found") - } - - sr = &msr - - if len(shards) > maxShards { - // We reserve some data structures. That's a lot of shards - panic("Too many shards") - } - - for _, sid := range shards { - sn, err := c.LastSequenceNumber(sid) - if err != nil { - return nil, err - } - var shardStream *ShardStreamReader - if sn == "" { - shardStream = NewShardStreamReader(svc, streamName, sid) - } else { - shardStream = NewShardStreamReaderFromSequence(svc, streamName, sid, sn) - } - - msr.readers = append(msr.readers, shardStream) - - go func(shardStream *ShardStreamReader) { - msr.allWg.Add(1) - defer msr.allWg.Done() - - log.Printf("Starting stream processing for %s:%s", shardStream.StreamName, shardStream.ShardID) - processStreamToChan(shardStream, msr.recStream, msr.done) - - msr.quit <- struct{}{} - }(shardStream) - } - - go func() { - <-msr.quit - log.Println("Stop triggered, shutdown starting.") - - // Closing the done channel will cause all the worker routines to shutdown. - // But we can't close a channel more than once, so we'll control access - // to it via the quit channel. - close(msr.done) - }() - + rec = sr.Record return } -func processStreamToChan(r *ShardStreamReader, recChan chan *ShardRecord, done chan struct{}) { - for { - select { - case <-done: - return - default: - } - - kRec, err := r.Get() - if err != nil { - log.Println("Error reading record", err) - return - } - - // this indicates there were no more records. Rather than block - // forever, the ShardStreamReader graciously gives us the opportunity - // to change our minds. - if kRec == nil { - continue - } - - rec, eb, err := msgp.ReadMapStrIntfBytes(kRec.Data, nil) - if err != nil { - // This will end the stream. If this ever happens, we might need - // some way to repair the stream. - log.Println("Failed to decode record from stream", err) - return - } - if len(eb) > 0 { - log.Println("Extra bytes in stream record", len(eb)) - return - } - shardRec := &ShardRecord{ - Record: rec, - ShardID: r.ShardID, - SequenceNumber: SequenceNumber(*kRec.SequenceNumber), - } - - select { - case recChan <- shardRec: - case <-done: - return - } +// Implement the ShardReader interface +func (rc *streamReader) ReadShardRecord() (rec *ShardRecord, err error) { + rec, err = rc.reader.ReadShardRecord() + if err != nil { + return } + rc.shardIDToSequenceNumber[rec.ShardID] = rec.SequenceNumber + return } - -// TODO: An interface to choose shards diff --git a/triton/stream_test.go b/triton/stream_test.go index 00a4bbc..c58c8ff 100644 --- a/triton/stream_test.go +++ b/triton/stream_test.go @@ -3,7 +3,6 @@ package triton import ( "fmt" "testing" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" @@ -33,81 +32,24 @@ func (s *NullKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) func TestNewShardStreamReader(t *testing.T) { svc := NullKinesisService{} - s := NewShardStreamReader(&svc, "test-stream", "shard-0001") - if s.StreamName != "test-stream" { + s := newShardReader(&newShardReaderParams{ + kinesisService: &svc, + stream: "test-stream", + shardID: "shard-0001", + }) + if s.stream != "test-stream" { t.Errorf("bad stream name") } - if s.ShardID != ShardID("shard-0001") { + if s.shardID != ShardID("shard-0001") { t.Errorf("bad ShardID") } - if s.NextIteratorValue != nil { + if s.nextIterator != "" { t.Errorf("bad NextIteratorValue") } } -func TestNewShardStreamReaderFromSequence(t *testing.T) { - svc := NullKinesisService{} - - s := NewShardStreamReaderFromSequence(&svc, "test-stream", "shard-0001", "abc123") - if s.StreamName != "test-stream" { - t.Errorf("bad stream name") - } - - if s.ShardID != "shard-0001" { - t.Errorf("bad ShardID") - } - - if s.NextIteratorValue != nil { - t.Errorf("bad NextIteratorValue") - } - - if *s.LastSequenceNumber != "abc123" { - t.Errorf("bad LastSequenceNumber") - } -} - -func TestStreamWait(t *testing.T) { - svc := NullKinesisService{} - s := NewShardStreamReader(&svc, "test-stream", "shard-0000") - - n := time.Now() - s.wait(100 * time.Millisecond) - if time.Since(n).Seconds() > 0.050 { - t.Errorf("Shouldn't have waited") - } - - s.wait(100 * time.Millisecond) - if time.Since(n).Seconds() < 0.050 { - t.Errorf("Should have waited") - } - -} - -func TestFetchMoreRecords(t *testing.T) { - svc := newTestKinesisService() - st := newTestKinesisStream("test-stream") - s1 := newTestKinesisShard() - - r1 := make(map[string]interface{}) - s1.AddRecord(SequenceNumber("a"), r1) - st.AddShard("shard-0000", s1) - svc.AddStream(st) - - s := NewShardStreamReader(svc, "test-stream", "shard-0000") - - err := s.fetchMoreRecords() - if err != nil { - t.Errorf("Received error %v", err) - return - } - - if len(s.records) != 1 { - t.Errorf("Should have a record") - } -} - func TestRead(t *testing.T) { svc := newTestKinesisService() st := newTestKinesisStream("test-stream") @@ -118,16 +60,21 @@ func TestRead(t *testing.T) { st.AddShard("shard-0000", s1) svc.AddStream(st) - s := NewShardStreamReader(svc, "test-stream", "shard-0000") - - r, err := s.Get() - if err != nil { + s := newShardReader(&newShardReaderParams{ + kinesisService: svc, + stream: "test-stream", + shardID: "shard-0000", + }) + defer func() { + s.close <- true + }() + select { + case record := <-s.records: + if record == nil { + t.Errorf("Should be a record") + } + case err := <-s.errors: t.Errorf("Received error %v", err) - return - } - - if r == nil { - t.Errorf("Should be a record") } } diff --git a/triton/tail.go b/triton/tail.go index 76d8c87..d9e7716 100644 --- a/triton/tail.go +++ b/triton/tail.go @@ -108,7 +108,7 @@ func (t *TailAt) sendArchivedRecords() (lastMetadata *StreamMetadata, err error) continue } for { - var rec map[string]interface{} + var rec Record rec, err = archive.ReadRecord() if err == io.EOF { err = nil @@ -140,28 +140,29 @@ func (t *TailAt) initStream() { t.errors <- err return } - err = t.sendKinesisRecords(lastStreamMetadata) - if err != nil { - t.errors <- err - } + t.sendKinesisRecords(lastStreamMetadata) } -func (t *TailAt) sendKinesisRecords(previousMetadata *StreamMetadata) (err error) { - shards, err := t.listShards() - if err != nil { - return +func (t *TailAt) sendKinesisRecords(previousMetadata *StreamMetadata) { + shardToSequenceNumber := make(map[ShardID]SequenceNumber) + if previousMetadata != nil { + for k, v := range previousMetadata.Shards { + shardToSequenceNumber[k] = v.MaxSequenceNumber + } } - // send all of the records in `startingKey` - // load metadata for starting key - // then send kinesis records - // load the sequenceNumbers for the last key - for _, shard := range shards { - var lastSequenceNumber SequenceNumber - if previousMetadata != nil && previousMetadata.Shards[shard] != nil { - lastSequenceNumber = previousMetadata.Shards[shard].MaxSequenceNumber + shardReader := NewMultiShardReader(&NewMultiShardReaderParams{ + KinesisService: t.kinesisService, + Stream: t.stream, + ShardToSequenceNumber: shardToSequenceNumber, + }) + for { + sr, err := shardReader.ReadShardRecord() + if err != nil { + t.errors <- err + } else { + t.records <- sr.Record } - go t.sendKinesisRecordsForShard(shard, lastSequenceNumber) } return } @@ -227,20 +228,6 @@ func (t *TailAt) getStreamIterator(shardID ShardID, startingSequenceNumber Seque return } -// listShardsForStream helper method to list all the shards for a stream -func (t *TailAt) listShards() (result []ShardID, err error) { - describeStreamOutput, err := t.kinesisService.DescribeStream(&kinesis.DescribeStreamInput{ - StreamName: aws.String(t.stream), - }) - if err != nil { - return - } - for _, shard := range describeStreamOutput.StreamDescription.Shards { - result = append(result, ShardID(*shard.ShardId)) - } - return -} - // Close closes the tail stream func (t *TailAt) Close() { t.Lock() diff --git a/triton/util.go b/triton/util.go new file mode 100644 index 0000000..c8d0313 --- /dev/null +++ b/triton/util.go @@ -0,0 +1 @@ +package triton From bcf27c0d816c1752f5aeb5855605f854e86738db Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Mon, 4 Jan 2016 12:08:03 -0800 Subject: [PATCH 8/9] Remove dead package --- triton/util.go | 1 - 1 file changed, 1 deletion(-) delete mode 100644 triton/util.go diff --git a/triton/util.go b/triton/util.go deleted file mode 100644 index c8d0313..0000000 --- a/triton/util.go +++ /dev/null @@ -1 +0,0 @@ -package triton From dfdaccdbded585013b64ff0fbcd5892cfe804c88 Mon Sep 17 00:00:00 2001 From: Brandon Bickford Date: Mon, 4 Jan 2016 13:32:56 -0800 Subject: [PATCH 9/9] Add tailing example --- README.md | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7ae36b3..b922928 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,13 @@ Triton is an opinionated set of tooling for building a data pipeline around an AWS stack including [Kinesis](http://aws.amazon.com/kinesis/) and S3. It provides the necessary glue for building real applications on top of the -type of architecture. +type of architecture. ## Overview ## As your application collects data, write it to Kinesis streams as a series of events. Other applications in your infrastructure read from this stream -providing a solid pattern for services to share data. +providing a solid pattern for services to share data. Triton aims to provide a level of tooling, glue and utility to make this ecosystem easy to use. Namely: @@ -177,7 +177,25 @@ for { ... } ``` +### Tailing from a specific time +You can tail from a specific point in time. For example: + +```go + +kinesisClient := ... +s3Client := ... + +tail := triton.NewTailAt(&triton.NewTailAtParams{ + S3Service: ... s3 client for region where triton records live..., + KinesisService: ... kinesis client..., + StreamName: "courier_activity_prod", + Bucket: "postmates-triton-prod", + Client: "archive" + At: time.Now().Add(time.Minute * 30), // 30 minutes ago +}) + +``` ### Other Languages ###