From 5d63214a798cf9c0112660e33915b2285d2c8cd4 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Bergeron Date: Wed, 22 Jan 2025 19:55:00 -0500 Subject: [PATCH 1/4] Add GOP cache. --- apidocs/openapi.yaml | 2 + internal/conf/conf.go | 1 + internal/core/core.go | 1 + internal/core/path.go | 2 + internal/core/path_manager.go | 2 + internal/protocols/hls/from_stream_test.go | 2 + internal/protocols/mpegts/from_stream_test.go | 2 + internal/protocols/rtmp/from_stream_test.go | 2 + internal/protocols/webrtc/from_stream_test.go | 3 + internal/recorder/recorder_test.go | 4 + internal/servers/hls/server_test.go | 3 + internal/servers/rtmp/server_test.go | 2 + internal/servers/rtsp/server_test.go | 2 + internal/servers/rtsp/session.go | 14 + internal/servers/srt/server_test.go | 2 + internal/servers/webrtc/server_test.go | 260 +++++++++++++++--- internal/stream/stream.go | 80 +++++- internal/stream/stream_format.go | 55 +++- internal/stream/stream_media.go | 2 + internal/test/medias.go | 11 + internal/test/source_tester.go | 1 + mediamtx.yml | 3 + 22 files changed, 412 insertions(+), 44 deletions(-) diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 5d25d6fcdfa..5377a5ff45a 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -71,6 +71,8 @@ components: type: boolean runOnDisconnect: type: string + gopCache: + type: boolean # Authentication authMethod: diff --git a/internal/conf/conf.go b/internal/conf/conf.go index bc1e13c2a20..3c2139e515b 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -169,6 +169,7 @@ type Conf struct { RunOnConnect string `json:"runOnConnect"` RunOnConnectRestart bool `json:"runOnConnectRestart"` RunOnDisconnect string `json:"runOnDisconnect"` + GopCache bool `json:"gopCache"` // Authentication AuthMethod AuthMethod `json:"authMethod"` diff --git a/internal/core/core.go b/internal/core/core.go index 976173e4095..ef124c53df4 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -347,6 +347,7 @@ func (p *Core) createResources(initial bool) error { pathConfs: p.conf.Paths, externalCmdPool: p.externalCmdPool, parent: p, + gopCache: p.conf.GopCache, } p.pathManager.initialize() diff --git a/internal/core/path.go b/internal/core/path.go index b214be66748..911e062d256 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -77,6 +77,7 @@ type path struct { wg *sync.WaitGroup externalCmdPool *externalcmd.Pool parent pathParent + gopCache bool ctx context.Context ctxCancel func() @@ -716,6 +717,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error desc, allocateEncoder, logger.NewLimitedLogger(pa.source), + pa.gopCache, ) if err != nil { return err diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index bbd1bb7b0a3..d0d4adac74e 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -59,6 +59,7 @@ type pathManager struct { pathConfs map[string]*conf.Path externalCmdPool *externalcmd.Pool parent pathManagerParent + gopCache bool ctx context.Context ctxCancel func() @@ -352,6 +353,7 @@ func (pm *pathManager) createPath( wg: &pm.wg, externalCmdPool: pm.externalCmdPool, parent: pm, + gopCache: pm.gopCache, } pa.initialize() diff --git a/internal/protocols/hls/from_stream_test.go b/internal/protocols/hls/from_stream_test.go index 53bd483a750..ecafc722490 100644 --- a/internal/protocols/hls/from_stream_test.go +++ b/internal/protocols/hls/from_stream_test.go @@ -23,6 +23,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { }}}, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -56,6 +57,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }}, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/protocols/mpegts/from_stream_test.go b/internal/protocols/mpegts/from_stream_test.go index 62c695632ee..0dc5ef2ac71 100644 --- a/internal/protocols/mpegts/from_stream_test.go +++ b/internal/protocols/mpegts/from_stream_test.go @@ -22,6 +22,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { }}}, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -49,6 +50,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }}, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/protocols/rtmp/from_stream_test.go b/internal/protocols/rtmp/from_stream_test.go index d1d0d1eb709..e5226ff18c3 100644 --- a/internal/protocols/rtmp/from_stream_test.go +++ b/internal/protocols/rtmp/from_stream_test.go @@ -25,6 +25,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { }}}, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -56,6 +57,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }}, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/protocols/webrtc/from_stream_test.go b/internal/protocols/webrtc/from_stream_test.go index 6b5188bc8c9..3325a355a08 100644 --- a/internal/protocols/webrtc/from_stream_test.go +++ b/internal/protocols/webrtc/from_stream_test.go @@ -22,6 +22,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { }}}, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -49,6 +50,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }}, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -84,6 +86,7 @@ func TestFromStream(t *testing.T) { }, false, test.NilLogger, + false, ) require.NoError(t, err) defer stream.Close() diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index a5b76b3dd29..26a04223974 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -130,6 +130,7 @@ func TestRecorder(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) defer stream.Close() @@ -343,6 +344,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) defer stream.Close() @@ -430,6 +432,7 @@ func TestRecorderSkipTracksPartial(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) defer stream.Close() @@ -490,6 +493,7 @@ func TestRecorderSkipTracksFull(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) defer stream.Close() diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index d1460a248d6..cdad47e1915 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -160,6 +160,7 @@ func TestServerRead(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -259,6 +260,7 @@ func TestServerRead(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) @@ -363,6 +365,7 @@ func TestDirectory(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index b6537187706..e5065362e50 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -45,6 +45,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre req.Desc, true, test.NilLogger, + false, ) if err != nil { return nil, err @@ -200,6 +201,7 @@ func TestServerRead(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/servers/rtsp/server_test.go b/internal/servers/rtsp/server_test.go index ecec3230c0f..15e0b48cc09 100644 --- a/internal/servers/rtsp/server_test.go +++ b/internal/servers/rtsp/server_test.go @@ -45,6 +45,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre req.Desc, true, test.NilLogger, + false, ) if err != nil { return nil, err @@ -169,6 +170,7 @@ func TestServerRead(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 248efee3743..8fa669c99d3 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/gortsplib/v4" rtspauth "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/google/uuid" "github.com/pion/rtp" @@ -269,6 +270,19 @@ func (s *session) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, e s.state = gortsplib.ServerSessionStatePlay s.transport = s.rsession.SetuppedTransport() s.mutex.Unlock() + + for _, medi := range s.stream.Desc().Medias { + if medi.Type == description.MediaTypeVideo { + for _, u := range s.stream.CachedUnits { + for _, pkt := range u.GetRTPPackets() { + err := s.rsession.WritePacketRTP(medi, pkt) + if err != nil { + break + } + } + } + } + } } return &base.Response{ diff --git a/internal/servers/srt/server_test.go b/internal/servers/srt/server_test.go index ae3db7722c0..ab5c4648cc3 100644 --- a/internal/servers/srt/server_test.go +++ b/internal/servers/srt/server_test.go @@ -42,6 +42,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre req.Desc, true, test.NilLogger, + false, ) if err != nil { return nil, err @@ -171,6 +172,7 @@ func TestServerRead(t *testing.T) { desc, true, test.NilLogger, + false, ) require.NoError(t, err) diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index 0717153376d..3b642d96915 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -59,6 +59,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre req.Desc, true, test.NilLogger, + false, ) if err != nil { return nil, err @@ -351,8 +352,9 @@ func TestServerRead(t *testing.T) { for _, ca := range []struct { name string medias []*description.Media - unit unit.Unit + unit []unit.Unit outRTPPayload []byte + gopCache bool }{ { "av1", @@ -362,10 +364,13 @@ func TestServerRead(t *testing.T) { PayloadTyp: 96, }}, }}, - &unit.AV1{ - TU: [][]byte{{1, 2}}, + []unit.Unit{ + &unit.AV1{ + TU: [][]byte{{1, 2}}, + }, }, []byte{0, 2, 1, 2}, + false, }, { "vp9", @@ -375,14 +380,17 @@ func TestServerRead(t *testing.T) { PayloadTyp: 96, }}, }}, - &unit.VP9{ - Frame: []byte{0x82, 0x49, 0x83, 0x42, 0x0, 0x77, 0xf0, 0x32, 0x34}, + []unit.Unit{ + &unit.VP9{ + Frame: []byte{0x82, 0x49, 0x83, 0x42, 0x0, 0x77, 0xf0, 0x32, 0x34}, + }, }, []byte{ 0x8f, 0xa0, 0xfd, 0x18, 0x07, 0x80, 0x03, 0x24, 0x01, 0x14, 0x01, 0x82, 0x49, 0x83, 0x42, 0x00, 0x77, 0xf0, 0x32, 0x34, }, + false, }, { "vp8", @@ -392,17 +400,22 @@ func TestServerRead(t *testing.T) { PayloadTyp: 96, }}, }}, - &unit.VP8{ - Frame: []byte{1, 2}, + []unit.Unit{ + &unit.VP8{ + Frame: []byte{1, 2}, + }, }, []byte{0x10, 1, 2}, + false, }, { "h264", []*description.Media{test.MediaH264}, - &unit.H264{ - AU: [][]byte{ - {5, 1}, + []unit.Unit{ + &unit.H264{ + AU: [][]byte{ + {5, 1}, + }, }, }, []byte{ @@ -412,6 +425,145 @@ func TestServerRead(t *testing.T) { 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, 0x07, 0x08, 0x00, 0x02, 0x05, 0x01, }, + false, + }, + { + "h264 with gop cache", + []*description.Media{test.MediaH264}, + []unit.Unit{ + // ffmpeg -f lavfi -i color=blue:s=2x2 -vframes 10 -c:v libx264 out.264 + &unit.H264{ + AU: [][]byte{ + { + 0x65, 0x88, 0x84, 0x00, 0x37, 0xff, 0xfe, 0xe1, + 0x03, 0xf8, 0x14, 0xd7, 0x4d, 0xfe, 0x63, 0x8f, + 0x43, 0xd9, 0x01, 0x68, 0xc1, + }, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9a, 0x24, 0x6c, 0x43, 0x7f, 0xfe, 0xe0}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9e, 0x42, 0x78, 0x85, 0xff, 0xc1, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0x61, 0x74, 0x42, 0xbf, 0xc4, 0x80}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0x63, 0x6a, 0x42, 0xbf, 0xc4, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9a, 0x68, 0x49, 0xa8, 0x41, 0x68, 0x99, 0x4c, 0x08, 0x5f, 0xff, 0xfe, 0xe1}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9e, 0x86, 0x45, 0x11, 0x2c, 0x2f, 0xff, 0xc1, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0xa5, 0x74, 0x42, 0xbf, 0xc4, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0xa7, 0x6a, 0x42, 0xbf, 0xc4, 0x80}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9a, 0xa9, 0x49, 0xa8, 0x41, 0x6c, 0x99, 0x4c, 0x08, 0x57, 0xff, 0xfe, 0xc0}, + }, + }, + }, + []byte{ + 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, + 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, + 0x07, 0x08, 0x00, 0x15, 0x65, 0x88, 0x84, 0x00, 0x37, 0xff, 0xfe, 0xe1, 0x03, 0xf8, 0x14, 0xd7, + 0x4d, 0xfe, 0x63, 0x8f, 0x43, 0xd9, 0x01, 0x68, 0xc1, + }, + true, + }, + { + "h265 with gop cache", + []*description.Media{test.MediaH265}, + []unit.Unit{ + // ffmpeg -f lavfi -i color=blue:s=16x16 -vframes 10 -c:v libx265 out.265 + &unit.H265{ + AU: [][]byte{ + { + 0x28, 0x01, 0xaf, 0x1d, 0x80, 0xf0, 0x0e, 0x9e, 0x0f, 0xfd, 0x7d, 0x3a, 0x39, 0xb1, + 0xc7, 0x6f, 0x98, + }, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xd0, 0x29, 0x4b, 0xe1, 0x0c, 0x63, 0x90, 0xfa, 0x84}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xe0, 0x64, 0x9d, 0x78, 0x61, 0x24, 0xc5, 0x60}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0x24, 0xf5, 0x5f, 0xa2, 0xc2, 0x98, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0x44, 0xd7, 0x5f, 0xa2, 0xc2, 0x88, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0x86, 0xb7, 0xfd, 0x46, 0x14, 0xc0, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xd0, 0x48, 0x92, 0x55, 0xfd, 0xc4, 0x30, 0x18, 0xec, 0xfa, 0x84}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xe0, 0xe2, 0x25, 0x57, 0x5f, 0x71, 0x84, 0x90, 0xc5, 0x60}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0xc6, 0xf5, 0xd7, 0xd2, 0x2c, 0x29, 0x80, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe1, 0x02, 0x2d, 0x57, 0xf7, 0x18, 0x51, 0xc8, 0x20}, + }, + }, + }, + []byte{ + 0x60, 0x0, 0x0, 0x18, 0x40, 0x1, 0xc, 0x1, 0xff, 0xff, 0x2, 0x20, 0x0, 0x0, 0x3, 0x0, 0xb0, 0x0, + 0x0, 0x3, 0x0, 0x0, 0x3, 0x0, 0x7b, 0x18, 0xb0, 0x24, 0x0, 0x3c, 0x42, 0x1, 0x1, 0x2, 0x20, 0x0, + 0x0, 0x3, 0x0, 0xb0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x3, 0x0, 0x7b, 0xa0, 0x7, 0x82, 0x0, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, 0xa2, 0x23, 0xff, 0x0, 0x1, 0x0, 0x1, 0x6a, 0x2, + 0x2, 0x2, 0x1, 0x0, 0x8, 0x44, 0x1, 0xc0, 0x25, 0x2f, 0x5, 0x32, 0x40, 0x0, 0x11, 0x28, 0x1, 0xaf, + 0x1d, 0x80, 0xf0, 0xe, 0x9e, 0xf, 0xfd, 0x7d, 0x3a, 0x39, 0xb1, 0xc7, 0x6f, 0x98, + }, + true, }, { "opus", @@ -422,10 +574,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 2, }}, }}, - &unit.Opus{ - Packets: [][]byte{{1, 2}}, + []unit.Unit{ + &unit.Opus{ + Packets: [][]byte{{1, 2}}, + }, }, []byte{1, 2}, + false, }, { "g722", @@ -433,22 +588,25 @@ func TestServerRead(t *testing.T) { Type: description.MediaTypeAudio, Formats: []format.Format{&format.G722{}}, }}, - &unit.Generic{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 9, - SequenceNumber: 1123, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{1, 2}, - }}, + []unit.Unit{ + &unit.Generic{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 9, + SequenceNumber: 1123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{1, 2}, + }}, + }, }, }, []byte{1, 2}, + false, }, { "g711 8khz mono", @@ -460,10 +618,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 1, }}, }}, - &unit.G711{ - Samples: []byte{1, 2, 3}, + []unit.Unit{ + &unit.G711{ + Samples: []byte{1, 2, 3}, + }, }, []byte{1, 2, 3}, + false, }, { "g711 16khz stereo", @@ -475,10 +636,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 2, }}, }}, - &unit.G711{ - Samples: []byte{1, 2, 3, 4}, + []unit.Unit{ + &unit.G711{ + Samples: []byte{1, 2, 3, 4}, + }, }, []byte{0x86, 0x84, 0x8a, 0x84, 0x8e, 0x84, 0x92, 0x84}, + false, }, { "lpcm", @@ -491,10 +655,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 2, }}, }}, - &unit.LPCM{ - Samples: []byte{1, 2, 3, 4}, + []unit.Unit{ + &unit.LPCM{ + Samples: []byte{1, 2, 3, 4}, + }, }, []byte{1, 2, 3, 4}, + false, }, } { t.Run(ca.name, func(t *testing.T) { @@ -504,8 +671,9 @@ func TestServerRead(t *testing.T) { 512, 1460, desc, - reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}), + reflect.TypeOf(ca.unit[0]) != reflect.TypeOf(&unit.Generic{}), test.NilLogger, + ca.gopCache, ) require.NoError(t, err) @@ -570,16 +738,26 @@ func TestServerRead(t *testing.T) { go func() { defer close(writerDone) - str.WaitRunningReader() - - r := reflect.New(reflect.TypeOf(ca.unit).Elem()) - r.Elem().Set(reflect.ValueOf(ca.unit).Elem()) + // When testing for gopCache, start pushing packets before the client connects + if !ca.gopCache { + str.WaitRunningReader() + } - if g, ok := r.Interface().(*unit.Generic); ok { - clone := *g.RTPPackets[0] - str.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0) - } else { - str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit)) + for i, u := range ca.unit { + r := reflect.New(reflect.TypeOf(u).Elem()) + r.Elem().Set(reflect.ValueOf(u).Elem()) + + // When testing for gopCache, wait until half-way before pushing the rest of segments. + if i == len(ca.unit)/2 && ca.gopCache { + str.WaitRunningReader() + } + + if g, ok := r.Interface().(*unit.Generic); ok { + clone := *g.RTPPackets[0] + str.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0) + } else { + str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit)) + } } }() diff --git a/internal/stream/stream.go b/internal/stream/stream.go index dfb0204eb30..7ac4a671f6e 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -38,6 +38,8 @@ type Stream struct { streamReaders map[Reader]*streamReader readerRunning chan struct{} + + CachedUnits []unit.Unit } // New allocates a Stream. @@ -47,6 +49,7 @@ func New( desc *description.Session, generateRTPPackets bool, decodeErrLogger logger.Writer, + gopCache bool, ) (*Stream, error) { s := &Stream{ writeQueueSize: writeQueueSize, @@ -61,7 +64,7 @@ func New( for _, media := range desc.Medias { var err error - s.streamMedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, decodeErrLogger) + s.streamMedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, decodeErrLogger, gopCache) if err != nil { return nil, err } @@ -184,9 +187,82 @@ func (s *Stream) StartReader(reader Reader) { sr.start() - for _, sm := range s.streamMedias { + for m, sm := range s.streamMedias { for _, sf := range sm.formats { sf.startReader(sr) + if m.Type == description.MediaTypeVideo { + cb := sf.runningReaders[sr] + if cb == nil { + continue + } + + framesWithAU := 0 + for _, u := range s.CachedUnits { + if !isEmptyAU(u) { + framesWithAU++ + } + } + if framesWithAU == 0 { + continue + } + + // The previous p-frames must be sent at a certain speed to avoid the video freezing. + // We must update the PTS of the p-frames to have them played back real quick, but not instantly. + // If we do not update the PTS, the client will pause by an amount equal to the time between the p-frames. + // This is an issue because we want to send the p-frames as fast as possible. + playbackFPS := int64(100) + msPerFrame := 1000 / playbackFPS + ticksPerMs := int64(90000 / 1000) + pts := s.CachedUnits[len(s.CachedUnits)-1].GetPTS() - ticksPerMs*int64(framesWithAU)*msPerFrame + start := time.Now() + for _, u := range s.CachedUnits { + if isEmptyAU(u) { + continue + } + pts += ticksPerMs * msPerFrame + start = start.Add(time.Millisecond * time.Duration(msPerFrame)) + + var clonedU unit.Unit + switch tunit := u.(type) { + case *unit.H264: + clonedU = &unit.H264{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{ + { + Header: rtp.Header{ + Timestamp: uint32(pts), + }, + }, + }, + PTS: pts, + }, + AU: tunit.AU, + } + case *unit.H265: + clonedU = &unit.H265{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{ + { + Header: rtp.Header{ + Timestamp: uint32(pts), + }, + }, + }, + PTS: pts, + }, + AU: tunit.AU, + } + } + until := start + sr.push(func() error { + size := unitSize(clonedU) + atomic.AddUint64(s.bytesSent, size) + err := cb(clonedU) + time.Sleep(time.Until(until)) + return err + }) + } + } } } diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index cea65f6bd3f..7b22a794a60 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -6,6 +6,8 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/formatprocessor" @@ -13,6 +15,10 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) +const ( + maxCachedGOPSize int = 512 +) + func unitSize(u unit.Unit) uint64 { n := uint64(0) for _, pkt := range u.GetRTPPackets() { @@ -21,6 +27,26 @@ func unitSize(u unit.Unit) uint64 { return n } +func isKeyFrame(u unit.Unit) bool { + switch tunit := u.(type) { + case *unit.H264: + return h264.IDRPresent(tunit.AU) + case *unit.H265: + return h265.IsRandomAccess(tunit.AU) + } + return false +} + +func isEmptyAU(u unit.Unit) bool { + switch tunit := u.(type) { + case *unit.H264: + return len(tunit.AU) == 0 + case *unit.H265: + return len(tunit.AU) == 0 + } + return true +} + type streamFormat struct { udpMaxPayloadSize int format format.Format @@ -30,6 +56,7 @@ type streamFormat struct { proc formatprocessor.Processor pausedReaders map[*streamReader]ReadFunc runningReaders map[*streamReader]ReadFunc + gopCache bool } func (sf *streamFormat) initialize() error { @@ -78,7 +105,7 @@ func (sf *streamFormat) writeRTPPacket( ntp time.Time, pts int64, ) { - hasNonRTSPReaders := len(sf.pausedReaders) > 0 || len(sf.runningReaders) > 0 + hasNonRTSPReaders := len(sf.pausedReaders) > 0 || len(sf.runningReaders) > 0 || sf.gopCache u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders) if err != nil { @@ -94,6 +121,32 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni atomic.AddUint64(s.bytesReceived, size) + if sf.gopCache && medi.Type == description.MediaTypeVideo { + if isKeyFrame(u) { + if s.CachedUnits == nil { + // Initialize the cache and enable caching + s.CachedUnits = make([]unit.Unit, 0, maxCachedGOPSize) + } else { + // Keep the last packets that were used to generate the key frame. + // This is to send a full key frame in the RTSP stream. + i := len(s.CachedUnits) + for ; i > 0; i-- { + if !isEmptyAU(s.CachedUnits[i-1]) { + break + } + } + s.CachedUnits = s.CachedUnits[i:] + } + } + if s.CachedUnits != nil { + s.CachedUnits = append(s.CachedUnits, u) + } + l := len(s.CachedUnits) + if l > maxCachedGOPSize { + s.CachedUnits = s.CachedUnits[l-maxCachedGOPSize:] + } + } + if s.rtspStream != nil { for _, pkt := range u.GetRTPPackets() { s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index f406968384e..cc65664f658 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -15,6 +15,7 @@ func newStreamMedia(udpMaxPayloadSize int, medi *description.Media, generateRTPPackets bool, decodeErrLogger logger.Writer, + gopCache bool, ) (*streamMedia, error) { sm := &streamMedia{ formats: make(map[format.Format]*streamFormat), @@ -26,6 +27,7 @@ func newStreamMedia(udpMaxPayloadSize int, format: forma, generateRTPPackets: generateRTPPackets, decodeErrLogger: decodeErrLogger, + gopCache: gopCache, } err := sf.initialize() if err != nil { diff --git a/internal/test/medias.go b/internal/test/medias.go index 1fa1d79f9c5..ca76c2a6b88 100644 --- a/internal/test/medias.go +++ b/internal/test/medias.go @@ -8,6 +8,9 @@ import ( // MediaH264 is a dummy H264 media. var MediaH264 = UniqueMediaH264() +// MediaH265 is a dummy H265 media. +var MediaH265 = UniqueMediaH265() + // MediaMPEG4Audio is a dummy MPEG-4 audio media. var MediaMPEG4Audio = UniqueMediaMPEG4Audio() @@ -19,6 +22,14 @@ func UniqueMediaH264() *description.Media { } } +// UniqueMediaH265 is a dummy H265 media. +func UniqueMediaH265() *description.Media { + return &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{FormatH265}, + } +} + // UniqueMediaMPEG4Audio is a dummy MPEG-4 audio media. func UniqueMediaMPEG4Audio() *description.Media { return &description.Media{ diff --git a/internal/test/source_tester.go b/internal/test/source_tester.go index 03d59a4b424..3b2c024dfc3 100644 --- a/internal/test/source_tester.go +++ b/internal/test/source_tester.go @@ -70,6 +70,7 @@ func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathS req.Desc, req.GenerateRTPPackets, t, + false, ) t.reader = NilLogger diff --git a/mediamtx.yml b/mediamtx.yml index 22e35ddccaa..8093d61697a 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -23,6 +23,9 @@ writeQueueSize: 512 # Maximum size of outgoing UDP packets. # This can be decreased to avoid fragmentation on networks with a low UDP MTU. udpMaxPayloadSize: 1472 +# Enable GOP cache to improve initial playback experience for new clients. +# Note: will increase memory usage. +gopCache: false # Command to run when a client connects to the server. # This is terminated with SIGINT when a client disconnects from the server. From 4b104c1c360c124db4d0cb4ec0f2f0fca072328a Mon Sep 17 00:00:00 2001 From: Jean-Philippe Bergeron Date: Thu, 23 Jan 2025 13:52:28 -0500 Subject: [PATCH 2/4] Update stream.go --- internal/stream/stream.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 7ac4a671f6e..9ec6e65b4e3 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -210,16 +210,18 @@ func (s *Stream) StartReader(reader Reader) { // We must update the PTS of the p-frames to have them played back real quick, but not instantly. // If we do not update the PTS, the client will pause by an amount equal to the time between the p-frames. // This is an issue because we want to send the p-frames as fast as possible. - playbackFPS := int64(100) + playbackFPS := 100 msPerFrame := 1000 / playbackFPS - ticksPerMs := int64(90000 / 1000) - pts := s.CachedUnits[len(s.CachedUnits)-1].GetPTS() - ticksPerMs*int64(framesWithAU)*msPerFrame + ticksPerMs := 90000 / 1000 + lastTimestamp := s.CachedUnits[len(s.CachedUnits)-1].GetRTPPackets()[0].Timestamp + lastPts := s.CachedUnits[len(s.CachedUnits)-1].GetPTS() + delta := -ticksPerMs * framesWithAU * msPerFrame start := time.Now() for _, u := range s.CachedUnits { if isEmptyAU(u) { continue } - pts += ticksPerMs * msPerFrame + delta += ticksPerMs * msPerFrame start = start.Add(time.Millisecond * time.Duration(msPerFrame)) var clonedU unit.Unit @@ -230,11 +232,11 @@ func (s *Stream) StartReader(reader Reader) { RTPPackets: []*rtp.Packet{ { Header: rtp.Header{ - Timestamp: uint32(pts), + Timestamp: lastTimestamp + uint32(delta), }, }, }, - PTS: pts, + PTS: lastPts + int64(delta), }, AU: tunit.AU, } @@ -244,11 +246,11 @@ func (s *Stream) StartReader(reader Reader) { RTPPackets: []*rtp.Packet{ { Header: rtp.Header{ - Timestamp: uint32(pts), + Timestamp: lastTimestamp + uint32(delta), }, }, }, - PTS: pts, + PTS: lastPts + int64(delta), }, AU: tunit.AU, } From 64801a166bf30429783e0071d3b2a89634600cda Mon Sep 17 00:00:00 2001 From: Jean-Philippe Bergeron Date: Wed, 29 Jan 2025 14:49:46 -0500 Subject: [PATCH 3/4] Restart on GOP setting change --- internal/core/core.go | 1 + internal/stream/stream_format.go | 1 + 2 files changed, 2 insertions(+) diff --git a/internal/core/core.go b/internal/core/core.go index ef124c53df4..7369af5c007 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -702,6 +702,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || + newConf.GopCache != p.conf.GopCache || closeMetrics || closeAuthManager || closeLogger diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 7b22a794a60..81f5c0de3d3 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -144,6 +144,7 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni l := len(s.CachedUnits) if l > maxCachedGOPSize { s.CachedUnits = s.CachedUnits[l-maxCachedGOPSize:] + sf.decodeErrLogger.Log(logger.Warn, "GOP cache is full, dropping packets") } } From 61a8614df0dd1fc523d3f767febe25b25340be11 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Bergeron Date: Tue, 4 Feb 2025 14:04:14 -0500 Subject: [PATCH 4/4] Update session.go --- internal/servers/rtsp/session.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 8fa669c99d3..23bfcad916a 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -271,19 +271,22 @@ func (s *session) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, e s.transport = s.rsession.SetuppedTransport() s.mutex.Unlock() - for _, medi := range s.stream.Desc().Medias { - if medi.Type == description.MediaTypeVideo { - for _, u := range s.stream.CachedUnits { - for _, pkt := range u.GetRTPPackets() { - err := s.rsession.WritePacketRTP(medi, pkt) - if err != nil { - break + if len(s.stream.CachedUnits) > 0 { + lastTimestamp := s.stream.CachedUnits[len(s.stream.CachedUnits)-1].GetRTPPackets()[0].Timestamp + for _, medi := range s.stream.Desc().Medias { + if medi.Type == description.MediaTypeVideo { + for _, u := range s.stream.CachedUnits { + for _, pkt := range u.GetRTPPackets() { + pkt.Timestamp = lastTimestamp + err := s.rsession.WritePacketRTP(medi, pkt) + if err != nil { + break + } } } } } } - } return &base.Response{ StatusCode: base.StatusOK,