Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ChanReader / ChanWriter for use inside toxics #134

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions CREATING_TOXICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,16 @@ An implementation of the noop toxic above using the stream package would look so
```go
func (t *NoopToxic) Pipe(stub *toxics.ToxicStub) {
buf := make([]byte, 32*1024)
writer := stream.NewChanWriter(stub.Output)
reader := stream.NewChanReader(stub.Input)
reader.SetInterrupt(stub.Interrupt)
for {
n, err := reader.Read(buf)
n, err := stub.Reader.Read(buf)
if err == stream.ErrInterrupted {
writer.Write(buf[:n])
return
} else if err == io.EOF {
stub.Close()
return
}
writer.Write(buf[:n])
stub.Writer.Write(buf[:n])
stub.Reader.Checkpoint(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Checkpoint(0)? Should it not be Checkpoint(-stub.Reader.Buffered())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example where I used .Buffered() was for a bufio.Reader attached to the TransactionalReader.
In this case, the amount of buffered data is always 0. Since everything read is being used, we can set the checkpoint to the last read byte, rather than the (last read) - (amount buffered by bufio.Reader)

}
}
```
Expand Down
11 changes: 8 additions & 3 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
// Interrupt the last toxic so that we don't have a race when moving channels
if link.stubs[i-1].InterruptToxic() {
link.stubs[i-1].Output = newin
link.stubs[i-1].Writer.SetOutput(newin)

if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
Expand Down Expand Up @@ -129,8 +130,11 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
stop <- link.stubs[i-1].InterruptToxic()
}()

// Unblock the previous toxic if it is trying to flush
// If the previous toxic is closed, continue flusing until we reach the end.
// Flush toxic's internal buffer
link.stubs[i].Reader.FlushTo(link.stubs[i].Writer)

// Unblock the previous toxic if it is trying to flush.
// If the previous toxic is closed, continue flushing until we reach the end.
interrupted := false
stopped := false
for !interrupted {
Expand All @@ -149,7 +153,7 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
}
}

// Empty the toxic's buffer if necessary
// Empty the toxic's input buffer if necessary
for len(link.stubs[i].Input) > 0 {
tmp := <-link.stubs[i].Input
if tmp == nil {
Expand All @@ -160,6 +164,7 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
}

link.stubs[i-1].Output = link.stubs[i].Output
link.stubs[i-1].Writer.SetOutput(link.stubs[i].Output)
link.stubs = append(link.stubs[:i], link.stubs[i+1:]...)

go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
Expand Down
112 changes: 111 additions & 1 deletion stream/io_chan.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"bytes"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -32,12 +33,20 @@ func NewChanWriter(output chan<- *StreamChunk) *ChanWriter {
// Write `buf` as a StreamChunk to the channel. The full buffer is always written, and error
// will always be nil. Calling `Write()` after closing the channel will panic.
func (c *ChanWriter) Write(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}

packet := &StreamChunk{make([]byte, len(buf)), time.Now()}
copy(packet.Data, buf) // Make a copy before sending it to the channel
c.output <- packet
return len(buf), nil
}

func (c *ChanWriter) SetOutput(output chan<- *StreamChunk) {
c.output = output
}

// Close the output channel
func (c *ChanWriter) Close() error {
close(c.output)
Expand Down Expand Up @@ -71,7 +80,7 @@ func (c *ChanReader) Read(out []byte) (int, error) {
}
n := copy(out, c.buffer)
c.buffer = c.buffer[n:]
if len(out) <= len(c.buffer) {
if len(out) == n {
return n, nil
} else if n > 0 {
// We have some data to return, so make the channel read optional
Expand Down Expand Up @@ -106,3 +115,104 @@ func (c *ChanReader) Read(out []byte) (int, error) {
c.buffer = p.Data[n2:]
return n + n2, nil
}

// TransactionalReader is a ChanReader that can rollback its progress to checkpoints.
// This is useful when using other buffered readers, since they may read past the end of a message.
// The buffered reader can later be removed by rolling back any buffered bytes.
//
// chan []byte -> ChanReader -> TeeReader -> Read() -> output
// V ^
// bytes.Buffer -> bytes.Reader
type TransactionalReader struct {
buffer *bytes.Buffer
bufReader *bytes.Reader
reader *ChanReader
tee io.Reader
}

func NewTransactionalReader(input <-chan *StreamChunk) *TransactionalReader {
t := &TransactionalReader{
buffer: bytes.NewBuffer(make([]byte, 0, 32*1024)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you choose 32K sized buffer for a specific reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose 32K because it's the size used by io.Copy(), however bytes.Buffer will grow the size of the buffer as necessary.
This is just a nice default size so in most cases it doesn't need to be reallocated.

reader: NewChanReader(input),
}
t.tee = io.TeeReader(t.reader, t.buffer)
return t
}

// Reads from the input channel either directly, or from a buffer if Rollback() has been called.
// If the reader returns `ErrInterrupted`, it will automatically call Rollback()
func (t *TransactionalReader) Read(out []byte) (n int, err error) {
defer func() {
if err == ErrInterrupted {
t.Rollback()
}
}()

if t.bufReader != nil {
n, err := t.bufReader.Read(out)
if err == io.EOF {
t.bufReader = nil
if n > 0 {
return n, nil
} else {
return t.tee.Read(out)
}
}
return n, err
} else {
return t.tee.Read(out)
}
}

// Flushes all buffers past the current position in the reader to the specified writer.
func (t *TransactionalReader) FlushTo(writer io.Writer) {
n := 0
if t.bufReader != nil {
n = t.bufReader.Len()
}
buf := make([]byte, n+len(t.reader.buffer))
if n > 0 {
t.bufReader.Read(buf[:n])
}
if len(buf[n:]) > 0 {
t.reader.Read(buf[n:])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? Why are we trying to flush the reader? It's not buffered is it? I'll look at this again later and probably understand right away.

writer.Write(buf)
t.bufReader = nil
t.buffer.Reset()
}

// Sets a checkpoint in the reader. A call to Rollback() will begin reading from this point.
// If offset is negative, the checkpoint will be set N bytes before the current position.
// If the offset is positive, the checkpoint will be set N bytes after the previous checkpoint.
// An offset of 0 will set the checkpoint to the current position.
func (t *TransactionalReader) Checkpoint(offset int) {
current := t.buffer.Len()
if t.bufReader != nil {
current = int(t.bufReader.Size()) - t.bufReader.Len()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So current represents how much of the buffer we've already read.

  • 0 will say... okay we've read everything! Lock it in!
  • -5 will say we've read everything except for the last 5 bytes. Let's start reading again from the start of those last 5.
  • 5 will say we successfully read 5 bytes but not the rest. Let's start reading from the rest.
  • t.buffer.Len() will do the same thing is 0.

Okay I found this confusing but LGTM


n := current
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializing n to current seems to be pointless here? Why not var n int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, looks like this is redundant. I'll see if I can refactor this function and make it a little simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually found another bug while looking at this. if bufReader != nil, t.buffer.Reset() shouldn't be called unless the offset is big enough.
I fixed this, and added a test for it.

if offset > 0 {
n = offset
} else {
n = current + offset
}

if n >= current {
t.buffer.Reset()
} else {
t.buffer.Next(n)
}
}

// Rolls back the reader to start from the last checkpoint.
func (t *TransactionalReader) Rollback() {
if t.buffer.Len() > 0 {
t.bufReader = bytes.NewReader(t.buffer.Bytes())
}
}

func (t *TransactionalReader) SetInterrupt(interrupt <-chan struct{}) {
t.reader.SetInterrupt(interrupt)
}
Loading