diff --git a/README.md b/README.md index 7ae36b3..c5fc6e6 100644 --- a/README.md +++ b/README.md @@ -4,13 +4,13 @@ Triton is an opinionated set of tooling for building a data pipeline around an AWS stack including [Kinesis](http://aws.amazon.com/kinesis/) and S3. It provides the necessary glue for building real applications on top of the -type of architecture. +type of architecture. ## Overview ## As your application collects data, write it to Kinesis streams as a series of events. Other applications in your infrastructure read from this stream -providing a solid pattern for services to share data. +providing a solid pattern for services to share data. Triton aims to provide a level of tooling, glue and utility to make this ecosystem easy to use. Namely: @@ -200,4 +200,4 @@ Standard go build commands of course also work. * Metrics/Reporting hooks for easier status checks * Better handle Kinesis shard splitting and combining - * Better patterns for dealing with arbitrary `map[string]interface{}` data + * Better patterns for dealing with arbitrary `Record` data diff --git a/triton/archive.go b/triton/archive.go index ec1d695..31d0c35 100644 --- a/triton/archive.go +++ b/triton/archive.go @@ -24,7 +24,7 @@ type StoreArchive struct { rdr Reader } -func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) { +func (sa *StoreArchive) ReadRecord() (rec Record, err error) { if sa.rdr == nil { out, err := sa.s3Svc.GetObject(&s3.GetObjectInput{ Bucket: aws.String(sa.Bucket), diff --git a/triton/archive_reader.go b/triton/archive_reader.go index f1389b4..aa1b88b 100644 --- a/triton/archive_reader.go +++ b/triton/archive_reader.go @@ -13,8 +13,8 @@ type ArchiveReader struct { mr *msgp.Reader } -func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error) { - rec = make(map[string]interface{}) +func (r *ArchiveReader) ReadRecord() (rec Record, err error) { + rec = make(Record) err = r.mr.ReadMapStrIntf(rec) return diff --git a/triton/aws.go b/triton/aws.go index 5bc86ad..197da11 100644 --- a/triton/aws.go +++ b/triton/aws.go @@ -17,6 +17,7 @@ type KinesisService interface { DescribeStream(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) GetShardIterator(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error) GetRecords(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) + PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) } type S3Service interface { diff --git a/triton/reader.go b/triton/reader.go index 6f34bee..aebd7dd 100644 --- a/triton/reader.go +++ b/triton/reader.go @@ -6,7 +6,7 @@ import ( ) type Reader interface { - ReadRecord() (rec map[string]interface{}, err error) + ReadRecord() (rec Record, err error) } // A SerialReader let's us read from multiple readers, in sequence @@ -15,7 +15,7 @@ type SerialReader struct { r_idx int } -func (sr *SerialReader) ReadRecord() (rec map[string]interface{}, err error) { +func (sr *SerialReader) ReadRecord() (rec Record, err error) { for sr.r_idx < len(sr.readers) { rec, err = sr.readers[sr.r_idx].ReadRecord() if err != nil { diff --git a/triton/reader_test.go b/triton/reader_test.go index 0ab8306..96053d7 100644 --- a/triton/reader_test.go +++ b/triton/reader_test.go @@ -23,7 +23,7 @@ func TestSerialReaderEmtpy(t *testing.T) { type instantEOFReader struct{} -func (sr *instantEOFReader) ReadRecord() (rec map[string]interface{}, err error) { +func (sr *instantEOFReader) ReadRecord() (rec Record, err error) { return nil, io.EOF } diff --git a/triton/store.go b/triton/store.go index 4a6a149..9eb6dcb 100644 --- a/triton/store.go +++ b/triton/store.go @@ -141,7 +141,7 @@ func (s *Store) flushBuffer() (err error) { return } -func (s *Store) PutRecord(rec map[string]interface{}) (err error) { +func (s *Store) PutRecord(rec Record) (err error) { // TODO: Looks re-usable b := make([]byte, 0, 1024) b, err = msgp.AppendMapStrIntf(b, rec) diff --git a/triton/store_test.go b/triton/store_test.go index 95390fa..7531ec9 100644 --- a/triton/store_test.go +++ b/triton/store_test.go @@ -13,7 +13,7 @@ import ( type nullStreamReader struct{} -func (nsr *nullStreamReader) ReadRecord() (map[string]interface{}, error) { +func (nsr *nullStreamReader) ReadRecord() (Record, error) { return nil, io.EOF } diff --git a/triton/stream_reader.go b/triton/stream_reader.go index fe5fc8d..857f91d 100644 --- a/triton/stream_reader.go +++ b/triton/stream_reader.go @@ -22,7 +22,7 @@ type StreamReader interface { type multiShardStreamReader struct { checkpointer Checkpointer readers []*ShardStreamReader - recStream chan map[string]interface{} + recStream chan Record allWg sync.WaitGroup done chan struct{} quit chan struct{} @@ -37,7 +37,7 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) { return } -func (msr *multiShardStreamReader) ReadRecord() (rec map[string]interface{}, err error) { +func (msr *multiShardStreamReader) ReadRecord() (rec Record, err error) { select { case rec = <-msr.recStream: return rec, nil @@ -58,7 +58,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr msr := multiShardStreamReader{ c, make([]*ShardStreamReader, 0), - make(chan map[string]interface{}), + make(chan Record), sync.WaitGroup{}, make(chan struct{}), make(chan struct{}, maxShards), @@ -118,7 +118,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr return } -func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface{}, done chan struct{}) { +func processStreamToChan(r *ShardStreamReader, recChan chan Record, done chan struct{}) { for { select { case <-done: diff --git a/triton/stream_reader_test.go b/triton/stream_reader_test.go index 77f1f70..3d08f0e 100644 --- a/triton/stream_reader_test.go +++ b/triton/stream_reader_test.go @@ -7,13 +7,13 @@ func TestNewStreamReader(t *testing.T) { st := newTestKinesisStream("test-stream") s1 := newTestKinesisShard() - r1 := make(map[string]interface{}) + r1 := make(Record) r1["value"] = "a" s1.AddRecord(SequenceNumber("a"), r1) st.AddShard(ShardID("0"), s1) s2 := newTestKinesisShard() - r2 := make(map[string]interface{}) + r2 := make(Record) r2["value"] = "b" s2.AddRecord(SequenceNumber("b"), r2) st.AddShard(ShardID("1"), s2) diff --git a/triton/stream_test.go b/triton/stream_test.go index 76a33e5..21f98b5 100644 --- a/triton/stream_test.go +++ b/triton/stream_test.go @@ -31,6 +31,15 @@ func (s *NullKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) return nil, fmt.Errorf("Not Implemented") } +func (s *NullKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) { + results := []*kinesis.PutRecordsResultEntry{} + gso := &kinesis.PutRecordsOutput{ + Records: results, + FailedRecordCount: aws.Int64(0), + } + return gso, nil +} + type FailingKinesisService struct{} func (s *FailingKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { @@ -47,6 +56,11 @@ func (s *FailingKinesisService) GetShardIterator(*kinesis.GetShardIteratorInput) return gso, nil } +func (s *FailingKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) { + err := awserr.New("ProvisionedThroughputExceededException", "slow down dummy", fmt.Errorf("error")) + return nil, err +} + func TestNewShardStreamReader(t *testing.T) { svc := NullKinesisService{} @@ -107,7 +121,7 @@ func TestFetchMoreRecords(t *testing.T) { st := newTestKinesisStream("test-stream") s1 := newTestKinesisShard() - r1 := make(map[string]interface{}) + r1 := make(Record) s1.AddRecord(SequenceNumber("a"), r1) st.AddShard("shard-0000", s1) svc.AddStream(st) @@ -150,7 +164,7 @@ func TestRead(t *testing.T) { st := newTestKinesisStream("test-stream") s1 := newTestKinesisShard() - r1 := make(map[string]interface{}) + r1 := make(Record) s1.AddRecord(SequenceNumber("a"), r1) st.AddShard("shard-0000", s1) svc.AddStream(st) diff --git a/triton/test_util.go b/triton/test_util.go index cbd2569..f04cbf0 100644 --- a/triton/test_util.go +++ b/triton/test_util.go @@ -1,11 +1,13 @@ -// Mock Kinesis Service package triton +// Mock Kinesis Service + import ( "bytes" "fmt" "log" "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" @@ -21,7 +23,7 @@ type testKinesisShard struct { records []testKinesisRecords } -func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec map[string]interface{}) { +func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec Record) { b := bytes.NewBuffer(make([]byte, 0, 1024)) w := msgp.NewWriter(b) err := w.WriteMapStrIntf(rec) @@ -29,10 +31,36 @@ func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec map[string]interface panic(err) } w.Flush() - rs := testKinesisRecords{sn, [][]byte{b.Bytes()}} + s.AddData(sn, b.Bytes()) +} + +func (s *testKinesisShard) AddData(sn SequenceNumber, data []byte) { + rs := testKinesisRecords{sn, [][]byte{data}} s.records = append(s.records, rs) } +func (s *testKinesisShard) PopData() (SequenceNumber, []byte) { + r := s.records[0] + s.records = s.records[1:] + return r.sn, r.recordData[0] +} + +func (s *testKinesisShard) PopRecord() (SequenceNumber, Record) { + sn, data := s.PopData() + + b := bytes.NewBuffer(data) + r := make(Record) + + w := msgp.NewReader(b) + w.ReadMapStrIntf(r) + + return sn, r +} + +func (s *testKinesisShard) NextSequenceNumber() SequenceNumber { + return SequenceNumber(time.Now().String()) +} + func newTestKinesisShard() *testKinesisShard { return &testKinesisShard{make([]testKinesisRecords, 0)} } @@ -117,7 +145,6 @@ func (s *testKinesisService) GetRecords(gri *kinesis.GetRecordsInput) (*kinesis. } func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { - shards := make([]*kinesis.Shard, 0) stream, ok := s.streams[*input.StreamName] if !ok { @@ -125,7 +152,8 @@ func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) return nil, fmt.Errorf("Failed to find stream") } - for sid, _ := range stream.shards { + var shards []*kinesis.Shard + for sid := range stream.shards { shards = append(shards, &kinesis.Shard{ShardId: aws.String(string(sid))}) } @@ -140,3 +168,33 @@ func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) return dso, nil } + +func (s *testKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) { + stream, ok := s.streams[*input.StreamName] + if !ok { + return nil, fmt.Errorf("Failed to find stream") + } + + records := make([]*kinesis.PutRecordsResultEntry, len(input.Records)) + for i, r := range input.Records { + shard, ok := stream.shards[ShardID(*r.PartitionKey)] + if !ok { + return nil, fmt.Errorf("Failed to find shard") + } + + sn := shard.NextSequenceNumber() + shard.AddData(sn, r.Data) + + records[i] = &kinesis.PutRecordsResultEntry{ + SequenceNumber: aws.String(string(sn)), + ShardId: r.PartitionKey, + } + } + + output := &kinesis.PutRecordsOutput{ + Records: records, + FailedRecordCount: aws.Int64(0), + } + + return output, nil +} diff --git a/triton/triton.go b/triton/triton.go new file mode 100644 index 0000000..b9ffd1c --- /dev/null +++ b/triton/triton.go @@ -0,0 +1,4 @@ +package triton + +// Record is the unit of data that is passed through Triton. +type Record map[string]interface{} diff --git a/triton/write_batch_test.go b/triton/write_batch_test.go new file mode 100644 index 0000000..64eab18 --- /dev/null +++ b/triton/write_batch_test.go @@ -0,0 +1,106 @@ +package triton + +import ( + "bytes" + "testing" + "time" +) + +func TestBatchWriterSizeExceeded(t *testing.T) { + configString := bytes.NewBufferString(` + my_stream: + name: test-stream + partition_key: value + region: us-west-1 + `) + c, _ := NewConfigFromFile(configString) + config, _ := c.ConfigForName("my_stream") + + svc := newTestKinesisService() + st := newTestKinesisStream(config.StreamName) + s1 := newTestKinesisShard() + st.AddShard(ShardID("test-value"), s1) + svc.AddStream(st) + + w := NewTestWriter(config, svc, 1) + bw := NewBatchWriterSize(w, 2, 24*time.Hour) + + r := Record(map[string]interface{}{"value": "test-value"}) + bw.WriteRecords(r) + + // Ensure this was not written + if len(s1.records) != 0 { + t.Fatal("Batcher did not wait for size to be exceeded") + } + + bw.WriteRecords(r) + + // wait for write -- this is technically a race condition + time.Sleep(10 * time.Millisecond) + + if len(s1.records) != 2 { + t.Fatal("Batcher did not write when size exceeded") + } +} + +func TestBatchWriterTimeExceeded(t *testing.T) { + configString := bytes.NewBufferString(` + my_stream: + name: test-stream + partition_key: value + region: us-west-1 + `) + c, _ := NewConfigFromFile(configString) + config, _ := c.ConfigForName("my_stream") + + svc := newTestKinesisService() + st := newTestKinesisStream(config.StreamName) + s1 := newTestKinesisShard() + st.AddShard(ShardID("test-value"), s1) + svc.AddStream(st) + + w := NewTestWriter(config, svc, 1) + bw := NewBatchWriterSize(w, 1000, 1*time.Millisecond) + + r := Record(map[string]interface{}{"value": "test-value"}) + bw.WriteRecords(r) + + // Ensure this was not written + if len(s1.records) != 0 { + t.Fatal("Batcher did not wait for time to be exceeded") + } + + // wait for write -- this is technically a race condition + time.Sleep(10 * time.Millisecond) + + if len(s1.records) != 1 { + t.Fatal("Batcher did not write when time exceeded") + } +} + +func TestBatchWriterFailed(t *testing.T) { + configString := bytes.NewBufferString(` + my_stream: + name: test-stream + partition_key: value + region: us-west-1 + `) + c, _ := NewConfigFromFile(configString) + config, _ := c.ConfigForName("my_stream") + + r := Record(map[string]interface{}{"value": "test-value"}) + w := NewTestWriter(config, &FailingKinesisService{}, 1) + bw := NewBatchWriterSize(w, 1, 1*time.Hour) + bw.WriteRecords(r) + + // wait for write -- this is technically a race condition + time.Sleep(1 * time.Millisecond) + + select { + case <-bw.Errors(): + // expected behavior + default: + t.Fatal("Write did not fail as expected") + } + +} diff --git a/triton/writer.go b/triton/writer.go new file mode 100644 index 0000000..3ee1348 --- /dev/null +++ b/triton/writer.go @@ -0,0 +1,115 @@ +package triton + +import ( + "bytes" + "errors" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/cenkalti/backoff" + "github.com/tinylib/msgp/msgp" +) + +// Writer represents a Kinesis writer configured to use a stream and +// partition key. +type Writer interface { + WriteRecords(r ...Record) error +} + +// NewWriter returns a Writer using the given configuration. +// The returned writer is a syncronous writer. +func NewWriter(config *StreamConfig) Writer { + awsConfig := aws.NewConfig().WithRegion(config.RegionName) + sess := session.New(awsConfig) + svc := kinesis.New(sess) + + return &writer{ + config: config, + svc: svc, + maxBackoffWait: 2 * time.Minute, + } +} + +type writer struct { + config *StreamConfig + svc KinesisService + + maxBackoffWait time.Duration +} + +// WriteRecords synchronously writes data to the Kinesis stream. +// +// Writes to Kinesis are expected to take ~100ms. Upon error, WriteRecord +// will retry with exponential backoff. Care must be taken to protect against +// unwanted blocking. +func (w *writer) WriteRecords(rs ...Record) error { + inputs := make([]*recordInput, len(rs)) + for i, r := range rs { + ri, err := w.recordInputFromRecord(r) + if err != nil { + return err + } + inputs[i] = ri + } + + return w.write(inputs...) +} + +func (w *writer) write(inputs ...*recordInput) error { + records := make([]*kinesis.PutRecordsRequestEntry, len(inputs)) + for i, input := range inputs { + records[i] = &kinesis.PutRecordsRequestEntry{ + Data: input.data, + PartitionKey: aws.String(input.partition), + } + } + + params := &kinesis.PutRecordsInput{ + Records: records, + StreamName: aws.String(w.config.StreamName), + } + + writeOp := func() error { + _, err := w.svc.PutRecords(params) + return err + } + + eb := backoff.NewExponentialBackOff() + eb.InitialInterval = 100 * time.Millisecond + eb.MaxElapsedTime = w.maxBackoffWait + + return backoff.Retry(writeOp, eb) +} + +type recordInput struct { + data []byte + partition string +} + +func (w *writer) recordInputFromRecord(r Record) (*recordInput, error) { + // extract partition value + val, ok := r[w.config.PartitionKeyName] + if !ok { + errMsg := fmt.Sprintf("Partition key '%s' does not exist", w.config.PartitionKeyName) + return nil, errors.New(errMsg) + } + + partition := fmt.Sprintf("%v", val) + + // encode to msgpack + buf := new(bytes.Buffer) + encoder := msgp.NewWriter(buf) + if err := encoder.WriteMapStrIntf(r); err != nil { + return nil, err + } + encoder.Flush() + + ri := &recordInput{ + data: buf.Bytes(), + partition: partition, + } + return ri, nil +} diff --git a/triton/writer_batch.go b/triton/writer_batch.go new file mode 100644 index 0000000..9e75a8b --- /dev/null +++ b/triton/writer_batch.go @@ -0,0 +1,158 @@ +package triton + +import "time" + +const defaultBatchSize = 10 +const defaultFlushInterval = 1 * time.Second + +// MaxBatchSize is the limit Kinesis has on a PutRecords call +const MaxBatchSize = 500 + +// NewBatchWriter creates a batch version of an existing Writer using default +// values for size and interval. +func NewBatchWriter(w Writer) *BatchWriter { + return NewBatchWriterSize(w, defaultBatchSize, defaultFlushInterval) +} + +// NewBatchWriterSize creates a batch writer using the given size and interval. +func NewBatchWriterSize(w Writer, size int, intr time.Duration) *BatchWriter { + if size <= 0 { + size = defaultBatchSize + } else if size > MaxBatchSize { + size = MaxBatchSize + } + + if intr <= 0 { + intr = defaultFlushInterval + } + + bufferSize := 10000 + + bw := &BatchWriter{ + w: w, + buf: make(chan Record, bufferSize), + signal: make(chan struct{}, bufferSize), + flushSignal: make(chan struct{}, 0), // must be blocking write/read + errors: make(chan error, 1), + + size: size, + intr: intr, + + ticker: time.NewTicker(intr), + } + + go bw.writeLoop() + + return bw +} + +// BatchWriter implements an asyncronous writer that writes records in batches. +// A batch is written when either the buffer size is exceeded or the time +// interval since the last write has been exceeded. +// +// Write errors are written to the Errors() channel. It is highly recommended +// that you actively monitor this channel because the writer will not stop +// after an error. +type BatchWriter struct { + w Writer + buf chan Record + errors chan error + + size int + intr time.Duration + + ticker *time.Ticker + signal chan struct{} + flushSignal chan struct{} +} + +func (bw *BatchWriter) writeLoop() { + for { + select { + case _, ok := <-bw.signal: + if !ok { + return + } + + if len(bw.buf) >= bw.size { + bw.flush() + } + case <-bw.ticker.C: + bw.flush() + case _, ok := <-bw.flushSignal: + if ok { + bw.flush() + bw.flushSignal <- struct{}{} + } + } + } +} + +// flush writes everything in the current buffer. To prevent concurrent flushes, +// only write_loop() is allowed to call this. +func (bw *BatchWriter) flush() { + numBuffered := len(bw.buf) + if numBuffered == 0 { + return + } + + records := make([](Record), len(bw.buf)) + for i := 0; i < numBuffered; i++ { + records[i] = <-bw.buf // this will never block + } + + // write in blocks of batch size + for n := 0; n < numBuffered; n += bw.size { + end := n + bw.size + if end > numBuffered { + end = numBuffered + } + + if err := bw.w.WriteRecords(records[n:end]...); err != nil { + bw.writeError(err) + } + } +} + +// Flush forces all buffered records to be sent. +// If there is an error it will have been written to the Errors chan. +func (bw *BatchWriter) Flush() { + bw.flushSignal <- struct{}{} // ask to flush + <-bw.flushSignal // block until finish +} + +// Close prevents future writes and flushes all currently buffered records. +// If there is an error it will have been written to the Errors chan. +func (bw *BatchWriter) Close() { + bw.ticker.Stop() + close(bw.signal) + bw.Flush() + close(bw.flushSignal) + close(bw.errors) +} + +// Errors returns the channel that errors will be returned on. It is highly +// recommended that you monitor this channel. +func (bw *BatchWriter) Errors() <-chan error { + return bw.errors +} + +// writeError trys to write to error chan but will not block. +func (bw *BatchWriter) writeError(err error) { + select { + case bw.errors <- err: + default: + } +} + +// WriteRecords performs an asyncronous write to Kinesis. +// +// The returned error will always be nil in the current implementation. +// It is recommended you read errors from Errors(). +func (bw *BatchWriter) WriteRecords(rs ...Record) error { + for _, r := range rs { + bw.buf <- r + } + bw.signal <- struct{}{} // signal that buffer changed + return nil +} diff --git a/triton/writer_test.go b/triton/writer_test.go new file mode 100644 index 0000000..27cebe1 --- /dev/null +++ b/triton/writer_test.go @@ -0,0 +1,60 @@ +package triton + +import ( + "bytes" + "testing" + "time" +) + +func NewTestWriter(config *StreamConfig, s KinesisService, backoffWait time.Duration) Writer { + return &writer{ + config: config, + svc: s, + maxBackoffWait: backoffWait, + } +} + +func TestWriteRecords(t *testing.T) { + configString := bytes.NewBufferString(` + my_stream: + name: test-stream + partition_key: value + region: us-west-1 + `) + c, _ := NewConfigFromFile(configString) + config, _ := c.ConfigForName("my_stream") + + svc := newTestKinesisService() + st := newTestKinesisStream(config.StreamName) + s1 := newTestKinesisShard() + st.AddShard(ShardID("test-value"), s1) + svc.AddStream(st) + + r := Record(map[string]interface{}{"value": "test-value"}) + w := NewTestWriter(config, svc, 1) + if err := w.WriteRecords(r); err != nil { + t.Fatal(err) + } + + _, storedRecord := s1.PopRecord() + if r["value"] != storedRecord["value"] { + t.Fatal("Records are not equal", r, storedRecord) + } +} + +func TestWriteRecordsFailing(t *testing.T) { + configString := bytes.NewBufferString(` + my_stream: + name: test-stream + partition_key: value + region: us-west-1 + `) + c, _ := NewConfigFromFile(configString) + config, _ := c.ConfigForName("my_stream") + + r := Record(map[string]interface{}{"value": "test-value"}) + w := NewTestWriter(config, &FailingKinesisService{}, 1) + if err := w.WriteRecords(r); err == nil { + t.Fatal("Write did not fail as expected") + } +}