Skip to content

Commit

Permalink
playback: improve /list performance (#3663) (#4102)
Browse files Browse the repository at this point in the history
Segments are now parsed in parallel.
  • Loading branch information
aler9 authored Jan 3, 2025
1 parent ac0ddc9 commit 21b5031
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 54 deletions.
10 changes: 5 additions & 5 deletions internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func seekAndMux(

segmentStartOffset := start.Sub(segments[0].Start)

segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m)
segmentDuration, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil {
return err
}

segmentEnd = start.Add(segmentMaxElapsed)
segmentEnd = start.Add(segmentDuration)

for _, seg := range segments[1:] {
f, err = os.Open(seg.Fpath)
Expand All @@ -92,13 +92,13 @@ func seekAndMux(

segmentStartOffset := seg.Start.Sub(start)

var segmentMaxElapsed time.Duration
segmentMaxElapsed, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
var segmentDuration time.Duration
segmentDuration, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil {
return err
}

segmentEnd = start.Add(segmentMaxElapsed)
segmentEnd = start.Add(segmentDuration)
}

err = m.flush()
Expand Down
142 changes: 93 additions & 49 deletions internal/playback/on_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,66 +22,110 @@ func (d listEntryDuration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).Seconds())
}

type parsedSegment struct {
start time.Time
init *fmp4.Init
duration time.Duration
}

func parseSegment(seg *recordstore.Segment) (*parsedSegment, error) {
f, err := os.Open(seg.Fpath)
if err != nil {
return nil, err
}
defer f.Close()

init, duration, err := segmentFMP4ReadHeader(f)
if err != nil {
return nil, err
}

// if duration is not present in the header, compute it
// by parsing each part
if duration == 0 {
duration, err = segmentFMP4ReadDurationFromParts(f, init)
if err != nil {
return nil, err
}
}

return &parsedSegment{
start: seg.Start,
init: init,
duration: duration,
}, nil
}

func parseSegments(segments []*recordstore.Segment) ([]*parsedSegment, error) {
parsed := make([]*parsedSegment, len(segments))
ch := make(chan error)

// process segments in parallel.
// parallel random access should improve performance in most cases.
// ref: https://pkolaczk.github.io/disk-parallelism/
for i, seg := range segments {
go func(i int, seg *recordstore.Segment) {
var err error
parsed[i], err = parseSegment(seg)
ch <- err
}(i, seg)
}

var err error

for range segments {
err2 := <-ch
if err2 != nil {
err = err2
}
}

return parsed, err
}

type listEntry struct {
Start time.Time `json:"start"`
Duration listEntryDuration `json:"duration"`
URL string `json:"url"`
}

func readDurationAndConcatenate(
func concatenateSegments(parsed []*parsedSegment) []listEntry {
out := []listEntry{}
var prevInit *fmp4.Init

for _, parsed := range parsed {
if len(out) != 0 && segmentFMP4CanBeConcatenated(
prevInit,
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
parsed.init,
parsed.start) {
prevStart := out[len(out)-1].Start
curEnd := parsed.start.Add(parsed.duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: parsed.start,
Duration: listEntryDuration(parsed.duration),
})
}

prevInit = parsed.init
}

return out
}

func parseAndConcatenate(
recordFormat conf.RecordFormat,
segments []*recordstore.Segment,
) ([]listEntry, error) {
if recordFormat == conf.RecordFormatFMP4 {
out := []listEntry{}
var prevInit *fmp4.Init

for _, seg := range segments {
err := func() error {
f, err := os.Open(seg.Fpath)
if err != nil {
return err
}
defer f.Close()

init, duration, err := segmentFMP4ReadHeader(f)
if err != nil {
return err
}

// if duration is not present in the header, compute it
// by parsing each part
if duration == 0 {
duration, err = segmentFMP4ReadDurationFromParts(f, init)
if err != nil {
return err
}
}

if len(out) != 0 && segmentFMP4CanBeConcatenated(
prevInit,
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
init,
seg.Start) {
prevStart := out[len(out)-1].Start
curEnd := seg.Start.Add(duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: seg.Start,
Duration: listEntryDuration(duration),
})
}

prevInit = init

return nil
}()
if err != nil {
return nil, err
}
parsed, err := parseSegments(segments)
if err != nil {
return nil, err
}

out := concatenateSegments(parsed)
return out, nil
}

Expand Down Expand Up @@ -135,7 +179,7 @@ func (s *Server) onList(ctx *gin.Context) {
return
}

entries, err := readDurationAndConcatenate(pathConf.RecordFormat, segments)
entries, err := parseAndConcatenate(pathConf.RecordFormat, segments)
if err != nil {
s.writeError(ctx, http.StatusInternalServerError, err)
return
Expand Down

0 comments on commit 21b5031

Please sign in to comment.