From 9ac0ec0d13503a862a18fd541569b23257afad23 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Fri, 23 Sep 2016 15:56:27 -0400 Subject: [PATCH 01/10] Start writing ChanReadWriter --- link.go | 11 ++- stream/io_chan.go | 115 +++++++++++++++++++++++++++++- stream/io_chan_test.go | 61 ++++++++++++++++ stream/redis.go | 155 +++++++++++++++++++++++++++++++++++++++++ toxics/redis.go | 77 ++++++++++++++++++++ toxics/toxic.go | 19 +++-- 6 files changed, 427 insertions(+), 11 deletions(-) create mode 100644 stream/redis.go create mode 100644 toxics/redis.go diff --git a/link.go b/link.go index 69d053cc..12df52bf 100644 --- a/link.go +++ b/link.go @@ -97,6 +97,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { // Interrupt the last toxic so that we don't have a race when moving channels if link.stubs[i-1].InterruptToxic() { link.stubs[i-1].Output = newin + link.stubs[i-1].ReadWriter.SetOutput(newin) if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { link.stubs[i].State = stateful.NewState() @@ -129,8 +130,11 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) { stop <- link.stubs[i-1].InterruptToxic() }() - // Unblock the previous toxic if it is trying to flush - // If the previous toxic is closed, continue flusing until we reach the end. + // Flush toxic's internal buffer + link.stubs[i].ReadWriter.Flush() + + // Unblock the previous toxic if it is trying to flush. + // If the previous toxic is closed, continue flushing until we reach the end. interrupted := false stopped := false for !interrupted { @@ -149,7 +153,7 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) { } } - // Empty the toxic's buffer if necessary + // Empty the toxic's input buffer if necessary for len(link.stubs[i].Input) > 0 { tmp := <-link.stubs[i].Input if tmp == nil { @@ -160,6 +164,7 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) { } link.stubs[i-1].Output = link.stubs[i].Output + link.stubs[i-1].ReadWriter.SetOutput(link.stubs[i].Output) link.stubs = append(link.stubs[:i], link.stubs[i+1:]...) go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1]) diff --git a/stream/io_chan.go b/stream/io_chan.go index 4038c32e..89833bf2 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -1,9 +1,12 @@ package stream import ( + "bytes" "fmt" "io" "time" + + "github.com/Sirupsen/logrus" ) type Direction uint8 @@ -32,6 +35,10 @@ func NewChanWriter(output chan<- *StreamChunk) *ChanWriter { // Write `buf` as a StreamChunk to the channel. The full buffer is always written, and error // will always be nil. Calling `Write()` after closing the channel will panic. func (c *ChanWriter) Write(buf []byte) (int, error) { + if len(buf) == 0 { + return 0, nil + } + packet := &StreamChunk{make([]byte, len(buf)), time.Now()} copy(packet.Data, buf) // Make a copy before sending it to the channel c.output <- packet @@ -71,7 +78,7 @@ func (c *ChanReader) Read(out []byte) (int, error) { } n := copy(out, c.buffer) c.buffer = c.buffer[n:] - if len(out) <= len(c.buffer) { + if len(out) == n { return n, nil } else if n > 0 { // We have some data to return, so make the channel read optional @@ -106,3 +113,109 @@ func (c *ChanReader) Read(out []byte) (int, error) { c.buffer = p.Data[n2:] return n + n2, nil } + +type Closer interface { + Close() +} + +// (buffered) +// chan []byte -> ChanReader -> MultiReader -> +// V ^ +// bytes.Buffer ----| +type ChanReadWriter struct { + buffer *bytes.Buffer + bufReader *bytes.Reader + reader *ChanReader + writer *ChanWriter + tee io.Reader + closer Closer +} + +func (c *ChanReadWriter) HandleError(err error) bool { + if err == ErrInterrupted { + c.SeekBack() + return true + } else if err == io.EOF || err == io.ErrUnexpectedEOF { + c.SeekBack() + c.Flush() + if c.closer != nil { + c.closer.Close() + } + return true + } else if err != nil { + c.SeekBack() + c.Flush() + logrus.Warn("Read error in toxic: ", err) + } + return false +} + +func (c *ChanReadWriter) Read(out []byte) (int, error) { + if c.bufReader != nil { + n, err := c.bufReader.Read(out) + if err == io.EOF { + c.bufReader = nil + return n, nil + } + return n, err + } else { + return c.tee.Read(out) + } +} + +func (c *ChanReadWriter) Write(buf []byte) (int, error) { + n, err := c.writer.Write(buf) + c.Checkpoint() + return n, err +} + +func (c *ChanReadWriter) SetOutput(output chan<- *StreamChunk) { + c.writer.output = output +} + +func (c *ChanReadWriter) Flush() { + n := 0 + if c.bufReader != nil { + n = c.bufReader.Len() + } + buf := make([]byte, n+len(c.reader.buffer)) + if n > 0 { + c.bufReader.Read(buf[:n]) + } + if len(buf[n:]) > 0 { + c.reader.Read(buf[n:]) + } + c.writer.Write(buf) + c.bufReader = nil + c.buffer.Reset() +} + +func (c *ChanReadWriter) Checkpoint() { + if c.bufReader != nil { + c.buffer.Next(int(c.bufReader.Size()) - c.bufReader.Len()) + } else { + c.buffer.Reset() + } +} + +func (c *ChanReadWriter) SeekBack() { + c.bufReader = bytes.NewReader(c.buffer.Bytes()) +} + +func (c *ChanReadWriter) SetInterrupt(interrupt <-chan struct{}) { + c.reader.SetInterrupt(interrupt) +} + +func (c *ChanReadWriter) SetCloser(closer Closer) { + c.closer = closer +} + +func NewChanReadWriter(input <-chan *StreamChunk, output chan<- *StreamChunk) *ChanReadWriter { + rw := &ChanReadWriter{ + buffer: bytes.NewBuffer(make([]byte, 0, 32*1024)), + reader: NewChanReader(input), + writer: NewChanWriter(output), + } + rw.tee = io.TeeReader(rw.reader, rw.buffer) + return rw +} diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index 9e140ef1..f4852448 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -153,6 +153,55 @@ func TestMultiWriteWithCopy(t *testing.T) { } } +func TestMultiRead(t *testing.T) { + send := []byte("hello world") + c := make(chan *StreamChunk) + writer := NewChanWriter(c) + reader := NewChanReader(c) + passed := make(chan bool) + go func() { + writer.Write(send) + select { + case c <- &StreamChunk{[]byte("garbage"), time.Now()}: + case <-passed: + } + writer.Close() + }() + buf := make([]byte, len(send)) + + n, err := reader.Read(buf[:8]) + if n != 8 { + t.Fatalf("Read wrong number of bytes: %d expected 8", n) + } + if err != nil { + t.Fatal("Couldn't read from stream", err) + } + if !bytes.Equal(buf[:8], send[:8]) { + t.Fatal("Got wrong message from stream", string(buf[:8])) + } + time.Sleep(10 * time.Millisecond) + + n, err = reader.Read(buf[8:]) + if n != len(buf[8:]) { + t.Fatalf("Read wrong number of bytes: %d expected %d", n, len(buf[8:])) + } + if err != nil { + t.Fatal("Couldn't read from stream", err) + } + if !bytes.Equal(buf, send) { + t.Fatal("Got wrong message from stream", string(buf)) + } + + passed <- true + + n, err = reader.Read(buf) + if n != 0 { + t.Fatalf("Read from channel occured when it shouldn't have: %s", string(buf[:n])) + } else if err != io.EOF { + t.Fatal("Read returned wrong error after close:", err) + } +} + func TestReadInterrupt(t *testing.T) { send := []byte("hello world") c := make(chan *StreamChunk) @@ -199,3 +248,15 @@ func TestReadInterrupt(t *testing.T) { t.Fatal("Got wrong message from stream", string(buf)) } } + +func TestBlankWrite(t *testing.T) { + c := make(chan *StreamChunk, 2) + writer := NewChanWriter(c) + writer.Write([]byte{}) + writer.Write(nil) + writer.Close() + + for v := range c { + t.Fatalf("Unexpected write to channel: %+v", v) + } +} diff --git a/stream/redis.go b/stream/redis.go new file mode 100644 index 00000000..d2cac3a1 --- /dev/null +++ b/stream/redis.go @@ -0,0 +1,155 @@ +package stream + +import ( + "bufio" + "errors" + "fmt" + "io" + "strconv" +) + +const ( + SimpleString = '+' + Error = '-' + Integer = ':' + BulkString = '$' + Array = '*' + Null = '\x00' +) + +var ErrInvalidSyntax = errors.New("invalid RESP syntax") + +type RedisType struct { + Type byte + Value interface{} +} + +func (t RedisType) String() string { + if v, ok := t.Value.(string); ok { + return v + } + return "" +} + +func (t RedisType) Integer() int64 { + if v, ok := t.Value.(int64); ok { + return v + } + return 0 +} + +func (t RedisType) Array() []RedisType { + if v, ok := t.Value.([]RedisType); ok { + return v + } + return nil +} + +func (t RedisType) StringArray() []string { + array := t.Array() + out := make([]string, len(array)) + for i := range array { + out[i] = array[i].String() + } + return out +} + +func (t RedisType) Raw() []byte { + out := []byte{t.Type} + switch t.Type { + case SimpleString, Error: + out = append(out, []byte(t.String())...) + out = append(out, '\r', '\n') + case Integer: + out = append(out, []byte(strconv.FormatInt(t.Integer(), 10))...) + out = append(out, '\r', '\n') + case BulkString: + value := t.String() + out = append(out, []byte(strconv.Itoa(len(value)))...) + out = append(out, '\r', '\n') + out = append(out, []byte(value)...) + out = append(out, '\r', '\n') + case Array: + value := t.Array() + out = append(out, []byte(strconv.Itoa(len(value)))...) + out = append(out, '\r', '\n') + for i := 0; i < len(value); i++ { + out = append(out, value[i].Raw()...) + } + default: + return nil + } + return out +} + +func readLine(reader *bufio.Reader) ([]byte, error) { + line, err := reader.ReadSlice('\n') + if err != nil { + return nil, err + } + + if len(line) < 1 || line[len(line)-2] != '\r' { + return nil, ErrInvalidSyntax + } + return line[:len(line)-2], nil +} + +func ParseRESP(reader *bufio.Reader) (RedisType, error) { + var out RedisType + line, err := readLine(reader) + if err != nil { + return RedisType{}, err + } + + if len(line) > 0 { + out.Type = line[0] + switch line[0] { + case SimpleString, Error: + out.Value = string(line[1:]) + return out, nil + case Integer: + out.Value, err = strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + return RedisType{}, ErrInvalidSyntax + } + return out, nil + case BulkString: + length, err := strconv.Atoi(string(line[1:])) + if err != nil { + return RedisType{}, ErrInvalidSyntax + } + if length < 0 { + return out, nil + } + buf := make([]byte, length+2) + _, err = io.ReadFull(reader, buf) + if err != nil { + return RedisType{}, err + } + out.Value = string(buf[:length]) + return out, nil + case Array: + length, err := strconv.Atoi(string(line[1:])) + if err != nil { + return RedisType{}, ErrInvalidSyntax + } + if length < 0 { + return out, nil + } + array := make([]RedisType, length) + for i := 0; i < length; i++ { + array[i], err = ParseRESP(reader) + if err != nil { + return RedisType{}, err + } + } + out.Value = array + return out, nil + default: + fmt.Println("Format not supported") + return RedisType{}, ErrInvalidSyntax + } + } else { + return RedisType{}, ErrInvalidSyntax + } +} diff --git a/toxics/redis.go b/toxics/redis.go new file mode 100644 index 00000000..7b27cf40 --- /dev/null +++ b/toxics/redis.go @@ -0,0 +1,77 @@ +package toxics + +import ( + "bufio" + "fmt" + "io" + + "github.com/Shopify/toxiproxy/stream" +) + +type RedisToxic struct { + FailWrites bool `json:"fail_writes"` +} + +type RedisToxicState struct { + Command chan stream.RedisType +} + +func (t *RedisToxic) PipeUpstream(stub *ToxicStub) { + state := stub.State.(*RedisToxicState) + + reader := bufio.NewReader(stub.ReadWriter) + for { + cmd, err := stream.ParseRESP(reader) + if stub.ReadWriter.HandleError(err) { + if err == io.EOF { + close(state.Command) + } + return + } else if err == nil { + state.Command <- cmd + str := cmd.StringArray() + fmt.Println("Command:", str) + if len(str) > 0 && str[0] == "SET" { + // Skip the backend server + stub.ReadWriter.Checkpoint() + } else { + stub.ReadWriter.Write(cmd.Raw()) + } + } + } +} + +func (t *RedisToxic) Pipe(stub *ToxicStub) { + state := stub.State.(*RedisToxicState) + + reader := bufio.NewReader(stub.ReadWriter) + for { + resp, err := stream.ParseRESP(reader) + if stub.ReadWriter.HandleError(err) { + return + } else { + select { + case <-stub.Interrupt: + return + case cmd := <-state.Command: + str := cmd.StringArray() + if len(str) > 0 && str[0] == "SET" { + stub.ReadWriter.Write(stream.RedisType{stream.Error, "ERR write failure"}.Raw()) + } else { + stub.ReadWriter.Write(resp.Raw()) + } + default: + } + } + } +} + +func (t *RedisToxic) NewState() interface{} { + return &RedisToxicState{ + Command: make(chan stream.RedisType, 1), + } +} + +func init() { + Register("redis", new(RedisToxic)) +} diff --git a/toxics/toxic.go b/toxics/toxic.go index 342b7e07..f6c601e9 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -51,21 +51,26 @@ type ToxicWrapper struct { } type ToxicStub struct { - Input <-chan *stream.StreamChunk - Output chan<- *stream.StreamChunk - State interface{} - Interrupt chan struct{} - running chan struct{} - closed chan struct{} + Input <-chan *stream.StreamChunk + Output chan<- *stream.StreamChunk + State interface{} + ReadWriter *stream.ChanReadWriter + Interrupt chan struct{} + running chan struct{} + closed chan struct{} } func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub { - return &ToxicStub{ + stub := &ToxicStub{ Interrupt: make(chan struct{}), closed: make(chan struct{}), Input: input, Output: output, } + stub.ReadWriter = stream.NewChanReadWriter(input, output) + stub.ReadWriter.SetInterrupt(stub.Interrupt) + stub.ReadWriter.SetCloser(stub) + return stub } // Begin running a toxic on this stub, can be interrupted. From 4e56917e6afb18611803f33e0186434d06380648 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Mon, 26 Sep 2016 11:54:19 -0400 Subject: [PATCH 02/10] Add some comments and cleanup --- stream/io_chan.go | 43 ++++++++++++++++++++++++++----------------- toxics/redis.go | 2 -- toxics/toxic.go | 3 +-- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/stream/io_chan.go b/stream/io_chan.go index 89833bf2..0f7a37fd 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -114,14 +114,18 @@ func (c *ChanReader) Read(out []byte) (int, error) { return n + n2, nil } +// ToxicStub can't be imported due to an import loop, so we use an interface instead type Closer interface { Close() } -// (buffered) -// chan []byte -> ChanReader -> MultiReader -> -// V ^ -// bytes.Buffer ----| +// Reader: +// chan []byte -> ChanReader -> TeeReader -> Read() -> output +// V ^ +// bytes.Buffer -> bytes.Reader +// +// Writer: +// chan []byte <- ChanWriter <- Write() <- input type ChanReadWriter struct { buffer *bytes.Buffer bufReader *bytes.Reader @@ -131,25 +135,29 @@ type ChanReadWriter struct { closer Closer } +// Handles errors returned by Read(). Returns true if the channel has closed and the caller should exit. +// Unknown errors will flush all data since the last checkpoint to the writer and return false so the +// caller can handle the error. func (c *ChanReadWriter) HandleError(err error) bool { if err == ErrInterrupted { - c.SeekBack() + c.Rollback() return true } else if err == io.EOF || err == io.ErrUnexpectedEOF { - c.SeekBack() + c.Rollback() c.Flush() if c.closer != nil { c.closer.Close() } return true } else if err != nil { - c.SeekBack() + c.Rollback() c.Flush() logrus.Warn("Read error in toxic: ", err) } return false } +// Reads from the input channel either directly, or from a buffer if Rollback() has been called. func (c *ChanReadWriter) Read(out []byte) (int, error) { if c.bufReader != nil { n, err := c.bufReader.Read(out) @@ -163,16 +171,14 @@ func (c *ChanReadWriter) Read(out []byte) (int, error) { } } +// Writes directly to the output channel and sets a checkpoint in the reader. func (c *ChanReadWriter) Write(buf []byte) (int, error) { n, err := c.writer.Write(buf) c.Checkpoint() return n, err } -func (c *ChanReadWriter) SetOutput(output chan<- *StreamChunk) { - c.writer.output = output -} - +// Flushes all buffers in the reader and writes them to the output channel. func (c *ChanReadWriter) Flush() { n := 0 if c.bufReader != nil { @@ -190,6 +196,7 @@ func (c *ChanReadWriter) Flush() { c.buffer.Reset() } +// Sets a checkpoint in the reader. A call to Rollback() will begin reading from this point. func (c *ChanReadWriter) Checkpoint() { if c.bufReader != nil { c.buffer.Next(int(c.bufReader.Size()) - c.bufReader.Len()) @@ -198,23 +205,25 @@ func (c *ChanReadWriter) Checkpoint() { } } -func (c *ChanReadWriter) SeekBack() { +// Rolls back the reader to start from the last checkpoint. +func (c *ChanReadWriter) Rollback() { c.bufReader = bytes.NewReader(c.buffer.Bytes()) } -func (c *ChanReadWriter) SetInterrupt(interrupt <-chan struct{}) { - c.reader.SetInterrupt(interrupt) +func (c *ChanReadWriter) SetOutput(output chan<- *StreamChunk) { + c.writer.output = output } -func (c *ChanReadWriter) SetCloser(closer Closer) { - c.closer = closer +func (c *ChanReadWriter) SetInterrupt(interrupt <-chan struct{}) { + c.reader.SetInterrupt(interrupt) } -func NewChanReadWriter(input <-chan *StreamChunk, output chan<- *StreamChunk) *ChanReadWriter { +func NewChanReadWriter(input <-chan *StreamChunk, output chan<- *StreamChunk, stub Closer) *ChanReadWriter { rw := &ChanReadWriter{ buffer: bytes.NewBuffer(make([]byte, 0, 32*1024)), reader: NewChanReader(input), writer: NewChanWriter(output), + closer: stub, } rw.tee = io.TeeReader(rw.reader, rw.buffer) return rw diff --git a/toxics/redis.go b/toxics/redis.go index 7b27cf40..896bc065 100644 --- a/toxics/redis.go +++ b/toxics/redis.go @@ -51,8 +51,6 @@ func (t *RedisToxic) Pipe(stub *ToxicStub) { return } else { select { - case <-stub.Interrupt: - return case cmd := <-state.Command: str := cmd.StringArray() if len(str) > 0 && str[0] == "SET" { diff --git a/toxics/toxic.go b/toxics/toxic.go index f6c601e9..db658318 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -67,9 +67,8 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream Input: input, Output: output, } - stub.ReadWriter = stream.NewChanReadWriter(input, output) + stub.ReadWriter = stream.NewChanReadWriter(input, output, stub) stub.ReadWriter.SetInterrupt(stub.Interrupt) - stub.ReadWriter.SetCloser(stub) return stub } From a0d76479aa0ec6f0021e66e164c7a09ea2a1854b Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Fri, 30 Sep 2016 13:38:33 -0400 Subject: [PATCH 03/10] Add ReadWriter tests --- stream/io_chan.go | 6 +- stream/io_chan_test.go | 137 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 1 deletion(-) diff --git a/stream/io_chan.go b/stream/io_chan.go index 0f7a37fd..941d7844 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -163,7 +163,11 @@ func (c *ChanReadWriter) Read(out []byte) (int, error) { n, err := c.bufReader.Read(out) if err == io.EOF { c.bufReader = nil - return n, nil + if n > 0 { + return n, nil + } else { + return c.tee.Read(out) + } } return n, err } else { diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index f4852448..b7756304 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -260,3 +260,140 @@ func TestBlankWrite(t *testing.T) { t.Fatalf("Unexpected write to channel: %+v", v) } } + +type mockCloser chan struct{} + +func (c mockCloser) Close() { + close(c) +} + +func NewTestReadWriter() (*ChanReadWriter, chan *StreamChunk, chan struct{}) { + input := make(chan *StreamChunk, 2) + output := make(chan *StreamChunk, 1) + closer := make(mockCloser) + rw := NewChanReadWriter(input, output, closer) + input <- &StreamChunk{[]byte("hello world"), time.Now()} + input <- &StreamChunk{[]byte("foobar"), time.Now()} + close(input) + return rw, output, closer +} + +func AssertRead(t *testing.T, rw *ChanReadWriter, buf []byte, msg string, expectedErr error) (n int, err error, ret bool) { + n, err = rw.Read(buf) + ret = rw.HandleError(err) + + if n != len(msg) { + t.Fatalf("Read wrong number of bytes: %d expected %d", n, len(msg)) + } + + if err != expectedErr { + t.Fatal("Unexpected error during read:", err) + } + if err == io.EOF || err == io.ErrUnexpectedEOF || err == ErrInterrupted { + if !ret { + t.Fatal("HandleError() returned true without error") + } + } else { + if ret { + t.Fatal("HandleError() did not return true for error:", err) + } + } + if !bytes.Equal(buf[:n], []byte(msg)) { + t.Fatalf("Got wrong message from stream: %s expected %s", string(buf[:n]), msg) + } + return +} + +func AssertClosed(t *testing.T, output chan *StreamChunk, closer chan struct{}, expectedOutput []byte) { + select { + case msg := <-output: + if expectedOutput == nil { + t.Fatal("Unexpected message written to output channel:", string(msg.Data)) + } else if !bytes.Equal(msg.Data, expectedOutput) { + t.Fatal("Wrong message written to output channel:", string(msg.Data), "expected", string(expectedOutput)) + } + default: + if expectedOutput != nil { + t.Fatal("Expected message to be written to output channel:", string(expectedOutput)) + } + } + + select { + case <-closer: + default: + t.Fatal("Closer was not closed at end of stream") + } +} + +func TestReadWriterBasicFull(t *testing.T) { + rw, output, closer := NewTestReadWriter() + buf := make([]byte, 32) + + AssertRead(t, rw, buf, "hello world", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "foobar", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, nil) +} + +func TestReadWriterCheckpointRollback(t *testing.T) { + rw, output, closer := NewTestReadWriter() + buf := make([]byte, 8) + + AssertRead(t, rw, buf, "hello wo", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "rldfooba", nil) + rw.Rollback() + AssertRead(t, rw, buf, "rldfooba", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "r", nil) + rw.Rollback() + AssertRead(t, rw, buf, "r", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, nil) +} + +func TestReadWriterFlush(t *testing.T) { + rw, output, closer := NewTestReadWriter() + buf := make([]byte, 8) + + AssertRead(t, rw, buf, "hello wo", nil) + rw.Flush() + AssertRead(t, rw, buf, "foobar", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, []byte("rld")) +} + +func TestReadWriterNoCheckpoint(t *testing.T) { + rw, output, closer := NewTestReadWriter() + buf := make([]byte, 32) + + AssertRead(t, rw, buf, "hello world", nil) + AssertRead(t, rw, buf, "foobar", nil) + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, []byte("hello worldfoobar")) +} + +func TestReadWriterInterrupt(t *testing.T) { + rw, output, closer := NewTestReadWriter() + interrupt := make(chan struct{}, 1) + rw.SetInterrupt(interrupt) + buf := make([]byte, 32) + + AssertRead(t, rw, buf, "hello world", nil) + rw.Checkpoint() + interrupt <- struct{}{} + AssertRead(t, rw, buf, "", ErrInterrupted) + AssertRead(t, rw, buf, "foobar", nil) + rw.Checkpoint() + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, nil) +} From b3650a41238d6a80127ebc3590626998f270dec5 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Fri, 30 Sep 2016 15:10:31 -0400 Subject: [PATCH 04/10] Fix checkpoints usage when used with a buffered reader --- stream/io_chan.go | 20 +++++++-- stream/io_chan_test.go | 99 +++++++++++++++++++++++++++++++++++------- toxics/redis.go | 4 +- 3 files changed, 103 insertions(+), 20 deletions(-) diff --git a/stream/io_chan.go b/stream/io_chan.go index 941d7844..49cf8776 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -178,7 +178,6 @@ func (c *ChanReadWriter) Read(out []byte) (int, error) { // Writes directly to the output channel and sets a checkpoint in the reader. func (c *ChanReadWriter) Write(buf []byte) (int, error) { n, err := c.writer.Write(buf) - c.Checkpoint() return n, err } @@ -201,11 +200,26 @@ func (c *ChanReadWriter) Flush() { } // Sets a checkpoint in the reader. A call to Rollback() will begin reading from this point. -func (c *ChanReadWriter) Checkpoint() { +// If offset is negative, the checkpoint will be set N bytes before the current position. +// If the offset is positive, the checkpoint will be set N bytes after the previous checkpoint. +// An offset of 0 will set the checkpoint to the current position. +func (c *ChanReadWriter) Checkpoint(offset int) { + current := c.buffer.Len() if c.bufReader != nil { - c.buffer.Next(int(c.bufReader.Size()) - c.bufReader.Len()) + current = int(c.bufReader.Size()) - c.bufReader.Len() + } + + n := current + if offset > 0 { + n = offset } else { + n = current + offset + } + + if n >= current { c.buffer.Reset() + } else { + c.buffer.Next(n) } } diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index b7756304..1b8d4a95 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -1,6 +1,7 @@ package stream import ( + "bufio" "bytes" "io" "testing" @@ -283,7 +284,10 @@ func AssertRead(t *testing.T, rw *ChanReadWriter, buf []byte, msg string, expect ret = rw.HandleError(err) if n != len(msg) { - t.Fatalf("Read wrong number of bytes: %d expected %d", n, len(msg)) + t.Fatalf("Read wrong number of bytes: %d expected %d (%s expected %s)", n, len(msg), string(buf[:n]), msg) + } + if !bytes.Equal(buf[:n], []byte(msg)) { + t.Fatalf("Got wrong message from stream: %s expected %s", string(buf[:n]), msg) } if err != expectedErr { @@ -298,9 +302,6 @@ func AssertRead(t *testing.T, rw *ChanReadWriter, buf []byte, msg string, expect t.Fatal("HandleError() did not return true for error:", err) } } - if !bytes.Equal(buf[:n], []byte(msg)) { - t.Fatalf("Got wrong message from stream: %s expected %s", string(buf[:n]), msg) - } return } @@ -330,9 +331,9 @@ func TestReadWriterBasicFull(t *testing.T) { buf := make([]byte, 32) AssertRead(t, rw, buf, "hello world", nil) - rw.Checkpoint() + rw.Checkpoint(0) AssertRead(t, rw, buf, "foobar", nil) - rw.Checkpoint() + rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) AssertClosed(t, output, closer, nil) @@ -343,15 +344,15 @@ func TestReadWriterCheckpointRollback(t *testing.T) { buf := make([]byte, 8) AssertRead(t, rw, buf, "hello wo", nil) - rw.Checkpoint() + rw.Checkpoint(0) AssertRead(t, rw, buf, "rldfooba", nil) rw.Rollback() AssertRead(t, rw, buf, "rldfooba", nil) - rw.Checkpoint() + rw.Checkpoint(-3) AssertRead(t, rw, buf, "r", nil) rw.Rollback() - AssertRead(t, rw, buf, "r", nil) - rw.Checkpoint() + AssertRead(t, rw, buf, "obar", nil) + rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) AssertClosed(t, output, closer, nil) @@ -364,7 +365,7 @@ func TestReadWriterFlush(t *testing.T) { AssertRead(t, rw, buf, "hello wo", nil) rw.Flush() AssertRead(t, rw, buf, "foobar", nil) - rw.Checkpoint() + rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) AssertClosed(t, output, closer, []byte("rld")) @@ -382,17 +383,83 @@ func TestReadWriterNoCheckpoint(t *testing.T) { } func TestReadWriterInterrupt(t *testing.T) { - rw, output, closer := NewTestReadWriter() - interrupt := make(chan struct{}, 1) + input := make(chan *StreamChunk, 1) + output := make(chan *StreamChunk, 1) + closer := make(mockCloser) + rw := NewChanReadWriter(input, output, closer) + input <- &StreamChunk{[]byte("hello world"), time.Now()} + interrupt := make(chan struct{}) rw.SetInterrupt(interrupt) buf := make([]byte, 32) AssertRead(t, rw, buf, "hello world", nil) - rw.Checkpoint() - interrupt <- struct{}{} + rw.Checkpoint(0) + go func() { + interrupt <- struct{}{} + input <- &StreamChunk{[]byte("foobar"), time.Now()} + close(input) + }() AssertRead(t, rw, buf, "", ErrInterrupted) AssertRead(t, rw, buf, "foobar", nil) - rw.Checkpoint() + rw.Checkpoint(0) + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, nil) +} + +func TestReadWriterBufferedReader(t *testing.T) { + rw, output, closer := NewTestReadWriter() + buf := make([]byte, 32) + reader := bufio.NewReader(rw) + msg, err := reader.ReadString(' ') + ret := rw.HandleError(err) + + if ret || err != nil { + t.Fatal("Read failed through bufio.Reader:", err, ret) + } + if msg != "hello " { + t.Fatal("Buffered reader read wrong message:", msg) + } + if reader.Buffered() != 5 { + t.Fatal("Unexpected number of buffered bytes in reader:", reader.Buffered()) + } + + rw.Checkpoint(-reader.Buffered()) + rw.Rollback() + + AssertRead(t, rw, buf, "world", nil) + rw.Checkpoint(0) + AssertRead(t, rw, buf, "foobar", nil) + rw.Checkpoint(0) + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, output, closer, nil) +} + +func TestReadWriterBufferedReaderAlternate(t *testing.T) { + rw, output, closer := NewTestReadWriter() + buf := make([]byte, 32) + reader := bufio.NewReader(rw) + msg, err := reader.ReadString(' ') + ret := rw.HandleError(err) + + if ret || err != nil { + t.Fatal("Read failed through bufio.Reader:", err, ret) + } + if msg != "hello " { + t.Fatal("Buffered reader read wrong message:", msg) + } + if reader.Buffered() != 5 { + t.Fatal("Unexpected number of buffered bytes in reader:", reader.Buffered()) + } + + rw.Checkpoint(len(msg)) + rw.Rollback() + + AssertRead(t, rw, buf, "world", nil) + rw.Checkpoint(0) + AssertRead(t, rw, buf, "foobar", nil) + rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) AssertClosed(t, output, closer, nil) diff --git a/toxics/redis.go b/toxics/redis.go index 896bc065..12b36b96 100644 --- a/toxics/redis.go +++ b/toxics/redis.go @@ -33,10 +33,10 @@ func (t *RedisToxic) PipeUpstream(stub *ToxicStub) { fmt.Println("Command:", str) if len(str) > 0 && str[0] == "SET" { // Skip the backend server - stub.ReadWriter.Checkpoint() } else { stub.ReadWriter.Write(cmd.Raw()) } + stub.ReadWriter.Checkpoint(-reader.Buffered()) } } } @@ -59,7 +59,9 @@ func (t *RedisToxic) Pipe(stub *ToxicStub) { stub.ReadWriter.Write(resp.Raw()) } default: + stub.ReadWriter.Write(resp.Raw()) } + stub.ReadWriter.Checkpoint(-reader.Buffered()) } } } From 85b172803a910233e8a57d7f2defb5386ce1e8f4 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Mon, 3 Oct 2016 16:09:38 -0400 Subject: [PATCH 05/10] Separate reader and writer --- link.go | 6 +- stream/io_chan.go | 125 ++++++++++++++-------------------------- stream/io_chan_test.go | 127 ++++++++++++++++++++++------------------- toxics/redis.go | 20 +++---- toxics/toxic.go | 43 +++++++++++--- 5 files changed, 157 insertions(+), 164 deletions(-) diff --git a/link.go b/link.go index 12df52bf..6bffe98f 100644 --- a/link.go +++ b/link.go @@ -97,7 +97,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) { // Interrupt the last toxic so that we don't have a race when moving channels if link.stubs[i-1].InterruptToxic() { link.stubs[i-1].Output = newin - link.stubs[i-1].ReadWriter.SetOutput(newin) + link.stubs[i-1].Writer.SetOutput(newin) if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok { link.stubs[i].State = stateful.NewState() @@ -131,7 +131,7 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) { }() // Flush toxic's internal buffer - link.stubs[i].ReadWriter.Flush() + link.stubs[i].Reader.FlushTo(link.stubs[i].Writer) // Unblock the previous toxic if it is trying to flush. // If the previous toxic is closed, continue flushing until we reach the end. @@ -164,7 +164,7 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) { } link.stubs[i-1].Output = link.stubs[i].Output - link.stubs[i-1].ReadWriter.SetOutput(link.stubs[i].Output) + link.stubs[i-1].Writer.SetOutput(link.stubs[i].Output) link.stubs = append(link.stubs[:i], link.stubs[i+1:]...) go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1]) diff --git a/stream/io_chan.go b/stream/io_chan.go index 49cf8776..4e4e41ce 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -5,8 +5,6 @@ import ( "fmt" "io" "time" - - "github.com/Sirupsen/logrus" ) type Direction uint8 @@ -45,6 +43,10 @@ func (c *ChanWriter) Write(buf []byte) (int, error) { return len(buf), nil } +func (c *ChanWriter) SetOutput(output chan<- *StreamChunk) { + c.output = output +} + // Close the output channel func (c *ChanWriter) Close() error { close(c.output) @@ -114,99 +116,73 @@ func (c *ChanReader) Read(out []byte) (int, error) { return n + n2, nil } -// ToxicStub can't be imported due to an import loop, so we use an interface instead -type Closer interface { - Close() -} - -// Reader: +// TransactionalReader is a ChanReader that can rollback its progress to checkpoints. +// This is useful when using other buffered readers, since they may read past the end of a message. +// The buffered reader can later be removed by rolling back any buffered bytes. +// // chan []byte -> ChanReader -> TeeReader -> Read() -> output // V ^ // bytes.Buffer -> bytes.Reader -// -// Writer: -// chan []byte <- ChanWriter <- Write() <- input -type ChanReadWriter struct { +type TransactionalReader struct { buffer *bytes.Buffer bufReader *bytes.Reader reader *ChanReader - writer *ChanWriter tee io.Reader - closer Closer -} - -// Handles errors returned by Read(). Returns true if the channel has closed and the caller should exit. -// Unknown errors will flush all data since the last checkpoint to the writer and return false so the -// caller can handle the error. -func (c *ChanReadWriter) HandleError(err error) bool { - if err == ErrInterrupted { - c.Rollback() - return true - } else if err == io.EOF || err == io.ErrUnexpectedEOF { - c.Rollback() - c.Flush() - if c.closer != nil { - c.closer.Close() - } - return true - } else if err != nil { - c.Rollback() - c.Flush() - logrus.Warn("Read error in toxic: ", err) +} + +func NewTransactionalReader(input <-chan *StreamChunk) *TransactionalReader { + t := &TransactionalReader{ + buffer: bytes.NewBuffer(make([]byte, 0, 32*1024)), + reader: NewChanReader(input), } - return false + t.tee = io.TeeReader(t.reader, t.buffer) + return t } // Reads from the input channel either directly, or from a buffer if Rollback() has been called. -func (c *ChanReadWriter) Read(out []byte) (int, error) { - if c.bufReader != nil { - n, err := c.bufReader.Read(out) +func (t *TransactionalReader) Read(out []byte) (int, error) { + if t.bufReader != nil { + n, err := t.bufReader.Read(out) if err == io.EOF { - c.bufReader = nil + t.bufReader = nil if n > 0 { return n, nil } else { - return c.tee.Read(out) + return t.tee.Read(out) } } return n, err } else { - return c.tee.Read(out) + return t.tee.Read(out) } } -// Writes directly to the output channel and sets a checkpoint in the reader. -func (c *ChanReadWriter) Write(buf []byte) (int, error) { - n, err := c.writer.Write(buf) - return n, err -} - -// Flushes all buffers in the reader and writes them to the output channel. -func (c *ChanReadWriter) Flush() { +// Flushes all buffers in the reader to the specified writer. +func (t *TransactionalReader) FlushTo(writer io.Writer) { n := 0 - if c.bufReader != nil { - n = c.bufReader.Len() + if t.bufReader != nil { + n = t.bufReader.Len() } - buf := make([]byte, n+len(c.reader.buffer)) + buf := make([]byte, n+len(t.reader.buffer)) if n > 0 { - c.bufReader.Read(buf[:n]) + t.bufReader.Read(buf[:n]) } if len(buf[n:]) > 0 { - c.reader.Read(buf[n:]) + t.reader.Read(buf[n:]) } - c.writer.Write(buf) - c.bufReader = nil - c.buffer.Reset() + writer.Write(buf) + t.bufReader = nil + t.buffer.Reset() } // Sets a checkpoint in the reader. A call to Rollback() will begin reading from this point. // If offset is negative, the checkpoint will be set N bytes before the current position. // If the offset is positive, the checkpoint will be set N bytes after the previous checkpoint. // An offset of 0 will set the checkpoint to the current position. -func (c *ChanReadWriter) Checkpoint(offset int) { - current := c.buffer.Len() - if c.bufReader != nil { - current = int(c.bufReader.Size()) - c.bufReader.Len() +func (t *TransactionalReader) Checkpoint(offset int) { + current := t.buffer.Len() + if t.bufReader != nil { + current = int(t.bufReader.Size()) - t.bufReader.Len() } n := current @@ -217,32 +193,17 @@ func (c *ChanReadWriter) Checkpoint(offset int) { } if n >= current { - c.buffer.Reset() + t.buffer.Reset() } else { - c.buffer.Next(n) + t.buffer.Next(n) } } // Rolls back the reader to start from the last checkpoint. -func (c *ChanReadWriter) Rollback() { - c.bufReader = bytes.NewReader(c.buffer.Bytes()) -} - -func (c *ChanReadWriter) SetOutput(output chan<- *StreamChunk) { - c.writer.output = output +func (t *TransactionalReader) Rollback() { + t.bufReader = bytes.NewReader(t.buffer.Bytes()) } -func (c *ChanReadWriter) SetInterrupt(interrupt <-chan struct{}) { - c.reader.SetInterrupt(interrupt) -} - -func NewChanReadWriter(input <-chan *StreamChunk, output chan<- *StreamChunk, stub Closer) *ChanReadWriter { - rw := &ChanReadWriter{ - buffer: bytes.NewBuffer(make([]byte, 0, 32*1024)), - reader: NewChanReader(input), - writer: NewChanWriter(output), - closer: stub, - } - rw.tee = io.TeeReader(rw.reader, rw.buffer) - return rw +func (t *TransactionalReader) SetInterrupt(interrupt <-chan struct{}) { + t.reader.SetInterrupt(interrupt) } diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index 1b8d4a95..75aea7fb 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -262,26 +262,47 @@ func TestBlankWrite(t *testing.T) { } } -type mockCloser chan struct{} +type TestReadWriter struct { + *TransactionalReader + *ChanWriter + input chan *StreamChunk + output chan *StreamChunk + closer chan struct{} +} + +func (c *TestReadWriter) Close() { + close(c.closer) +} -func (c mockCloser) Close() { - close(c) +func NewTestReadWriter() *TestReadWriter { + rw := &TestReadWriter{ + input: make(chan *StreamChunk, 2), + output: make(chan *StreamChunk, 1), + closer: make(chan struct{}), + } + rw.TransactionalReader = NewTransactionalReader(rw.input) + rw.ChanWriter = NewChanWriter(rw.output) + rw.input <- &StreamChunk{[]byte("hello world"), time.Now()} + rw.input <- &StreamChunk{[]byte("foobar"), time.Now()} + close(rw.input) + return rw } -func NewTestReadWriter() (*ChanReadWriter, chan *StreamChunk, chan struct{}) { - input := make(chan *StreamChunk, 2) - output := make(chan *StreamChunk, 1) - closer := make(mockCloser) - rw := NewChanReadWriter(input, output, closer) - input <- &StreamChunk{[]byte("hello world"), time.Now()} - input <- &StreamChunk{[]byte("foobar"), time.Now()} - close(input) - return rw, output, closer +func HandleError(t *testing.T, rw *TestReadWriter, err error, expectedErr error) { + if err != expectedErr { + t.Fatalf("Unexpected error during read: %v != %v", err, expectedErr) + } + if err == ErrInterrupted { + rw.Rollback() + } else if err == io.EOF { + rw.Rollback() + rw.FlushTo(rw) + rw.Close() + } } -func AssertRead(t *testing.T, rw *ChanReadWriter, buf []byte, msg string, expectedErr error) (n int, err error, ret bool) { - n, err = rw.Read(buf) - ret = rw.HandleError(err) +func AssertRead(t *testing.T, rw *TestReadWriter, buf []byte, msg string, expectedErr error) { + n, err := rw.Read(buf) if n != len(msg) { t.Fatalf("Read wrong number of bytes: %d expected %d (%s expected %s)", n, len(msg), string(buf[:n]), msg) @@ -290,24 +311,12 @@ func AssertRead(t *testing.T, rw *ChanReadWriter, buf []byte, msg string, expect t.Fatalf("Got wrong message from stream: %s expected %s", string(buf[:n]), msg) } - if err != expectedErr { - t.Fatal("Unexpected error during read:", err) - } - if err == io.EOF || err == io.ErrUnexpectedEOF || err == ErrInterrupted { - if !ret { - t.Fatal("HandleError() returned true without error") - } - } else { - if ret { - t.Fatal("HandleError() did not return true for error:", err) - } - } - return + HandleError(t, rw, err, expectedErr) } -func AssertClosed(t *testing.T, output chan *StreamChunk, closer chan struct{}, expectedOutput []byte) { +func AssertClosed(t *testing.T, rw *TestReadWriter, expectedOutput []byte) { select { - case msg := <-output: + case msg := <-rw.output: if expectedOutput == nil { t.Fatal("Unexpected message written to output channel:", string(msg.Data)) } else if !bytes.Equal(msg.Data, expectedOutput) { @@ -320,14 +329,14 @@ func AssertClosed(t *testing.T, output chan *StreamChunk, closer chan struct{}, } select { - case <-closer: + case <-rw.closer: default: t.Fatal("Closer was not closed at end of stream") } } func TestReadWriterBasicFull(t *testing.T) { - rw, output, closer := NewTestReadWriter() + rw := NewTestReadWriter() buf := make([]byte, 32) AssertRead(t, rw, buf, "hello world", nil) @@ -336,11 +345,11 @@ func TestReadWriterBasicFull(t *testing.T) { rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, nil) + AssertClosed(t, rw, nil) } func TestReadWriterCheckpointRollback(t *testing.T) { - rw, output, closer := NewTestReadWriter() + rw := NewTestReadWriter() buf := make([]byte, 8) AssertRead(t, rw, buf, "hello wo", nil) @@ -355,39 +364,43 @@ func TestReadWriterCheckpointRollback(t *testing.T) { rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, nil) + AssertClosed(t, rw, nil) } func TestReadWriterFlush(t *testing.T) { - rw, output, closer := NewTestReadWriter() + rw := NewTestReadWriter() buf := make([]byte, 8) AssertRead(t, rw, buf, "hello wo", nil) - rw.Flush() + rw.FlushTo(rw) AssertRead(t, rw, buf, "foobar", nil) rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, []byte("rld")) + AssertClosed(t, rw, []byte("rld")) } func TestReadWriterNoCheckpoint(t *testing.T) { - rw, output, closer := NewTestReadWriter() + rw := NewTestReadWriter() buf := make([]byte, 32) AssertRead(t, rw, buf, "hello world", nil) AssertRead(t, rw, buf, "foobar", nil) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, []byte("hello worldfoobar")) + AssertClosed(t, rw, []byte("hello worldfoobar")) } func TestReadWriterInterrupt(t *testing.T) { - input := make(chan *StreamChunk, 1) - output := make(chan *StreamChunk, 1) - closer := make(mockCloser) - rw := NewChanReadWriter(input, output, closer) - input <- &StreamChunk{[]byte("hello world"), time.Now()} + rw := &TestReadWriter{ + input: make(chan *StreamChunk, 1), + output: make(chan *StreamChunk, 1), + closer: make(chan struct{}), + } + rw.TransactionalReader = NewTransactionalReader(rw.input) + rw.ChanWriter = NewChanWriter(rw.output) + rw.input <- &StreamChunk{[]byte("hello world"), time.Now()} + interrupt := make(chan struct{}) rw.SetInterrupt(interrupt) buf := make([]byte, 32) @@ -396,27 +409,24 @@ func TestReadWriterInterrupt(t *testing.T) { rw.Checkpoint(0) go func() { interrupt <- struct{}{} - input <- &StreamChunk{[]byte("foobar"), time.Now()} - close(input) + rw.input <- &StreamChunk{[]byte("foobar"), time.Now()} + close(rw.input) }() AssertRead(t, rw, buf, "", ErrInterrupted) AssertRead(t, rw, buf, "foobar", nil) rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, nil) + AssertClosed(t, rw, nil) } func TestReadWriterBufferedReader(t *testing.T) { - rw, output, closer := NewTestReadWriter() + rw := NewTestReadWriter() buf := make([]byte, 32) reader := bufio.NewReader(rw) msg, err := reader.ReadString(' ') - ret := rw.HandleError(err) + HandleError(t, rw, err, nil) - if ret || err != nil { - t.Fatal("Read failed through bufio.Reader:", err, ret) - } if msg != "hello " { t.Fatal("Buffered reader read wrong message:", msg) } @@ -433,19 +443,16 @@ func TestReadWriterBufferedReader(t *testing.T) { rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, nil) + AssertClosed(t, rw, nil) } func TestReadWriterBufferedReaderAlternate(t *testing.T) { - rw, output, closer := NewTestReadWriter() + rw := NewTestReadWriter() buf := make([]byte, 32) reader := bufio.NewReader(rw) msg, err := reader.ReadString(' ') - ret := rw.HandleError(err) + HandleError(t, rw, err, nil) - if ret || err != nil { - t.Fatal("Read failed through bufio.Reader:", err, ret) - } if msg != "hello " { t.Fatal("Buffered reader read wrong message:", msg) } @@ -462,5 +469,5 @@ func TestReadWriterBufferedReaderAlternate(t *testing.T) { rw.Checkpoint(0) AssertRead(t, rw, buf, "", io.EOF) - AssertClosed(t, output, closer, nil) + AssertClosed(t, rw, nil) } diff --git a/toxics/redis.go b/toxics/redis.go index 12b36b96..38ff7d8a 100644 --- a/toxics/redis.go +++ b/toxics/redis.go @@ -19,10 +19,10 @@ type RedisToxicState struct { func (t *RedisToxic) PipeUpstream(stub *ToxicStub) { state := stub.State.(*RedisToxicState) - reader := bufio.NewReader(stub.ReadWriter) + reader := bufio.NewReader(stub.Reader) for { cmd, err := stream.ParseRESP(reader) - if stub.ReadWriter.HandleError(err) { + if stub.HandleReadError(err) { if err == io.EOF { close(state.Command) } @@ -34,9 +34,9 @@ func (t *RedisToxic) PipeUpstream(stub *ToxicStub) { if len(str) > 0 && str[0] == "SET" { // Skip the backend server } else { - stub.ReadWriter.Write(cmd.Raw()) + stub.Writer.Write(cmd.Raw()) } - stub.ReadWriter.Checkpoint(-reader.Buffered()) + stub.Reader.Checkpoint(-reader.Buffered()) } } } @@ -44,24 +44,24 @@ func (t *RedisToxic) PipeUpstream(stub *ToxicStub) { func (t *RedisToxic) Pipe(stub *ToxicStub) { state := stub.State.(*RedisToxicState) - reader := bufio.NewReader(stub.ReadWriter) + reader := bufio.NewReader(stub.Reader) for { resp, err := stream.ParseRESP(reader) - if stub.ReadWriter.HandleError(err) { + if stub.HandleReadError(err) { return } else { select { case cmd := <-state.Command: str := cmd.StringArray() if len(str) > 0 && str[0] == "SET" { - stub.ReadWriter.Write(stream.RedisType{stream.Error, "ERR write failure"}.Raw()) + stub.Writer.Write(stream.RedisType{stream.Error, "ERR write failure"}.Raw()) } else { - stub.ReadWriter.Write(resp.Raw()) + stub.Writer.Write(resp.Raw()) } default: - stub.ReadWriter.Write(resp.Raw()) + stub.Writer.Write(resp.Raw()) } - stub.ReadWriter.Checkpoint(-reader.Buffered()) + stub.Reader.Checkpoint(-reader.Buffered()) } } } diff --git a/toxics/toxic.go b/toxics/toxic.go index db658318..0ad085e0 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -1,11 +1,13 @@ package toxics import ( + "io" "math/rand" "reflect" "sync" "github.com/Shopify/toxiproxy/stream" + "github.com/Sirupsen/logrus" ) // A Toxic is something that can be attatched to a link to modify the way @@ -51,13 +53,14 @@ type ToxicWrapper struct { } type ToxicStub struct { - Input <-chan *stream.StreamChunk - Output chan<- *stream.StreamChunk - State interface{} - ReadWriter *stream.ChanReadWriter - Interrupt chan struct{} - running chan struct{} - closed chan struct{} + Input <-chan *stream.StreamChunk + Output chan<- *stream.StreamChunk + State interface{} + Reader *stream.TransactionalReader + Writer *stream.ChanWriter + Interrupt chan struct{} + running chan struct{} + closed chan struct{} } func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub { @@ -66,9 +69,10 @@ func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.Stream closed: make(chan struct{}), Input: input, Output: output, + Reader: stream.NewTransactionalReader(input), + Writer: stream.NewChanWriter(output), } - stub.ReadWriter = stream.NewChanReadWriter(input, output, stub) - stub.ReadWriter.SetInterrupt(stub.Interrupt) + stub.Reader.SetInterrupt(stub.Interrupt) return stub } @@ -101,6 +105,27 @@ func (s *ToxicStub) Close() { close(s.Output) } +// Handles errors returned by the stub's TransactionalReader. Returns true if the channel +// has closed and the caller should exit. +// Unknown errors will flush all data since the last checkpoint to the writer and return false so the +// caller can handle the error. +func (s *ToxicStub) HandleReadError(err error) bool { + if err == stream.ErrInterrupted { + s.Reader.Rollback() + return true + } else if err == io.EOF || err == io.ErrUnexpectedEOF { + s.Reader.Rollback() + s.Reader.FlushTo(s.Writer) + s.Close() + return true + } else if err != nil { + s.Reader.Rollback() + s.Reader.FlushTo(s.Writer) + logrus.Warn("Read error in toxic: ", err) + } + return false +} + var ToxicRegistry map[string]Toxic var registryMutex sync.RWMutex From cf6492a835a36976051a4b9b2b334a6d8e140d5a Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Tue, 4 Oct 2016 11:33:50 -0400 Subject: [PATCH 06/10] Update Noop example for ChanReader --- CREATING_TOXICS.md | 14 ++++---------- stream/io_chan_test.go | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/CREATING_TOXICS.md b/CREATING_TOXICS.md index ab39bc83..bee128d7 100644 --- a/CREATING_TOXICS.md +++ b/CREATING_TOXICS.md @@ -145,19 +145,13 @@ An implementation of the noop toxic above using the stream package would look so ```go func (t *NoopToxic) Pipe(stub *toxics.ToxicStub) { buf := make([]byte, 32*1024) - writer := stream.NewChanWriter(stub.Output) - reader := stream.NewChanReader(stub.Input) - reader.SetInterrupt(stub.Interrupt) for { - n, err := reader.Read(buf) - if err == stream.ErrInterrupted { - writer.Write(buf[:n]) - return - } else if err == io.EOF { - stub.Close() + n, err := stub.Reader.Read(buf) + if stub.HandleReadError(err) { return } - writer.Write(buf[:n]) + stub.Writer.Write(buf[:n]) + stub.Reader.Checkpoint(0) } } ``` diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index 75aea7fb..f370b526 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -380,6 +380,20 @@ func TestReadWriterFlush(t *testing.T) { AssertClosed(t, rw, []byte("rld")) } +func TestReadWriterDoubleFlush(t *testing.T) { + rw := NewTestReadWriter() + buf := make([]byte, 8) + + AssertRead(t, rw, buf, "hello wo", nil) + rw.FlushTo(rw) + rw.FlushTo(rw) + AssertRead(t, rw, buf, "foobar", nil) + rw.FlushTo(rw) + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, rw, []byte("rld")) +} + func TestReadWriterNoCheckpoint(t *testing.T) { rw := NewTestReadWriter() buf := make([]byte, 32) From 04822788f83542680806a8dca21547f2e761407b Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Tue, 4 Oct 2016 11:35:09 -0400 Subject: [PATCH 07/10] Remove redis toxic --- stream/redis.go | 155 ------------------------------------------------ toxics/redis.go | 77 ------------------------ 2 files changed, 232 deletions(-) delete mode 100644 stream/redis.go delete mode 100644 toxics/redis.go diff --git a/stream/redis.go b/stream/redis.go deleted file mode 100644 index d2cac3a1..00000000 --- a/stream/redis.go +++ /dev/null @@ -1,155 +0,0 @@ -package stream - -import ( - "bufio" - "errors" - "fmt" - "io" - "strconv" -) - -const ( - SimpleString = '+' - Error = '-' - Integer = ':' - BulkString = '$' - Array = '*' - Null = '\x00' -) - -var ErrInvalidSyntax = errors.New("invalid RESP syntax") - -type RedisType struct { - Type byte - Value interface{} -} - -func (t RedisType) String() string { - if v, ok := t.Value.(string); ok { - return v - } - return "" -} - -func (t RedisType) Integer() int64 { - if v, ok := t.Value.(int64); ok { - return v - } - return 0 -} - -func (t RedisType) Array() []RedisType { - if v, ok := t.Value.([]RedisType); ok { - return v - } - return nil -} - -func (t RedisType) StringArray() []string { - array := t.Array() - out := make([]string, len(array)) - for i := range array { - out[i] = array[i].String() - } - return out -} - -func (t RedisType) Raw() []byte { - out := []byte{t.Type} - switch t.Type { - case SimpleString, Error: - out = append(out, []byte(t.String())...) - out = append(out, '\r', '\n') - case Integer: - out = append(out, []byte(strconv.FormatInt(t.Integer(), 10))...) - out = append(out, '\r', '\n') - case BulkString: - value := t.String() - out = append(out, []byte(strconv.Itoa(len(value)))...) - out = append(out, '\r', '\n') - out = append(out, []byte(value)...) - out = append(out, '\r', '\n') - case Array: - value := t.Array() - out = append(out, []byte(strconv.Itoa(len(value)))...) - out = append(out, '\r', '\n') - for i := 0; i < len(value); i++ { - out = append(out, value[i].Raw()...) - } - default: - return nil - } - return out -} - -func readLine(reader *bufio.Reader) ([]byte, error) { - line, err := reader.ReadSlice('\n') - if err != nil { - return nil, err - } - - if len(line) < 1 || line[len(line)-2] != '\r' { - return nil, ErrInvalidSyntax - } - return line[:len(line)-2], nil -} - -func ParseRESP(reader *bufio.Reader) (RedisType, error) { - var out RedisType - line, err := readLine(reader) - if err != nil { - return RedisType{}, err - } - - if len(line) > 0 { - out.Type = line[0] - switch line[0] { - case SimpleString, Error: - out.Value = string(line[1:]) - return out, nil - case Integer: - out.Value, err = strconv.ParseInt(string(line[1:]), 10, 64) - if err != nil { - return RedisType{}, ErrInvalidSyntax - } - return out, nil - case BulkString: - length, err := strconv.Atoi(string(line[1:])) - if err != nil { - return RedisType{}, ErrInvalidSyntax - } - if length < 0 { - return out, nil - } - buf := make([]byte, length+2) - _, err = io.ReadFull(reader, buf) - if err != nil { - return RedisType{}, err - } - out.Value = string(buf[:length]) - return out, nil - case Array: - length, err := strconv.Atoi(string(line[1:])) - if err != nil { - return RedisType{}, ErrInvalidSyntax - } - if length < 0 { - return out, nil - } - array := make([]RedisType, length) - for i := 0; i < length; i++ { - array[i], err = ParseRESP(reader) - if err != nil { - return RedisType{}, err - } - } - out.Value = array - return out, nil - default: - fmt.Println("Format not supported") - return RedisType{}, ErrInvalidSyntax - } - } else { - return RedisType{}, ErrInvalidSyntax - } -} diff --git a/toxics/redis.go b/toxics/redis.go deleted file mode 100644 index 38ff7d8a..00000000 --- a/toxics/redis.go +++ /dev/null @@ -1,77 +0,0 @@ -package toxics - -import ( - "bufio" - "fmt" - "io" - - "github.com/Shopify/toxiproxy/stream" -) - -type RedisToxic struct { - FailWrites bool `json:"fail_writes"` -} - -type RedisToxicState struct { - Command chan stream.RedisType -} - -func (t *RedisToxic) PipeUpstream(stub *ToxicStub) { - state := stub.State.(*RedisToxicState) - - reader := bufio.NewReader(stub.Reader) - for { - cmd, err := stream.ParseRESP(reader) - if stub.HandleReadError(err) { - if err == io.EOF { - close(state.Command) - } - return - } else if err == nil { - state.Command <- cmd - str := cmd.StringArray() - fmt.Println("Command:", str) - if len(str) > 0 && str[0] == "SET" { - // Skip the backend server - } else { - stub.Writer.Write(cmd.Raw()) - } - stub.Reader.Checkpoint(-reader.Buffered()) - } - } -} - -func (t *RedisToxic) Pipe(stub *ToxicStub) { - state := stub.State.(*RedisToxicState) - - reader := bufio.NewReader(stub.Reader) - for { - resp, err := stream.ParseRESP(reader) - if stub.HandleReadError(err) { - return - } else { - select { - case cmd := <-state.Command: - str := cmd.StringArray() - if len(str) > 0 && str[0] == "SET" { - stub.Writer.Write(stream.RedisType{stream.Error, "ERR write failure"}.Raw()) - } else { - stub.Writer.Write(resp.Raw()) - } - default: - stub.Writer.Write(resp.Raw()) - } - stub.Reader.Checkpoint(-reader.Buffered()) - } - } -} - -func (t *RedisToxic) NewState() interface{} { - return &RedisToxicState{ - Command: make(chan stream.RedisType, 1), - } -} - -func init() { - Register("redis", new(RedisToxic)) -} From a323870bf5184bac847c6b96e9cc820584bd908a Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Wed, 14 Dec 2016 17:23:52 -0500 Subject: [PATCH 08/10] Remove need for `HandleReadError()` in ToxicStub --- CREATING_TOXICS.md | 5 ++++- stream/io_chan.go | 11 +++++++++-- toxics/toxic.go | 29 ++++++----------------------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/CREATING_TOXICS.md b/CREATING_TOXICS.md index bee128d7..46672088 100644 --- a/CREATING_TOXICS.md +++ b/CREATING_TOXICS.md @@ -147,7 +147,10 @@ func (t *NoopToxic) Pipe(stub *toxics.ToxicStub) { buf := make([]byte, 32*1024) for { n, err := stub.Reader.Read(buf) - if stub.HandleReadError(err) { + if err == stream.ErrInterrupted { + return + } else if err == io.EOF { + stub.Close() return } stub.Writer.Write(buf[:n]) diff --git a/stream/io_chan.go b/stream/io_chan.go index 4e4e41ce..84fa1d7e 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -140,7 +140,14 @@ func NewTransactionalReader(input <-chan *StreamChunk) *TransactionalReader { } // Reads from the input channel either directly, or from a buffer if Rollback() has been called. -func (t *TransactionalReader) Read(out []byte) (int, error) { +// If the reader returns `ErrInterrupted`, it will automatically call Rollback() +func (t *TransactionalReader) Read(out []byte) (n int, err error) { + defer func() { + if err == ErrInterrupted || err == io.EOF { + t.Rollback() + } + }() + if t.bufReader != nil { n, err := t.bufReader.Read(out) if err == io.EOF { @@ -157,7 +164,7 @@ func (t *TransactionalReader) Read(out []byte) (int, error) { } } -// Flushes all buffers in the reader to the specified writer. +// Flushes all buffers past the current position in the reader to the specified writer. func (t *TransactionalReader) FlushTo(writer io.Writer) { n := 0 if t.bufReader != nil { diff --git a/toxics/toxic.go b/toxics/toxic.go index 0ad085e0..1cb0ed49 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -1,13 +1,11 @@ package toxics import ( - "io" "math/rand" "reflect" "sync" "github.com/Shopify/toxiproxy/stream" - "github.com/Sirupsen/logrus" ) // A Toxic is something that can be attatched to a link to modify the way @@ -100,32 +98,17 @@ func (s *ToxicStub) InterruptToxic() bool { } } +func (s *ToxicStub) Flush() { + s.Reader.FlushTo(s.Writer) +} + func (s *ToxicStub) Close() { + s.Flush() + close(s.closed) close(s.Output) } -// Handles errors returned by the stub's TransactionalReader. Returns true if the channel -// has closed and the caller should exit. -// Unknown errors will flush all data since the last checkpoint to the writer and return false so the -// caller can handle the error. -func (s *ToxicStub) HandleReadError(err error) bool { - if err == stream.ErrInterrupted { - s.Reader.Rollback() - return true - } else if err == io.EOF || err == io.ErrUnexpectedEOF { - s.Reader.Rollback() - s.Reader.FlushTo(s.Writer) - s.Close() - return true - } else if err != nil { - s.Reader.Rollback() - s.Reader.FlushTo(s.Writer) - logrus.Warn("Read error in toxic: ", err) - } - return false -} - var ToxicRegistry map[string]Toxic var registryMutex sync.RWMutex From 4c86ebd37780e8a7b0d1b438004b57aaad4f228c Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Wed, 14 Dec 2016 18:14:28 -0500 Subject: [PATCH 09/10] Clean up and improve tests --- stream/io_chan.go | 6 ++++-- stream/io_chan_test.go | 30 +++++++++++++++++++++++++++--- toxics/toxic.go | 7 ++----- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/stream/io_chan.go b/stream/io_chan.go index 84fa1d7e..6e475ce2 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -143,7 +143,7 @@ func NewTransactionalReader(input <-chan *StreamChunk) *TransactionalReader { // If the reader returns `ErrInterrupted`, it will automatically call Rollback() func (t *TransactionalReader) Read(out []byte) (n int, err error) { defer func() { - if err == ErrInterrupted || err == io.EOF { + if err == ErrInterrupted { t.Rollback() } }() @@ -208,7 +208,9 @@ func (t *TransactionalReader) Checkpoint(offset int) { // Rolls back the reader to start from the last checkpoint. func (t *TransactionalReader) Rollback() { - t.bufReader = bytes.NewReader(t.buffer.Bytes()) + if t.buffer.Len() > 0 { + t.bufReader = bytes.NewReader(t.buffer.Bytes()) + } } func (t *TransactionalReader) SetInterrupt(interrupt <-chan struct{}) { diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index f370b526..349313af 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -292,9 +292,7 @@ func HandleError(t *testing.T, rw *TestReadWriter, err error, expectedErr error) if err != expectedErr { t.Fatalf("Unexpected error during read: %v != %v", err, expectedErr) } - if err == ErrInterrupted { - rw.Rollback() - } else if err == io.EOF { + if err == io.EOF { rw.Rollback() rw.FlushTo(rw) rw.Close() @@ -348,6 +346,22 @@ func TestReadWriterBasicFull(t *testing.T) { AssertClosed(t, rw, nil) } +func TestReadWriterNoopRollback(t *testing.T) { + rw := NewTestReadWriter() + buf := make([]byte, 32) + + AssertRead(t, rw, buf, "hello world", nil) + AssertRead(t, rw, buf, "foobar", nil) + rw.Checkpoint(0) + rw.Rollback() + if rw.bufReader != nil { + t.Fatal("bufReader was set when it shouldn't have been") + } + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, rw, nil) +} + func TestReadWriterCheckpointRollback(t *testing.T) { rw := NewTestReadWriter() buf := make([]byte, 8) @@ -418,17 +432,27 @@ func TestReadWriterInterrupt(t *testing.T) { interrupt := make(chan struct{}) rw.SetInterrupt(interrupt) buf := make([]byte, 32) + sync := make(chan struct{}) AssertRead(t, rw, buf, "hello world", nil) rw.Checkpoint(0) go func() { interrupt <- struct{}{} rw.input <- &StreamChunk{[]byte("foobar"), time.Now()} + + <-sync + interrupt <- struct{}{} close(rw.input) }() AssertRead(t, rw, buf, "", ErrInterrupted) AssertRead(t, rw, buf, "foobar", nil) + + close(sync) + // Interrupt should rollback automatically + AssertRead(t, rw, buf, "", ErrInterrupted) + AssertRead(t, rw, buf, "foobar", nil) rw.Checkpoint(0) + AssertRead(t, rw, buf, "", io.EOF) AssertClosed(t, rw, nil) diff --git a/toxics/toxic.go b/toxics/toxic.go index 1cb0ed49..fe004e8f 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -98,12 +98,9 @@ func (s *ToxicStub) InterruptToxic() bool { } } -func (s *ToxicStub) Flush() { - s.Reader.FlushTo(s.Writer) -} - func (s *ToxicStub) Close() { - s.Flush() + s.Reader.Rollback() + s.Reader.FlushTo(s.Writer) close(s.closed) close(s.Output) From bf1f8f4bf3df79ea504b3276917ca2cbd08ff509 Mon Sep 17 00:00:00 2001 From: Jacob Wirth Date: Mon, 19 Dec 2016 14:25:14 -0500 Subject: [PATCH 10/10] Fix bug with setting checkpoint while reading from bufReader --- stream/io_chan.go | 8 +++----- stream/io_chan_test.go | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/stream/io_chan.go b/stream/io_chan.go index 6e475ce2..c4c25fee 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -192,14 +192,12 @@ func (t *TransactionalReader) Checkpoint(offset int) { current = int(t.bufReader.Size()) - t.bufReader.Len() } - n := current - if offset > 0 { - n = offset - } else { + n := offset + if offset <= 0 { n = current + offset } - if n >= current { + if n >= t.buffer.Len() { t.buffer.Reset() } else { t.buffer.Next(n) diff --git a/stream/io_chan_test.go b/stream/io_chan_test.go index 349313af..38933a7c 100644 --- a/stream/io_chan_test.go +++ b/stream/io_chan_test.go @@ -381,6 +381,25 @@ func TestReadWriterCheckpointRollback(t *testing.T) { AssertClosed(t, rw, nil) } +func TestReadWriterCheckpointMidBufReader(t *testing.T) { + rw := NewTestReadWriter() + buf := make([]byte, 8) + + AssertRead(t, rw, buf, "hello wo", nil) + AssertRead(t, rw, buf, "rldfooba", nil) + rw.Rollback() + AssertRead(t, rw, buf, "hello wo", nil) + rw.Checkpoint(0) + AssertRead(t, rw, buf, "rldfooba", nil) + rw.Rollback() + AssertRead(t, rw, buf, "rldfooba", nil) + AssertRead(t, rw, buf, "r", nil) + rw.Checkpoint(0) + AssertRead(t, rw, buf, "", io.EOF) + + AssertClosed(t, rw, nil) +} + func TestReadWriterFlush(t *testing.T) { rw := NewTestReadWriter() buf := make([]byte, 8)