Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache H264/H265 GOPs in order to allow readers to decode frames immediately #4189

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ components:
type: boolean
runOnDisconnect:
type: string
gopCache:
type: boolean

# Authentication
authMethod:
Expand Down
1 change: 1 addition & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type Conf struct {
RunOnConnect string `json:"runOnConnect"`
RunOnConnectRestart bool `json:"runOnConnectRestart"`
RunOnDisconnect string `json:"runOnDisconnect"`
GopCache bool `json:"gopCache"`

// Authentication
AuthMethod AuthMethod `json:"authMethod"`
Expand Down
2 changes: 2 additions & 0 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func (p *Core) createResources(initial bool) error {
pathConfs: p.conf.Paths,
externalCmdPool: p.externalCmdPool,
parent: p,
gopCache: p.conf.GopCache,
}
p.pathManager.initialize()

Expand Down Expand Up @@ -701,6 +702,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize ||
newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize ||
newConf.GopCache != p.conf.GopCache ||
closeMetrics ||
closeAuthManager ||
closeLogger
Expand Down
2 changes: 2 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type path struct {
wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool
parent pathParent
gopCache bool

ctx context.Context
ctxCancel func()
Expand Down Expand Up @@ -716,6 +717,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
desc,
allocateEncoder,
logger.NewLimitedLogger(pa.source),
pa.gopCache,
)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions internal/core/path_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type pathManager struct {
pathConfs map[string]*conf.Path
externalCmdPool *externalcmd.Pool
parent pathManagerParent
gopCache bool

ctx context.Context
ctxCancel func()
Expand Down Expand Up @@ -352,6 +353,7 @@ func (pm *pathManager) createPath(
wg: &pm.wg,
externalCmdPool: pm.externalCmdPool,
parent: pm,
gopCache: pm.gopCache,
}
pa.initialize()

Expand Down
2 changes: 2 additions & 0 deletions internal/protocols/hls/from_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) {
}}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -56,6 +57,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions internal/protocols/mpegts/from_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) {
}}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -49,6 +50,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions internal/protocols/rtmp/from_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) {
}}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -56,6 +57,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
3 changes: 3 additions & 0 deletions internal/protocols/webrtc/from_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) {
}}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -49,6 +50,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
}},
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -84,6 +86,7 @@ func TestFromStream(t *testing.T) {
},
false,
test.NilLogger,
false,
)
require.NoError(t, err)
defer stream.Close()
Expand Down
4 changes: 4 additions & 0 deletions internal/recorder/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func TestRecorder(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)
defer stream.Close()
Expand Down Expand Up @@ -343,6 +344,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)
defer stream.Close()
Expand Down Expand Up @@ -430,6 +432,7 @@ func TestRecorderSkipTracksPartial(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)
defer stream.Close()
Expand Down Expand Up @@ -490,6 +493,7 @@ func TestRecorderSkipTracksFull(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)
defer stream.Close()
Expand Down
3 changes: 3 additions & 0 deletions internal/servers/hls/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestServerRead(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -259,6 +260,7 @@ func TestServerRead(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -363,6 +365,7 @@ func TestDirectory(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions internal/servers/rtmp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre
req.Desc,
true,
test.NilLogger,
false,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -200,6 +201,7 @@ func TestServerRead(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
2 changes: 2 additions & 0 deletions internal/servers/rtsp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre
req.Desc,
true,
test.NilLogger,
false,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -169,6 +170,7 @@ func TestServerRead(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
19 changes: 18 additions & 1 deletion internal/servers/rtsp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bluenviron/gortsplib/v4"
rtspauth "github.com/bluenviron/gortsplib/v4/pkg/auth"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/google/uuid"
"github.com/pion/rtp"

Expand Down Expand Up @@ -269,7 +270,23 @@ func (s *session) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, e
s.state = gortsplib.ServerSessionStatePlay
s.transport = s.rsession.SetuppedTransport()
s.mutex.Unlock()
}

if len(s.stream.CachedUnits) > 0 {
lastTimestamp := s.stream.CachedUnits[len(s.stream.CachedUnits)-1].GetRTPPackets()[0].Timestamp
for _, medi := range s.stream.Desc().Medias {
if medi.Type == description.MediaTypeVideo {
for _, u := range s.stream.CachedUnits {
for _, pkt := range u.GetRTPPackets() {
pkt.Timestamp = lastTimestamp
err := s.rsession.WritePacketRTP(medi, pkt)
if err != nil {
break
}
}
}
}
}
}

return &base.Response{
StatusCode: base.StatusOK,
Expand Down
2 changes: 2 additions & 0 deletions internal/servers/srt/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre
req.Desc,
true,
test.NilLogger,
false,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,6 +172,7 @@ func TestServerRead(t *testing.T) {
desc,
true,
test.NilLogger,
false,
)
require.NoError(t, err)

Expand Down
Loading