Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ai/live: Add limited retries for segment publish #3315

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,56 @@
// check for end of stream
if _, eos := reader.(*media.EOSReader); eos {
if err := publisher.Close(); err != nil {
clog.Infof(ctx, "Error closing trickle publisher. err=%s", err)
clog.Infof(ctx, "Error closing trickle publisher. err=%v", err)

Check warning on line 57 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L57

Added line #L57 was not covered by tests
}
cancel()
return
}
if _, atMax := slowOrchChecker.BeginSegment(); atMax {
thisSeq, atMax := slowOrchChecker.BeginSegment()
if atMax {

Check warning on line 63 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L62-L63

Added lines #L62 - L63 were not covered by tests
clog.Infof(ctx, "Orchestrator is slow - terminating")
cancel()
return
// TODO kill the rest of the processing, including ingest
// TODO switch orchestrators
}
go func() {
go func(seq int) {

Check warning on line 70 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L70

Added line #L70 was not covered by tests
defer slowOrchChecker.EndSegment()
var r io.Reader = reader
if paymentProcessor != nil {
r = paymentProcessor.process(reader)
}

clog.V(8).Infof(ctx, "trickle publish writing data")
// TODO this blocks! very bad!
if err := publisher.Write(r); err != nil {
clog.Infof(ctx, "Error writing to trickle publisher. err=%s", err)
clog.V(8).Infof(ctx, "trickle publish writing data seq=%d", seq)
segment, err := publisher.Next()
if err != nil {
clog.Infof(ctx, "error getting next publish handle; dropping segment err=%v", err)
return

Check warning on line 81 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L77-L81

Added lines #L77 - L81 were not covered by tests
}
}()
for {
currentSeq := slowOrchChecker.GetCount()
if seq != currentSeq {
clog.Infof(ctx, "Next segment has already started; skipping this one seq=%d currentSeq=%d", seq, currentSeq)
return
}
n, err := segment.Write(r)
if err == nil {
// no error, all done, let's leave
return
}

Check warning on line 93 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L83-L93

Added lines #L83 - L93 were not covered by tests
// Retry segment only if nothing has been sent yet
// and the next segment has not yet started
// otherwise drop
if n > 0 {
clog.Infof(ctx, "Error publishing segment; dropping remainder wrote=%d err=%v", n, err)
return
}
clog.Infof(ctx, "Error publishing segment before writing; retrying err=%v", err)
// Clone in case read head was incremented somewhere, which cloning ressets
r = reader.Clone()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this seems maybe not 100% guaranteed, since it could happen that something was read from the reader and then failed to be sent to the writer (and we got 0 on L89).

I think there's no really easy option here apart from maybe an additional layer to check if the reader was used at some point, like a wrapped reader that flags if it was read. A built-in approach could be using a io.TeeReader to read into a buffer as we read from the input (which would also allow us to retry when some bytes have been read).

Anyway, if you think this is too much of a corner case, this LGTM as it's much simpler. But I think that conceptually it could still have this issue 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it could happen that something was read from the reader and then failed to be sent to the writer (and we got 0 on L89).

That is basically why we call Clone() (which resets the read head for the clone) so when we try to read it during the next iteration, it is starting from the beginning, regardless of the actual read head. Note that this isn't a "standard" IO reader but rather something specialized that allows for clean resets, nonblocking reads/writes, multiple readers, etc, without having to go through hoops like TeeReader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh that's cool! Didn't know we already had that custom impl. Pretty neat :)

time.Sleep(250 * time.Millisecond)

Check warning on line 104 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L97-L104

Added lines #L97 - L104 were not covered by tests
}
}(thisSeq)
})
clog.Infof(ctx, "trickle pub")
}
Expand Down Expand Up @@ -251,7 +276,7 @@
}
}

clog.Infof(ctx, "Received event for stream=%s event=%+v", stream, event)
clog.V(8).Infof(ctx, "Received event for stream=%s event=%+v", stream, event)

Check warning on line 279 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L279

Added line #L279 was not covered by tests

eventType, ok := event["type"].(string)
if !ok {
Expand Down Expand Up @@ -328,3 +353,9 @@
defer s.mu.Unlock()
s.completeCount += 1
}

func (s *SlowOrchChecker) GetCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.segmentCount

Check warning on line 360 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L357-L360

Added lines #L357 - L360 were not covered by tests
}
77 changes: 61 additions & 16 deletions trickle/trickle_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
index int
writer *io.PipeWriter
errCh chan error

// needed to help with reconnects
written bool
client *TricklePublisher
}

// NewTricklePublisher creates a new trickle stream client
Expand All @@ -53,7 +57,6 @@
return c, nil
}

// Acquire lock to manage access to pendingPost and index
// NB expects to have the lock already since we mutate the index
func (c *TricklePublisher) preconnect() (*pendingPost, error) {

Expand Down Expand Up @@ -113,6 +116,7 @@
writer: pw,
index: index,
errCh: errCh,
client: c,

Check warning on line 119 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L119

Added line #L119 was not covered by tests
}, nil
}

