Skip to content

Commit

Permalink
Add requests error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Nov 15, 2023
1 parent 3247e01 commit 92021d3
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 162 deletions.
48 changes: 14 additions & 34 deletions packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Settings } from "./types";
import { Request } from "./request";
import { Request, RequestError, HttpRequestErrorType } from "./request";

export async function fulfillHttpSegmentRequest(
request: Request,
settings: Pick<Settings, "httpRequestTimeout">
settings: Pick<Settings, "httpDownloadTimeoutMs">
) {
const headers = new Headers();
const { segment } = request;
Expand All @@ -16,23 +16,19 @@ export async function fulfillHttpSegmentRequest(
}

const abortController = new AbortController();

const requestAbortTimeout = setTimeout(() => {
const errorType: HttpLoaderError["type"] = "request-timeout";
abortController.abort(errorType);
}, settings.httpRequestTimeout);

const abortManually = () => {
const abortErrorType: HttpLoaderError["type"] = "manual-abort";
abortController.abort(abortErrorType);
};

const requestControls = request.start("http", abortManually);
const requestControls = request.start(
{ type: "http" },
{
abort: (errorType) => abortController.abort(errorType),
fullLoadingTimeoutMs: settings.httpDownloadTimeoutMs,
}
);
try {
const fetchResponse = await window.fetch(url, {
headers,
signal: abortController.signal,
});
requestControls.firstBytesReceived();

if (fetchResponse.ok) {
if (!fetchResponse.body) return;
Expand All @@ -45,20 +41,13 @@ export async function fulfillHttpSegmentRequest(
requestControls.addLoadedChunk(chunk);
}
requestControls.completeOnSuccess();
clearTimeout(requestAbortTimeout);
}
throw new HttpLoaderError("fetch-error", fetchResponse.statusText);
throw new RequestError("fetch-error", fetchResponse.statusText);
} catch (error) {
if (error instanceof Error) {
let httpLoaderError: HttpLoaderError;
if ((error.name as HttpLoaderError["type"]) === "manual-abort") {
httpLoaderError = new HttpLoaderError("manual-abort");
} else if (
(error.name as HttpLoaderError["type"]) === "request-timeout"
) {
httpLoaderError = new HttpLoaderError("request-timeout");
} else if (!(error instanceof HttpLoaderError)) {
httpLoaderError = new HttpLoaderError("fetch-error", error.message);
let httpLoaderError: RequestError<HttpRequestErrorType>;
if (!(error instanceof RequestError)) {
httpLoaderError = new RequestError("fetch-error", error.message);
} else {
httpLoaderError = error;
}
Expand All @@ -76,12 +65,3 @@ async function* readStream(
yield value;
}
}

export class HttpLoaderError extends Error {
constructor(
readonly type: "request-timeout" | "fetch-error" | "manual-abort",
message?: string
) {
super(message);
}
}
31 changes: 6 additions & 25 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import { Settings, CoreEventHandlers } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { Playback, QueueItem } from "./internal-types";
import { RequestsContainer } from "./request-container";
import { Request, EngineCallbacks, HybridLoaderRequest } from "./request";
import { Request, EngineCallbacks, RequestError } from "./request";
import * as QueueUtils from "./utils/queue";
import * as LoggerUtils from "./utils/logger";
import * as StreamUtils from "./utils/stream";
import * as Utils from "./utils/utils";
import { P2PLoadersContainer } from "./p2p/loaders-container";
import { PeerRequestError } from "./p2p/peer";
import debug from "debug";

export class HybridLoader {
Expand Down Expand Up @@ -201,7 +200,7 @@ export class HybridLoader {
abortSegment(segment: Segment) {
const request = this.requests.get(segment);
if (!request) return;
request.abort();
request.abortEngineRequest();
this.createProcessQueueMicrotask();
this.logger.engine("abort: ", LoggerUtils.getSegmentString(segment));
}
Expand All @@ -210,7 +209,7 @@ export class HybridLoader {
const { segment } = item;

const request = this.requests.getOrCreateRequest(segment);
request.subscribe("onCompleted", this.onRequestCompleted);
request.subscribe("onSuccess", this.onRequestSucceed);
request.subscribe("onError", this.onRequestError);

void fulfillHttpSegmentRequest(request, this.settings);
Expand All @@ -226,21 +225,18 @@ export class HybridLoader {
const request = p2pLoader.downloadSegment(item);
if (request === undefined) return;

request.subscribe("onCompleted", this.onRequestCompleted);
request.subscribe("onSuccess", this.onRequestSucceed);
request.subscribe("onError", this.onRequestError);
}

private onRequestCompleted = (request: Request, data: ArrayBuffer) => {
private onRequestSucceed = (request: Request, data: ArrayBuffer) => {
const { segment } = request;
this.logger.loader(`http responses: ${segment.externalId}`);
this.eventHandlers?.onSegmentLoaded?.(data.byteLength, "http");
this.createProcessQueueMicrotask();
};

private onRequestError = (request: Request, error: Error) => {
if (!(error instanceof PeerRequestError) || error.type === "manual-abort") {
return;
}
private onRequestError = (request: Request, error: RequestError) => {
this.createProcessQueueMicrotask();
};

Expand Down Expand Up @@ -278,21 +274,6 @@ export class HybridLoader {
);
}

private onSegmentLoaded(
queueItem: QueueItem,
type: HybridLoaderRequest["type"],
data: ArrayBuffer
) {
const { segment, statuses } = queueItem;
const byteLength = data.byteLength;
if (type === "http" && statuses.isHighDemand) {
this.refreshLevelBandwidth(true);
}
void this.segmentStorage.storeSegment(segment, data);
this.eventHandlers?.onSegmentLoaded?.(byteLength, type);
this.createProcessQueueMicrotask();
}

private abortLastHttpLoadingAfter(queue: QueueItem[], segment: Segment) {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
if (itemSegment.localId === segment.localId) break;
Expand Down
12 changes: 9 additions & 3 deletions packages/p2p-media-loader-core/src/internal-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ export type JsonSegmentAnnouncement = {
};

export type PeerSegmentCommand = BasePeerCommand<
| PeerCommandType.SegmentRequest
| PeerCommandType.SegmentAbsent
| PeerCommandType.CancelSegmentRequest
PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest
> & {
i: string;
};

export type PeerSegmentRequestCommand =
BasePeerCommand<PeerCommandType.SegmentRequest> & {
i: string;
// start byte of range
b?: number;
};

export type PeerSegmentAnnouncementCommand =
BasePeerCommand<PeerCommandType.SegmentsAnnouncement> & {
a: JsonSegmentAnnouncement;
Expand All @@ -56,5 +61,6 @@ export type PeerSendSegmentCommand =

export type PeerCommand =
| PeerSegmentCommand
| PeerSegmentRequestCommand
| PeerSegmentAnnouncementCommand
| PeerSendSegmentCommand;
Loading

0 comments on commit 92021d3

Please sign in to comment.