diff --git a/monitor/census.go b/monitor/census.go index e5587c0e3..5c8fd9bdc 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -206,6 +206,7 @@ type ( mAIResultSaveFailed *stats.Int64Measure mAICurrentLivePipelines *stats.Int64Measure mAIFirstSegmentDelay *stats.Int64Measure + mAILiveAttempts *stats.Int64Measure lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo @@ -377,6 +378,7 @@ func InitCensus(nodeType NodeType, version string) { census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot") census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot") census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms") + census.mAILiveAttempts = stats.Int64("ai_live_attempts", "AI Live stream attempted", "tot") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -991,6 +993,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: baseTagsWithOrchInfo, Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000), }, + { + Name: "ai_live_attempt", + Measure: census.mAILiveAttempts, + Description: "AI Live stream attempted", + TagKeys: baseTags, + Aggregation: view.Count(), + }, } // Register the views @@ -1992,6 +2001,10 @@ func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) { } } +func AILiveVideoAttempt() { + stats.Record(census.ctx, census.mAILiveAttempts.M(1)) +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_http.go b/server/ai_http.go index f738f3df0..6715cc4c6 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -95,6 +95,9 @@ func aiHttpHandle[I any](h *lphttp, decoderFunc func(*I, *http.Request) error) h func (h *lphttp) StartLiveVideoToVideo() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if monitor.Enabled { + monitor.AILiveVideoAttempt() + } remoteAddr := getRemoteAddr(r) ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr) diff --git a/server/ai_process.go b/server/ai_process.go index 384cadf20..bb2839de8 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -1079,9 +1079,12 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A startControlPublish(control, params) startTricklePublish(ctx, pub, params, sess) startTrickleSubscribe(ctx, sub, params, func() { + delayMs := time.Since(startTime).Milliseconds() if monitor.Enabled { - monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo) + monitor.AIFirstSegmentDelay(delayMs, sess.OrchestratorInfo) } + clog.V(common.VERBOSE).Infof(ctx, "First Segment delay=%dms streamID=%s", delayMs, params.liveParams.streamID) + }) startEventsSubscribe(ctx, events, params, sess) return resp, nil