Expand All @@ -136,41 +140,72 @@
return nil
}

// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion
func (c *TricklePublisher) Write(data io.Reader) error {

func (c *TricklePublisher) Next() (*pendingPost, error) {

Check warning on line 143 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L143

Added line #L143 was not covered by tests
// Acquire lock to manage access to pendingPost and index
c.writeLock.Lock()
defer c.writeLock.Unlock()

Check warning on line 146 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L146

Added line #L146 was not covered by tests

// Get the writer to use
pp := c.pendingPost
if pp == nil {
p, err := c.preconnect()
if err != nil {
c.writeLock.Unlock()
return err
return nil, err

Check warning on line 154 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L154

Added line #L154 was not covered by tests
}
pp = p
}
writer := pp.writer
index := pp.index
errCh := pp.errCh

// Set up the next connection
nextPost, err := c.preconnect()
if err != nil {
c.writeLock.Unlock()
return err
return nil, err

Check warning on line 163 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L163

Added line #L163 was not covered by tests
}
c.pendingPost = nextPost

// Now unlock so the copy does not block
c.writeLock.Unlock()
return pp, nil

Check warning on line 167 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L167

Added line #L167 was not covered by tests
}

func (p *pendingPost) reconnect() (*pendingPost, error) {
// This is a little gnarly but works for now:
// Set the publisher's sequence sequence to the intended reconnect
// Call publisher's preconnect (which increments its sequence)
// then reset publisher's sequence back to the original
//slog.Info("Re-connecting", "url", p.client.baseURL, "seq", p.client.index)
p.client.writeLock.Lock()
defer p.client.writeLock.Unlock()
currentSeq := p.client.index
p.client.index = p.index
pp, err := p.client.preconnect()
p.client.index = currentSeq
return pp, err

Check warning on line 182 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L170-L182

Added lines #L170 - L182 were not covered by tests
}

func (p *pendingPost) Write(data io.Reader) (int64, error) {

// If writing multiple times, reconnect
if p.written {
pp, err := p.reconnect()
if err != nil {
return 0, err
}
p = pp

Check warning on line 193 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L185-L193

Added lines #L185 - L193 were not covered by tests
}

var (
writer = p.writer
index = p.index
errCh = p.errCh
)

// Mark as written
p.written = true

Check warning on line 203 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L196-L203

Added lines #L196 - L203 were not covered by tests

// before writing, check for error from preconnects
select {
case err := <-errCh:
return err
return 0, err

Check warning on line 208 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L208

Added line #L208 was not covered by tests
default:
// no error, continue
}
Expand All @@ -192,18 +227,28 @@
// also prioritize errors over this channel compared to io errors
// such as "read/write on closed pipe"
if err := <-errCh; err != nil {
return err
return n, err

Check warning on line 230 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L230

Added line #L230 was not covered by tests
}

if ioError != nil {
return fmt.Errorf("error streaming data to segment %d: %w", index, err)
return n, fmt.Errorf("error streaming data to segment %d: %w", index, ioError)

Check warning on line 234 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L234

Added line #L234 was not covered by tests
}

if closeErr != nil {
return fmt.Errorf("error closing writer for segment %d: %w", index, err)
return n, fmt.Errorf("error closing writer for segment %d: %w", index, closeErr)

Check warning on line 238 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L238

Added line #L238 was not covered by tests
}

return nil
return n, nil

Check warning on line 241 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L241

Added line #L241 was not covered by tests
}

// Write sends data to the current segment, sets up the next segment concurrently, and blocks until completion
func (c *TricklePublisher) Write(data io.Reader) error {
pp, err := c.Next()
if err != nil {
return err
}
_, err = pp.Write(data)
return err

Check warning on line 251 in trickle/trickle_publisher.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_publisher.go#L245-L251

Added lines #L245 - L251 were not covered by tests
}

func humanBytes(bytes int64) string {
Expand Down
6 changes: 5 additions & 1 deletion trickle/trickle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@
if exists {
slog.Warn("Overwriting existing entry", "idx", idx)
// Overwrite anything that exists now. TODO figure out a safer behavior?
return
// TODO fix concurrent writes to the same segment; would be very bad
segment.buffer.Reset()
segment.closed = false

Check warning on line 338 in trickle/trickle_server.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_server.go#L336-L338

Added lines #L336 - L338 were not covered by tests
}

// Wrap the request body with the custom timeoutReader so we can send
Expand Down Expand Up @@ -527,6 +529,8 @@
}
if startPos > totalLen {
slog.Info("Invalid start pos, invoking eof")
// This might happen if the buffer was reset
// eg because of a repeated POST

Check warning on line 533 in trickle/trickle_server.go

View check run for this annotation

Codecov / codecov/patch

trickle/trickle_server.go#L532-L533

Added lines #L532 - L533 were not covered by tests
return nil, true
}
if s.closed {
Expand Down
Loading