Skip to content

Commit

Permalink
Merge pull request #3 from TilsonJoji/main
Browse files Browse the repository at this point in the history
Support display of available video tracks as drop-down and to swap among them
  • Loading branch information
englishm authored Nov 20, 2024
2 parents 6b99350 + ac5dac5 commit f944367
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 20 deletions.
50 changes: 42 additions & 8 deletions lib/playback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ export class Player {

#connection: Connection
#catalog: Catalog.Root
#tracknum: number

// Running is a promise that resolves when the player is closed.
// #close is called with no error, while #abort is called with an error.
#running: Promise<void>
#close!: () => void
#abort!: (err: Error) => void

private constructor(connection: Connection, catalog: Catalog.Root, backend: Backend) {
private constructor(connection: Connection, catalog: Catalog.Root, backend: Backend, tracknum: number) {
this.#connection = connection
this.#catalog = catalog
this.#backend = backend
this.#tracknum = tracknum

const abort = new Promise<void>((resolve, reject) => {
this.#close = resolve
Expand All @@ -49,7 +51,7 @@ export class Player {
this.#running = Promise.race([this.#run(), abort]).catch(this.#close)
}

static async create(config: PlayerConfig): Promise<Player> {
static async create(config: PlayerConfig, tracknum: number): Promise<Player> {
const client = new Client({ url: config.url, fingerprint: config.fingerprint, role: "subscriber" })
const connection = await client.connect()

Expand All @@ -59,18 +61,20 @@ export class Player {
const canvas = config.canvas.transferControlToOffscreen()
const backend = new Backend({ canvas, catalog })

return new Player(connection, catalog, backend)
return new Player(connection, catalog, backend, tracknum)
}

async #run() {
const inits = new Set<[string, string]>()
const tracks = new Array<Catalog.Track>()

for (const track of this.#catalog.tracks) {
if (!track.namespace) throw new Error("track has no namespace")
if (track.initTrack) inits.add([track.namespace, track.initTrack])
tracks.push(track)
}
this.#catalog.tracks.forEach((track, index) => {
if (index == this.#tracknum || Catalog.isAudioTrack(track)) {
if (!track.namespace) throw new Error("track has no namespace")
if (track.initTrack) inits.add([track.namespace, track.initTrack])
tracks.push(track)
}
})

// Call #runInit on each unique init track
// TODO do this in parallel with #runTrack to remove a round trip
Expand Down Expand Up @@ -140,6 +144,36 @@ export class Player {
return this.#catalog
}

getCurrentTrack() {
if (this.#tracknum >= 0 && this.#tracknum < this.#catalog.tracks.length) {
return this.#catalog.tracks[this.#tracknum]
} else {
console.warn("Invalid track number:", this.#tracknum)
return null
}
}

getVideoTracks() {
return this.#catalog.tracks.filter(Catalog.isVideoTrack).map((track) => track.name)
}

async switchTrack(trackname: string) {
const currentTrack = this.getCurrentTrack()
if (currentTrack) {
console.log(`Unsubscribing from track: ${currentTrack.name} and Subscribing to track: ${trackname}`)
await this.unsubscribeFromTrack(currentTrack.name)
} else {
console.log(`Subscribing to track: ${trackname}`)
}
this.#tracknum = this.#catalog.tracks.findIndex((track) => track.name === trackname)
const tracksToStream = this.#catalog.tracks.filter((track) => track.name === trackname)
await Promise.all(tracksToStream.map((track) => this.#runTrack(track)))
}

async unsubscribeFromTrack(trackname: string) {
await this.#connection.unsubscribe(trackname)
}

#onMessage(msg: Message.FromWorker) {
if (msg.timeline) {
//this.#timeline.update(msg.timeline)
Expand Down
79 changes: 69 additions & 10 deletions lib/playback/worker/video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,32 @@ import { Frame, Component } from "./timeline"
import * as MP4 from "../../media/mp4"
import * as Message from "./message"

interface DecoderConfig {
codec: string
description?: ArrayBuffer | Uint8Array | DataView
codedWidth?: number
codedHeight?: number
displayAspectWidth?: number
displayAspectHeight?: number
colorSpace?: {
primaries?: "bt709" | "bt470bg" | "smpte170m"
transfer?: "bt709" | "smpte170m" | "iec61966-2-1"
matrix?: "rgb" | "bt709" | "bt470bg" | "smpte170m"
}
hardwareAcceleration?: "no-preference" | "prefer-hardware" | "prefer-software"
optimizeForLatency?: boolean
}

export class Renderer {
#canvas: OffscreenCanvas
#timeline: Component

#decoder!: VideoDecoder
#queue: TransformStream<Frame, VideoFrame>

#decoderConfig?: DecoderConfig
#waitingForKeyframe: boolean = true

constructor(config: Message.ConfigVideo, timeline: Component) {
this.#canvas = config.canvas
this.#timeline = timeline
Expand Down Expand Up @@ -50,10 +69,30 @@ export class Renderer {
}

#transform(frame: Frame) {
if (this.#decoder.state === "closed") {
console.warn("Decoder is closed. Skipping frame.")
return
}

const { sample, track } = frame

// Reset the decoder on video track change
if (this.#decoderConfig && this.#decoder.state == "configured") {
if (MP4.isVideoTrack(track)) {
const configMismatch =
this.#decoderConfig.codec !== track.codec ||
this.#decoderConfig.codedWidth !== track.video.width ||
this.#decoderConfig.codedHeight !== track.video.height

if (configMismatch) {
this.#decoder.reset()
this.#decoderConfig = undefined
}
}
}

// Configure the decoder with the first frame
if (this.#decoder.state !== "configured") {
const { sample, track } = frame

const desc = sample.description
const box = desc.avcC ?? desc.hvcC ?? desc.vpcC ?? desc.av1C
if (!box) throw new Error(`unsupported codec: ${track.codec}`)
Expand All @@ -64,21 +103,41 @@ export class Renderer {

if (!MP4.isVideoTrack(track)) throw new Error("expected video track")

this.#decoder.configure({
this.#decoderConfig = {
codec: track.codec,
codedHeight: track.video.height,
codedWidth: track.video.width,
description,
// optimizeForLatency: true
})
}

this.#decoder.configure(this.#decoderConfig)
if (!frame.sample.is_sync) {
this.#waitingForKeyframe = true
} else {
this.#waitingForKeyframe = false
}
}

const chunk = new EncodedVideoChunk({
type: frame.sample.is_sync ? "key" : "delta",
data: frame.sample.data,
timestamp: frame.sample.dts / frame.track.timescale,
})
//At the start of decode , VideoDecoder seems to expect a key frame after configure() or flush()
if (this.#decoder.state == "configured") {
if (this.#waitingForKeyframe && !frame.sample.is_sync) {
console.warn("Skipping non-keyframe until a keyframe is found.")
return
}

// On arrival of a keyframe, allow decoding and stop waiting for a keyframe.
if (frame.sample.is_sync) {
this.#waitingForKeyframe = false
}

const chunk = new EncodedVideoChunk({
type: frame.sample.is_sync ? "key" : "delta",
data: frame.sample.data,
timestamp: frame.sample.dts / frame.track.timescale,
})

this.#decoder.decode(chunk)
this.#decoder.decode(chunk)
}
}
}
4 changes: 4 additions & 0 deletions lib/transport/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export class Connection {
return this.#subscriber.subscribe(namespace, track)
}

unsubscribe(track: string) {
return this.#subscriber.unsubscribe(track)
}

subscribed() {
return this.#publisher.subscribed()
}
Expand Down
22 changes: 22 additions & 0 deletions lib/transport/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export class Subscriber {
#subscribe = new Map<bigint, SubscribeSend>()
#subscribeNext = 0n

#trackToIDMap = new Map<string, bigint>()

constructor(control: Control.Stream, objects: Objects) {
this.#control = control
this.#objects = objects
Expand Down Expand Up @@ -66,6 +68,8 @@ export class Subscriber {
const subscribe = new SubscribeSend(this.#control, id, namespace, track)
this.#subscribe.set(id, subscribe)

this.#trackToIDMap.set(track, id)

await this.#control.send({
kind: Control.Msg.Subscribe,
id,
Expand All @@ -82,6 +86,24 @@ export class Subscriber {
return subscribe
}

async unsubscribe(track: string) {
if (this.#trackToIDMap.has(track)) {
const trackID = this.#trackToIDMap.get(track)
if (trackID === undefined) {
console.warn(`Exception track ${track} not found in trackToIDMap.`)
return
}
try {
await this.#control.send({ kind: Control.Msg.Unsubscribe, id: trackID })
this.#trackToIDMap.delete(track)
} catch (error) {
console.error(`Failed to unsubscribe from track ${track}:`, error)
}
} else {
console.warn(`During unsubscribe request initiation attempt track ${track} not found in trackToIDMap.`)
}
}

recvSubscribeOk(msg: Control.SubscribeOk) {
const subscribe = this.#subscribe.get(msg.id)
if (!subscribe) {
Expand Down
57 changes: 55 additions & 2 deletions web/src/components/watch.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export default function Watch(props: { name: string }) {
const urlSearchParams = new URLSearchParams(window.location.search)
const params = Object.fromEntries(urlSearchParams.entries())
const server = params.server ?? import.meta.env.PUBLIC_RELAY_HOST
let tracknum: number = params.track ?? 0

const [error, setError] = createSignal<Error | undefined>()

Expand All @@ -18,6 +19,9 @@ export default function Watch(props: { name: string }) {
const [usePlayer, setPlayer] = createSignal<Player | undefined>()
const [showCatalog, setShowCatalog] = createSignal(false)

const [options, setOptions] = createSignal([])
const [selectedOption, setSelectedOption] = createSignal<string | undefined>()

createEffect(() => {
const namespace = props.name
const url = `https://${server}`
Expand All @@ -26,7 +30,7 @@ export default function Watch(props: { name: string }) {
// TODO remove this when WebTransport correctly supports self-signed certificates
const fingerprint = server.startsWith("localhost") ? `https://${server}/fingerprint` : undefined

Player.create({ url, fingerprint, canvas, namespace }).then(setPlayer).catch(setError)
Player.create({ url, fingerprint, canvas, namespace }, tracknum).then(setPlayer).catch(setError)
})

createEffect(() => {
Expand All @@ -50,13 +54,62 @@ export default function Watch(props: { name: string }) {
return JSON.stringify(catalog, null, 2)
})

function updateURLWithTracknumber(trackIndex) {
const url = new URL(window.location.href)
url.searchParams.set('track', trackIndex.toString())
window.history.replaceState({}, '', decodeURIComponent(url.toString()))
}

createEffect(async () => {
const player = usePlayer()
if (!player) return

const videotracks = await player.getVideoTracks()
setOptions(videotracks)

if (tracknum >= 0 && tracknum < videotracks.length) {
const selectedTrack = videotracks[tracknum]
setSelectedOption(selectedTrack)
updateURLWithTracknumber(tracknum)
}

})

const handleOptionSelectChange = (event) => {
const selectedTrack = event.target.value
setSelectedOption(selectedTrack)
usePlayer()?.switchTrack(selectedTrack)

const videotracks = options()
const trackIndex = videotracks.indexOf(selectedTrack)
tracknum = trackIndex

if (trackIndex !== -1) {
updateURLWithTracknumber(trackIndex)
}

}


// NOTE: The canvas automatically has width/height set to the decoded video size.
// TODO shrink it if needed via CSS
return (
<>
<Fail error={error()} />
<canvas ref={canvas} onClick={play} class="aspect-video w-full rounded-lg" />

<div class="mt-2 flex">
<select value={selectedOption() ?? ''} onChange={handleOptionSelectChange}>
{options()?.length ? (
options().map((option, index) => (
<option key={index} value={option}>
{option}
</option>
))
) : (
<option disabled>No options available</option>
)}
</select>
</div>
<h3>Debug</h3>
<Show when={catalog()}>
<div class="mt-2 flex">
Expand Down

0 comments on commit f944367

Please sign in to comment.