Skip to content

Commit

Permalink
add runOnRecordSegmentCreate hook (#2451)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Oct 14, 2023
1 parent 9de6423 commit 2ae211d
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 56 deletions.
99 changes: 54 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1321,71 +1321,80 @@ paths:
`runOnReady` allows to run a command when a stream is ready to be read:

```yml
paths:
mypath:
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
# Restart the command if it exits.
runOnReadyRestart: no
pathDefaults:
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
# Restart the command if it exits.
runOnReadyRestart: no
```

`runOnNotReady` allows to run a command when a stream is not available anymore:

```yml
paths:
mypath:
# Environment variables are the same of runOnReady.
runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
pathDefaults:
# Environment variables are the same of runOnReady.
runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
```

`runOnRead` allows to run a command when a client starts reading:

```yml
paths:
mypath:
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnRead: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
# Restart the command if it exits.
runOnReadRestart: no
pathDefaults:
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnRead: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
# Restart the command if it exits.
runOnReadRestart: no
```

`runOnUnread` allows to run a command when a client stops reading:

```yml
paths:
mypath:
# Command to run when a client stops reading.
# Environment variables are the same of runOnRead.
runOnUnread: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
pathDefaults:
# Command to run when a client stops reading.
# Environment variables are the same of runOnRead.
runOnUnread: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
```

`runOnRecordSegmentCreate` allows to run a command when a recording segment is created:

```yml
pathDefaults:
# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentCreate: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
```

`runOnRecordSegmentComplete` allows to run a command when a recording segment is complete:

```yml
paths:
mypath:
# Command to run when a recording segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
pathDefaults:
# Command to run when a recording segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
```

### API
Expand Down
2 changes: 2 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ components:
type: boolean
runOnUnread:
type: string
runOnRecordSegmentCreate:
type: string
runOnRecordSegmentComplete:
type: string

Expand Down
1 change: 1 addition & 0 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type Path struct {
RunOnRead string `json:"runOnRead"`
RunOnReadRestart bool `json:"runOnReadRestart"`
RunOnUnread string `json:"runOnUnread"`
RunOnRecordSegmentCreate string `json:"runOnRecordSegmentCreate"`
RunOnRecordSegmentComplete string `json:"runOnRecordSegmentComplete"`
}

Expand Down
14 changes: 14 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,20 @@ func (pa *path) startRecording() {
time.Duration(pa.conf.RecordSegmentDuration),
pa.name,
pa.stream,
func(segmentPath string) {
if pa.conf.RunOnRecordSegmentCreate != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath

pa.Log(logger.Info, "runOnRecordSegmentCreate command launched")
externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnRecordSegmentCreate,
false,
env,
nil)
}

Check warning on line 972 in internal/core/path.go

View check run for this annotation

Codecov / codecov/patch

internal/core/path.go#L962-L972

Added lines #L962 - L972 were not covered by tests
},
func(segmentPath string) {
if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.externalCmdEnv()
Expand Down
16 changes: 9 additions & 7 deletions internal/record/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/bluenviron/mediamtx/internal/unit"
)

// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)

func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
timeScale64 := uint64(timeScale)
secs := v / time.Second
Expand Down Expand Up @@ -111,7 +114,8 @@ type Agent struct {
partDuration time.Duration
segmentDuration time.Duration
stream *stream.Stream
onSegmentComplete func(string)
onSegmentCreate OnSegmentFunc
onSegmentComplete OnSegmentFunc
parent logger.Writer

ctx context.Context
Expand All @@ -125,31 +129,29 @@ type Agent struct {
done chan struct{}
}

// NewAgent allocates a nAgent.
// NewAgent allocates an Agent.
func NewAgent(
writeQueueSize int,
recordPath string,
partDuration time.Duration,
segmentDuration time.Duration,
pathName string,
stream *stream.Stream,
onSegmentComplete func(string),
onSegmentCreate OnSegmentFunc,
onSegmentComplete OnSegmentFunc,
parent logger.Writer,
) *Agent {
recordPath = strings.ReplaceAll(recordPath, "%path", pathName)
recordPath += ".mp4"

if onSegmentComplete == nil {
onSegmentComplete = func(_ string) {}
}

ctx, ctxCancel := context.WithCancel(context.Background())

r := &Agent{
path: recordPath,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
onSegmentCreate: onSegmentCreate,
onSegmentComplete: onSegmentComplete,
parent: parent,
ctx: ctx,
Expand Down
11 changes: 9 additions & 2 deletions internal/record/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestAgent(t *testing.T) {

recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")

segCreated := make(chan struct{}, 2)
segDone := make(chan struct{}, 2)

a := NewAgent(
Expand All @@ -86,6 +87,9 @@ func TestAgent(t *testing.T) {
1*time.Second,
"mypath",
stream,
func(fpath string) {
segCreated <- struct{}{}
},
func(fpath string) {
segDone <- struct{}{}
},
Expand Down Expand Up @@ -145,8 +149,11 @@ func TestAgent(t *testing.T) {
})
}

<-segDone
<-segDone
for i := 0; i < 2; i++ {
<-segCreated
<-segDone
}

a.Close()

_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"))
Expand Down
4 changes: 3 additions & 1 deletion internal/record/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newPart(
func (p *part) close() error {
if p.s.f == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path)
p.s.r.Log(logger.Debug, "opening segment %s", p.s.fpath)
p.s.r.Log(logger.Debug, "creating segment %s", p.s.fpath)

err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755)
if err != nil {
Expand All @@ -72,6 +72,8 @@ func (p *part) close() error {
return err
}

p.s.r.onSegmentCreate(p.s.fpath)

err = writeInit(f, p.s.r.tracks)
if err != nil {
f.Close()
Expand Down
12 changes: 11 additions & 1 deletion mediamtx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,15 @@ pathDefaults:
# Environment variables are the same of runOnRead.
runOnUnread:

# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentCreate:

# Command to run when a recording segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
Expand All @@ -531,7 +540,8 @@ pathDefaults:
# Path settings

# Settings in "paths" are applied to specific paths, and the map key
# is the name of the path. Any setting in "pathDefaults" can be overridden.
# is the name of the path.
# Any setting in "pathDefaults" can be overridden here.
# It's possible to use regular expressions by using a tilde as prefix,
# for example "~^(test1|test2)$" will match both "test1" and "test2",
# for example "~^prefix" will match all paths that start with "prefix".
Expand Down

0 comments on commit 2ae211d

Please sign in to comment.