Skip to content

Commit

Permalink
Merge pull request #3341 from livepeer/pawel/eng-2342-add-stream-star…
Browse files Browse the repository at this point in the history
…tup-time-metric

[ENG-2342] ai_first_segment_delay metric added
  • Loading branch information
pwilczynskiclearcode authored Jan 8, 2025
2 parents 17832e7 + ef06672 commit 8d6e7ec
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
55 changes: 30 additions & 25 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type (
mAIResultUploadTime *stats.Float64Measure
mAIResultSaveFailed *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
Expand Down Expand Up @@ -375,6 +376,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultUploadTime = stats.Float64("ai_result_upload_time_seconds", "Upload (to Orchestrator) time", "sec")
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -982,6 +984,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kOrchestratorURI, census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "ai_first_segment_delay_ms",
Measure: census.mAIFirstSegmentDelay,
Description: "Delay of the first live AI segment being processed",
TagKeys: baseTagsWithOrchInfo,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
}

// Register the views
Expand Down Expand Up @@ -1034,20 +1043,21 @@ func manifestIDTag(ctx context.Context, others ...tag.Mutator) []tag.Mutator {
return others
}

func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator {
others = manifestIDTag(ctx, others...)

others = append(
others,
tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()),
tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String()),
)
func orchInfoTags(orchInfo *lpnet.OrchestratorInfo) []tag.Mutator {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}
tags := []tag.Mutator{tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
others = append(others, tag.Insert(census.kOrchestratorVersion, capabilities.Version))
tags = append(tags, tag.Insert(census.kOrchestratorVersion, capabilities.GetVersion()))
}
return tags
}

return others
func manifestIDTagAndOrchInfo(orchInfo *lpnet.OrchestratorInfo, ctx context.Context, others ...tag.Mutator) []tag.Mutator {
return append(manifestIDTag(ctx, others...), orchInfoTags(orchInfo)...)
}

func manifestIDTagStr(manifestID string, others ...tag.Mutator) []tag.Mutator {
Expand Down Expand Up @@ -1870,11 +1880,8 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(pipeline string, Mo
cen.lock.Lock()
defer cen.lock.Unlock()

tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(cen.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
tags := []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, Model)}
tags = append(tags, orchInfoTags(orchInfo)...)

if err := stats.RecordWithTags(cen.ctx, tags, cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
Expand All @@ -1895,16 +1902,8 @@ func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(pipeline string, Mo

// AIRequestError logs an error in a gateway AI job request.
func AIRequestError(code string, pipeline string, model string, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}
capabilities := orchInfo.GetCapabilities()
if capabilities != nil {
tags = append(tags, tag.Insert(census.kOrchestratorVersion, orchInfo.GetCapabilities().GetVersion()))
}
tags := []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, model)}
tags = append(tags, orchInfoTags(orchInfo)...)

if err := stats.RecordWithTags(census.ctx, tags, census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
Expand Down Expand Up @@ -1987,6 +1986,12 @@ func AIResultDownloaded(ctx context.Context, pipeline string, model string, down
}
}

func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) {
if err := stats.RecordWithTags(census.ctx, orchInfoTags(orchInfo), census.mAIFirstSegmentDelay.M(delayMs)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
9 changes: 8 additions & 1 deletion server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/trickle"

"github.com/livepeer/lpms/ffmpeg"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -123,7 +124,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
clog.Infof(ctx, "trickle pub")
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams) {
func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, onFistSegment func()) {
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
r, w, err := os.Pipe()
Expand All @@ -137,6 +138,8 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
// read segments from trickle subscription
go func() {
var err error
firstSegment := true

defer w.Close()
retries := 0
// we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length
Expand Down Expand Up @@ -181,6 +184,10 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
return
}
if firstSegment {
firstSegment = false
onFistSegment()
}
clog.V(8).Infof(ctx, "trickle subscribe read data completed seq=%d bytes=%s", seq, humanize.Bytes(uint64(n)))
}
}()
Expand Down
7 changes: 6 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess
const initPixelsToPay = 30 * 30 * 3200 * 1800 // 30 seconds, 30fps, 1800p

func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLiveVideoToVideoJSONRequestBody) (any, error) {
startTime := time.Now()
// Live Video should not reuse the existing session balance, because it could lead to not sending the init
// payment, which in turns may cause "Insufficient Balance" on the Orchestrator's side.
// It works differently than other AI Jobs, because Live Video is accounted by mid on the Orchestrator's side.
Expand Down Expand Up @@ -1077,7 +1078,11 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A

startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params)
startTrickleSubscribe(ctx, sub, params, func() {
if monitor.Enabled {
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo)
}
})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
}
Expand Down

0 comments on commit 8d6e7ec

Please sign in to comment.