Skip to content

Commit

Permalink
ai/live: Add limited publish retries (#3315)
Browse files Browse the repository at this point in the history
Only retry if the error occurs before sending any data, and
if the next segment hasn't arrived yet.
  • Loading branch information
j0sh authored Dec 17, 2024
1 parent 3ac5972 commit 706fda1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 26 deletions.
49 changes: 40 additions & 9 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,56 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
// check for end of stream
if _, eos := reader.(*media.EOSReader); eos {
if err := publisher.Close(); err != nil {
clog.Infof(ctx, "Error closing trickle publisher. err=%s", err)
clog.Infof(ctx, "Error closing trickle publisher. err=%v", err)
}
cancel()
return
}
if _, atMax := slowOrchChecker.BeginSegment(); atMax {
thisSeq, atMax := slowOrchChecker.BeginSegment()
if atMax {
clog.Infof(ctx, "Orchestrator is slow - terminating")
cancel()
return
// TODO kill the rest of the processing, including ingest
// TODO switch orchestrators
}
go func() {
go func(seq int) {
defer slowOrchChecker.EndSegment()
var r io.Reader = reader
if paymentProcessor != nil {
r = paymentProcessor.process(reader)
}

clog.V(8).Infof(ctx, "trickle publish writing data")
// TODO this blocks! very bad!
if err := publisher.Write(r); err != nil {
clog.Infof(ctx, "Error writing to trickle publisher. err=%s", err)
clog.V(8).Infof(ctx, "trickle publish writing data seq=%d", seq)
segment, err := publisher.Next()
if err != nil {
clog.Infof(ctx, "error getting next publish handle; dropping segment err=%v", err)
return
}
}()
for {
currentSeq := slowOrchChecker.GetCount()
if seq != currentSeq {
clog.Infof(ctx, "Next segment has already started; skipping this one seq=%d currentSeq=%d", seq, currentSeq)
return
}
n, err := segment.Write(r)
if err == nil {
// no error, all done, let's leave
return
}
// Retry segment only if nothing has been sent yet
// and the next segment has not yet started
// otherwise drop
if n > 0 {
clog.Infof(ctx, "Error publishing segment; dropping remainder wrote=%d err=%v", n, err)
return
}
clog.Infof(ctx, "Error publishing segment before writing; retrying err=%v", err)
// Clone in case read head was incremented somewhere, which cloning ressets
r = reader.Clone()
time.Sleep(250 * time.Millisecond)
}
}(thisSeq)
})
clog.Infof(ctx, "trickle pub")
}
Expand Down Expand Up @@ -251,7 +276,7 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar
}
}

clog.Infof(ctx, "Received event for stream=%s event=%+v", stream, event)
clog.V(8).Infof(ctx, "Received event for stream=%s event=%+v", stream, event)

eventType, ok := event["type"].(string)
if !ok {
Expand Down Expand Up @@ -328,3 +353,9 @@ func (s *SlowOrchChecker) EndSegment() {
defer s.mu.Unlock()
s.completeCount += 1
}

func (s *SlowOrchChecker) GetCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.segmentCount
}
77 changes: 61 additions & 16 deletions trickle/trickle_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type pendingPost struct {
index int
writer *io.PipeWriter
errCh chan error

// needed to help with reconnects
written bool
client *TricklePublisher
}

// NewTricklePublisher creates a new trickle stream client
Expand All @@ -53,7 +57,6 @@ func NewTricklePublisher(url string) (*TricklePublisher, error) {
return c, nil
}

// Acquire lock to manage access to pendingPost and index
// NB expects to have the lock already since we mutate the index
func (c *TricklePublisher) preconnect() (*pendingPost, error) {

Expand Down Expand Up @@ -113,6 +116,7 @@ func (c *TricklePublisher) preconnect() (*pendingPost, error) {
writer: pw,
index: index,
errCh: errCh,
client: c,
}, nil
}

Expand All @@ -136,41 +140,72 @@ func (c *TricklePublisher) Close() error {
return nil
}

// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion
func (c *TricklePublisher) Write(data io.Reader) error {

func (c *TricklePublisher) Next() (*pendingPost, error) {

Check warning on line 143 in trickle/trickle_publisher.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

exported method Next returns unexported type *trickle.pendingPost, which can be annoying to use
// Acquire lock to manage access to pendingPost and index
c.writeLock.Lock()
defer c.writeLock.Unlock()

// Get the writer to use
pp := c.pendingPost
if pp == nil {
p, err := c.preconnect()
if err != nil {
c.writeLock.Unlock()
return err
return nil, err
}
pp = p
}
writer := pp.writer
index := pp.index
errCh := pp.errCh

// Set up the next connection
nextPost, err := c.preconnect()
if err != nil {
c.writeLock.Unlock()
return err
return nil, err
}
c.pendingPost = nextPost

// Now unlock so the copy does not block
c.writeLock.Unlock()
return pp, nil
}

func (p *pendingPost) reconnect() (*pendingPost, error) {
// This is a little gnarly but works for now:
// Set the publisher's sequence sequence to the intended reconnect
// Call publisher's preconnect (which increments its sequence)
// then reset publisher's sequence back to the original
//slog.Info("Re-connecting", "url", p.client.baseURL, "seq", p.client.index)
p.client.writeLock.Lock()
defer p.client.writeLock.Unlock()
currentSeq := p.client.index
p.client.index = p.index
pp, err := p.client.preconnect()
p.client.index = currentSeq
return pp, err
}

func (p *pendingPost) Write(data io.Reader) (int64, error) {

// If writing multiple times, reconnect
if p.written {
pp, err := p.reconnect()
if err != nil {
return 0, err
}
p = pp
}

var (
writer = p.writer
index = p.index
errCh = p.errCh
)

// Mark as written
p.written = true

// before writing, check for error from preconnects
select {
case err := <-errCh:
return err
return 0, err
default:
// no error, continue
}
Expand All @@ -192,18 +227,28 @@ func (c *TricklePublisher) Write(data io.Reader) error {
// also prioritize errors over this channel compared to io errors
// such as "read/write on closed pipe"
if err := <-errCh; err != nil {
return err
return n, err
}

if ioError != nil {
return fmt.Errorf("error streaming data to segment %d: %w", index, err)
return n, fmt.Errorf("error streaming data to segment %d: %w", index, ioError)
}

if closeErr != nil {
return fmt.Errorf("error closing writer for segment %d: %w", index, err)
return n, fmt.Errorf("error closing writer for segment %d: %w", index, closeErr)
}

return nil
return n, nil
}

// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion
func (c *TricklePublisher) Write(data io.Reader) error {
pp, err := c.Next()
if err != nil {
return err
}
_, err = pp.Write(data)
return err
}

func humanBytes(bytes int64) string {
Expand Down
6 changes: 5 additions & 1 deletion trickle/trickle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ func (s *Stream) handlePost(w http.ResponseWriter, r *http.Request, idx int) {
if exists {
slog.Warn("Overwriting existing entry", "idx", idx)
// Overwrite anything that exists now. TODO figure out a safer behavior?
return
// TODO fix concurrent writes to the same segment; would be very bad
segment.buffer.Reset()
segment.closed = false
}

// Wrap the request body with the custom timeoutReader so we can send
Expand Down Expand Up @@ -527,6 +529,8 @@ func (s *Segment) readData(startPos int) ([]byte, bool) {
}
if startPos > totalLen {
slog.Info("Invalid start pos, invoking eof")
// This might happen if the buffer was reset
// eg because of a repeated POST
return nil, true
}
if s.closed {
Expand Down

0 comments on commit 706fda1

Please sign in to comment.