diff --git a/packages/p2p-media-loader-core/package.json b/packages/p2p-media-loader-core/package.json index b0619783..a9671dc5 100644 --- a/packages/p2p-media-loader-core/package.json +++ b/packages/p2p-media-loader-core/package.json @@ -28,9 +28,8 @@ }, "dependencies": { "bittorrent-tracker": "10.0.12", - "ripemd160": "^2.0.2" + "nano-md5": "^1.0.5" }, "devDependencies": { - "@types/ripemd160": "^2.0.2" } } diff --git a/packages/p2p-media-loader-core/src/declarations.d.ts b/packages/p2p-media-loader-core/src/declarations.d.ts index 65768074..003539ea 100644 --- a/packages/p2p-media-loader-core/src/declarations.d.ts +++ b/packages/p2p-media-loader-core/src/declarations.d.ts @@ -1,8 +1,8 @@ declare module "bittorrent-tracker" { export default class Client { constructor(options: { - infoHash: string; - peerId: string; + infoHash: Uint8Array; + peerId: Uint8Array; announce: string[]; port: number; rtcConfig?: RTCConfiguration; @@ -47,3 +47,13 @@ declare module "bittorrent-tracker" { destroy(): void; }; } + +declare module "nano-md5" { + type BinaryStringObject = string & { toHex: () => string }; + const md5: { + (utf8String: string): string; // returns hex string interpretation of binary data + fromUtf8(utf8String: string): BinaryStringObject; + }; + + export default md5; +} diff --git a/packages/p2p-media-loader-core/src/enums.ts b/packages/p2p-media-loader-core/src/enums.ts deleted file mode 100644 index 059691b4..00000000 --- a/packages/p2p-media-loader-core/src/enums.ts +++ /dev/null @@ -1,12 +0,0 @@ -export enum PeerCommandType { - SegmentsAnnouncement, - SegmentRequest, - SegmentData, - SegmentAbsent, - CancelSegmentRequest, -} - -export enum PeerSegmentStatus { - Loaded, - LoadingByHttp, -} diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 59c34ac5..ff2edc58 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -1,10 +1,9 @@ import { Segment, StreamWithSegments } from "./index"; import { fulfillHttpSegmentRequest } from "./http-loader"; import { SegmentsMemoryStorage } from "./segments-storage"; -import { Settings, CoreEventHandlers } from "./types"; +import { Settings, CoreEventHandlers, Playback } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; import { P2PLoadersContainer } from "./p2p/loaders-container"; -import { Playback, QueueItem } from "./internal-types"; import { RequestsContainer } from "./request-container"; import { EngineCallbacks } from "./request"; import * as QueueUtils from "./utils/queue"; @@ -283,7 +282,7 @@ export class HybridLoader { } private abortLastHttpLoadingInQueueAfterItem( - queue: QueueItem[], + queue: QueueUtils.QueueItem[], segment: Segment ): boolean { for (const { segment: itemSegment } of arrayBackwards(queue)) { @@ -298,7 +297,7 @@ export class HybridLoader { } private abortLastP2PLoadingInQueueAfterItem( - queue: QueueItem[], + queue: QueueUtils.QueueItem[], segment: Segment ): boolean { for (const { segment: itemSegment } of arrayBackwards(queue)) { diff --git a/packages/p2p-media-loader-core/src/internal-types.d.ts b/packages/p2p-media-loader-core/src/internal-types.d.ts deleted file mode 100644 index dd307504..00000000 --- a/packages/p2p-media-loader-core/src/internal-types.d.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { Segment } from "./types"; -import { PeerCommandType } from "./enums"; -import { SegmentPlaybackStatuses } from "./utils/stream"; - -export type Playback = { - position: number; - rate: number; -}; - -export type QueueItem = { segment: Segment; statuses: SegmentPlaybackStatuses }; - -export type BasePeerCommand = { - c: T; -}; - -// {l: loadedSegmentsExternalIds; p: loadingInProcessSegmentExternalIds} -export type JsonSegmentAnnouncement = { - l: string; - p: string; -}; - -export type PeerSegmentCommand = BasePeerCommand< - PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest -> & { - i: string; -}; - -export type PeerSegmentRequestCommand = - BasePeerCommand & { - i: string; - // start byte of range - b?: number; - }; - -export type PeerSegmentAnnouncementCommand = - BasePeerCommand & { - a: JsonSegmentAnnouncement; - }; - -export type PeerSendSegmentCommand = - BasePeerCommand & { - i: string; - s: number; - }; - -export type PeerCommand = - | PeerSegmentCommand - | PeerSegmentRequestCommand - | PeerSegmentAnnouncementCommand - | PeerSendSegmentCommand; diff --git a/packages/p2p-media-loader-core/src/p2p/commands/binary-command-creator.ts b/packages/p2p-media-loader-core/src/p2p/commands/binary-command-creator.ts new file mode 100644 index 00000000..4da5e551 --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/commands/binary-command-creator.ts @@ -0,0 +1,252 @@ +import * as Serialization from "./binary-serialization"; +import { PeerCommandType, PeerCommand } from "./types"; + +const FRAME_PART_LENGTH = 4; +const commandFrameStart = stringToUtf8CodesBuffer("cstr", FRAME_PART_LENGTH); +const commandFrameEnd = stringToUtf8CodesBuffer("cend", FRAME_PART_LENGTH); +const commandDivFrameStart = stringToUtf8CodesBuffer("dstr", FRAME_PART_LENGTH); +const commandDivFrameEnd = stringToUtf8CodesBuffer("dend", FRAME_PART_LENGTH); +const startFrames = [commandFrameStart, commandDivFrameStart]; +const endFrames = [commandFrameEnd, commandDivFrameEnd]; +const commandFramesLength = commandFrameStart.length + commandFrameEnd.length; + +export function isCommandChunk(buffer: Uint8Array) { + const length = commandFrameStart.length; + const bufferEndingToCompare = buffer.slice(-length); + return ( + startFrames.some((frame) => + areBuffersEqual(buffer, frame, FRAME_PART_LENGTH) + ) && + endFrames.some((frame) => + areBuffersEqual(bufferEndingToCompare, frame, FRAME_PART_LENGTH) + ) + ); +} + +function isFirstCommandChunk(buffer: Uint8Array) { + return areBuffersEqual(buffer, commandFrameStart, FRAME_PART_LENGTH); +} + +function isLastCommandChunk(buffer: Uint8Array) { + return areBuffersEqual( + buffer.slice(-FRAME_PART_LENGTH), + commandFrameEnd, + FRAME_PART_LENGTH + ); +} + +export class BinaryCommandJoiningError extends Error { + constructor(readonly type: "incomplete-joining" | "no-first-chunk") { + super(); + } +} + +export class BinaryCommandChunksJoiner { + private readonly chunks = new Serialization.ResizableUint8Array(); + private status: "joining" | "completed" = "joining"; + + constructor( + private readonly onComplete: (commandBuffer: Uint8Array) => void + ) {} + + addCommandChunk(chunk: Uint8Array) { + if (this.status === "completed") return; + + const isFirstChunk = isFirstCommandChunk(chunk); + if (!this.chunks.length && !isFirstChunk) { + throw new BinaryCommandJoiningError("no-first-chunk"); + } + if (this.chunks.length && isFirstChunk) { + throw new BinaryCommandJoiningError("incomplete-joining"); + } + this.chunks.push(this.unframeCommandChunk(chunk)); + + if (!isLastCommandChunk(chunk)) return; + this.status = "completed"; + this.onComplete(this.chunks.getBuffer()); + } + + private unframeCommandChunk(chunk: Uint8Array) { + return chunk.slice(FRAME_PART_LENGTH, chunk.length - FRAME_PART_LENGTH); + } +} + +export class BinaryCommandCreator { + private readonly bytes = new Serialization.ResizableUint8Array(); + private resultBuffers: Uint8Array[] = []; + private status: "creating" | "completed" = "creating"; + + constructor( + commandType: PeerCommandType, + private readonly maxChunkLength: number + ) { + this.bytes.push(commandType); + } + + addInteger(name: string, value: number) { + this.bytes.push(name.charCodeAt(0)); + const bytes = Serialization.serializeInt(BigInt(value)); + this.bytes.push(bytes); + } + + addSimilarIntArr(name: string, arr: number[]) { + this.bytes.push(name.charCodeAt(0)); + const bytes = Serialization.serializeSimilarIntArray( + arr.map((num) => BigInt(num)) + ); + this.bytes.push(bytes); + } + + addString(name: string, string: string) { + this.bytes.push(name.charCodeAt(0)); + const bytes = Serialization.serializeString(string); + this.bytes.push(bytes); + } + + complete() { + if (!this.bytes.length) throw new Error("Buffer is empty"); + if (this.status === "completed") return; + this.status = "completed"; + + const unframedBuffer = this.bytes.getBuffer(); + if (unframedBuffer.length + commandFramesLength <= this.maxChunkLength) { + this.resultBuffers.push( + frameBuffer(unframedBuffer, commandFrameStart, commandFrameEnd) + ); + return; + } + + let chunksAmount = Math.ceil(unframedBuffer.length / this.maxChunkLength); + if ( + Math.ceil(unframedBuffer.length / chunksAmount) + commandFramesLength > + this.maxChunkLength + ) { + chunksAmount++; + } + + for (const [i, chunk] of splitBufferToEqualChunks( + unframedBuffer, + chunksAmount + )) { + if (i === 0) { + this.resultBuffers.push( + frameBuffer(chunk, commandFrameStart, commandDivFrameEnd) + ); + } else if (i === chunksAmount - 1) { + this.resultBuffers.push( + frameBuffer(chunk, commandDivFrameStart, commandFrameEnd) + ); + } else { + this.resultBuffers.push( + frameBuffer(chunk, commandDivFrameStart, commandDivFrameEnd) + ); + } + } + } + + getResultBuffers(): Uint8Array[] { + if (this.status === "creating" || !this.resultBuffers.length) { + throw new Error("Command is not complete."); + } + return this.resultBuffers; + } +} + +export function deserializeCommand(bytes: Uint8Array): PeerCommand { + const [commandCode] = bytes; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const deserializedCommand: { [key: string]: any } = { + c: commandCode, + }; + + let offset = 1; + while (offset < bytes.length) { + const name = String.fromCharCode(bytes[offset]); + offset++; + const dataType = getDataTypeFromByte(bytes[offset]); + + switch (dataType) { + case Serialization.SerializedItem.Int: + { + const { number, byteLength } = Serialization.deserializeInt( + bytes.slice(offset) + ); + deserializedCommand[name] = Number(number); + offset += byteLength; + } + break; + case Serialization.SerializedItem.SimilarIntArray: + { + const { numbers, byteLength } = + Serialization.deserializeSimilarIntArray(bytes.slice(offset)); + deserializedCommand[name] = numbers.map((n) => Number(n)); + offset += byteLength; + } + break; + case Serialization.SerializedItem.String: + { + const { string, byteLength } = Serialization.deserializeString( + bytes.slice(offset) + ); + deserializedCommand[name] = string; + offset += byteLength; + } + break; + } + } + return deserializedCommand as unknown as PeerCommand; +} + +function getDataTypeFromByte(byte: number): Serialization.SerializedItem { + const typeCode = byte >> 4; + if (!Serialization.serializedItemTypes.includes(typeCode)) { + throw new Error("Not existing type"); + } + + return typeCode as Serialization.SerializedItem; +} + +function stringToUtf8CodesBuffer(string: string, length?: number): Uint8Array { + if (length && string.length !== length) { + throw new Error("Wrong string length"); + } + const buffer = new Uint8Array(length ?? string.length); + for (let i = 0; i < string.length; i++) buffer[i] = string.charCodeAt(i); + return buffer; +} + +function* splitBufferToEqualChunks( + buffer: Uint8Array, + chunksAmount: number +): Generator<[number, Uint8Array], void> { + const chunkLength = Math.ceil(buffer.length / chunksAmount); + for (let i = 0; i < chunksAmount; i++) { + yield [i, buffer.slice(i * chunkLength, (i + 1) * chunkLength)]; + } +} + +function frameBuffer( + buffer: Uint8Array, + frameStart: Uint8Array, + frameEnd: Uint8Array +) { + const result = new Uint8Array( + buffer.length + frameStart.length + frameEnd.length + ); + result.set(frameStart); + result.set(buffer, frameStart.length); + result.set(frameEnd, frameStart.length + buffer.length); + + return result; +} + +function areBuffersEqual( + buffer1: Uint8Array, + buffer2: Uint8Array, + length: number +) { + for (let i = 0; i < length; i++) { + if (buffer1[i] !== buffer2[i]) return false; + } + return true; +} diff --git a/packages/p2p-media-loader-core/src/p2p/commands/binary-serialization.ts b/packages/p2p-media-loader-core/src/p2p/commands/binary-serialization.ts new file mode 100644 index 00000000..794afd1b --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/commands/binary-serialization.ts @@ -0,0 +1,196 @@ +import { joinChunks } from "../../utils/utils"; + +// restricted up to 16 item types (4 bits to type definition) +export enum SerializedItem { + Int, + SimilarIntArray, + String, +} + +export const serializedItemTypes = Object.values(SerializedItem); + +function abs(num: bigint): bigint { + return num < 0 ? -num : num; +} + +function getRequiredBytesForInt(num: bigint): number { + const binaryString = num.toString(2); + const necessaryBits = num < 0 ? binaryString.length : binaryString.length + 1; + return Math.ceil(necessaryBits / 8); +} + +function intToBytes(num: bigint): Uint8Array { + const isNegative = num < 0; + const bytesAmountNumber = getRequiredBytesForInt(num); + const bytes = new Uint8Array(bytesAmountNumber); + const bytesAmount = BigInt(bytesAmountNumber); + + num = abs(num); + for (let i = 0; i < bytesAmountNumber; i++) { + const shift = 8n * (bytesAmount - 1n - BigInt(i)); + const byte = (num >> shift) & 0xffn; + bytes[i] = Number(byte); + } + + if (isNegative) bytes[0] = bytes[0] | 0b10000000; + return bytes; +} + +function bytesToInt(bytes: Uint8Array): bigint { + const byteLength = BigInt(bytes.length); + const getNumberPart = (byte: number, i: number): bigint => { + const shift = 8n * (byteLength - 1n - BigInt(i)); + return BigInt(byte) << shift; + }; + + // ignore first bit of first byte as it is sign bit + let number = getNumberPart(bytes[0] & 0b01111111, 0); + for (let i = 1; i < byteLength; i++) { + number = getNumberPart(bytes[i], i) | number; + } + if ((bytes[0] & 0b10000000) >> 7 !== 0) number = -number; + + return number; +} + +export function serializeInt(num: bigint): Uint8Array { + const numBytes = intToBytes(num); + const numberMetadata = (SerializedItem.Int << 4) | numBytes.length; + return new Uint8Array([numberMetadata, ...numBytes]); +} + +export function deserializeInt(bytes: Uint8Array) { + const metadata = bytes[0]; + const code = metadata >> 4; + if (code !== SerializedItem.Int) { + throw new Error( + "Trying to deserialize integer with invalid serialized item code" + ); + } + const numberBytesLength = metadata & 0b1111; + const start = 1; + const end = start + numberBytesLength; + return { + number: bytesToInt(bytes.slice(start, end)), + byteLength: numberBytesLength + 1, + }; +} + +export function serializeSimilarIntArray(numbers: bigint[]) { + const commonPartNumbersMap = new Map(); + + for (const number of numbers) { + const common = number & ~0xffn; + const diffByte = number & 0xffn; + const bytes = commonPartNumbersMap.get(common) ?? new ResizableUint8Array(); + if (!bytes.length) commonPartNumbersMap.set(common, bytes); + bytes.push(Number(diffByte)); + } + + const result = new ResizableUint8Array(); + result.push([SerializedItem.SimilarIntArray << 4, commonPartNumbersMap.size]); + + for (const [commonPart, binaryArray] of commonPartNumbersMap) { + const { length } = binaryArray.getBytesChunks(); + const commonPartWithLength = commonPart | (BigInt(length) & 0xffn); + binaryArray.unshift(serializeInt(commonPartWithLength)); + result.push(binaryArray.getBuffer()); + } + + return result.getBuffer(); +} + +export function deserializeSimilarIntArray(bytes: Uint8Array) { + const [codeByte, commonPartArraysAmount] = bytes; + const code = codeByte >> 4; + if (code !== SerializedItem.SimilarIntArray) { + throw new Error( + "Trying to deserialize similar int array with invalid serialized item code" + ); + } + + let offset = 2; + const originalIntArr: bigint[] = []; + for (let i = 0; i < commonPartArraysAmount; i++) { + const { number: commonPartWithLength, byteLength } = deserializeInt( + bytes.slice(offset) + ); + offset += byteLength; + const arrayLength = commonPartWithLength & 0xffn; + const commonPart = commonPartWithLength & ~0xffn; + + for (let j = 0; j < arrayLength; j++) { + const diffPart = BigInt(bytes[offset]); + originalIntArr.push(commonPart | diffPart); + offset++; + } + } + + return { numbers: originalIntArr, byteLength: offset }; +} + +export function serializeString(string: string) { + const { length } = string; + const bytes = new ResizableUint8Array(); + bytes.push([ + (SerializedItem.String << 4) | ((length >> 8) & 0x0f), + length & 0xff, + ]); + bytes.push(new TextEncoder().encode(string)); + return bytes.getBuffer(); +} + +export function deserializeString(bytes: Uint8Array) { + const [codeByte, lengthByte] = bytes; + const code = codeByte >> 4; + if (code !== SerializedItem.String) { + throw new Error( + "Trying to deserialize bytes (sting) with invalid serialized item code." + ); + } + const length = ((codeByte & 0x0f) << 8) | lengthByte; + const stringBytes = bytes.slice(2, length + 2); + const string = new TextDecoder("utf8").decode(stringBytes); + return { string, byteLength: length + 2 }; +} + +export class ResizableUint8Array { + private bytes: Uint8Array[] = []; + private _length = 0; + + push(bytes: Uint8Array | number | number[]) { + this.addBytes(bytes, "end"); + } + + unshift(bytes: Uint8Array | number | number[]) { + this.addBytes(bytes, "start"); + } + + private addBytes( + bytes: Uint8Array | number | number[], + position: "start" | "end" + ) { + let bytesToAdd: Uint8Array; + if (bytes instanceof Uint8Array) { + bytesToAdd = bytes; + } else if (Array.isArray(bytes)) { + bytesToAdd = new Uint8Array(bytes); + } else { + bytesToAdd = new Uint8Array([bytes]); + } + this._length += bytesToAdd.length; + this.bytes[position === "start" ? "unshift" : "push"](bytesToAdd); + } + + getBytesChunks(): ReadonlyArray { + return this.bytes; + } + + getBuffer(): Uint8Array { + return joinChunks(this.bytes, this._length); + } + + get length() { + return this._length; + } +} diff --git a/packages/p2p-media-loader-core/src/p2p/commands/commands.ts b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts new file mode 100644 index 00000000..30ddeca6 --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/commands/commands.ts @@ -0,0 +1,72 @@ +import { BinaryCommandCreator } from "./binary-command-creator"; +import { + PeerSegmentCommand, + PeerSendSegmentCommand, + PeerSegmentAnnouncementCommand, + PeerRequestSegmentCommand, + PeerCommand, + PeerCommandType, +} from "./types"; + +function serializeSegmentAnnouncementCommand( + command: PeerSegmentAnnouncementCommand, + maxChunkSize: number +) { + const { c: commandCode, p: loadingByHttp, l: loaded } = command; + const creator = new BinaryCommandCreator(commandCode, maxChunkSize); + if (loaded?.length) creator.addSimilarIntArr("l", loaded); + if (loadingByHttp?.length) { + creator.addSimilarIntArr("p", loadingByHttp); + } + creator.complete(); + return creator.getResultBuffers(); +} + +function serializePeerSegmentCommand( + command: PeerSegmentCommand, + maxChunkSize: number +) { + const creator = new BinaryCommandCreator(command.c, maxChunkSize); + creator.addInteger("i", command.i); + creator.complete(); + return creator.getResultBuffers(); +} + +function serializePeerSendSegmentCommand( + command: PeerSendSegmentCommand, + maxChunkSize: number +) { + const creator = new BinaryCommandCreator(command.c, maxChunkSize); + creator.addInteger("i", command.i); + creator.addInteger("s", command.s); + creator.complete(); + return creator.getResultBuffers(); +} + +function serializePeerSegmentRequestCommand( + command: PeerRequestSegmentCommand, + maxChunkSize: number +) { + const creator = new BinaryCommandCreator(command.c, maxChunkSize); + creator.addInteger("i", command.i); + if (command.b) creator.addInteger("b", command.b); + creator.complete(); + return creator.getResultBuffers(); +} + +export function serializePeerCommand( + command: PeerCommand, + maxChunkSize: number +) { + switch (command.c) { + case PeerCommandType.CancelSegmentRequest: + case PeerCommandType.SegmentAbsent: + return serializePeerSegmentCommand(command, maxChunkSize); + case PeerCommandType.SegmentRequest: + return serializePeerSegmentRequestCommand(command, maxChunkSize); + case PeerCommandType.SegmentsAnnouncement: + return serializeSegmentAnnouncementCommand(command, maxChunkSize); + case PeerCommandType.SegmentData: + return serializePeerSendSegmentCommand(command, maxChunkSize); + } +} diff --git a/packages/p2p-media-loader-core/src/p2p/commands/index.ts b/packages/p2p-media-loader-core/src/p2p/commands/index.ts new file mode 100644 index 00000000..248f8d7e --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/commands/index.ts @@ -0,0 +1,8 @@ +export * from "./types"; +export { serializePeerCommand } from "./commands"; +export { + deserializeCommand, + isCommandChunk, + BinaryCommandChunksJoiner, + BinaryCommandJoiningError, +} from "./binary-command-creator"; diff --git a/packages/p2p-media-loader-core/src/p2p/commands/types.ts b/packages/p2p-media-loader-core/src/p2p/commands/types.ts new file mode 100644 index 00000000..27e9b144 --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/commands/types.ts @@ -0,0 +1,41 @@ +type BasePeerCommand = { + c: T; +}; + +export enum PeerCommandType { + SegmentsAnnouncement, + SegmentRequest, + SegmentData, + SegmentAbsent, + CancelSegmentRequest, +} + +export type PeerSegmentCommand = BasePeerCommand< + PeerCommandType.SegmentAbsent | PeerCommandType.CancelSegmentRequest +> & { + i: number; // segment id +}; + +export type PeerRequestSegmentCommand = + BasePeerCommand & { + i: number; // segment id + b?: number; // byte from + }; + +export type PeerSegmentAnnouncementCommand = + BasePeerCommand & { + l?: number[]; // loaded segments + p?: number[]; // segments loading by http + }; + +export type PeerSendSegmentCommand = + BasePeerCommand & { + i: number; // segment id + s: number; // size in bytes + }; + +export type PeerCommand = + | PeerSegmentCommand + | PeerRequestSegmentCommand + | PeerSegmentAnnouncementCommand + | PeerSendSegmentCommand; diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index bd4d05fb..cfda5d49 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -1,16 +1,13 @@ import { Peer } from "./peer"; import { Segment, Settings, StreamWithSegments } from "../types"; import { SegmentsMemoryStorage } from "../segments-storage"; -import * as PeerUtil from "../utils/peer"; -import * as StreamUtils from "../utils/stream"; -import * as Utils from "../utils/utils"; -import { PeerSegmentStatus } from "../enums"; import { RequestsContainer } from "../request-container"; import { Request } from "../request"; import { P2PTrackerClient } from "./tracker-client"; +import * as StreamUtils from "../utils/stream"; +import * as Utils from "../utils/utils"; export class P2PLoader { - private readonly peerId: string; private readonly trackerClient: P2PTrackerClient; private isAnnounceMicrotaskCreated = false; @@ -21,13 +18,12 @@ export class P2PLoader { private readonly segmentStorage: SegmentsMemoryStorage, private readonly settings: Settings ) { - this.peerId = PeerUtil.generatePeerId(); const streamExternalId = StreamUtils.getStreamExternalId( this.streamManifestUrl, this.stream ); + this.trackerClient = new P2PTrackerClient( - this.peerId, streamExternalId, this.stream, { @@ -49,7 +45,7 @@ export class P2PLoader { for (const peer of this.trackerClient.peers()) { if ( !peer.downloadingSegment && - peer.getSegmentStatus(segment) === PeerSegmentStatus.Loaded + peer.getSegmentStatus(segment) === "loaded" ) { peersWithSegment.push(peer); } @@ -59,7 +55,7 @@ export class P2PLoader { if (!peer) return; const request = this.requests.getOrCreateRequest(segment); - peer.fulfillSegmentRequest(request); + peer.downloadSegment(request); } isLoadingOrLoadedBySomeone(segment: Segment): boolean { @@ -77,9 +73,9 @@ export class P2PLoader { } private getSegmentsAnnouncement() { - const loaded: string[] = + const loaded: number[] = this.segmentStorage.getStoredSegmentExternalIdsOfStream(this.stream); - const httpLoading: string[] = []; + const httpLoading: number[] = []; for (const request of this.requests.httpRequests()) { const segment = this.stream.segments.get(request.segment.localId); @@ -87,12 +83,12 @@ export class P2PLoader { httpLoading.push(segment.externalId); } - return PeerUtil.getJsonSegmentsAnnouncement(loaded, httpLoading); + return { loaded, httpLoading }; } private onPeerConnected = (peer: Peer) => { - const announcement = this.getSegmentsAnnouncement(); - peer.sendSegmentsAnnouncement(announcement); + const { httpLoading, loaded } = this.getSegmentsAnnouncement(); + peer.sendSegmentsAnnouncementCommand(loaded, httpLoading); }; broadcastAnnouncement = () => { @@ -100,9 +96,9 @@ export class P2PLoader { this.isAnnounceMicrotaskCreated = true; queueMicrotask(() => { - const announcement = this.getSegmentsAnnouncement(); + const { httpLoading, loaded } = this.getSegmentsAnnouncement(); for (const peer of this.trackerClient.peers()) { - peer.sendSegmentsAnnouncement(announcement); + peer.sendSegmentsAnnouncementCommand(loaded, httpLoading); } this.isAnnounceMicrotaskCreated = false; }); @@ -110,16 +106,23 @@ export class P2PLoader { private onSegmentRequested = async ( peer: Peer, - segmentExternalId: string + segmentExternalId: number, + byteFrom?: number ) => { const segment = StreamUtils.getSegmentFromStreamByExternalId( this.stream, segmentExternalId ); - const segmentData = - segment && (await this.segmentStorage.getSegmentData(segment)); - if (segmentData) void peer.sendSegmentData(segmentExternalId, segmentData); - else peer.sendSegmentAbsent(segmentExternalId); + if (!segment) return; + const segmentData = await this.segmentStorage.getSegmentData(segment); + if (!segmentData) { + peer.sendSegmentAbsentCommand(segmentExternalId); + return; + } + void peer.uploadSegmentData( + segmentExternalId, + byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData + ); }; destroy() { diff --git a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts index 623fb00f..f36a6263 100644 --- a/packages/p2p-media-loader-core/src/p2p/loaders-container.ts +++ b/packages/p2p-media-loader-core/src/p2p/loaders-container.ts @@ -49,7 +49,13 @@ export class P2PLoadersContainer { changeCurrentLoader(stream: StreamWithSegments) { const loaderItem = this.loaders.get(stream.localId); - const prev = this._currentLoaderItem; + if (this._currentLoaderItem) { + const ids = this.segmentStorage.getStoredSegmentExternalIdsOfStream( + this._currentLoaderItem.stream + ); + if (!ids.length) this.destroyAndRemoveLoader(this._currentLoaderItem); + else this.setLoaderDestroyTimeout(this._currentLoaderItem); + } if (loaderItem) { this._currentLoaderItem = loaderItem; clearTimeout(loaderItem.destroyTimeoutId); @@ -62,14 +68,6 @@ export class P2PLoadersContainer { this.logger( `change current p2p loader: ${LoggerUtils.getStreamString(stream)}` ); - - if (!prev) return; - - const ids = this.segmentStorage.getStoredSegmentExternalIdsOfStream( - prev.stream - ); - if (!ids.length) this.destroyAndRemoveLoader(prev); - else this.setLoaderDestroyTimeout(prev); } private setLoaderDestroyTimeout(item: P2PLoaderContainerItem) { diff --git a/packages/p2p-media-loader-core/src/p2p/peer-base.ts b/packages/p2p-media-loader-core/src/p2p/peer-base.ts new file mode 100644 index 00000000..202d4912 --- /dev/null +++ b/packages/p2p-media-loader-core/src/p2p/peer-base.ts @@ -0,0 +1,130 @@ +import { PeerConnection } from "bittorrent-tracker"; +import * as Command from "./commands"; +import * as Utils from "../utils/utils"; +import debug from "debug"; +import { Settings } from "../types"; + +export type PeerSettings = Pick< + Settings, + "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" +>; + +export abstract class PeerBase { + readonly id: string; + private isUploadingSegment = false; + private commandChunks?: Command.BinaryCommandChunksJoiner; + protected readonly logger = debug("core:peer"); + + protected constructor( + private readonly connection: PeerConnection, + protected readonly settings: PeerSettings + ) { + this.id = PeerBase.getPeerIdFromConnection(connection); + connection.on("data", this.onDataReceived); + connection.on("close", this.onPeerClosed); + connection.on("error", this.onConnectionError); + } + + private onDataReceived = (data: Uint8Array) => { + if (Command.isCommandChunk(data)) this.receivingCommandBytes(data); + else this.receiveSegmentChunk(data); + }; + + private onPeerClosed = () => { + this.logger(`connection with peer closed: ${this.id}`); + this.destroy(); + }; + + private onConnectionError = (error: { code: string }) => { + this.logger(`peer error: ${this.id} ${error.code}`); + this.destroy(); + }; + + protected sendCommand(command: Command.PeerCommand) { + const binaryCommandBuffers = Command.serializePeerCommand( + command, + this.settings.webRtcMaxMessageSize + ); + for (const buffer of binaryCommandBuffers) { + this.connection.send(buffer); + } + } + + protected async splitDataToChunksAndUploadAsync(data: Uint8Array) { + const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize); + const channel = this.connection._channel; + const { promise, resolve, reject } = Utils.getControlledPromise(); + + const sendChunk = () => { + while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) { + const chunk = chunks.next().value; + if (!chunk) { + resolve(); + break; + } + if (chunk && !this.isUploadingSegment) { + reject(); + break; + } + this.connection.send(chunk); + } + }; + try { + channel.addEventListener("bufferedamountlow", sendChunk); + this.isUploadingSegment = true; + sendChunk(); + await promise; + return promise; + } finally { + this.isUploadingSegment = false; + } + } + + protected cancelDataUploading() { + this.isUploadingSegment = false; + } + + private receivingCommandBytes(buffer: Uint8Array) { + if (!this.commandChunks) { + this.commandChunks = new Command.BinaryCommandChunksJoiner( + (commandBuffer) => { + this.commandChunks = undefined; + const command = Command.deserializeCommand(commandBuffer); + this.receiveCommand(command); + } + ); + } + try { + this.commandChunks.addCommandChunk(buffer); + } catch (err) { + if (!(err instanceof Command.BinaryCommandJoiningError)) return; + this.commandChunks = undefined; + } + } + + protected abstract receiveCommand(command: Command.PeerCommand): void; + + protected abstract receiveSegmentChunk(data: Uint8Array): void; + + protected destroy() { + this.connection.destroy(); + } + + static getPeerIdFromConnection(connection: PeerConnection) { + return Utils.hexToUtf8(connection.id); + } +} + +function* getBufferChunks( + data: ArrayBuffer, + maxChunkSize: number +): Generator { + let bytesLeft = data.byteLength; + while (bytesLeft > 0) { + const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft; + const from = data.byteLength - bytesLeft; + const buffer = data.slice(from, from + bytesToSend); + bytesLeft -= bytesToSend; + yield buffer; + } +} diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index d62f9bc4..98f14cd9 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -1,87 +1,56 @@ import { PeerConnection } from "bittorrent-tracker"; -import { - JsonSegmentAnnouncement, - PeerCommand, - PeerSegmentAnnouncementCommand, - PeerSegmentCommand, - PeerSegmentRequestCommand, - PeerSendSegmentCommand, -} from "../internal-types"; -import { PeerCommandType, PeerSegmentStatus } from "../enums"; +import { PeerBase, PeerSettings } from "./peer-base"; import { Request, RequestControls, RequestError, PeerRequestErrorType, } from "../request"; -import { Segment, Settings } from "../types"; -import * as PeerUtil from "../utils/peer"; -import * as Utils from "../utils/utils"; -import debug from "debug"; +import * as Command from "./commands"; +import { Segment } from "../types"; +const { PeerCommandType } = Command; type PeerEventHandlers = { onPeerClosed: (peer: Peer) => void; - onSegmentRequested: (peer: Peer, segmentId: string) => void; + onSegmentRequested: ( + peer: Peer, + segmentId: number, + byteFrom?: number + ) => void; }; -type PeerSettings = Pick< - Settings, - "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" ->; - -export class Peer { - readonly id: string; - private segments = new Map(); +export class Peer extends PeerBase { private requestContext?: { request: Request; controls: RequestControls }; - private readonly logger = debug("core:peer"); - private isUploadingSegment = false; + private loadedSegments = new Set(); + private httpLoadingSegments = new Set(); constructor( - private readonly connection: PeerConnection, + connection: PeerConnection, private readonly eventHandlers: PeerEventHandlers, - private readonly settings: PeerSettings + settings: PeerSettings ) { - this.id = Peer.getPeerIdFromHexString(connection.id); - this.eventHandlers = eventHandlers; - - connection.on("data", this.onReceiveData.bind(this)); - connection.on("close", () => { - this.logger(`connection with peer closed: ${this.id}`); - this.destroy(); - this.eventHandlers.onPeerClosed(this); - }); - connection.on("error", (error) => { - if (error.code === "ERR_DATA_CHANNEL") { - this.logger(`peer error: ${this.id} ${error.code}`); - this.destroy(); - this.eventHandlers.onPeerClosed(this); - } - }); + super(connection, settings); } get downloadingSegment(): Segment | undefined { return this.requestContext?.request.segment; } - getSegmentStatus(segment: Segment): PeerSegmentStatus | undefined { + getSegmentStatus(segment: Segment): "loaded" | "http-loading" | undefined { const { externalId } = segment; - return this.segments.get(externalId); + if (this.loadedSegments.has(externalId)) return "loaded"; + if (this.httpLoadingSegments.has(externalId)) return "http-loading"; } - private onReceiveData(data: Uint8Array) { - const command = PeerUtil.getPeerCommandFromArrayBuffer(data); - if (!command) { - this.receiveSegmentChunk(data); - return; - } - + protected receiveCommand(command: Command.PeerCommand) { switch (command.c) { case PeerCommandType.SegmentsAnnouncement: - this.segments = PeerUtil.getSegmentsFromPeerAnnouncement(command.a); + this.loadedSegments = new Set(command.l); + this.httpLoadingSegments = new Set(command.p); break; case PeerCommandType.SegmentRequest: - this.eventHandlers.onSegmentRequested(this, command.i); + this.eventHandlers.onSegmentRequested(this, command.i, command.b); break; case PeerCommandType.SegmentData: @@ -99,125 +68,80 @@ export class Peer { case PeerCommandType.SegmentAbsent: if (this.requestContext?.request.segment.externalId === command.i) { - this.cancelSegmentRequest("peer-segment-absent"); - this.segments.delete(command.i); + this.cancelSegmentDownloading("peer-segment-absent"); + this.loadedSegments.delete(command.i); } break; case PeerCommandType.CancelSegmentRequest: - this.isUploadingSegment = false; + this.cancelDataUploading(); break; } } - private sendCommand(command: PeerCommand) { - this.connection.send(JSON.stringify(command)); + protected receiveSegmentChunk(chunk: Uint8Array): void { + if (!this.requestContext) return; + const { request, controls } = this.requestContext; + controls.addLoadedChunk(chunk); + + if (request.loadedBytes === request.totalBytes) { + controls.completeOnSuccess(); + this.requestContext = undefined; + } else if ( + request.totalBytes !== undefined && + request.loadedBytes > request.totalBytes + ) { + this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + } } - fulfillSegmentRequest(request: Request) { + downloadSegment(segmentRequest: Request) { if (this.requestContext) { throw new Error("Segment already is downloading"); } this.requestContext = { - request, - controls: request.start( + request: segmentRequest, + controls: segmentRequest.start( { type: "p2p", peerId: this.id }, { - abort: this.abortRequest, + abort: this.abortSegmentDownloading, notReceivingBytesTimeoutMs: this.settings.p2pNotReceivingBytesTimeoutMs, } ), }; - const command: PeerSegmentRequestCommand = { + const command: Command.PeerRequestSegmentCommand = { c: PeerCommandType.SegmentRequest, - i: request.segment.externalId, + i: segmentRequest.segment.externalId, }; - if (request.loadedBytes) command.b = request.loadedBytes; + if (segmentRequest.loadedBytes) command.b = segmentRequest.loadedBytes; this.sendCommand(command); } - sendSegmentsAnnouncement(announcement: JsonSegmentAnnouncement) { - const command: PeerSegmentAnnouncementCommand = { - c: PeerCommandType.SegmentsAnnouncement, - a: announcement, - }; - this.sendCommand(command); - } + private abortSegmentDownloading = () => { + if (!this.requestContext) return; + const { request } = this.requestContext; + this.sendCancelSegmentRequestCommand(request.segment); + this.requestContext = undefined; + }; - async sendSegmentData(segmentExternalId: string, data: ArrayBuffer) { + async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) { this.logger(`send segment ${segmentExternalId} to ${this.id}`); - const command: PeerSendSegmentCommand = { + const command: Command.PeerSendSegmentCommand = { c: PeerCommandType.SegmentData, i: segmentExternalId, s: data.byteLength, }; this.sendCommand(command); - - const chunks = getBufferChunks(data, this.settings.webRtcMaxMessageSize); - const channel = this.connection._channel; - const { promise, resolve, reject } = Utils.getControlledPromise(); - - const sendChunk = () => { - while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) { - const chunk = chunks.next().value; - if (!chunk) { - resolve(); - break; - } - if (chunk && !this.isUploadingSegment) { - reject(); - break; - } - this.connection.send(chunk); - } - }; try { - channel.addEventListener("bufferedamountlow", sendChunk); - this.isUploadingSegment = true; - sendChunk(); - await promise; + await this.splitDataToChunksAndUploadAsync(data as Uint8Array); this.logger(`segment ${segmentExternalId} has been sent to ${this.id}`); } catch (err) { this.logger(`cancel segment uploading ${segmentExternalId}`); - } finally { - channel.removeEventListener("bufferedamountlow", sendChunk); - this.isUploadingSegment = false; - } - } - - sendSegmentAbsent(segmentExternalId: string) { - const command: PeerSegmentCommand = { - c: PeerCommandType.SegmentAbsent, - i: segmentExternalId, - }; - this.sendCommand(command); - } - - private receiveSegmentChunk(chunk: Uint8Array): void { - if (!this.requestContext) return; - const { request, controls } = this.requestContext; - controls.addLoadedChunk(chunk); - - if (request.loadedBytes === request.totalBytes) { - controls.completeOnSuccess(); - this.requestContext = undefined; - } else if ( - request.totalBytes !== undefined && - request.loadedBytes > request.totalBytes - ) { - this.cancelSegmentRequest("peer-response-bytes-mismatch"); } } - private abortRequest = () => { - if (!this.requestContext) return; - const { request } = this.requestContext; - this.sendCancelSegmentRequestCommand(request.segment); - this.requestContext = undefined; - }; - - private cancelSegmentRequest(type: PeerRequestErrorType) { + private cancelSegmentDownloading(type: PeerRequestErrorType) { if (!this.requestContext) return; const { request, controls } = this.requestContext; const { segment } = request; @@ -230,6 +154,25 @@ export class Peer { this.requestContext = undefined; } + sendSegmentsAnnouncementCommand( + loadedSegmentsIds: number[], + httpLoadingSegmentsIds: number[] + ) { + const command: Command.PeerSegmentAnnouncementCommand = { + c: PeerCommandType.SegmentsAnnouncement, + p: httpLoadingSegmentsIds, + l: loadedSegmentsIds, + }; + this.sendCommand(command); + } + + sendSegmentAbsentCommand(segmentExternalId: number) { + this.sendCommand({ + c: PeerCommandType.SegmentAbsent, + i: segmentExternalId, + }); + } + private sendCancelSegmentRequestCommand(segment: Segment) { this.sendCommand({ c: PeerCommandType.CancelSegmentRequest, @@ -238,35 +181,8 @@ export class Peer { } destroy() { - this.cancelSegmentRequest("peer-closed"); - this.connection.destroy(); - } - - static getPeerIdFromHexString(hex: string) { - return hexToUtf8(hex); - } -} - -function* getBufferChunks( - data: ArrayBuffer, - maxChunkSize: number -): Generator { - let bytesLeft = data.byteLength; - while (bytesLeft > 0) { - const bytesToSend = bytesLeft >= maxChunkSize ? maxChunkSize : bytesLeft; - const from = data.byteLength - bytesLeft; - const buffer = data.slice(from, from + bytesToSend); - bytesLeft -= bytesToSend; - yield buffer; - } -} - -function hexToUtf8(hexString: string) { - const bytes = new Uint8Array(hexString.length / 2); - - for (let i = 0; i < hexString.length; i += 2) { - bytes[i / 2] = parseInt(hexString.slice(i, i + 2), 16); + super.destroy(); + this.cancelSegmentDownloading("peer-closed"); + this.eventHandlers.onPeerClosed(this); } - const decoder = new TextDecoder(); - return decoder.decode(bytes); } diff --git a/packages/p2p-media-loader-core/src/p2p/tracker-client.ts b/packages/p2p-media-loader-core/src/p2p/tracker-client.ts index ab476f83..e9533398 100644 --- a/packages/p2p-media-loader-core/src/p2p/tracker-client.ts +++ b/packages/p2p-media-loader-core/src/p2p/tracker-client.ts @@ -14,26 +14,31 @@ type PeerItem = { }; type P2PTrackerClientEventHandlers = { onPeerConnected: (peer: Peer) => void; - onSegmentRequested: (peer: Peer, segmentExternalId: string) => void; + onSegmentRequested: (peer: Peer, segmentExternalId: number) => void; }; export class P2PTrackerClient { + private readonly peerId: string; + private readonly streamShortId: string; private readonly client: TrackerClient; private readonly _peers = new Map(); private readonly logger = debug("core:p2p-tracker-client"); constructor( - private readonly peerId: string, - streamExternalId: string, - private readonly stream: StreamWithSegments, + streamId: string, + stream: StreamWithSegments, private readonly eventHandlers: P2PTrackerClientEventHandlers, private readonly settings: Settings ) { - const streamHash = PeerUtil.getStreamHash(streamExternalId); - const streamShortId = LoggerUtils.getStreamString(stream); + const { string: peerId, bytes: peerIdBytes } = PeerUtil.generatePeerId(); + const { bytes: streamIdBytes, string: streamHash } = + PeerUtil.getStreamHash(streamId); + this.peerId = peerId; + this.streamShortId = LoggerUtils.getStreamString(stream); + this.client = new TrackerClient({ - infoHash: utf8ToHex(streamHash), - peerId: utf8ToHex(this.peerId), + infoHash: streamIdBytes, + peerId: peerIdBytes, port: 6881, announce: [ // "wss://tracker.novage.com.ua", @@ -54,7 +59,7 @@ export class P2PTrackerClient { this.client.on("warning", this.onTrackerClientWarning); this.client.on("error", this.onTrackerClientError); this.logger( - `create new client; \nstream: ${streamShortId}; hash: ${streamHash}; \npeer id: ${this.peerId}` + `create new client; \nstream: ${this.streamShortId}; hash: ${streamHash}; \npeer id: ${this.peerId}` ); } @@ -71,15 +76,13 @@ export class P2PTrackerClient { } } this._peers.clear(); - this.logger( - `destroy client; stream: ${LoggerUtils.getStreamString(this.stream)}` - ); + this.logger(`destroy client; stream: ${this.streamShortId}`); } private onReceivePeerConnection: TrackerClientEvents["peer"] = ( peerConnection ) => { - const itemId = Peer.getPeerIdFromHexString(peerConnection.id); + const itemId = Peer.getPeerIdFromConnection(peerConnection); let peerItem = this._peers.get(itemId); if (peerItem?.peer) { peerConnection.destroy(); @@ -113,17 +116,11 @@ export class P2PTrackerClient { private onTrackerClientWarning: TrackerClientEvents["warning"] = ( warning ) => { - this.logger( - `tracker warning (${LoggerUtils.getStreamString( - this.stream - )}: ${warning})` - ); + this.logger(`tracker warning (${this.streamShortId}: ${warning})`); }; private onTrackerClientError: TrackerClientEvents["error"] = (error) => { - this.logger( - `tracker error (${LoggerUtils.getStreamString(this.stream)}: ${error})` - ); + this.logger(`tracker error (${this.streamShortId}: ${error})`); }; *peers() { @@ -137,12 +134,3 @@ export class P2PTrackerClient { this._peers.delete(peer.id); }; } - -function utf8ToHex(utf8String: string) { - let result = ""; - for (let i = 0; i < utf8String.length; i++) { - result += utf8String.charCodeAt(i).toString(16); - } - - return result; -} diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts index f44c4b84..f5f56667 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/request-container.ts @@ -1,7 +1,6 @@ -import { Segment, Settings } from "./types"; +import { Segment, Settings, Playback } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; import { Request } from "./request"; -import { Playback } from "./internal-types"; export class RequestsContainer { private readonly requests = new Map(); diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts index 77fd0b18..a36cbef9 100644 --- a/packages/p2p-media-loader-core/src/request.ts +++ b/packages/p2p-media-loader-core/src/request.ts @@ -1,10 +1,9 @@ -import { Segment, SegmentResponse } from "./types"; +import { Segment, SegmentResponse, Playback } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; import * as StreamUtils from "./utils/stream"; import * as Utils from "./utils/utils"; import * as LoggerUtils from "./utils/logger"; import debug from "debug"; -import { Playback } from "./internal-types"; export type EngineCallbacks = { onSuccess: (response: SegmentResponse) => void; diff --git a/packages/p2p-media-loader-core/src/segments-storage.ts b/packages/p2p-media-loader-core/src/segments-storage.ts index 545ee965..bd8577fb 100644 --- a/packages/p2p-media-loader-core/src/segments-storage.ts +++ b/packages/p2p-media-loader-core/src/segments-storage.ts @@ -85,7 +85,7 @@ export class SegmentsMemoryStorage { getStoredSegmentExternalIdsOfStream(stream: Stream) { const streamId = StreamUtils.getStreamShortId(stream); - const externalIds: string[] = []; + const externalIds: number[] = []; for (const { segment } of this.cache.values()) { const itemStreamId = StreamUtils.getStreamShortId(segment.stream); if (itemStreamId === streamId) externalIds.push(segment.externalId); diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index 85d65fd3..d555a881 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -7,7 +7,7 @@ export type ByteRange = { start: number; end: number }; export type SegmentBase = { readonly localId: string; - readonly externalId: string; + readonly externalId: number; readonly url: string; readonly byteRange?: ByteRange; readonly startTime: number; @@ -64,3 +64,8 @@ export type Settings = { export type CoreEventHandlers = { onSegmentLoaded?: (byteLength: number, type: RequestAttempt["type"]) => void; }; + +export type Playback = { + position: number; + rate: number; +}; diff --git a/packages/p2p-media-loader-core/src/utils/logger.ts b/packages/p2p-media-loader-core/src/utils/logger.ts index 3133c7fc..415909a2 100644 --- a/packages/p2p-media-loader-core/src/utils/logger.ts +++ b/packages/p2p-media-loader-core/src/utils/logger.ts @@ -1,5 +1,5 @@ import { Segment, Stream } from "../types"; -import { QueueItem } from "../internal-types"; +import { QueueItem } from "./queue"; import { SegmentPlaybackStatuses } from "./stream"; export function getStreamString(stream: Stream) { diff --git a/packages/p2p-media-loader-core/src/utils/peer.ts b/packages/p2p-media-loader-core/src/utils/peer.ts index 05c8cdd1..9501231a 100644 --- a/packages/p2p-media-loader-core/src/utils/peer.ts +++ b/packages/p2p-media-loader-core/src/utils/peer.ts @@ -1,24 +1,26 @@ -import { JsonSegmentAnnouncement, PeerCommand } from "../internal-types"; -import { PeerCommandType, PeerSegmentStatus } from "../enums"; -import RIPEMD160 from "ripemd160"; +import md5 from "nano-md5"; +import { utf8ToUintArray } from "./utils"; const HASH_SYMBOLS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; const PEER_ID_LENGTH = 20; -export function getStreamHash(streamId: string): string { - const symbolsCount = HASH_SYMBOLS.length; - const bytes = new RIPEMD160().update(streamId).digest(); - let hash = ""; +export function getStreamHash(streamId: string): { + string: string; + bytes: Uint8Array; +} { + // slice one byte to have 15 bytes binary string + const binary15BytesHashString = md5.fromUtf8(streamId).slice(1); + const base64Hash20BytesString = btoa(binary15BytesHashString); + const hashBytes = utf8ToUintArray(base64Hash20BytesString); - for (const byte of bytes) { - hash += HASH_SYMBOLS[byte % symbolsCount]; - } - - return hash; + return { string: btoa(binary15BytesHashString), bytes: hashBytes }; } -export function generatePeerId(): string { +export function generatePeerId(): { + string: string; + bytes: Uint8Array; +} { let peerId = "PEER:"; const randomCharsAmount = PEER_ID_LENGTH - peerId.length; @@ -28,57 +30,5 @@ export function generatePeerId(): string { ); } - return peerId; -} - -export function getPeerCommandFromArrayBuffer( - data: ArrayBuffer -): PeerCommand | undefined { - const bytes = new Uint8Array(data); - - // Serialized JSON string check by first, second and last characters: '{" .... }' - if ( - bytes[0] === 123 && - bytes[1] === 34 && - bytes[data.byteLength - 1] === 125 - ) { - try { - const decoded = new TextDecoder().decode(data); - const parsed = JSON.parse(decoded) as object; - if (isPeerCommand(parsed)) return parsed; - } catch { - return undefined; - } - } -} - -export function getSegmentsFromPeerAnnouncement( - announcement: JsonSegmentAnnouncement -): Map { - const segmentStatusMap = new Map(); - announcement.l - .split("|") - .forEach((id) => segmentStatusMap.set(id, PeerSegmentStatus.Loaded)); - - announcement.p - .split("|") - .forEach((id) => segmentStatusMap.set(id, PeerSegmentStatus.LoadingByHttp)); - return segmentStatusMap; -} - -export function getJsonSegmentsAnnouncement( - loadedSegmentExternalIds: string[], - loadingByHttpSegmentExternalIds: string[] -): JsonSegmentAnnouncement { - return { - l: loadedSegmentExternalIds.join("|"), - p: loadingByHttpSegmentExternalIds.join("|"), - }; -} - -function isPeerCommand(command: object): command is PeerCommand { - return ( - (command as PeerCommand).c !== undefined && - Object.values(PeerCommandType).includes((command as PeerCommand).c) - ); + return { string: peerId, bytes: utf8ToUintArray(peerId) }; } diff --git a/packages/p2p-media-loader-core/src/utils/queue.ts b/packages/p2p-media-loader-core/src/utils/queue.ts index 428e3250..bda5a1fe 100644 --- a/packages/p2p-media-loader-core/src/utils/queue.ts +++ b/packages/p2p-media-loader-core/src/utils/queue.ts @@ -1,11 +1,12 @@ -import { Segment } from "../types"; -import { Playback, QueueItem } from "../internal-types"; +import { Segment, Playback } from "../types"; import { getSegmentPlaybackStatuses, SegmentPlaybackStatuses, PlaybackTimeWindowsSettings, } from "./stream"; +export type QueueItem = { segment: Segment; statuses: SegmentPlaybackStatuses }; + export function generateQueue({ lastRequestedSegment, playback, diff --git a/packages/p2p-media-loader-core/src/utils/stream.ts b/packages/p2p-media-loader-core/src/utils/stream.ts index 516e0501..4b008cad 100644 --- a/packages/p2p-media-loader-core/src/utils/stream.ts +++ b/packages/p2p-media-loader-core/src/utils/stream.ts @@ -1,5 +1,10 @@ -import { Segment, Settings, Stream, StreamWithSegments } from "../types"; -import { Playback } from "../internal-types"; +import { + Segment, + Settings, + Stream, + StreamWithSegments, + Playback, +} from "../types"; export type SegmentPlaybackStatuses = { isHighDemand: boolean; @@ -34,7 +39,7 @@ export function getSegmentFromStreamsMap( export function getSegmentFromStreamByExternalId( stream: StreamWithSegments, - segmentExternalId: string + segmentExternalId: number ): Segment | undefined { for (const segment of stream.segments.values()) { if (segment.externalId === segmentExternalId) return segment; diff --git a/packages/p2p-media-loader-core/src/utils/utils.ts b/packages/p2p-media-loader-core/src/utils/utils.ts index a47d2a18..d5ab0969 100644 --- a/packages/p2p-media-loader-core/src/utils/utils.ts +++ b/packages/p2p-media-loader-core/src/utils/utils.ts @@ -18,7 +18,7 @@ export function getControlledPromise() { export function joinChunks( chunks: Uint8Array[], totalBytes?: number -): ArrayBuffer { +): Uint8Array { if (totalBytes === undefined) { totalBytes = chunks.reduce((sum, chunk) => sum + chunk.byteLength, 0); } @@ -39,3 +39,20 @@ export function getPercent(numerator: number, denominator: number): number { export function getRandomItem(items: T[]): T { return items[Math.floor(Math.random() * items.length)]; } + +export function utf8ToUintArray(utf8String: string): Uint8Array { + const encoder = new TextEncoder(); + const hashBytes = new Uint8Array(utf8String.length); + encoder.encodeInto(utf8String, hashBytes); + return hashBytes; +} + +export function hexToUtf8(hexString: string) { + const bytes = new Uint8Array(hexString.length / 2); + + for (let i = 0; i < hexString.length; i += 2) { + bytes[i / 2] = parseInt(hexString.slice(i, i + 2), 16); + } + const decoder = new TextDecoder(); + return decoder.decode(bytes); +} diff --git a/packages/p2p-media-loader-hlsjs/src/segment-mananger.ts b/packages/p2p-media-loader-hlsjs/src/segment-mananger.ts index 97870b6a..5e0c42b4 100644 --- a/packages/p2p-media-loader-hlsjs/src/segment-mananger.ts +++ b/packages/p2p-media-loader-hlsjs/src/segment-mananger.ts @@ -69,7 +69,7 @@ export class SegmentManager { newSegments.push({ localId: segmentLocalId, url: responseUrl, - externalId: live ? sn.toString() : index.toString(), + externalId: live ? sn : index, byteRange, startTime, endTime, diff --git a/packages/p2p-media-loader-shaka/src/segment-manager.ts b/packages/p2p-media-loader-shaka/src/segment-manager.ts index 83d93828..2a3c2185 100644 --- a/packages/p2p-media-loader-shaka/src/segment-manager.ts +++ b/packages/p2p-media-loader-shaka/src/segment-manager.ts @@ -58,7 +58,7 @@ export class SegmentManager { const staleSegmentsIds = new Set(managerStream.segments.keys()); const newSegments: SegmentBase[] = []; for (const reference of segmentReferences) { - const externalId = Math.trunc(reference.getStartTime()).toString(); + const externalId = Math.trunc(reference.getStartTime() * 2); const segmentLocalId = Utils.getSegmentLocalIdFromReference(reference); if (!managerStream.segments.has(segmentLocalId)) { @@ -93,7 +93,7 @@ export class SegmentManager { segmentReferences.forEach((reference, index) => { const segment = Utils.createSegment({ segmentReference: reference, - externalId: (firstReferenceMediaSequence + index).toString(), + externalId: firstReferenceMediaSequence + index, }); newSegments.push(segment); }); @@ -111,7 +111,7 @@ export class SegmentManager { const segment = Utils.createSegment({ localId, segmentReference: reference, - externalId: index.toString(), + externalId: index, }); newSegments.push(segment); index--; diff --git a/packages/p2p-media-loader-shaka/src/stream-utils.ts b/packages/p2p-media-loader-shaka/src/stream-utils.ts index a5237101..a0a955a1 100644 --- a/packages/p2p-media-loader-shaka/src/stream-utils.ts +++ b/packages/p2p-media-loader-shaka/src/stream-utils.ts @@ -11,7 +11,7 @@ export function createSegment({ localId, }: { segmentReference: shaka.media.SegmentReference; - externalId: string; + externalId: number; localId?: string; }): SegmentBase { const { byteRange, url, startTime, endTime } = diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5f2d92da..6a4e1815 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,9 +1,5 @@ lockfileVersion: '6.0' -settings: - autoInstallPeers: true - excludeLinksFromLockfile: false - patchedDependencies: bittorrent-tracker@10.0.12: hash: 3bacck7ok4ioq2ztv47aeh7t7e @@ -96,13 +92,9 @@ importers: bittorrent-tracker: specifier: 10.0.12 version: 10.0.12(patch_hash=3bacck7ok4ioq2ztv47aeh7t7e) - ripemd160: - specifier: ^2.0.2 - version: 2.0.2 - devDependencies: - '@types/ripemd160': - specifier: ^2.0.2 - version: 2.0.2 + nano-md5: + specifier: ^1.0.5 + version: 1.0.5 packages/p2p-media-loader-hlsjs: dependencies: @@ -1020,12 +1012,6 @@ packages: resolution: {integrity: sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==} dev: true - /@types/node@20.8.8: - resolution: {integrity: sha512-YRsdVxq6OaLfmR9Hy816IMp33xOBjfyOgUd77ehqg96CFywxAPbDbXvAsuN2KVg2HOT8Eh6uAfU+l4WffwPVrQ==} - dependencies: - undici-types: 5.25.3 - dev: true - /@types/prop-types@15.7.5: resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==} dev: true @@ -1044,12 +1030,6 @@ packages: csstype: 3.1.2 dev: true - /@types/ripemd160@2.0.2: - resolution: {integrity: sha512-hv3Oh/+ldCqp1xBRGi/1G6y2fxV6wUiiwjPt2Q7fe4UgmbD52ChdmxJyjDsGCRb9yuTBS9281UhH4D9gO85k7A==} - dependencies: - '@types/node': 20.8.8 - dev: true - /@types/scheduler@0.16.3: resolution: {integrity: sha512-5cJ8CB4yAx7BH1oMvdU0Jh9lrEXyPkar6F9G/ERswkCuvP4KQZfZkSjcMbAICCpQTN4OuZn8tz0HiKv9TGZgrQ==} dev: true @@ -2391,6 +2371,7 @@ packages: inherits: 2.0.4 readable-stream: 3.6.2 safe-buffer: 5.2.1 + dev: true /hash.js@1.1.7: resolution: {integrity: sha512-taOaskGt4z4SOANNseOviYDvjEJinIkRgmp7LbKP2YTTmVxWBl87s/uzK9r+44BclBSp2X7K1hqeNfz9JbBeXA==} @@ -2732,6 +2713,10 @@ packages: global: 4.4.0 dev: false + /nano-md5@1.0.5: + resolution: {integrity: sha512-1VAOX0EiuwAdCMGpnglxp9r6ylm+gXwQi+UPAnc/Oj1tLLJ8D1I8rLZeiO4MWsUAqH8tuBAHweT1LYSrDfJlPg==} + dev: false + /nanoid@3.3.6: resolution: {integrity: sha512-BGcqMMJuToF7i1rt+2PWSNVnWIkGCU78jBG3RxO/bZlnZPK2Cmi2QaffxGO/2RvWi9sL+FAiRiXMgsyxQ1DIDA==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -3080,6 +3065,7 @@ packages: inherits: 2.0.4 string_decoder: 1.3.0 util-deprecate: 1.0.2 + dev: true /regenerator-runtime@0.13.11: resolution: {integrity: sha512-kY1AZVr2Ra+t+piVaJ4gxaFaReZVH40AKNo7UCX6W+dEwBo/2oZJzqfuN1qLq1oL45o56cPaTXELwrTh8Fpggg==} @@ -3124,6 +3110,7 @@ packages: dependencies: hash-base: 3.1.0 inherits: 2.0.4 + dev: true /rollup@3.25.1: resolution: {integrity: sha512-tywOR+rwIt5m2ZAWSe5AIJcTat8vGlnPFAv15ycCrw33t6iFsXZ6mzHVFh2psSjxQPmI+xgzMZZizUAukBI4aQ==} @@ -3152,6 +3139,7 @@ packages: /safe-buffer@5.2.1: resolution: {integrity: sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==} + dev: true /safer-buffer@2.1.2: resolution: {integrity: sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==} @@ -3308,6 +3296,7 @@ packages: resolution: {integrity: sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==} dependencies: safe-buffer: 5.2.1 + dev: true /strip-ansi@6.0.1: resolution: {integrity: sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==} @@ -3412,10 +3401,6 @@ packages: base64-arraybuffer: 1.0.2 dev: false - /undici-types@5.25.3: - resolution: {integrity: sha512-Ga1jfYwRn7+cP9v8auvEXN1rX3sWqlayd4HP7OKk4mZWylEmu3KzXDUGrQUN6Ol7qo1gPvB2e5gX6udnyEPgdA==} - dev: true - /unordered-array-remove@1.0.2: resolution: {integrity: sha512-45YsfD6svkgaCBNyvD+dFHm4qFX9g3wRSIVgWVPtm2OCnphvPxzJoe20ATsiNpNJrmzHifnxm+BN5F7gFT/4gw==} dev: false @@ -3454,6 +3439,7 @@ packages: /util-deprecate@1.0.2: resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==} + dev: true /util@0.12.5: resolution: {integrity: sha512-kZf/K6hEIrWHI6XqOFUiiMa+79wE/D8Q+NCNAWclkyg3b4d2k7s0QGepNjiABc+aR3N1PAyHL7p6UcLY6LmrnA==} @@ -3628,3 +3614,7 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} dev: true + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false