Skip to content

Commit

Permalink
Delegate control to process queue. (#313)
Browse files Browse the repository at this point in the history
* Reduce LoadProgress object type.

* Create microtask for process queue.

* Add event dispatcher class.

* Create Request class.

* Move Request file to separate file.

* Change style of bittorrent-tracker declarations event handlers types.

* Create P2PTrackerClient class to encapsulate peer connection logic.

* Add TODO task.

* Add requests error handling.

* Fix types errors.

* P2P announce segments on http requests state change.

* Fix bugs.

* Make broadcastAnnouncement method as function expression.

* Refactor Request code. Create separate flows for each type of abort.

* Remove event subscriptions to request instance.

* Fix type errors.

* Fix bugs.

* Fix lint warnings.

* Move P2P tracker client to separate file.

* Fix more bugs. Rewrite loggers.

* Remove unused code.

* Use function declaration instead of expression.

---------

Co-authored-by: Igor Zolotarenko <[email protected]>
  • Loading branch information
i-zolotarenko and i-zolotarenko authored Dec 12, 2023
1 parent 3b0b69a commit 4a72e36
Show file tree
Hide file tree
Showing 26 changed files with 1,175 additions and 1,160 deletions.
8 changes: 3 additions & 5 deletions p2p-media-loader-demo/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,11 @@ function useLocalStorageItem<T>(

const loggers = [
"core:hybrid-loader-main",
"core:hybrid-loader-main-engine",
"core:hybrid-loader-secondary",
"core:hybrid-loader-secondary-engine",
"core:p2p-loader",
"core:p2p-tracker-client",
"core:peer",
"core:p2p-loaders-container",
"core:requests-container-main",
"core:requests-container-secondary",
"core:request-main",
"core:request-secondary",
"core:segment-memory-storage",
] as const;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { LoadProgress } from "./request-container";
import { LoadProgress } from "./request";

export class BandwidthApproximator {
private readonly loadings: LoadProgress[] = [];
Expand Down
26 changes: 14 additions & 12 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,27 @@ import {
SegmentBase,
CoreEventHandlers,
} from "./types";
import * as Utils from "./utils/utils";
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { EngineCallbacks } from "./request-container";
import { EngineCallbacks } from "./request";
import { SegmentsMemoryStorage } from "./segments-storage";

export class Core<TStream extends Stream = Stream> {
private manifestResponseUrl?: string;
private readonly streams = new Map<string, StreamWithSegments<TStream>>();
private readonly settings: Settings = {
simultaneousHttpDownloads: 2,
simultaneousHttpDownloads: 3,
simultaneousP2PDownloads: 3,
highDemandTimeWindow: 15,
httpDownloadTimeWindow: 45,
p2pDownloadTimeWindow: 45,
cachedSegmentExpiration: 120 * 1000,
cachedSegmentsCount: 50,
webRtcMaxMessageSize: 64 * 1024 - 1,
p2pSegmentDownloadTimeout: 5000,
p2pLoaderDestroyTimeout: 30 * 1000,
p2pNotReceivingBytesTimeoutMs: 1000,
p2pLoaderDestroyTimeoutMs: 30 * 1000,
httpNotReceivingBytesTimeoutMs: 1000,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private segmentStorage?: SegmentsMemoryStorage;
Expand All @@ -40,7 +41,7 @@ export class Core<TStream extends Stream = Stream> {
}

hasSegment(segmentLocalId: string): boolean {
const segment = Utils.getSegmentFromStreamsMap(
const segment = StreamUtils.getSegmentFromStreamsMap(
this.streams,
segmentLocalId
);
Expand Down Expand Up @@ -92,11 +93,9 @@ export class Core<TStream extends Stream = Stream> {
void loader.loadSegment(segment, callbacks);
}

abortSegmentLoading(segmentId: string): void {
const segment = this.identifySegment(segmentId);
const streamType = segment.stream.type;
if (streamType === "main") this.mainStreamLoader?.abortSegment(segment);
else this.secondaryStreamLoader?.abortSegment(segment);
abortSegmentLoading(segmentLocalId: string): void {
this.mainStreamLoader?.abortSegmentRequest(segmentLocalId);
this.secondaryStreamLoader?.abortSegmentRequest(segmentLocalId);
}

updatePlayback(position: number, rate: number): void {
Expand All @@ -116,7 +115,10 @@ export class Core<TStream extends Stream = Stream> {
throw new Error("Manifest response url is undefined");
}

const segment = Utils.getSegmentFromStreamsMap(this.streams, segmentId);
const segment = StreamUtils.getSegmentFromStreamsMap(
this.streams,
segmentId
);
if (!segment) {
throw new Error(`Not found segment with id: ${segmentId}`);
}
Expand Down
47 changes: 18 additions & 29 deletions packages/p2p-media-loader-core/src/declarations.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ declare module "bittorrent-tracker" {
getAnnounceOpts?: () => object;
});

on<E extends TrackerEvent>(event: E, handler: TrackerEventHandler<E>): void;
on<E extends keyof TrackerClientEvents>(
event: E,
handler: TrackerClientEvents[E]
): void;

start(): void;

Expand All @@ -20,39 +23,25 @@ declare module "bittorrent-tracker" {
destroy(): void;
}

export type TrackerEvent = "update" | "peer" | "warning" | "error";

export type TrackerEventHandler<E extends TrackerEvent> = E extends "update"
? (data: object) => void
: E extends "peer"
? (peer: PeerConnection) => void
: E extends "warning"
? (warning: unknown) => void
: E extends "error"
? (error: unknown) => void
: never;

type PeerEvent = "connect" | "data" | "close" | "error";

export type PeerConnectionEventHandler<E extends PeerEvent> =
E extends "connect"
? () => void
: E extends "data"
? (data: ArrayBuffer) => void
: E extends "close"
? () => void
: E extends "error"
? (error: { code: string }) => void
: never;
export type TrackerClientEvents = {
update: (data: object) => void;
peer: (peer: PeerConnection) => void;
warning: (warning: unknown) => void;
error: (error: unknown) => void;
};

export type PeerEvents = {
connect: () => void;
data: (data: Uint8Array) => void;
close: () => void;
error: (error: { code: string }) => void;
};

export type PeerConnection = {
id: string;
initiator: boolean;
_channel: RTCDataChannel;
on<E extends PeerEvent>(
event: E,
handler: PeerConnectionEventHandler<E>
): void;
on<E extends keyof PeerEvents>(event: E, handler: PeerEvents[E]): void;
send(data: string | ArrayBuffer): void;
write(data: string | ArrayBuffer): void;
destroy(): void;
Expand Down
16 changes: 0 additions & 16 deletions packages/p2p-media-loader-core/src/errors.ts

This file was deleted.

31 changes: 31 additions & 0 deletions packages/p2p-media-loader-core/src/event-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export class EventDispatcher<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
T extends { [key: string]: (...args: any[]) => void | Promise<void> },
K extends keyof T = keyof T
> {
private readonly listeners = new Map<keyof T, Set<T[K]>>();

subscribe(eventType: K, ...listeners: T[K][]) {
let eventListeners = this.listeners.get(eventType);
if (!eventListeners) {
eventListeners = new Set();
this.listeners.set(eventType, eventListeners);
}
for (const listener of listeners) eventListeners.add(listener);
}

unsubscribe(eventType: K, listener: T[K]) {
const eventListeners = this.listeners.get(eventType);
if (!eventListeners) return;
eventListeners.delete(listener);
if (!eventListeners.size) this.listeners.delete(eventType);
}

dispatch(eventType: K, ...args: Parameters<T[K]>) {
const eventListeners = this.listeners.get(eventType);
if (!eventListeners) return;
for (const listener of eventListeners) {
listener(...args);
}
}
}
131 changes: 40 additions & 91 deletions packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
@@ -1,111 +1,60 @@
import { RequestAbortError, FetchError } from "./errors";
import { Segment } from "./types";
import { HttpRequest, LoadProgress } from "./request-container";
import { Settings } from "./types";
import { Request, RequestError, HttpRequestErrorType } from "./request";

export function getHttpSegmentRequest(segment: Segment): Readonly<HttpRequest> {
const { promise, abortController, progress } = fetchSegmentData(segment);
return {
type: "http",
promise,
progress,
abort: () => abortController.abort(),
};
}

function fetchSegmentData(segment: Segment) {
export async function fulfillHttpSegmentRequest(
request: Request,
settings: Pick<Settings, "httpNotReceivingBytesTimeoutMs">
) {
const headers = new Headers();
const { url, byteRange, localId: segmentId } = segment;
const { segment } = request;
const { url, byteRange } = segment;

if (byteRange) {
const { start, end } = byteRange;
const byteRangeString = `bytes=${start}-${end}`;
headers.set("Range", byteRangeString);
}
const abortController = new AbortController();

const progress: LoadProgress = {
canBeTracked: false,
totalBytes: 0,
loadedBytes: 0,
percent: 0,
startTimestamp: performance.now(),
};
const loadSegmentData = async () => {
try {
const response = await window.fetch(url, {
headers,
signal: abortController.signal,
});

if (response.ok) {
return await getDataPromiseAndMonitorProgress(response, progress);
}
throw new FetchError(
response.statusText ?? `Network response was not for ${segmentId}`,
response.status,
response
);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
throw new RequestAbortError(`Segment fetch was aborted ${segmentId}`);
}
throw error;
const abortController = new AbortController();
const requestControls = request.start(
{ type: "http" },
{
abort: () => abortController.abort("abort"),
notReceivingBytesTimeoutMs: settings.httpNotReceivingBytesTimeoutMs,
}
};

return {
promise: loadSegmentData(),
abortController,
progress,
};
}

async function getDataPromiseAndMonitorProgress(
response: Response,
progress: LoadProgress
): Promise<ArrayBuffer> {
const totalBytesString = response.headers.get("Content-Length");
if (!response.body) {
return response.arrayBuffer().then((data) => {
progress.loadedBytes = data.byteLength;
progress.totalBytes = data.byteLength;
progress.lastLoadedChunkTimestamp = performance.now();
progress.percent = 100;
return data;
);
try {
const fetchResponse = await window.fetch(url, {
headers,
signal: abortController.signal,
});
}
requestControls.firstBytesReceived();

if (totalBytesString) {
progress.totalBytes = +totalBytesString;
progress.canBeTracked = true;
}
if (!fetchResponse.ok) {
throw new RequestError("fetch-error", fetchResponse.statusText);
}

const reader = response.body.getReader();
if (!fetchResponse.body) return;
const totalBytesString = fetchResponse.headers.get("Content-Length");
if (totalBytesString) request.setTotalBytes(+totalBytesString);

progress.startTimestamp = performance.now();
const chunks: Uint8Array[] = [];
for await (const chunk of readStream(reader)) {
chunks.push(chunk);
progress.loadedBytes += chunk.length;
progress.lastLoadedChunkTimestamp = performance.now();
if (progress.canBeTracked) {
progress.percent = (progress.loadedBytes / progress.totalBytes) * 100;
const reader = fetchResponse.body.getReader();
for await (const chunk of readStream(reader)) {
requestControls.addLoadedChunk(chunk);
}
}
requestControls.completeOnSuccess();
} catch (error) {
if (error instanceof Error) {
if (error.name !== "abort") return;

if (!progress.canBeTracked) {
progress.totalBytes = progress.loadedBytes;
progress.percent = 100;
}
const resultBuffer = new ArrayBuffer(progress.loadedBytes);
const view = new Uint8Array(resultBuffer);

let offset = 0;
for (const chunk of chunks) {
view.set(chunk, offset);
offset += chunk.length;
const httpLoaderError: RequestError<HttpRequestErrorType> = !(
error instanceof RequestError
)
? new RequestError("fetch-error", error.message)
: error;
requestControls.abortOnError(httpLoaderError);
}
}
return resultBuffer;
}

async function* readStream(
Expand Down
Loading

0 comments on commit 4a72e36

Please sign in to comment.