Skip to content

Commit

Permalink
Merge branch 'master' into rafal/debug-sigsegv
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Jan 13, 2025
2 parents 2df074b + 4d09ba7 commit 3a2885c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
13 changes: 13 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type (
mAIResultSaveFailed *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure
mAILiveAttempts *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
Expand Down Expand Up @@ -377,6 +378,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")
census.mAILiveAttempts = stats.Int64("ai_live_attempts", "AI Live stream attempted", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -991,6 +993,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTagsWithOrchInfo,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
{
Name: "ai_live_attempt",
Measure: census.mAILiveAttempts,
Description: "AI Live stream attempted",
TagKeys: baseTags,
Aggregation: view.Count(),
},
}

// Register the views
Expand Down Expand Up @@ -1992,6 +2001,10 @@ func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) {
}
}

func AILiveVideoAttempt() {
stats.Record(census.ctx, census.mAILiveAttempts.M(1))
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
3 changes: 3 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func aiHttpHandle[I any](h *lphttp, decoderFunc func(*I, *http.Request) error) h

func (h *lphttp) StartLiveVideoToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if monitor.Enabled {
monitor.AILiveVideoAttempt()
}

remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)
Expand Down
5 changes: 4 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,12 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params, func() {
delayMs := time.Since(startTime).Milliseconds()
if monitor.Enabled {
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo)
monitor.AIFirstSegmentDelay(delayMs, sess.OrchestratorInfo)
}
clog.V(common.VERBOSE).Infof(ctx, "First Segment delay=%dms streamID=%s", delayMs, params.liveParams.streamID)

})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
Expand Down

0 comments on commit 3a2885c

Please sign in to comment.