From 4a72e36f05fc1dbcd06762f9277ae04d88593acd Mon Sep 17 00:00:00 2001 From: i-zolotarenko <86921321+i-zolotarenko@users.noreply.github.com> Date: Tue, 12 Dec 2023 17:58:24 +0200 Subject: [PATCH] Delegate control to process queue. (#313) * Reduce LoadProgress object type. * Create microtask for process queue. * Add event dispatcher class. * Create Request class. * Move Request file to separate file. * Change style of bittorrent-tracker declarations event handlers types. * Create P2PTrackerClient class to encapsulate peer connection logic. * Add TODO task. * Add requests error handling. * Fix types errors. * P2P announce segments on http requests state change. * Fix bugs. * Make broadcastAnnouncement method as function expression. * Refactor Request code. Create separate flows for each type of abort. * Remove event subscriptions to request instance. * Fix type errors. * Fix bugs. * Fix lint warnings. * Move P2P tracker client to separate file. * Fix more bugs. Rewrite loggers. * Remove unused code. * Use function declaration instead of expression. --------- Co-authored-by: Igor Zolotarenko --- p2p-media-loader-demo/src/App.tsx | 8 +- .../src/bandwidth-approximator.ts | 2 +- packages/p2p-media-loader-core/src/core.ts | 26 +- .../src/declarations.d.ts | 47 +-- packages/p2p-media-loader-core/src/errors.ts | 16 - .../src/event-dispatcher.ts | 31 ++ .../p2p-media-loader-core/src/http-loader.ts | 131 ++----- .../src/hybrid-loader.ts | 351 ++++++++---------- packages/p2p-media-loader-core/src/index.ts | 3 +- .../src/internal-types.d.ts | 32 +- .../p2p-media-loader-core/src/p2p/loader.ts | 191 ++-------- .../src/p2p/loaders-container.ts | 2 +- .../p2p-media-loader-core/src/p2p/peer.ts | 303 +++++---------- .../src/p2p/tracker-client.ts | 148 ++++++++ .../src/request-container.ts | 234 +++--------- packages/p2p-media-loader-core/src/request.ts | 341 +++++++++++++++++ .../src/segments-storage.ts | 101 ++--- packages/p2p-media-loader-core/src/types.d.ts | 14 +- .../p2p-media-loader-core/src/utils/logger.ts | 9 +- .../p2p-media-loader-core/src/utils/queue.ts | 92 +---- .../p2p-media-loader-core/src/utils/stream.ts | 108 ++++++ .../p2p-media-loader-core/src/utils/utils.ts | 56 ++- .../src/fragment-loader.ts | 26 +- packages/p2p-media-loader-shaka/src/engine.ts | 6 +- .../src/loading-handler.ts | 55 ++- .../src/segment-manager.ts | 2 +- 26 files changed, 1175 insertions(+), 1160 deletions(-) delete mode 100644 packages/p2p-media-loader-core/src/errors.ts create mode 100644 packages/p2p-media-loader-core/src/event-dispatcher.ts create mode 100644 packages/p2p-media-loader-core/src/p2p/tracker-client.ts create mode 100644 packages/p2p-media-loader-core/src/request.ts create mode 100644 packages/p2p-media-loader-core/src/utils/stream.ts diff --git a/p2p-media-loader-demo/src/App.tsx b/p2p-media-loader-demo/src/App.tsx index f6b1434c..18b3c620 100644 --- a/p2p-media-loader-demo/src/App.tsx +++ b/p2p-media-loader-demo/src/App.tsx @@ -443,13 +443,11 @@ function useLocalStorageItem( const loggers = [ "core:hybrid-loader-main", - "core:hybrid-loader-main-engine", "core:hybrid-loader-secondary", - "core:hybrid-loader-secondary-engine", - "core:p2p-loader", + "core:p2p-tracker-client", "core:peer", "core:p2p-loaders-container", - "core:requests-container-main", - "core:requests-container-secondary", + "core:request-main", + "core:request-secondary", "core:segment-memory-storage", ] as const; diff --git a/packages/p2p-media-loader-core/src/bandwidth-approximator.ts b/packages/p2p-media-loader-core/src/bandwidth-approximator.ts index 79216e53..2f60626f 100644 --- a/packages/p2p-media-loader-core/src/bandwidth-approximator.ts +++ b/packages/p2p-media-loader-core/src/bandwidth-approximator.ts @@ -1,4 +1,4 @@ -import { LoadProgress } from "./request-container"; +import { LoadProgress } from "./request"; export class BandwidthApproximator { private readonly loadings: LoadProgress[] = []; diff --git a/packages/p2p-media-loader-core/src/core.ts b/packages/p2p-media-loader-core/src/core.ts index 69700d8b..8ee51cb7 100644 --- a/packages/p2p-media-loader-core/src/core.ts +++ b/packages/p2p-media-loader-core/src/core.ts @@ -7,17 +7,17 @@ import { SegmentBase, CoreEventHandlers, } from "./types"; -import * as Utils from "./utils/utils"; +import * as StreamUtils from "./utils/stream"; import { LinkedMap } from "./linked-map"; import { BandwidthApproximator } from "./bandwidth-approximator"; -import { EngineCallbacks } from "./request-container"; +import { EngineCallbacks } from "./request"; import { SegmentsMemoryStorage } from "./segments-storage"; export class Core { private manifestResponseUrl?: string; private readonly streams = new Map>(); private readonly settings: Settings = { - simultaneousHttpDownloads: 2, + simultaneousHttpDownloads: 3, simultaneousP2PDownloads: 3, highDemandTimeWindow: 15, httpDownloadTimeWindow: 45, @@ -25,8 +25,9 @@ export class Core { cachedSegmentExpiration: 120 * 1000, cachedSegmentsCount: 50, webRtcMaxMessageSize: 64 * 1024 - 1, - p2pSegmentDownloadTimeout: 5000, - p2pLoaderDestroyTimeout: 30 * 1000, + p2pNotReceivingBytesTimeoutMs: 1000, + p2pLoaderDestroyTimeoutMs: 30 * 1000, + httpNotReceivingBytesTimeoutMs: 1000, }; private readonly bandwidthApproximator = new BandwidthApproximator(); private segmentStorage?: SegmentsMemoryStorage; @@ -40,7 +41,7 @@ export class Core { } hasSegment(segmentLocalId: string): boolean { - const segment = Utils.getSegmentFromStreamsMap( + const segment = StreamUtils.getSegmentFromStreamsMap( this.streams, segmentLocalId ); @@ -92,11 +93,9 @@ export class Core { void loader.loadSegment(segment, callbacks); } - abortSegmentLoading(segmentId: string): void { - const segment = this.identifySegment(segmentId); - const streamType = segment.stream.type; - if (streamType === "main") this.mainStreamLoader?.abortSegment(segment); - else this.secondaryStreamLoader?.abortSegment(segment); + abortSegmentLoading(segmentLocalId: string): void { + this.mainStreamLoader?.abortSegmentRequest(segmentLocalId); + this.secondaryStreamLoader?.abortSegmentRequest(segmentLocalId); } updatePlayback(position: number, rate: number): void { @@ -116,7 +115,10 @@ export class Core { throw new Error("Manifest response url is undefined"); } - const segment = Utils.getSegmentFromStreamsMap(this.streams, segmentId); + const segment = StreamUtils.getSegmentFromStreamsMap( + this.streams, + segmentId + ); if (!segment) { throw new Error(`Not found segment with id: ${segmentId}`); } diff --git a/packages/p2p-media-loader-core/src/declarations.d.ts b/packages/p2p-media-loader-core/src/declarations.d.ts index f85dacec..65768074 100644 --- a/packages/p2p-media-loader-core/src/declarations.d.ts +++ b/packages/p2p-media-loader-core/src/declarations.d.ts @@ -9,7 +9,10 @@ declare module "bittorrent-tracker" { getAnnounceOpts?: () => object; }); - on(event: E, handler: TrackerEventHandler): void; + on( + event: E, + handler: TrackerClientEvents[E] + ): void; start(): void; @@ -20,39 +23,25 @@ declare module "bittorrent-tracker" { destroy(): void; } - export type TrackerEvent = "update" | "peer" | "warning" | "error"; - - export type TrackerEventHandler = E extends "update" - ? (data: object) => void - : E extends "peer" - ? (peer: PeerConnection) => void - : E extends "warning" - ? (warning: unknown) => void - : E extends "error" - ? (error: unknown) => void - : never; - - type PeerEvent = "connect" | "data" | "close" | "error"; - - export type PeerConnectionEventHandler = - E extends "connect" - ? () => void - : E extends "data" - ? (data: ArrayBuffer) => void - : E extends "close" - ? () => void - : E extends "error" - ? (error: { code: string }) => void - : never; + export type TrackerClientEvents = { + update: (data: object) => void; + peer: (peer: PeerConnection) => void; + warning: (warning: unknown) => void; + error: (error: unknown) => void; + }; + + export type PeerEvents = { + connect: () => void; + data: (data: Uint8Array) => void; + close: () => void; + error: (error: { code: string }) => void; + }; export type PeerConnection = { id: string; initiator: boolean; _channel: RTCDataChannel; - on( - event: E, - handler: PeerConnectionEventHandler - ): void; + on(event: E, handler: PeerEvents[E]): void; send(data: string | ArrayBuffer): void; write(data: string | ArrayBuffer): void; destroy(): void; diff --git a/packages/p2p-media-loader-core/src/errors.ts b/packages/p2p-media-loader-core/src/errors.ts deleted file mode 100644 index 421932e9..00000000 --- a/packages/p2p-media-loader-core/src/errors.ts +++ /dev/null @@ -1,16 +0,0 @@ -export class FetchError extends Error { - public code: number; - public details: object; - - constructor(message: string, code: number, details: object) { - super(message); - this.code = code; - this.details = details; - } -} - -export class RequestAbortError extends Error { - constructor(message = "AbortError") { - super(message); - } -} diff --git a/packages/p2p-media-loader-core/src/event-dispatcher.ts b/packages/p2p-media-loader-core/src/event-dispatcher.ts new file mode 100644 index 00000000..b37e621c --- /dev/null +++ b/packages/p2p-media-loader-core/src/event-dispatcher.ts @@ -0,0 +1,31 @@ +export class EventDispatcher< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + T extends { [key: string]: (...args: any[]) => void | Promise }, + K extends keyof T = keyof T +> { + private readonly listeners = new Map>(); + + subscribe(eventType: K, ...listeners: T[K][]) { + let eventListeners = this.listeners.get(eventType); + if (!eventListeners) { + eventListeners = new Set(); + this.listeners.set(eventType, eventListeners); + } + for (const listener of listeners) eventListeners.add(listener); + } + + unsubscribe(eventType: K, listener: T[K]) { + const eventListeners = this.listeners.get(eventType); + if (!eventListeners) return; + eventListeners.delete(listener); + if (!eventListeners.size) this.listeners.delete(eventType); + } + + dispatch(eventType: K, ...args: Parameters) { + const eventListeners = this.listeners.get(eventType); + if (!eventListeners) return; + for (const listener of eventListeners) { + listener(...args); + } + } +} diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index efd00a9c..d81d9d3b 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -1,111 +1,60 @@ -import { RequestAbortError, FetchError } from "./errors"; -import { Segment } from "./types"; -import { HttpRequest, LoadProgress } from "./request-container"; +import { Settings } from "./types"; +import { Request, RequestError, HttpRequestErrorType } from "./request"; -export function getHttpSegmentRequest(segment: Segment): Readonly { - const { promise, abortController, progress } = fetchSegmentData(segment); - return { - type: "http", - promise, - progress, - abort: () => abortController.abort(), - }; -} - -function fetchSegmentData(segment: Segment) { +export async function fulfillHttpSegmentRequest( + request: Request, + settings: Pick +) { const headers = new Headers(); - const { url, byteRange, localId: segmentId } = segment; + const { segment } = request; + const { url, byteRange } = segment; if (byteRange) { const { start, end } = byteRange; const byteRangeString = `bytes=${start}-${end}`; headers.set("Range", byteRangeString); } - const abortController = new AbortController(); - - const progress: LoadProgress = { - canBeTracked: false, - totalBytes: 0, - loadedBytes: 0, - percent: 0, - startTimestamp: performance.now(), - }; - const loadSegmentData = async () => { - try { - const response = await window.fetch(url, { - headers, - signal: abortController.signal, - }); - if (response.ok) { - return await getDataPromiseAndMonitorProgress(response, progress); - } - throw new FetchError( - response.statusText ?? `Network response was not for ${segmentId}`, - response.status, - response - ); - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - throw new RequestAbortError(`Segment fetch was aborted ${segmentId}`); - } - throw error; + const abortController = new AbortController(); + const requestControls = request.start( + { type: "http" }, + { + abort: () => abortController.abort("abort"), + notReceivingBytesTimeoutMs: settings.httpNotReceivingBytesTimeoutMs, } - }; - - return { - promise: loadSegmentData(), - abortController, - progress, - }; -} - -async function getDataPromiseAndMonitorProgress( - response: Response, - progress: LoadProgress -): Promise { - const totalBytesString = response.headers.get("Content-Length"); - if (!response.body) { - return response.arrayBuffer().then((data) => { - progress.loadedBytes = data.byteLength; - progress.totalBytes = data.byteLength; - progress.lastLoadedChunkTimestamp = performance.now(); - progress.percent = 100; - return data; + ); + try { + const fetchResponse = await window.fetch(url, { + headers, + signal: abortController.signal, }); - } + requestControls.firstBytesReceived(); - if (totalBytesString) { - progress.totalBytes = +totalBytesString; - progress.canBeTracked = true; - } + if (!fetchResponse.ok) { + throw new RequestError("fetch-error", fetchResponse.statusText); + } - const reader = response.body.getReader(); + if (!fetchResponse.body) return; + const totalBytesString = fetchResponse.headers.get("Content-Length"); + if (totalBytesString) request.setTotalBytes(+totalBytesString); - progress.startTimestamp = performance.now(); - const chunks: Uint8Array[] = []; - for await (const chunk of readStream(reader)) { - chunks.push(chunk); - progress.loadedBytes += chunk.length; - progress.lastLoadedChunkTimestamp = performance.now(); - if (progress.canBeTracked) { - progress.percent = (progress.loadedBytes / progress.totalBytes) * 100; + const reader = fetchResponse.body.getReader(); + for await (const chunk of readStream(reader)) { + requestControls.addLoadedChunk(chunk); } - } + requestControls.completeOnSuccess(); + } catch (error) { + if (error instanceof Error) { + if (error.name !== "abort") return; - if (!progress.canBeTracked) { - progress.totalBytes = progress.loadedBytes; - progress.percent = 100; - } - const resultBuffer = new ArrayBuffer(progress.loadedBytes); - const view = new Uint8Array(resultBuffer); - - let offset = 0; - for (const chunk of chunks) { - view.set(chunk, offset); - offset += chunk.length; + const httpLoaderError: RequestError = !( + error instanceof RequestError + ) + ? new RequestError("fetch-error", error.message) + : error; + requestControls.abortOnError(httpLoaderError); + } } - return resultBuffer; } async function* readStream( diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 41b0c00c..4e84023f 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -1,18 +1,16 @@ import { Segment, StreamWithSegments } from "./index"; -import { getHttpSegmentRequest } from "./http-loader"; +import { fulfillHttpSegmentRequest } from "./http-loader"; import { SegmentsMemoryStorage } from "./segments-storage"; import { Settings, CoreEventHandlers } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; +import { P2PLoadersContainer } from "./p2p/loaders-container"; import { Playback, QueueItem } from "./internal-types"; -import { - RequestsContainer, - EngineCallbacks, - HybridLoaderRequest, -} from "./request-container"; +import { RequestsContainer } from "./request-container"; +import { EngineCallbacks } from "./request"; import * as QueueUtils from "./utils/queue"; import * as LoggerUtils from "./utils/logger"; -import { FetchError } from "./errors"; -import { P2PLoadersContainer } from "./p2p/loaders-container"; +import * as StreamUtils from "./utils/stream"; +import * as Utils from "./utils/utils"; import debug from "debug"; export class HybridLoader { @@ -24,8 +22,8 @@ export class HybridLoader { private lastQueueProcessingTimeStamp?: number; private readonly segmentAvgDuration: number; private randomHttpDownloadInterval!: number; - private readonly logger: { engine: debug.Debugger; loader: debug.Debugger }; - private readonly levelBandwidth = { value: 0, refreshCount: 0 }; + private readonly logger: debug.Debugger; + private isProcessQueueMicrotaskCreated = false; constructor( private streamManifestUrl: string, @@ -38,19 +36,24 @@ export class HybridLoader { this.lastRequestedSegment = requestedSegment; const activeStream = requestedSegment.stream; this.playback = { position: requestedSegment.startTime, rate: 1 }; - this.segmentAvgDuration = getSegmentAvgDuration(activeStream); - this.requests = new RequestsContainer(requestedSegment.stream.type); + this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream); + this.requests = new RequestsContainer( + this.requestProcessQueueMicrotask, + this.bandwidthApproximator, + this.playback, + this.settings + ); if (!this.segmentStorage.isInitialized) { throw new Error("Segment storage is not initialized."); } this.segmentStorage.addIsSegmentLockedPredicate((segment) => { if (segment.stream !== activeStream) return false; - const bufferRanges = QueueUtils.getLoadBufferRanges( + return StreamUtils.isSegmentActualInPlayback( + segment, this.playback, this.settings ); - return QueueUtils.isSegmentActual(segment, bufferRanges); }); this.p2pLoaders = new P2PLoadersContainer( this.streamManifestUrl, @@ -60,11 +63,8 @@ export class HybridLoader { this.settings ); - const loader = debug(`core:hybrid-loader-${activeStream.type}`); - const engine = debug(`core:hybrid-loader-${activeStream.type}-engine`); - loader.color = "coral"; - engine.color = "orange"; - this.logger = { loader, engine }; + this.logger = debug(`core:hybrid-loader-${activeStream.type}`); + this.logger.color = "coral"; this.setIntervalLoading(); } @@ -79,14 +79,11 @@ export class HybridLoader { // api method for engines async loadSegment(segment: Readonly, callbacks: EngineCallbacks) { - this.logger.engine(`requests: ${LoggerUtils.getSegmentString(segment)}`); + this.logger(`requests: ${LoggerUtils.getSegmentString(segment)}`); const { stream } = segment; if (stream !== this.lastRequestedSegment.stream) { - this.logger.engine( - `stream changed to ${LoggerUtils.getStreamString(stream)}` - ); + this.logger(`stream changed to ${LoggerUtils.getStreamString(stream)}`); this.p2pLoaders.changeCurrentLoader(stream); - this.refreshLevelBandwidth(true); } this.lastRequestedSegment = segment; @@ -96,37 +93,90 @@ export class HybridLoader { if (data) { callbacks.onSuccess({ data, - bandwidth: this.levelBandwidth.value, + bandwidth: this.bandwidthApproximator.getBandwidth(), }); } } else { - this.requests.addEngineCallbacks(segment, callbacks); + const request = this.requests.getOrCreateRequest(segment); + request.setOrResolveEngineCallbacks(callbacks); } - this.processQueue(); + this.requestProcessQueueMicrotask(); } - private processQueue(force = true) { + private requestProcessQueueMicrotask = (force = true) => { const now = performance.now(); if ( - !force && - this.lastQueueProcessingTimeStamp !== undefined && - now - this.lastQueueProcessingTimeStamp <= 1000 + (!force && + this.lastQueueProcessingTimeStamp !== undefined && + now - this.lastQueueProcessingTimeStamp <= 1000) || + this.isProcessQueueMicrotaskCreated ) { return; } - this.lastQueueProcessingTimeStamp = now; + this.isProcessQueueMicrotaskCreated = true; + queueMicrotask(() => { + try { + this.processQueue(); + this.lastQueueProcessingTimeStamp = now; + } finally { + this.isProcessQueueMicrotaskCreated = false; + } + }); + }; + + private processQueue() { const { queue, queueSegmentIds } = QueueUtils.generateQueue({ lastRequestedSegment: this.lastRequestedSegment, playback: this.playback, settings: this.settings, - skipSegment: (segment) => this.segmentStorage.hasSegment(segment), + skipSegment: (segment) => { + return ( + this.requests.get(segment)?.status === "succeed" || + this.segmentStorage.hasSegment(segment) + ); + }, }); - this.requests.abortAllNotRequestedByEngine((segment) => - queueSegmentIds.has(segment.localId) - ); + for (const request of this.requests.items()) { + if (request.status === "loading") { + if ( + !request.isSegmentRequestedByEngine && + !queueSegmentIds.has(request.segment.localId) + ) { + request.abortFromProcessQueue(); + this.requests.remove(request); + } + continue; + } + + if (request.status === "succeed") { + const { type, data } = request; + if (!type || !data) continue; + if (type === "http") { + this.p2pLoaders.currentLoader.broadcastAnnouncement(); + } + void this.segmentStorage.storeSegment(request.segment, data); + this.eventHandlers?.onSegmentLoaded?.(data.byteLength, type); + this.requests.remove(request); + continue; + } + + if (request.status === "failed") { + if (request.type === "http") { + this.p2pLoaders.currentLoader.broadcastAnnouncement(); + } + continue; + } + + if ( + (request.status === "not-started" || request.status === "aborted") && + !request.isSegmentRequestedByEngine + ) { + this.requests.remove(request); + } + } const { simultaneousHttpDownloads, simultaneousP2PDownloads } = this.settings; @@ -136,58 +186,48 @@ export class HybridLoader { const request = this.requests.get(segment); if (statuses.isHighDemand) { - if (request?.type === "http") continue; - - if (request?.type === "p2p") { - const timeToPlayback = getTimeToSegmentPlayback( - segment, - this.playback - ); - const remainingDownloadTime = - getPredictedRemainingDownloadTime(request); - if ( - remainingDownloadTime === undefined || - remainingDownloadTime > timeToPlayback - ) { - request.abort(); - } else { - continue; - } - } - if (this.requests.httpRequestsCount < simultaneousHttpDownloads) { - void this.loadThroughHttp(item); + if (request?.type === "http" && request.status === "loading") continue; + + if (this.requests.executingHttpCount < simultaneousHttpDownloads) { + void this.loadThroughHttp(segment); continue; } - this.abortLastHttpLoadingAfter(queue, segment); - if (this.requests.httpRequestsCount < simultaneousHttpDownloads) { - void this.loadThroughHttp(item); + if ( + this.abortLastHttpLoadingInQueueAfterItem(queue, segment) && + this.requests.executingHttpCount < simultaneousHttpDownloads + ) { + void this.loadThroughHttp(segment); continue; } - if (this.requests.isP2PRequested(segment)) continue; + if (request?.type === "p2p" && request.status === "loading") continue; - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { - void this.loadThroughP2P(item); + if (this.requests.executingP2PCount < simultaneousP2PDownloads) { + void this.loadThroughP2P(segment); continue; } - this.abortLastP2PLoadingAfter(queue, segment); - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { - void this.loadThroughP2P(item); + if ( + this.abortLastP2PLoadingInQueueAfterItem(queue, segment) && + this.requests.executingP2PCount < simultaneousP2PDownloads + ) { + void this.loadThroughP2P(segment); } break; } if (statuses.isP2PDownloadable) { - if (request) continue; - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { - void this.loadThroughP2P(item); + if (request?.status === "loading") continue; + if (this.requests.executingP2PCount < simultaneousP2PDownloads) { + void this.loadThroughP2P(segment); continue; } - this.abortLastP2PLoadingAfter(queue, segment); - if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) { - void this.loadThroughP2P(item); + if ( + this.abortLastP2PLoadingInQueueAfterItem(queue, segment) && + this.requests.executingP2PCount < simultaneousP2PDownloads + ) { + void this.loadThroughP2P(segment); } } break; @@ -195,46 +235,21 @@ export class HybridLoader { } // api method for engines - abortSegment(segment: Segment) { - this.logger.engine("abort: ", LoggerUtils.getSegmentString(segment)); - this.requests.abortEngineRequest(segment); + abortSegmentRequest(segmentLocalId: string) { + const request = this.requests.getBySegmentLocalId(segmentLocalId); + if (!request) return; + request.abortFromEngine(); + this.logger("abort: ", LoggerUtils.getSegmentString(request.segment)); } - private async loadThroughHttp(item: QueueItem, isRandom = false) { - const { segment } = item; - let data: ArrayBuffer | undefined; - try { - const httpRequest = getHttpSegmentRequest(segment); - - if (!isRandom) { - this.logger.loader( - `http request: ${LoggerUtils.getQueueItemString(item)}` - ); - } - - this.requests.addLoaderRequest(segment, httpRequest); - this.bandwidthApproximator.addLoading(httpRequest.progress); - data = await httpRequest.promise; - if (!data) return; - this.logger.loader(`http responses: ${segment.externalId}`); - this.onSegmentLoaded(item, "http", data); - } catch (err) { - if (err instanceof FetchError) { - this.processQueue(); - } - } + private async loadThroughHttp(segment: Segment) { + const request = this.requests.getOrCreateRequest(segment); + void fulfillHttpSegmentRequest(request, this.settings); + this.p2pLoaders.currentLoader.broadcastAnnouncement(); } - private async loadThroughP2P(item: QueueItem) { - const p2pLoader = this.p2pLoaders.currentLoader; - try { - const downloadPromise = p2pLoader.downloadSegment(item); - if (downloadPromise === undefined) return; - const data = await downloadPromise; - this.onSegmentLoaded(item, "p2p", data); - } catch (error) { - this.processQueue(); - } + private async loadThroughP2P(segment: Segment) { + this.p2pLoaders.currentLoader.downloadSegment(segment); } private loadRandomThroughHttp() { @@ -242,7 +257,7 @@ export class HybridLoader { const p2pLoader = this.p2pLoaders.currentLoader; const connectedPeersAmount = p2pLoader.connectedPeersAmount; if ( - this.requests.httpRequestsCount >= simultaneousHttpDownloads || + this.requests.executingHttpCount >= simultaneousHttpDownloads || !connectedPeersAmount ) { return; @@ -263,70 +278,38 @@ export class HybridLoader { const shouldLoad = Math.random() < probability; if (!shouldLoad) return; - const item = queue[Math.floor(Math.random() * queue.length)]; - void this.loadThroughHttp(item, true); - - this.logger.loader( - `http random request: ${LoggerUtils.getQueueItemString(item)}` - ); + const item = Utils.getRandomItem(queue); + void this.loadThroughHttp(item.segment); } - private onSegmentLoaded( - queueItem: QueueItem, - type: HybridLoaderRequest["type"], - data: ArrayBuffer - ) { - const { segment, statuses } = queueItem; - const byteLength = data.byteLength; - if (type === "http" && statuses.isHighDemand) { - this.refreshLevelBandwidth(true); - } - void this.segmentStorage.storeSegment(segment, data); - - const bandwidth = statuses.isHighDemand - ? this.bandwidthApproximator.getBandwidth() - : this.levelBandwidth.value; - - this.requests.resolveEngineRequest(segment, { data, bandwidth }); - this.eventHandlers?.onSegmentLoaded?.(byteLength, type); - this.processQueue(); - } - - private abortLastHttpLoadingAfter(queue: QueueItem[], segment: Segment) { + private abortLastHttpLoadingInQueueAfterItem( + queue: QueueItem[], + segment: Segment + ): boolean { for (const { segment: itemSegment } of arrayBackwards(queue)) { - if (itemSegment.localId === segment.localId) break; - if (this.requests.isHttpRequested(segment)) { - this.requests.abortLoaderRequest(segment); - this.logger.loader( - "http aborted: ", - LoggerUtils.getSegmentString(segment) - ); - break; + if (itemSegment === segment) break; + const request = this.requests.get(itemSegment); + if (request?.type === "http" && request.status === "loading") { + request.abortFromProcessQueue(); + return true; } } + return false; } - private abortLastP2PLoadingAfter(queue: QueueItem[], segment: Segment) { + private abortLastP2PLoadingInQueueAfterItem( + queue: QueueItem[], + segment: Segment + ): boolean { for (const { segment: itemSegment } of arrayBackwards(queue)) { - if (itemSegment.localId === segment.localId) break; - if (this.requests.isP2PRequested(segment)) { - this.requests.abortLoaderRequest(segment); - this.logger.loader( - "p2p aborted: ", - LoggerUtils.getSegmentString(segment) - ); - break; + if (itemSegment === segment) break; + const request = this.requests.get(itemSegment); + if (request?.type === "p2p" && request.status === "loading") { + request.abortFromProcessQueue(); + return true; } } - } - - private refreshLevelBandwidth(levelChanged = false) { - if (levelChanged) this.levelBandwidth.refreshCount = 0; - if (this.levelBandwidth.refreshCount < 3) { - const currentBandwidth = this.bandwidthApproximator.getBandwidth(); - this.levelBandwidth.value = currentBandwidth ?? 0; - this.levelBandwidth.refreshCount++; - } + return false; } updatePlayback(position: number, rate: number) { @@ -342,15 +325,15 @@ export class HybridLoader { if (isPositionChanged) this.playback.position = position; if (isRateChanged && rate !== 0) this.playback.rate = rate; if (isPositionSignificantlyChanged) { - this.logger.engine("position significantly changed"); + this.logger("position significantly changed"); } - void this.processQueue(isPositionSignificantlyChanged); + void this.requestProcessQueueMicrotask(isPositionSignificantlyChanged); } updateStream(stream: StreamWithSegments) { if (stream !== this.lastRequestedSegment.stream) return; - this.logger.engine(`update stream: ${LoggerUtils.getStreamString(stream)}`); - this.processQueue(); + this.logger(`update stream: ${LoggerUtils.getStreamString(stream)}`); + this.requestProcessQueueMicrotask(); } destroy() { @@ -360,8 +343,7 @@ export class HybridLoader { void this.segmentStorage.destroy(); this.requests.destroy(); this.p2pLoaders.destroy(); - this.logger.loader.destroy(); - this.logger.engine.destroy(); + this.logger.destroy(); } } @@ -370,36 +352,3 @@ function* arrayBackwards(arr: T[]) { yield arr[i]; } } - -function getTimeToSegmentPlayback(segment: Segment, playback: Playback) { - return Math.max(segment.startTime - playback.position, 0) / playback.rate; -} - -function getPredictedRemainingDownloadTime(request: HybridLoaderRequest) { - const { progress } = request; - if (!progress || progress.lastLoadedChunkTimestamp === undefined) { - return undefined; - } - - const now = performance.now(); - const bandwidth = - progress.percent / - (progress.lastLoadedChunkTimestamp - progress.startTimestamp); - const remainingDownloadPercent = 100 - progress.percent; - const predictedRemainingTimeFromLastDownload = - remainingDownloadPercent / bandwidth; - const timeFromLastDownload = now - progress.lastLoadedChunkTimestamp; - return (predictedRemainingTimeFromLastDownload - timeFromLastDownload) / 1000; -} - -function getSegmentAvgDuration(stream: StreamWithSegments) { - const { segments } = stream; - let sumDuration = 0; - const size = segments.size; - for (const segment of segments.values()) { - const duration = segment.endTime - segment.startTime; - sumDuration += duration; - } - - return sumDuration / size; -} diff --git a/packages/p2p-media-loader-core/src/index.ts b/packages/p2p-media-loader-core/src/index.ts index 036287ad..08df54ac 100644 --- a/packages/p2p-media-loader-core/src/index.ts +++ b/packages/p2p-media-loader-core/src/index.ts @@ -1,5 +1,6 @@ /// export { Core } from "./core"; -export * from "./errors"; export type * from "./types"; +export { CoreRequestError } from "./request"; +export type { EngineCallbacks } from "./request"; diff --git a/packages/p2p-media-loader-core/src/internal-types.d.ts b/packages/p2p-media-loader-core/src/internal-types.d.ts index f11f12dd..dd307504 100644 --- a/packages/p2p-media-loader-core/src/internal-types.d.ts +++ b/packages/p2p-media-loader-core/src/internal-types.d.ts @@ -1,29 +1,13 @@ import { Segment } from "./types"; import { PeerCommandType } from "./enums"; +import { SegmentPlaybackStatuses } from "./utils/stream"; export type Playback = { position: number; rate: number; }; -export type NumberRange = { - from: number; - to: number; -}; - -export type LoadBufferRanges = { - highDemand: NumberRange; - http: NumberRange; - p2p: NumberRange; -}; - -export type QueueItemStatuses = { - isHighDemand: boolean; - isHttpDownloadable: boolean; - isP2PDownloadable: boolean; -}; - -export type QueueItem = { segment: Segment; statuses: QueueItemStatuses }; +export type QueueItem = { segment: Segment; statuses: SegmentPlaybackStatuses }; export type BasePeerCommand = { c: T; @@ -36,13 +20,18 @@ export type JsonSegmentAnnouncement = { }; export type PeerSegmentCommand = BasePeerCommand< - | PeerCommandType.SegmentRequest - | PeerCommandType.SegmentAbsent - | PeerCommandType.CancelSegmentRequest + PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest > & { i: string; }; +export type PeerSegmentRequestCommand = + BasePeerCommand & { + i: string; + // start byte of range + b?: number; + }; + export type PeerSegmentAnnouncementCommand = BasePeerCommand & { a: JsonSegmentAnnouncement; @@ -56,5 +45,6 @@ export type PeerSendSegmentCommand = export type PeerCommand = | PeerSegmentCommand + | PeerSegmentRequestCommand | PeerSegmentAnnouncementCommand | PeerSendSegmentCommand; diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index da25f186..bd4d05fb 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -1,21 +1,17 @@ -import TrackerClient, { PeerConnection } from "bittorrent-tracker"; import { Peer } from "./peer"; -import * as PeerUtil from "../utils/peer"; import { Segment, Settings, StreamWithSegments } from "../types"; -import { QueueItem } from "../internal-types"; import { SegmentsMemoryStorage } from "../segments-storage"; +import * as PeerUtil from "../utils/peer"; +import * as StreamUtils from "../utils/stream"; import * as Utils from "../utils/utils"; -import * as LoggerUtils from "../utils/logger"; import { PeerSegmentStatus } from "../enums"; import { RequestsContainer } from "../request-container"; -import debug from "debug"; +import { Request } from "../request"; +import { P2PTrackerClient } from "./tracker-client"; export class P2PLoader { - private readonly streamHash: string; private readonly peerId: string; - private readonly trackerClient: TrackerClient; - private readonly peers = new Map(); - private readonly logger = debug("core:p2p-loader"); + private readonly trackerClient: P2PTrackerClient; private isAnnounceMicrotaskCreated = false; constructor( @@ -26,109 +22,48 @@ export class P2PLoader { private readonly settings: Settings ) { this.peerId = PeerUtil.generatePeerId(); - const streamExternalId = Utils.getStreamExternalId( + const streamExternalId = StreamUtils.getStreamExternalId( this.streamManifestUrl, this.stream ); - this.streamHash = PeerUtil.getStreamHash(streamExternalId); - - this.trackerClient = createTrackerClient({ - streamHash: utf8ToHex(this.streamHash), - peerHash: utf8ToHex(this.peerId), - }); - this.logger( - `create tracker client: ${LoggerUtils.getStreamString(stream)}; ${ - this.peerId - }` + this.trackerClient = new P2PTrackerClient( + this.peerId, + streamExternalId, + this.stream, + { + onPeerConnected: this.onPeerConnected, + onSegmentRequested: this.onSegmentRequested, + }, + this.settings ); - this.subscribeOnTrackerEvents(this.trackerClient); + this.segmentStorage.subscribeOnUpdate( this.stream, this.broadcastAnnouncement ); - this.requests.subscribeOnHttpRequestsUpdate(this.broadcastAnnouncement); this.trackerClient.start(); } - private subscribeOnTrackerEvents(trackerClient: TrackerClient) { - // eslint-disable-next-line @typescript-eslint/no-empty-function - trackerClient.on("update", () => {}); - trackerClient.on("peer", (peerConnection) => { - const peer = this.peers.get(peerConnection.id); - if (peer) peer.addConnection(peerConnection); - else this.createPeer(peerConnection); - }); - trackerClient.on("warning", (warning) => { - this.logger( - `tracker warning (${LoggerUtils.getStreamString( - this.stream - )}: ${warning})` - ); - }); - trackerClient.on("error", (error) => { - this.logger( - `tracker error (${LoggerUtils.getStreamString(this.stream)}: ${error})` - ); - }); - } - - private createPeer(connection: PeerConnection) { - const peer = new Peer( - connection, - { - onPeerConnected: this.onPeerConnected.bind(this), - onPeerClosed: this.onPeerClosed.bind(this), - onSegmentRequested: this.onSegmentRequested.bind(this), - }, - this.settings - ); - this.logger(`create new peer: ${peer.id}`); - this.peers.set(connection.id, peer); - } - - downloadSegment(item: QueueItem): Promise | undefined { - const { segment, statuses } = item; - const untestedPeers: Peer[] = []; - let fastestPeer: Peer | undefined; - let fastestPeerBandwidth = 0; - - for (const peer of this.peers.values()) { + downloadSegment(segment: Segment): Request | undefined { + const peersWithSegment: Peer[] = []; + for (const peer of this.trackerClient.peers()) { if ( !peer.downloadingSegment && peer.getSegmentStatus(segment) === PeerSegmentStatus.Loaded ) { - const { bandwidth } = peer; - if (bandwidth === undefined) { - untestedPeers.push(peer); - } else if (bandwidth > fastestPeerBandwidth) { - fastestPeerBandwidth = bandwidth; - fastestPeer = peer; - } + peersWithSegment.push(peer); } } - const peer = untestedPeers.length - ? getRandomItem(untestedPeers) - : fastestPeer; - + const peer = Utils.getRandomItem(peersWithSegment); if (!peer) return; - const request = peer.requestSegment(segment); - this.requests.addLoaderRequest(segment, request); - this.logger( - `p2p request ${segment.externalId} | ${LoggerUtils.getStatusesString( - statuses - )}` - ); - request.promise.then(() => { - this.logger(`p2p loaded: ${segment.externalId}`); - }); - - return request.promise; + const request = this.requests.getOrCreateRequest(segment); + peer.fulfillSegmentRequest(request); } isLoadingOrLoadedBySomeone(segment: Segment): boolean { - for (const peer of this.peers.values()) { + for (const peer of this.trackerClient.peers()) { if (peer.getSegmentStatus(segment)) return true; } return false; @@ -136,9 +71,8 @@ export class P2PLoader { get connectedPeersAmount() { let count = 0; - for (const peer of this.peers.values()) { - if (peer.isConnected) count++; - } + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for (const peer of this.trackerClient.peers()) count++; return count; } @@ -156,33 +90,29 @@ export class P2PLoader { return PeerUtil.getJsonSegmentsAnnouncement(loaded, httpLoading); } - private onPeerConnected(peer: Peer) { - this.logger(`connected with peer: ${peer.id}`); + private onPeerConnected = (peer: Peer) => { const announcement = this.getSegmentsAnnouncement(); peer.sendSegmentsAnnouncement(announcement); - } - - private onPeerClosed(peer: Peer) { - this.logger(`peer closed: ${peer.id}`); - this.peers.delete(peer.id); - } + }; - private broadcastAnnouncement = () => { + broadcastAnnouncement = () => { if (this.isAnnounceMicrotaskCreated) return; this.isAnnounceMicrotaskCreated = true; queueMicrotask(() => { const announcement = this.getSegmentsAnnouncement(); - for (const peer of this.peers.values()) { - if (!peer.isConnected) continue; + for (const peer of this.trackerClient.peers()) { peer.sendSegmentsAnnouncement(announcement); } this.isAnnounceMicrotaskCreated = false; }); }; - private async onSegmentRequested(peer: Peer, segmentExternalId: string) { - const segment = Utils.getSegmentFromStreamByExternalId( + private onSegmentRequested = async ( + peer: Peer, + segmentExternalId: string + ) => { + const segment = StreamUtils.getSegmentFromStreamByExternalId( this.stream, segmentExternalId ); @@ -190,62 +120,13 @@ export class P2PLoader { segment && (await this.segmentStorage.getSegmentData(segment)); if (segmentData) void peer.sendSegmentData(segmentExternalId, segmentData); else peer.sendSegmentAbsent(segmentExternalId); - } + }; destroy() { - this.logger( - `destroy tracker client: ${LoggerUtils.getStreamString(this.stream)}` - ); this.segmentStorage.unsubscribeFromUpdate( this.stream, this.broadcastAnnouncement ); - this.requests.unsubscribeFromHttpRequestsUpdate(this.broadcastAnnouncement); - for (const peer of this.peers.values()) { - peer.destroy(); - } - this.peers.clear(); this.trackerClient.destroy(); } } - -function createTrackerClient({ - streamHash, - peerHash, -}: { - streamHash: string; - peerHash: string; -}) { - return new TrackerClient({ - infoHash: streamHash, - peerId: peerHash, - port: 6881, - announce: [ - // "wss://tracker.novage.com.ua", - "wss://tracker.openwebtorrent.com", - ], - rtcConfig: { - iceServers: [ - { - urls: [ - "stun:stun.l.google.com:19302", - "stun:global.stun.twilio.com:3478", - ], - }, - ], - }, - }); -} - -function utf8ToHex(utf8String: string) { - let result = ""; - for (let i = 0; i < utf8String.length; i++) { - result += utf8String.charCodeAt(i).toString(16); - } - - return result; -} - -function getRandomItem(items: T[]): T { - return items[Math.floor(Math.random() * items.length)]; -} diff --git a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts index cbc8ced9..623fb00f 100644 --- a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts +++ b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts @@ -75,7 +75,7 @@ export class P2PLoadersContainer { private setLoaderDestroyTimeout(item: P2PLoaderContainerItem) { item.destroyTimeoutId = window.setTimeout( () => this.destroyAndRemoveLoader(item), - this.settings.p2pLoaderDestroyTimeout + this.settings.p2pLoaderDestroyTimeoutMs ); } diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 9fb4d43f..d62f9bc4 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -4,117 +4,63 @@ import { PeerCommand, PeerSegmentAnnouncementCommand, PeerSegmentCommand, + PeerSegmentRequestCommand, PeerSendSegmentCommand, } from "../internal-types"; import { PeerCommandType, PeerSegmentStatus } from "../enums"; -import * as PeerUtil from "../utils/peer"; -import { P2PRequest } from "../request-container"; +import { + Request, + RequestControls, + RequestError, + PeerRequestErrorType, +} from "../request"; import { Segment, Settings } from "../types"; +import * as PeerUtil from "../utils/peer"; import * as Utils from "../utils/utils"; import debug from "debug"; -export class PeerRequestError extends Error { - constructor( - readonly type: - | "abort" - | "request-timeout" - | "response-bytes-mismatch" - | "segment-absent" - | "peer-closed" - | "destroy" - ) { - super(); - } -} - type PeerEventHandlers = { - onPeerConnected: (peer: Peer) => void; onPeerClosed: (peer: Peer) => void; onSegmentRequested: (peer: Peer, segmentId: string) => void; }; -type PeerRequest = { - segment: Segment; - p2pRequest: P2PRequest; - resolve: (data: ArrayBuffer) => void; - reject: (error: PeerRequestError) => void; - chunks: ArrayBuffer[]; - responseTimeoutId: number; -}; - type PeerSettings = Pick< Settings, - "p2pSegmentDownloadTimeout" | "webRtcMaxMessageSize" + "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" >; export class Peer { readonly id: string; - private connection?: PeerConnection; - private connections = new Set(); private segments = new Map(); - private request?: PeerRequest; + private requestContext?: { request: Request; controls: RequestControls }; private readonly logger = debug("core:peer"); - private readonly bandwidthMeasurer = new BandwidthMeasurer(); private isUploadingSegment = false; constructor( - connection: PeerConnection, + private readonly connection: PeerConnection, private readonly eventHandlers: PeerEventHandlers, private readonly settings: PeerSettings ) { - this.id = hexToUtf8(connection.id); + this.id = Peer.getPeerIdFromHexString(connection.id); this.eventHandlers = eventHandlers; - this.addConnection(connection); - } - addConnection(connection: PeerConnection) { - if (this.connection && connection !== this.connection) { - connection.destroy(); - return; - } - this.connections.add(connection); - - connection.on("connect", () => { - if (this.connection) return; - - this.connection = connection; - for (const item of this.connections) { - if (item !== connection) { - this.connections.delete(item); - item.destroy(); - } - } - this.eventHandlers.onPeerConnected(this); - this.logger(`connected with peer: ${this.id}`); - - connection.on("data", this.onReceiveData.bind(this)); - connection.on("close", () => { - this.connection = undefined; - this.cancelSegmentRequest("peer-closed"); - this.logger(`connection with peer closed: ${this.id}`); + connection.on("data", this.onReceiveData.bind(this)); + connection.on("close", () => { + this.logger(`connection with peer closed: ${this.id}`); + this.destroy(); + this.eventHandlers.onPeerClosed(this); + }); + connection.on("error", (error) => { + if (error.code === "ERR_DATA_CHANNEL") { + this.logger(`peer error: ${this.id} ${error.code}`); this.destroy(); this.eventHandlers.onPeerClosed(this); - }); - connection.on("error", (error) => { - if (error.code === "ERR_DATA_CHANNEL") { - this.logger(`peer error: ${this.id} ${error.code}`); - this.destroy(); - this.eventHandlers.onPeerClosed(this); - } - }); + } }); } - get isConnected() { - return !!this.connection; - } - get downloadingSegment(): Segment | undefined { - return this.request?.segment; - } - - get bandwidth(): number | undefined { - return this.bandwidthMeasurer.getBandwidth(); + return this.requestContext?.request.segment; } getSegmentStatus(segment: Segment): PeerSegmentStatus | undefined { @@ -122,7 +68,7 @@ export class Peer { return this.segments.get(externalId); } - private onReceiveData(data: ArrayBuffer) { + private onReceiveData(data: Uint8Array) { const command = PeerUtil.getPeerCommandFromArrayBuffer(data); if (!command) { this.receiveSegmentChunk(data); @@ -139,16 +85,21 @@ export class Peer { break; case PeerCommandType.SegmentData: - if (this.request?.segment.externalId === command.i) { - const { progress } = this.request.p2pRequest; - progress.totalBytes = command.s; - progress.canBeTracked = true; + { + const request = this.requestContext?.request; + this.requestContext?.controls.firstBytesReceived(); + if ( + request?.segment.externalId === command.i && + request.totalBytes === undefined + ) { + request.setTotalBytes(command.s); + } } break; case PeerCommandType.SegmentAbsent: - if (this.request?.segment.externalId === command.i) { - this.cancelSegmentRequest("segment-absent"); + if (this.requestContext?.request.segment.externalId === command.i) { + this.cancelSegmentRequest("peer-segment-absent"); this.segments.delete(command.i); } break; @@ -160,22 +111,30 @@ export class Peer { } private sendCommand(command: PeerCommand) { - if (!this.connection) return; this.connection.send(JSON.stringify(command)); } - requestSegment(segment: Segment) { - if (this.request) { + fulfillSegmentRequest(request: Request) { + if (this.requestContext) { throw new Error("Segment already is downloading"); } - const { externalId } = segment; - const command: PeerSegmentCommand = { + this.requestContext = { + request, + controls: request.start( + { type: "p2p", peerId: this.id }, + { + abort: this.abortRequest, + notReceivingBytesTimeoutMs: + this.settings.p2pNotReceivingBytesTimeoutMs, + } + ), + }; + const command: PeerSegmentRequestCommand = { c: PeerCommandType.SegmentRequest, - i: externalId, + i: request.segment.externalId, }; + if (request.loadedBytes) command.b = request.loadedBytes; this.sendCommand(command); - this.request = this.createPeerRequest(segment); - return this.request.p2pRequest; } sendSegmentsAnnouncement(announcement: JsonSegmentAnnouncement) { @@ -187,7 +146,6 @@ export class Peer { } async sendSegmentData(segmentExternalId: string, data: ArrayBuffer) { - if (!this.connection) return; this.logger(`send segment ${segmentExternalId} to ${this.id}`); const command: PeerSendSegmentCommand = { c: PeerCommandType.SegmentData, @@ -197,8 +155,7 @@ export class Peer { this.sendCommand(command); const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize); - const connection = this.connection; - const channel = connection._channel; + const channel = this.connection._channel; const { promise, resolve, reject } = Utils.getControlledPromise(); const sendChunk = () => { @@ -212,7 +169,7 @@ export class Peer { reject(); break; } - connection.send(chunk); + this.connection.send(chunk); } }; try { @@ -237,120 +194,56 @@ export class Peer { this.sendCommand(command); } - private createPeerRequest(segment: Segment): PeerRequest { - const { promise, resolve, reject } = - Utils.getControlledPromise(); - return { - segment, - resolve, - reject, - responseTimeoutId: this.setRequestTimeout(), - chunks: [], - p2pRequest: { - type: "p2p", - progress: { - canBeTracked: false, - totalBytes: 0, - loadedBytes: 0, - percent: 0, - startTimestamp: performance.now(), - }, - promise, - abort: () => this.cancelSegmentRequest("abort"), - }, - }; - } - - private receiveSegmentChunk(chunk: ArrayBuffer): void { - const { request } = this; - const progress = request?.p2pRequest?.progress; - if (!request || !progress) return; - - progress.loadedBytes += chunk.byteLength; - progress.percent = (progress.loadedBytes / progress.loadedBytes) * 100; - progress.lastLoadedChunkTimestamp = performance.now(); - request.chunks.push(chunk); - - if (progress.loadedBytes === progress.totalBytes) { - const segmentData = joinChunks(request.chunks); - const { lastLoadedChunkTimestamp, startTimestamp, loadedBytes } = - progress; - const loadingDuration = lastLoadedChunkTimestamp - startTimestamp; - this.bandwidthMeasurer.addMeasurement(loadedBytes, loadingDuration); - this.approveRequest(segmentData); - } else if (progress.loadedBytes > progress.totalBytes) { - this.cancelSegmentRequest("response-bytes-mismatch"); + private receiveSegmentChunk(chunk: Uint8Array): void { + if (!this.requestContext) return; + const { request, controls } = this.requestContext; + controls.addLoadedChunk(chunk); + + if (request.loadedBytes === request.totalBytes) { + controls.completeOnSuccess(); + this.requestContext = undefined; + } else if ( + request.totalBytes !== undefined && + request.loadedBytes > request.totalBytes + ) { + this.cancelSegmentRequest("peer-response-bytes-mismatch"); } } - private approveRequest(data: ArrayBuffer) { - this.request?.resolve(data); - this.clearRequest(); - } - - private cancelSegmentRequest(type: PeerRequestError["type"]) { - if (!this.request) return; - this.logger( - `cancel segment request ${this.request?.segment.externalId} (${type})` - ); - const error = new PeerRequestError(type); - const sendCancelCommandTypes: PeerRequestError["type"][] = [ - "destroy", - "abort", - "request-timeout", - "response-bytes-mismatch", - ]; - if (sendCancelCommandTypes.includes(type)) { - this.sendCommand({ - c: PeerCommandType.CancelSegmentRequest, - i: this.request.segment.externalId, - }); + private abortRequest = () => { + if (!this.requestContext) return; + const { request } = this.requestContext; + this.sendCancelSegmentRequestCommand(request.segment); + this.requestContext = undefined; + }; + + private cancelSegmentRequest(type: PeerRequestErrorType) { + if (!this.requestContext) return; + const { request, controls } = this.requestContext; + const { segment } = request; + this.logger(`cancel segment request ${segment.externalId} (${type})`); + const error = new RequestError(type); + if (type === "peer-response-bytes-mismatch") { + this.sendCancelSegmentRequestCommand(request.segment); } - this.request.reject(error); - this.clearRequest(); - } - - private setRequestTimeout(): number { - return window.setTimeout( - () => this.cancelSegmentRequest("request-timeout"), - this.settings.p2pSegmentDownloadTimeout - ); + controls.abortOnError(error); + this.requestContext = undefined; } - private clearRequest() { - clearTimeout(this.request?.responseTimeoutId); - this.request = undefined; + private sendCancelSegmentRequestCommand(segment: Segment) { + this.sendCommand({ + c: PeerCommandType.CancelSegmentRequest, + i: segment.externalId, + }); } destroy() { - this.cancelSegmentRequest("destroy"); - this.connection?.destroy(); - this.connection = undefined; - for (const connection of this.connections) { - connection.destroy(); - } - this.connections.clear(); - } -} - -const SMOOTHING_COEF = 0.5; - -class BandwidthMeasurer { - private bandwidth?: number; - - addMeasurement(bytes: number, loadingDurationMs: number) { - const bits = bytes * 8; - const currentBandwidth = (bits * 1000) / loadingDurationMs; - - this.bandwidth = - this.bandwidth !== undefined - ? currentBandwidth * SMOOTHING_COEF + - (1 - SMOOTHING_COEF) * this.bandwidth - : currentBandwidth; + this.cancelSegmentRequest("peer-closed"); + this.connection.destroy(); } - getBandwidth() { - return this.bandwidth; + static getPeerIdFromHexString(hex: string) { + return hexToUtf8(hex); } } @@ -368,18 +261,6 @@ function* getBufferChunks( } } -function joinChunks(chunks: ArrayBuffer[]): ArrayBuffer { - const bytesSum = chunks.reduce((sum, chunk) => sum + chunk.byteLength, 0); - const buffer = new Uint8Array(bytesSum); - let offset = 0; - for (const chunk of chunks) { - buffer.set(new Uint8Array(chunk), offset); - offset += chunk.byteLength; - } - - return buffer; -} - function hexToUtf8(hexString: string) { const bytes = new Uint8Array(hexString.length / 2); diff --git a/packages/p2p-media-loader-core/src/p2p/tracker-client.ts b/packages/p2p-media-loader-core/src/p2p/tracker-client.ts new file mode 100644 index 00000000..ab476f83 --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/tracker-client.ts @@ -0,0 +1,148 @@ +import TrackerClient, { + PeerConnection, + TrackerClientEvents, +} from "bittorrent-tracker"; +import { Settings, StreamWithSegments } from "../types"; +import debug from "debug"; +import * as PeerUtil from "../utils/peer"; +import * as LoggerUtils from "../utils/logger"; +import { Peer } from "./peer"; + +type PeerItem = { + peer?: Peer; + potentialConnections: Set; +}; +type P2PTrackerClientEventHandlers = { + onPeerConnected: (peer: Peer) => void; + onSegmentRequested: (peer: Peer, segmentExternalId: string) => void; +}; + +export class P2PTrackerClient { + private readonly client: TrackerClient; + private readonly _peers = new Map(); + private readonly logger = debug("core:p2p-tracker-client"); + + constructor( + private readonly peerId: string, + streamExternalId: string, + private readonly stream: StreamWithSegments, + private readonly eventHandlers: P2PTrackerClientEventHandlers, + private readonly settings: Settings + ) { + const streamHash = PeerUtil.getStreamHash(streamExternalId); + const streamShortId = LoggerUtils.getStreamString(stream); + this.client = new TrackerClient({ + infoHash: utf8ToHex(streamHash), + peerId: utf8ToHex(this.peerId), + port: 6881, + announce: [ + // "wss://tracker.novage.com.ua", + "wss://tracker.openwebtorrent.com", + ], + rtcConfig: { + iceServers: [ + { + urls: [ + "stun:stun.l.google.com:19302", + "stun:global.stun.twilio.com:3478", + ], + }, + ], + }, + }); + this.client.on("peer", this.onReceivePeerConnection); + this.client.on("warning", this.onTrackerClientWarning); + this.client.on("error", this.onTrackerClientError); + this.logger( + `create new client; \nstream: ${streamShortId}; hash: ${streamHash}; \npeer id: ${this.peerId}` + ); + } + + start() { + this.client.start(); + } + + destroy() { + this.client.destroy(); + for (const { peer, potentialConnections } of this._peers.values()) { + peer?.destroy(); + for (const connection of potentialConnections) { + connection.destroy(); + } + } + this._peers.clear(); + this.logger( + `destroy client; stream: ${LoggerUtils.getStreamString(this.stream)}` + ); + } + + private onReceivePeerConnection: TrackerClientEvents["peer"] = ( + peerConnection + ) => { + const itemId = Peer.getPeerIdFromHexString(peerConnection.id); + let peerItem = this._peers.get(itemId); + if (peerItem?.peer) { + peerConnection.destroy(); + return; + } else if (!peerItem) { + peerItem = { potentialConnections: new Set() }; + peerItem.potentialConnections.add(peerConnection); + this._peers.set(itemId, peerItem); + } + + peerConnection.on("connect", () => { + if (!peerItem || peerItem.peer) return; + + for (const connection of peerItem.potentialConnections) { + if (connection !== peerConnection) connection.destroy(); + } + peerItem.potentialConnections.clear(); + peerItem.peer = new Peer( + peerConnection, + { + onPeerClosed: this.onPeerClosed, + onSegmentRequested: this.eventHandlers.onSegmentRequested, + }, + this.settings + ); + this.logger(`connected with peer: ${peerItem.peer.id}`); + this.eventHandlers.onPeerConnected(peerItem.peer); + }); + }; + + private onTrackerClientWarning: TrackerClientEvents["warning"] = ( + warning + ) => { + this.logger( + `tracker warning (${LoggerUtils.getStreamString( + this.stream + )}: ${warning})` + ); + }; + + private onTrackerClientError: TrackerClientEvents["error"] = (error) => { + this.logger( + `tracker error (${LoggerUtils.getStreamString(this.stream)}: ${error})` + ); + }; + + *peers() { + for (const peerItem of this._peers.values()) { + if (peerItem?.peer) yield peerItem.peer; + } + } + + private onPeerClosed = (peer: Peer) => { + this.logger(`peer closed: ${peer.id}`); + this._peers.delete(peer.id); + }; +} + +function utf8ToHex(utf8String: string) { + let result = ""; + for (let i = 0; i < utf8String.length; i++) { + result += utf8String.charCodeAt(i).toString(16); + } + + return result; +} diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts index 78140899..f44c4b84 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/request-container.ts @@ -1,232 +1,86 @@ -import { Segment, SegmentResponse, StreamType } from "./types"; -import { RequestAbortError } from "./errors"; -import { Subscriptions } from "./segments-storage"; -import Debug from "debug"; - -export type EngineCallbacks = { - onSuccess: (response: SegmentResponse) => void; - onError: (reason?: unknown) => void; -}; - -export type LoadProgress = { - startTimestamp: number; - lastLoadedChunkTimestamp?: number; - percent: number; - loadedBytes: number; - totalBytes: number; - canBeTracked: boolean; -}; - -type RequestBase = { - promise: Promise; - abort: () => void; - progress: LoadProgress; -}; - -export type HttpRequest = RequestBase & { - type: "http"; -}; - -export type P2PRequest = RequestBase & { - type: "p2p"; -}; - -export type HybridLoaderRequest = HttpRequest | P2PRequest; - -type Request = { - segment: Readonly; - loaderRequest?: Readonly; - engineCallbacks?: Readonly; -}; - -function getRequestItemId(segment: Segment) { - return segment.localId; -} +import { Segment, Settings } from "./types"; +import { BandwidthApproximator } from "./bandwidth-approximator"; +import { Request } from "./request"; +import { Playback } from "./internal-types"; export class RequestsContainer { - private readonly requests = new Map(); - private readonly onHttpRequestsHandlers = new Subscriptions(); - private readonly logger: Debug.Debugger; + private readonly requests = new Map(); - constructor(streamType: StreamType) { - this.logger = Debug(`core:requests-container-${streamType}`); - this.logger.color = "LightSeaGreen"; - } + constructor( + private readonly requestProcessQueueCallback: () => void, + private readonly bandwidthApproximator: BandwidthApproximator, + private readonly playback: Playback, + private readonly settings: Settings + ) {} - get httpRequestsCount() { + get executingHttpCount() { let count = 0; - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for (const request of this.httpRequests()) count++; + for (const request of this.httpRequests()) { + if (request.status === "loading") count++; + } return count; } - get p2pRequestsCount() { + get executingP2PCount() { let count = 0; - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for (const request of this.p2pRequests()) count++; + for (const request of this.p2pRequests()) { + if (request.status === "loading") count++; + } return count; } get(segment: Segment) { - const id = getRequestItemId(segment); - return this.requests.get(id)?.loaderRequest; + return this.requests.get(segment); } - addLoaderRequest(segment: Segment, loaderRequest: HybridLoaderRequest) { - const segmentId = getRequestItemId(segment); - const existingRequest = this.requests.get(segmentId); - if (existingRequest) { - existingRequest.loaderRequest = loaderRequest; - } else { - this.requests.set(segmentId, { - segment, - loaderRequest, - }); + getBySegmentLocalId(id: string) { + for (const request of this.requests.values()) { + if (request.segment.localId === id) return request; } - this.logger( - `add loader request: ${loaderRequest.type} ${segment.externalId}` - ); - - const clearRequestItem = () => this.clearRequestItem(segmentId, "loader"); - loaderRequest.promise - .then(() => clearRequestItem()) - .catch((err) => { - if (err instanceof RequestAbortError) clearRequestItem(); - }); - if (loaderRequest.type === "http") this.onHttpRequestsHandlers.fire(); } - addEngineCallbacks(segment: Segment, engineCallbacks: EngineCallbacks) { - const segmentId = getRequestItemId(segment); - const requestItem = this.requests.get(segmentId); - - const { onSuccess, onError } = engineCallbacks; - engineCallbacks.onSuccess = (response) => { - this.clearRequestItem(segmentId, "engine"); - return onSuccess(response); - }; - - engineCallbacks.onError = (error) => { - if (error instanceof RequestAbortError) { - this.clearRequestItem(segmentId, "engine"); - } - return onError(error); - }; - - if (requestItem) { - requestItem.engineCallbacks = engineCallbacks; - } else { - this.requests.set(segmentId, { + getOrCreateRequest(segment: Segment) { + let request = this.requests.get(segment); + if (!request) { + request = new Request( segment, - engineCallbacks, - }); + this.requestProcessQueueCallback, + this.bandwidthApproximator, + this.playback, + this.settings + ); + this.requests.set(segment, request); } - this.logger(`add engine request ${segment.externalId}`); + return request; + } + + remove(request: Request) { + this.requests.delete(request.segment); } - values() { + items() { return this.requests.values(); } *httpRequests(): Generator { for (const request of this.requests.values()) { - if (request.loaderRequest?.type === "http") yield request; + if (request.type === "http") yield request; } } *p2pRequests(): Generator { for (const request of this.requests.values()) { - if (request.loaderRequest?.type === "p2p") yield request; + if (request.type === "p2p") yield request; } } - - resolveEngineRequest(segment: Segment, response: SegmentResponse) { - const id = getRequestItemId(segment); - this.requests.get(id)?.engineCallbacks?.onSuccess(response); - } - - isHttpRequested(segment: Segment): boolean { - const id = getRequestItemId(segment); - return this.requests.get(id)?.loaderRequest?.type === "http"; - } - - isP2PRequested(segment: Segment): boolean { - const id = getRequestItemId(segment); - return this.requests.get(id)?.loaderRequest?.type === "p2p"; - } - isHybridLoaderRequested(segment: Segment): boolean { - const id = getRequestItemId(segment); - return !!this.requests.get(id)?.loaderRequest; - } - - abortEngineRequest(segment: Segment) { - const id = getRequestItemId(segment); - const request = this.requests.get(id); - if (!request) return; - - request.engineCallbacks?.onError(new RequestAbortError()); - request.loaderRequest?.abort(); - } - - abortLoaderRequest(segment: Segment) { - const id = getRequestItemId(segment); - this.requests.get(id)?.loaderRequest?.abort(); - } - - private clearRequestItem( - requestItemId: string, - type: "loader" | "engine" - ): void { - const requestItem = this.requests.get(requestItemId); - if (!requestItem) return; - const { segment, loaderRequest } = requestItem; - const segmentExternalId = segment.externalId; - - if (type === "engine") { - this.logger(`remove engine callbacks: ${segmentExternalId}`); - delete requestItem.engineCallbacks; - } - if (type === "loader" && loaderRequest) { - this.logger( - `remove loader request: ${loaderRequest.type} ${segmentExternalId}` - ); - if (loaderRequest.type === "http") { - this.onHttpRequestsHandlers.fire(); - } - delete requestItem.loaderRequest; - } - if (!requestItem.engineCallbacks && !requestItem.loaderRequest) { - this.logger(`remove request item ${segmentExternalId}`); - const segmentId = getRequestItemId(segment); - this.requests.delete(segmentId); - } - } - - abortAllNotRequestedByEngine(isLocked?: (segment: Segment) => boolean) { - const isSegmentLocked = isLocked ? isLocked : () => false; - for (const { - loaderRequest, - engineCallbacks, - segment, - } of this.requests.values()) { - if (engineCallbacks || !loaderRequest) continue; - if (!isSegmentLocked(segment)) loaderRequest.abort(); - } - } - - subscribeOnHttpRequestsUpdate(handler: () => void) { - this.onHttpRequestsHandlers.add(handler); - } - - unsubscribeFromHttpRequestsUpdate(handler: () => void) { - this.onHttpRequestsHandlers.remove(handler); + return !!this.requests.get(segment)?.type; } destroy() { for (const request of this.requests.values()) { - request.loaderRequest?.abort(); - request.engineCallbacks?.onError(); + request.abortFromProcessQueue(); + request.abortFromEngine(); } this.requests.clear(); } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts new file mode 100644 index 00000000..77fd0b18 --- /dev/null +++ b/packages/p2p-media-loader-core/src/request.ts @@ -0,0 +1,341 @@ +import { Segment, SegmentResponse } from "./types"; +import { BandwidthApproximator } from "./bandwidth-approximator"; +import * as StreamUtils from "./utils/stream"; +import * as Utils from "./utils/utils"; +import * as LoggerUtils from "./utils/logger"; +import debug from "debug"; +import { Playback } from "./internal-types"; + +export type EngineCallbacks = { + onSuccess: (response: SegmentResponse) => void; + onError: (reason: CoreRequestError) => void; +}; + +export type LoadProgress = { + startTimestamp: number; + lastLoadedChunkTimestamp?: number; + startFromByte?: number; + loadedBytes: number; +}; + +type HttpRequestAttempt = { + type: "http"; + error?: RequestError; +}; + +type P2PRequestAttempt = { + type: "p2p"; + peerId: string; + error?: RequestError; +}; + +export type RequestAttempt = HttpRequestAttempt | P2PRequestAttempt; + +export type RequestControls = Readonly<{ + firstBytesReceived: Request["firstBytesReceived"]; + addLoadedChunk: Request["addLoadedChunk"]; + completeOnSuccess: Request["completeOnSuccess"]; + abortOnError: Request["abortOnError"]; +}>; + +type OmitEncapsulated = Omit; +type StartRequestParameters = + | OmitEncapsulated + | OmitEncapsulated; + +export type RequestStatus = + | "not-started" + | "loading" + | "succeed" + | "failed" + | "aborted"; + +export class Request { + readonly id: string; + private _engineCallbacks?: EngineCallbacks; + private currentAttempt?: RequestAttempt; + private _failedAttempts: RequestAttempt[] = []; + private finalData?: ArrayBuffer; + private bytes: Uint8Array[] = []; + private _loadedBytes = 0; + private _totalBytes?: number; + private _status: RequestStatus = "not-started"; + private progress?: LoadProgress; + private notReceivingBytesTimeout: Timeout; + private _abortRequestCallback?: (errorType: RequestInnerErrorType) => void; + private readonly _logger: debug.Debugger; + + constructor( + readonly segment: Segment, + private readonly requestProcessQueueCallback: () => void, + private readonly bandwidthApproximator: BandwidthApproximator, + private readonly playback: Playback, + private readonly settings: StreamUtils.PlaybackTimeWindowsSettings + ) { + this.id = Request.getRequestItemId(segment); + this.notReceivingBytesTimeout = new Timeout(this.abortOnTimeout); + + const { type } = this.segment.stream; + this._logger = debug(`core:request-${type}`); + } + + get status() { + return this._status; + } + + get isSegmentRequestedByEngine(): boolean { + return !!this._engineCallbacks; + } + + get type() { + return this.currentAttempt?.type; + } + + get loadedBytes() { + return this._loadedBytes; + } + + get totalBytes(): number | undefined { + return this._totalBytes; + } + + get data(): ArrayBuffer | undefined { + if (this.status !== "succeed") return; + if (!this.finalData) this.finalData = Utils.joinChunks(this.bytes); + return this.finalData; + } + + get loadedPercent() { + if (!this._totalBytes) return; + return Utils.getPercent(this.loadedBytes, this._totalBytes); + } + + get failedAttempts(): ReadonlyArray> { + return this._failedAttempts; + } + + setOrResolveEngineCallbacks(callbacks: EngineCallbacks) { + if (this._engineCallbacks) { + throw new Error("Segment is already requested by engine"); + } + this._engineCallbacks = callbacks; + if (this.finalData) this.resolveEngineCallbacksSuccessfully(this.finalData); + } + + setTotalBytes(value: number) { + if (this._totalBytes !== undefined) { + throw new Error("Request total bytes value is already set"); + } + this._totalBytes = value; + } + + start( + requestData: StartRequestParameters, + controls: { + notReceivingBytesTimeoutMs?: number; + abort: (errorType: RequestInnerErrorType) => void; + } + ): RequestControls { + if (this._status === "succeed") { + throw new Error("Request has been already succeed."); + } + if (this._status === "loading") { + throw new Error("Request has been already started."); + } + + this._status = "loading"; + this.currentAttempt = { ...requestData }; + this.progress = { + startFromByte: this._loadedBytes, + loadedBytes: 0, + startTimestamp: performance.now(), + }; + this.bandwidthApproximator.addLoading(this.progress); + const { notReceivingBytesTimeoutMs, abort } = controls; + this._abortRequestCallback = abort; + + if (notReceivingBytesTimeoutMs !== undefined) { + this.notReceivingBytesTimeout.start(notReceivingBytesTimeoutMs); + } + + const statuses = StreamUtils.getSegmentPlaybackStatuses( + this.segment, + this.playback, + this.settings + ); + const statusString = LoggerUtils.getSegmentPlaybackStatusesString(statuses); + this.logger( + `${requestData.type} ${this.segment.externalId} ${statusString} started` + ); + + return { + firstBytesReceived: this.firstBytesReceived, + addLoadedChunk: this.addLoadedChunk, + completeOnSuccess: this.completeOnSuccess, + abortOnError: this.abortOnError, + }; + } + + private resolveEngineCallbacksSuccessfully(data: ArrayBuffer) { + this._engineCallbacks?.onSuccess({ + data, + bandwidth: this.bandwidthApproximator.getBandwidth(), + }); + this._engineCallbacks = undefined; + } + + abortFromEngine() { + this._engineCallbacks?.onError(new CoreRequestError("aborted")); + this._engineCallbacks = undefined; + this.requestProcessQueueCallback(); + } + + abortFromProcessQueue() { + this.throwErrorIfNotLoadingStatus(); + this._status = "aborted"; + this._abortRequestCallback?.("abort"); + this._abortRequestCallback = undefined; + this.currentAttempt = undefined; + this.notReceivingBytesTimeout.clear(); + } + + private abortOnTimeout = () => { + this.throwErrorIfNotLoadingStatus(); + if (!this.currentAttempt) return; + + this._status = "failed"; + const error = new RequestError("bytes-receiving-timeout"); + this._abortRequestCallback?.(error.type); + + this.currentAttempt.error = error; + this._failedAttempts.push(this.currentAttempt); + this.notReceivingBytesTimeout.clear(); + this.requestProcessQueueCallback(); + }; + + private abortOnError = (error: RequestError) => { + this.throwErrorIfNotLoadingStatus(); + if (!this.currentAttempt) return; + + this._status = "failed"; + this.currentAttempt.error = error; + this._failedAttempts.push(this.currentAttempt); + this.notReceivingBytesTimeout.clear(); + this.requestProcessQueueCallback(); + }; + + private completeOnSuccess = () => { + this.throwErrorIfNotLoadingStatus(); + if (!this.currentAttempt) return; + + this.notReceivingBytesTimeout.clear(); + this.finalData = Utils.joinChunks(this.bytes); + this._status = "succeed"; + this._totalBytes = this._loadedBytes; + + this.resolveEngineCallbacksSuccessfully(this.finalData); + this.logger( + `${this.currentAttempt.type} ${this.segment.externalId} succeed` + ); + this.requestProcessQueueCallback(); + }; + + private addLoadedChunk = (chunk: Uint8Array) => { + this.throwErrorIfNotLoadingStatus(); + if (!this.currentAttempt || !this.progress) return; + this.notReceivingBytesTimeout.restart(); + + this.bytes.push(chunk); + this.progress.lastLoadedChunkTimestamp = performance.now(); + this.progress.loadedBytes += chunk.length; + this._loadedBytes += chunk.length; + }; + + private firstBytesReceived = () => { + this.throwErrorIfNotLoadingStatus(); + this.notReceivingBytesTimeout.restart(); + }; + + private throwErrorIfNotLoadingStatus() { + if (this._status !== "loading") { + throw new Error(`Request has been already ${this.status}.`); + } + } + + private logger(message: string) { + this._logger.color = this.currentAttempt?.type === "http" ? "green" : "red"; + this._logger(message); + this._logger.color = ""; + } + + static getRequestItemId(segment: Segment) { + return segment.localId; + } +} + +const requestInnerErrorTypes = ["abort", "bytes-receiving-timeout"] as const; + +const httpRequestErrorTypes = ["fetch-error"] as const; + +const peerRequestErrorTypes = [ + "peer-response-bytes-mismatch", + "peer-segment-absent", + "peer-closed", +] as const; + +export type RequestInnerErrorType = (typeof requestInnerErrorTypes)[number]; +export type HttpRequestErrorType = (typeof httpRequestErrorTypes)[number]; +export type PeerRequestErrorType = (typeof peerRequestErrorTypes)[number]; + +type RequestErrorType = + | RequestInnerErrorType + | PeerRequestErrorType + | HttpRequestErrorType; + +export class RequestError< + T extends RequestErrorType = RequestErrorType +> extends Error { + constructor(readonly type: T, message?: string) { + super(message); + } + + static isRequestInnerErrorType( + error: RequestError + ): error is RequestError { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return requestInnerErrorTypes.includes(error.type as any); + } +} + +export class CoreRequestError extends Error { + constructor(readonly type: "failed" | "aborted") { + super(); + } +} + +export class Timeout { + private timeoutId?: number; + private ms?: number; + + constructor(private readonly action: () => void) {} + + start(ms: number) { + if (this.timeoutId) { + throw new Error("Timeout is already started."); + } + this.ms = ms; + this.timeoutId = window.setTimeout(this.action, this.ms); + } + + restart(ms?: number) { + if (this.timeoutId) clearTimeout(this.timeoutId); + if (ms) this.ms = ms; + if (!this.ms) return; + this.timeoutId = window.setTimeout(this.action, this.ms); + } + + clear() { + clearTimeout(this.timeoutId); + this.timeoutId = undefined; + } +} diff --git a/packages/p2p-media-loader-core/src/segments-storage.ts b/packages/p2p-media-loader-core/src/segments-storage.ts index e8b1a1db..545ee965 100644 --- a/packages/p2p-media-loader-core/src/segments-storage.ts +++ b/packages/p2p-media-loader-core/src/segments-storage.ts @@ -1,4 +1,6 @@ import { Segment, Settings, Stream } from "./types"; +import { EventDispatcher } from "./event-dispatcher"; +import * as StreamUtils from "./utils/stream"; import Debug from "debug"; type StorageSettings = Pick< @@ -6,62 +8,29 @@ type StorageSettings = Pick< "cachedSegmentExpiration" | "cachedSegmentsCount" >; -function getStreamShortExternalId(stream: Readonly) { - const { type, index } = stream; - return `${type}-${index}`; -} - function getStorageItemId(segment: Segment) { - const streamExternalId = getStreamShortExternalId(segment.stream); + const streamExternalId = StreamUtils.getStreamShortId(segment.stream); return `${streamExternalId}|${segment.externalId}`; } -export class Subscriptions< - T extends (...args: unknown[]) => void = () => void -> { - private readonly list: Set; - - constructor(handlers?: T | T[]) { - if (handlers) { - this.list = new Set(Array.isArray(handlers) ? handlers : [handlers]); - } else { - this.list = new Set(); - } - } - - add(handler: T) { - this.list.add(handler); - } - - remove(handler: T) { - this.list.delete(handler); - } - - fire(...args: Parameters) { - for (const handler of this.list) { - handler(...args); - } - } - - get isEmpty() { - return this.list.size === 0; - } -} - type StorageItem = { segment: Segment; data: ArrayBuffer; lastAccessed: number; }; +type StorageEventHandlers = { + [key in `onStorageUpdated${string}`]: (steam: Stream) => void; +}; + export class SegmentsMemoryStorage { private cache = new Map(); private _isInitialized = false; private readonly isSegmentLockedPredicates: (( segment: Segment ) => boolean)[] = []; - private onUpdateHandlers = new Map(); private readonly logger: Debug.Debugger; + private readonly events = new EventDispatcher(); constructor( private readonly masterManifestUrl: string, @@ -90,14 +59,13 @@ export class SegmentsMemoryStorage { async storeSegment(segment: Segment, data: ArrayBuffer) { const id = getStorageItemId(segment); - const streamId = getStreamShortExternalId(segment.stream); this.cache.set(id, { segment, data, lastAccessed: performance.now(), }); this.logger(`add segment: ${id}`); - this.fireOnUpdateSubscriptions(streamId); + this.dispatchStorageUpdatedEvent(segment.stream); void this.clear(); } @@ -116,10 +84,10 @@ export class SegmentsMemoryStorage { } getStoredSegmentExternalIdsOfStream(stream: Stream) { - const streamId = getStreamShortExternalId(stream); + const streamId = StreamUtils.getStreamShortId(stream); const externalIds: string[] = []; for (const { segment } of this.cache.values()) { - const itemStreamId = getStreamShortExternalId(segment.stream); + const itemStreamId = StreamUtils.getStreamShortId(segment.stream); if (itemStreamId === streamId) externalIds.push(segment.externalId); } return externalIds; @@ -128,7 +96,7 @@ export class SegmentsMemoryStorage { private async clear(): Promise { const itemsToDelete: string[] = []; const remainingItems: [string, StorageItem][] = []; - const streamIdsOfChangedItems = new Set(); + const streamsOfChangedItems = new Set(); // Delete old segments const now = performance.now(); @@ -138,9 +106,8 @@ export class SegmentsMemoryStorage { const { lastAccessed, segment } = item; if (now - lastAccessed > this.settings.cachedSegmentExpiration) { if (!this.isSegmentLocked(segment)) { - const streamId = getStreamShortExternalId(segment.stream); itemsToDelete.push(itemId); - streamIdsOfChangedItems.add(streamId); + streamsOfChangedItems.add(segment.stream); } } else { remainingItems.push(entry); @@ -155,9 +122,8 @@ export class SegmentsMemoryStorage { for (const [itemId, { segment }] of remainingItems) { if (!this.isSegmentLocked(segment)) { - const streamId = getStreamShortExternalId(segment.stream); itemsToDelete.push(itemId); - streamIdsOfChangedItems.add(streamId); + streamsOfChangedItems.add(segment.stream); countOverhead--; if (countOverhead === 0) break; } @@ -167,40 +133,39 @@ export class SegmentsMemoryStorage { if (itemsToDelete.length) { this.logger(`cleared ${itemsToDelete.length} segments`); itemsToDelete.forEach((id) => this.cache.delete(id)); - for (const streamId of streamIdsOfChangedItems) { - this.fireOnUpdateSubscriptions(streamId); + for (const stream of streamsOfChangedItems) { + this.dispatchStorageUpdatedEvent(stream); } } return itemsToDelete.length > 0; } - subscribeOnUpdate(stream: Stream, handler: () => void) { - const streamId = getStreamShortExternalId(stream); - const handlers = this.onUpdateHandlers.get(streamId); - if (!handlers) { - this.onUpdateHandlers.set(streamId, new Subscriptions(handler)); - } else { - handlers.add(handler); - } + subscribeOnUpdate( + stream: Stream, + listener: StorageEventHandlers["onStorageUpdated"] + ) { + const localId = StreamUtils.getStreamShortId(stream); + this.events.subscribe(`onStorageUpdated-${localId}`, listener); } - unsubscribeFromUpdate(stream: Stream, handler: () => void) { - const streamId = getStreamShortExternalId(stream); - const handlers = this.onUpdateHandlers.get(streamId); - if (handlers) { - handlers.remove(handler); - if (handlers.isEmpty) this.onUpdateHandlers.delete(streamId); - } + unsubscribeFromUpdate( + stream: Stream, + listener: StorageEventHandlers["onStorageUpdated"] + ) { + const localId = StreamUtils.getStreamShortId(stream); + this.events.unsubscribe(`onStorageUpdated-${localId}`, listener); } - private fireOnUpdateSubscriptions(streamId: string) { - this.onUpdateHandlers.get(streamId)?.fire(); + private dispatchStorageUpdatedEvent(stream: Stream) { + this.events.dispatch( + `onStorageUpdated${StreamUtils.getStreamShortId(stream)}`, + stream + ); } public async destroy() { this.cache.clear(); - this.onUpdateHandlers.clear(); this._isInitialized = false; } } diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index a894961a..85d65fd3 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -1,7 +1,5 @@ import { LinkedMap } from "./linked-map"; -import { HybridLoaderRequest } from "./request-container"; - -export type { EngineCallbacks } from "./request-container"; +import { RequestAttempt } from "./request"; export type StreamType = "main" | "secondary"; @@ -58,13 +56,11 @@ export type Settings = { cachedSegmentExpiration: number; cachedSegmentsCount: number; webRtcMaxMessageSize: number; - p2pSegmentDownloadTimeout: number; - p2pLoaderDestroyTimeout: number; + p2pNotReceivingBytesTimeoutMs: number; + p2pLoaderDestroyTimeoutMs: number; + httpNotReceivingBytesTimeoutMs: number; }; export type CoreEventHandlers = { - onSegmentLoaded?: ( - byteLength: number, - type: HybridLoaderRequest["type"] - ) => void; + onSegmentLoaded?: (byteLength: number, type: RequestAttempt["type"]) => void; }; diff --git a/packages/p2p-media-loader-core/src/utils/logger.ts b/packages/p2p-media-loader-core/src/utils/logger.ts index c479f828..3133c7fc 100644 --- a/packages/p2p-media-loader-core/src/utils/logger.ts +++ b/packages/p2p-media-loader-core/src/utils/logger.ts @@ -1,5 +1,6 @@ import { Segment, Stream } from "../types"; -import { QueueItem, QueueItemStatuses } from "../internal-types"; +import { QueueItem } from "../internal-types"; +import { SegmentPlaybackStatuses } from "./stream"; export function getStreamString(stream: Stream) { return `${stream.type}-${stream.index}`; @@ -10,7 +11,9 @@ export function getSegmentString(segment: Segment) { return `(${getStreamString(segment.stream)} | ${externalId})`; } -export function getStatusesString(statuses: QueueItemStatuses): string { +export function getSegmentPlaybackStatusesString( + statuses: SegmentPlaybackStatuses +): string { const { isHighDemand, isHttpDownloadable, isP2PDownloadable } = statuses; if (isHighDemand) return "high-demand"; if (isHttpDownloadable && isP2PDownloadable) return "http-p2p-window"; @@ -21,6 +24,6 @@ export function getStatusesString(statuses: QueueItemStatuses): string { export function getQueueItemString(item: QueueItem) { const { segment, statuses } = item; - const statusString = getStatusesString(statuses); + const statusString = getSegmentPlaybackStatusesString(statuses); return `${segment.externalId} ${statusString}`; } diff --git a/packages/p2p-media-loader-core/src/utils/queue.ts b/packages/p2p-media-loader-core/src/utils/queue.ts index 9b629596..428e3250 100644 --- a/packages/p2p-media-loader-core/src/utils/queue.ts +++ b/packages/p2p-media-loader-core/src/utils/queue.ts @@ -1,11 +1,10 @@ -import { Segment, Settings } from "../types"; +import { Segment } from "../types"; +import { Playback, QueueItem } from "../internal-types"; import { - LoadBufferRanges, - NumberRange, - Playback, - QueueItem, - QueueItemStatuses, -} from "../internal-types"; + getSegmentPlaybackStatuses, + SegmentPlaybackStatuses, + PlaybackTimeWindowsSettings, +} from "./stream"; export function generateQueue({ lastRequestedSegment, @@ -15,13 +14,9 @@ export function generateQueue({ }: { lastRequestedSegment: Readonly; playback: Readonly; - skipSegment: (segment: Segment, statuses: QueueItemStatuses) => boolean; - settings: Pick< - Settings, - "highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow" - >; + skipSegment: (segment: Segment, statuses: SegmentPlaybackStatuses) => boolean; + settings: PlaybackTimeWindowsSettings; }): { queue: QueueItem[]; queueSegmentIds: Set } { - const bufferRanges = getLoadBufferRanges(playback, settings); const { localId: requestedSegmentId, stream } = lastRequestedSegment; const queue: QueueItem[] = []; @@ -31,13 +26,13 @@ export function generateQueue({ const isNextNotActual = (segmentId: string) => { const next = segments.getNextTo(segmentId)?.[1]; if (!next) return true; - const statuses = getSegmentLoadStatuses(next, bufferRanges); + const statuses = getSegmentPlaybackStatuses(next, playback, settings); return isNotActualStatuses(statuses); }; let i = 0; for (const segment of segments.values(requestedSegmentId)) { - const statuses = getSegmentLoadStatuses(segment, bufferRanges); + const statuses = getSegmentPlaybackStatuses(segment, playback, settings); const isNotActual = isNotActualStatuses(statuses); if (isNotActual && (i !== 0 || isNextNotActual(requestedSegmentId))) break; i++; @@ -51,72 +46,7 @@ export function generateQueue({ return { queue, queueSegmentIds }; } -export function getLoadBufferRanges( - playback: Readonly, - settings: Pick< - Settings, - "highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow" - > -): LoadBufferRanges { - const { position, rate } = playback; - const { - highDemandTimeWindow, - httpDownloadTimeWindow, - p2pDownloadTimeWindow, - } = settings; - - const getRange = (position: number, rate: number, bufferLength: number) => { - return { - from: position, - to: position + rate * bufferLength, - }; - }; - return { - highDemand: getRange(position, rate, highDemandTimeWindow), - http: getRange(position, rate, httpDownloadTimeWindow), - p2p: getRange(position, rate, p2pDownloadTimeWindow), - }; -} - -export function getSegmentLoadStatuses( - segment: Readonly, - loadBufferRanges: LoadBufferRanges -): QueueItemStatuses { - const { highDemand, http, p2p } = loadBufferRanges; - const { startTime, endTime } = segment; - - const isValueInRange = (value: number, range: NumberRange) => - value >= range.from && value < range.to; - - return { - isHighDemand: - isValueInRange(startTime, highDemand) || - isValueInRange(endTime, highDemand), - isHttpDownloadable: - isValueInRange(startTime, http) || isValueInRange(endTime, http), - isP2PDownloadable: - isValueInRange(startTime, p2p) || isValueInRange(endTime, p2p), - }; -} - -function isNotActualStatuses(statuses: QueueItemStatuses) { +function isNotActualStatuses(statuses: SegmentPlaybackStatuses) { const { isHighDemand, isHttpDownloadable, isP2PDownloadable } = statuses; return !isHighDemand && !isHttpDownloadable && !isP2PDownloadable; } - -export function isSegmentActual( - segment: Readonly, - bufferRanges: LoadBufferRanges -) { - const { startTime, endTime } = segment; - const { highDemand, p2p, http } = bufferRanges; - - const isInRange = (value: number) => { - return ( - value > highDemand.from && - (value < highDemand.to || value < http.to || value < p2p.to) - ); - }; - - return isInRange(startTime) || isInRange(endTime); -} diff --git a/packages/p2p-media-loader-core/src/utils/stream.ts b/packages/p2p-media-loader-core/src/utils/stream.ts new file mode 100644 index 00000000..516e0501 --- /dev/null +++ b/packages/p2p-media-loader-core/src/utils/stream.ts @@ -0,0 +1,108 @@ +import { Segment, Settings, Stream, StreamWithSegments } from "../types"; +import { Playback } from "../internal-types"; + +export type SegmentPlaybackStatuses = { + isHighDemand: boolean; + isHttpDownloadable: boolean; + isP2PDownloadable: boolean; +}; + +export type PlaybackTimeWindowsSettings = Pick< + Settings, + "highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow" +>; + +const PEER_PROTOCOL_VERSION = "V1"; + +export function getStreamExternalId( + manifestResponseUrl: string, + stream: Readonly +): string { + const { type, index } = stream; + return `${PEER_PROTOCOL_VERSION}:${manifestResponseUrl}-${type}-${index}`; +} + +export function getSegmentFromStreamsMap( + streams: Map, + segmentId: string +): Segment | undefined { + for (const stream of streams.values()) { + const segment = stream.segments.get(segmentId); + if (segment) return segment; + } +} + +export function getSegmentFromStreamByExternalId( + stream: StreamWithSegments, + segmentExternalId: string +): Segment | undefined { + for (const segment of stream.segments.values()) { + if (segment.externalId === segmentExternalId) return segment; + } +} + +export function getStreamShortId(stream: Stream) { + return `${stream.type}-${stream.index}`; +} + +export function getSegmentAvgDuration(stream: StreamWithSegments) { + const { segments } = stream; + let sumDuration = 0; + const size = segments.size; + for (const segment of segments.values()) { + const duration = segment.endTime - segment.startTime; + sumDuration += duration; + } + + return sumDuration / size; +} + +export function isSegmentActualInPlayback( + segment: Readonly, + playback: Playback, + timeWindowsSettings: PlaybackTimeWindowsSettings +) { + const statuses = getSegmentPlaybackStatuses( + segment, + playback, + timeWindowsSettings + ); + return ( + statuses.isHighDemand || + statuses.isHttpDownloadable || + statuses.isP2PDownloadable + ); +} + +export function getSegmentPlaybackStatuses( + segment: Segment, + playback: Playback, + timeWindowsSettings: PlaybackTimeWindowsSettings +): SegmentPlaybackStatuses { + const { + highDemandTimeWindow, + httpDownloadTimeWindow, + p2pDownloadTimeWindow, + } = timeWindowsSettings; + + return { + isHighDemand: isInTimeWindow(segment, playback, highDemandTimeWindow), + isHttpDownloadable: isInTimeWindow( + segment, + playback, + httpDownloadTimeWindow + ), + isP2PDownloadable: isInTimeWindow(segment, playback, p2pDownloadTimeWindow), + }; +} + +function isInTimeWindow( + segment: Segment, + playback: Playback, + timeWindowLength: number +) { + const { startTime, endTime } = segment; + const { position, rate } = playback; + const rightMargin = position + timeWindowLength * rate; + return !(rightMargin < startTime || position > endTime); +} diff --git a/packages/p2p-media-loader-core/src/utils/utils.ts b/packages/p2p-media-loader-core/src/utils/utils.ts index d9fb72fd..a47d2a18 100644 --- a/packages/p2p-media-loader-core/src/utils/utils.ts +++ b/packages/p2p-media-loader-core/src/utils/utils.ts @@ -1,34 +1,3 @@ -import { Segment, Stream, StreamWithSegments } from "../index"; - -const PEER_PROTOCOL_VERSION = "V1"; - -export function getStreamExternalId( - manifestResponseUrl: string, - stream: Readonly -): string { - const { type, index } = stream; - return `${PEER_PROTOCOL_VERSION}:${manifestResponseUrl}-${type}-${index}`; -} - -export function getSegmentFromStreamsMap( - streams: Map, - segmentId: string -): Segment | undefined { - for (const stream of streams.values()) { - const segment = stream.segments.get(segmentId); - if (segment) return segment; - } -} - -export function getSegmentFromStreamByExternalId( - stream: StreamWithSegments, - segmentExternalId: string -): Segment | undefined { - for (const segment of stream.segments.values()) { - if (segment.externalId === segmentExternalId) return segment; - } -} - export function getControlledPromise() { let resolve: (value: T) => void; let reject: (reason?: unknown) => void; @@ -45,3 +14,28 @@ export function getControlledPromise() { reject: reject!, }; } + +export function joinChunks( + chunks: Uint8Array[], + totalBytes?: number +): ArrayBuffer { + if (totalBytes === undefined) { + totalBytes = chunks.reduce((sum, chunk) => sum + chunk.byteLength, 0); + } + const buffer = new Uint8Array(totalBytes); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.byteLength; + } + + return buffer; +} + +export function getPercent(numerator: number, denominator: number): number { + return (numerator / denominator) * 100; +} + +export function getRandomItem(items: T[]): T { + return items[Math.floor(Math.random() * items.length)]; +} diff --git a/packages/p2p-media-loader-hlsjs/src/fragment-loader.ts b/packages/p2p-media-loader-hlsjs/src/fragment-loader.ts index 8ed41e65..0bc75e7a 100644 --- a/packages/p2p-media-loader-hlsjs/src/fragment-loader.ts +++ b/packages/p2p-media-loader-hlsjs/src/fragment-loader.ts @@ -8,12 +8,7 @@ import type { LoaderStats, } from "hls.js"; import * as Utils from "./utils"; -import { - RequestAbortError, - Core, - FetchError, - SegmentResponse, -} from "p2p-media-loader-core"; +import { Core, SegmentResponse, CoreRequestError } from "p2p-media-loader-core"; const DEFAULT_DOWNLOAD_LATENCY = 10; @@ -89,7 +84,13 @@ export class FragmentLoaderBase implements Loader { }; const onError = (error: unknown) => { - if (error instanceof RequestAbortError && this.stats.aborted) return; + if ( + error instanceof CoreRequestError && + error.type === "aborted" && + this.stats.aborted + ) { + return; + } this.handleError(error); }; @@ -98,15 +99,16 @@ export class FragmentLoaderBase implements Loader { private handleError(thrownError: unknown) { const error = { code: 0, text: "" }; - let details: object | null = null; - if (thrownError instanceof FetchError) { - error.code = thrownError.code; + if ( + thrownError instanceof CoreRequestError && + thrownError.type === "failed" + ) { + // error.code = thrownError.code; error.text = thrownError.message; - details = thrownError.details; } else if (thrownError instanceof Error) { error.text = thrownError.message; } - this.callbacks?.onError(error, this.context, details, this.stats); + this.callbacks?.onError(error, this.context, null, this.stats); } private abortInternal() { diff --git a/packages/p2p-media-loader-shaka/src/engine.ts b/packages/p2p-media-loader-shaka/src/engine.ts index 8ccb40d4..626fe8de 100644 --- a/packages/p2p-media-loader-shaka/src/engine.ts +++ b/packages/p2p-media-loader-shaka/src/engine.ts @@ -13,7 +13,7 @@ import { HookedRequest, P2PMLShakaData, } from "./types"; -import { LoadingHandler } from "./loading-handler"; +import { Loader } from "./loading-handler"; import { decorateMethod } from "./utils"; import { Core, CoreEventHandlers } from "p2p-media-loader-core"; @@ -113,13 +113,13 @@ export class Engine { const { p2pml } = request; if (!p2pml) return this.shaka.net.HttpFetchPlugin.parse(...args); - const loadingHandler = new LoadingHandler( + const loadingHandler = new Loader( p2pml.shaka, p2pml.core, p2pml.streamInfo, p2pml.segmentManager ); - return loadingHandler.handleLoading(...args); + return loadingHandler.load(...args); }; this.shaka.net.NetworkingEngine.registerScheme("http", handleLoading); diff --git a/packages/p2p-media-loader-shaka/src/loading-handler.ts b/packages/p2p-media-loader-shaka/src/loading-handler.ts index 99773cfd..eed55ddd 100644 --- a/packages/p2p-media-loader-shaka/src/loading-handler.ts +++ b/packages/p2p-media-loader-shaka/src/loading-handler.ts @@ -2,17 +2,18 @@ import * as Utils from "./stream-utils"; import { SegmentManager } from "./segment-manager"; import { StreamInfo } from "./types"; import { Shaka, Stream } from "./types"; -import { Core, EngineCallbacks, SegmentResponse } from "p2p-media-loader-core"; - -interface LoadingHandlerInterface { - handleLoading: shaka.extern.SchemePlugin; -} +import { + Core, + CoreRequestError, + SegmentResponse, + EngineCallbacks, +} from "p2p-media-loader-core"; type LoadingHandlerParams = Parameters; type Response = shaka.extern.Response; type LoadingHandlerResult = shaka.extern.IAbortableOperation; -export class LoadingHandler implements LoadingHandlerInterface { +export class Loader { private loadArgs!: LoadingHandlerParams; constructor( @@ -27,12 +28,12 @@ export class LoadingHandler implements LoadingHandlerInterface { return fetchPlugin.parse(...this.loadArgs); } - handleLoading(...args: LoadingHandlerParams): LoadingHandlerResult { + load(...args: LoadingHandlerParams): LoadingHandlerResult { this.loadArgs = args; const { RequestType } = this.shaka.net.NetworkingEngine; const [url, request, requestType] = args; if (requestType === RequestType.SEGMENT) { - return this.handleSegmentLoading(url, request.headers.Range); + return this.loadSegment(url, request.headers.Range); } const loading = this.defaultLoad(); @@ -61,7 +62,7 @@ export class LoadingHandler implements LoadingHandlerInterface { } } - private handleSegmentLoading( + private loadSegment( segmentUrl: string, byteRangeString: string ): LoadingHandlerResult { @@ -70,15 +71,33 @@ export class LoadingHandler implements LoadingHandlerInterface { const loadSegment = async (): Promise => { const { request, callbacks } = getSegmentRequest(); - await this.core.loadSegment(segmentId, callbacks); - const { data, bandwidth } = await request; - return { - data, - headers: {}, - uri: segmentUrl, - originalUri: segmentUrl, - timeMs: getLoadingDurationBasedOnBandwidth(bandwidth, data.byteLength), - }; + void this.core.loadSegment(segmentId, callbacks); + try { + const { data, bandwidth } = await request; + return { + data, + headers: {}, + uri: segmentUrl, + originalUri: segmentUrl, + timeMs: getLoadingDurationBasedOnBandwidth( + bandwidth, + data.byteLength + ), + }; + } catch (error) { + // TODO: throw Shaka Errors + if (error instanceof CoreRequestError) { + const { Error: ShakaError } = this.shaka.util; + if (error.type === "aborted") { + throw new ShakaError( + ShakaError.Severity.RECOVERABLE, + ShakaError.Category.NETWORK, + this.shaka.util.Error.Code.OPERATION_ABORTED + ); + } + } + throw error; + } }; return new this.shaka.util.AbortableOperation(loadSegment(), async () => diff --git a/packages/p2p-media-loader-shaka/src/segment-manager.ts b/packages/p2p-media-loader-shaka/src/segment-manager.ts index 856610eb..beed778e 100644 --- a/packages/p2p-media-loader-shaka/src/segment-manager.ts +++ b/packages/p2p-media-loader-shaka/src/segment-manager.ts @@ -63,7 +63,7 @@ export class SegmentManager { const staleSegmentsIds = new Set(managerStream.segments.keys()); const newSegments: SegmentBase[] = []; for (const reference of segmentReferences) { - const externalId = (+reference.getStartTime().toFixed(3)).toString(); + const externalId = Math.trunc(reference.getStartTime()).toString(); const segmentLocalId = Utils.getSegmentLocalIdFromReference(reference); if (!managerStream.segments.has(segmentLocalId)) {