diff --git a/typescript/core/src/ChunkCursor.ts b/typescript/core/src/ChunkCursor.ts index 2113cc2b42..3a33303010 100644 --- a/typescript/core/src/ChunkCursor.ts +++ b/typescript/core/src/ChunkCursor.ts @@ -1,4 +1,3 @@ -import Reader from "./Reader"; import { parseRecord } from "./parse"; import { sortedIndexBy } from "./sortedIndexBy"; import { sortedLastIndexBy } from "./sortedLastIndex"; @@ -137,25 +136,31 @@ export class ChunkCursor { messageIndexes.byteLength, ); - const reader = new Reader(messageIndexesView); + let offset = 0; const arrayOfMessageOffsets: [logTime: bigint, offset: bigint][][] = []; - let record; - while ((record = parseRecord(reader, true))) { - if (record.type !== "MessageIndex") { + for ( + let result; + (result = parseRecord({ view: messageIndexesView, startOffset: offset, validateCrcs: true })), + result.record; + offset += result.usedBytes + ) { + if (result.record.type !== "MessageIndex") { continue; } if ( - record.records.length === 0 || - (this.#relevantChannels && !this.#relevantChannels.has(record.channelId)) + result.record.records.length === 0 || + (this.#relevantChannels && !this.#relevantChannels.has(result.record.channelId)) ) { continue; } - arrayOfMessageOffsets.push(record.records); + arrayOfMessageOffsets.push(result.record.records); } - if (reader.bytesRemaining() !== 0) { - throw new Error(`${reader.bytesRemaining()} bytes remaining in message index section`); + if (offset !== messageIndexesView.byteLength) { + throw new Error( + `${messageIndexesView.byteLength - offset} bytes remaining in message index section`, + ); } this.#orderedMessageOffsets = arrayOfMessageOffsets diff --git a/typescript/core/src/McapIndexedReader.ts b/typescript/core/src/McapIndexedReader.ts index 51ec00044e..5955300a40 100644 --- a/typescript/core/src/McapIndexedReader.ts +++ b/typescript/core/src/McapIndexedReader.ts @@ -2,7 +2,6 @@ import { crc32, crc32Final, crc32Init, crc32Update } from "@foxglove/crc"; import Heap from "heap-js"; import { ChunkCursor } from "./ChunkCursor"; -import Reader from "./Reader"; import { MCAP_MAGIC } from "./constants"; import { parseMagic, parseRecord } from "./parse"; import { DecompressHandlers, IReadable, TypedMcapRecords } from "./types"; @@ -112,7 +111,7 @@ export class McapIndexedReader { headerPrefix.byteOffset, headerPrefix.byteLength, ); - void parseMagic(new Reader(headerPrefixView)); + void parseMagic(headerPrefixView, 0); const headerContentLength = headerPrefixView.getBigUint64( MCAP_MAGIC.length + /* Opcode.HEADER */ 1, true, @@ -122,19 +121,26 @@ export class McapIndexedReader { const headerRecord = await readable.read(BigInt(MCAP_MAGIC.length), headerReadLength); headerEndOffset = BigInt(MCAP_MAGIC.length) + headerReadLength; - const headerReader = new Reader( - new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength), - ); - const headerResult = parseRecord(headerReader, true); - if (headerResult?.type !== "Header") { + const headerResult = parseRecord({ + view: new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength), + startOffset: 0, + validateCrcs: true, + }); + if (headerResult.record?.type !== "Header") { throw new Error( - `Unable to read header at beginning of file; found ${headerResult?.type ?? "nothing"}`, + `Unable to read header at beginning of file; found ${ + headerResult.record?.type ?? "nothing" + }`, ); } - if (headerReader.bytesRemaining() !== 0) { - throw new Error(`${headerReader.bytesRemaining()} bytes remaining after parsing header`); + if (headerResult.usedBytes !== headerRecord.byteLength) { + throw new Error( + `${ + headerRecord.byteLength - headerResult.usedBytes + } bytes remaining after parsing header`, + ); } - header = headerResult; + header = headerResult.record; } function errorWithLibrary(message: string): Error { @@ -173,32 +179,33 @@ export class McapIndexedReader { } try { - void parseMagic( - new Reader(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length), - ); + void parseMagic(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length); } catch (error) { throw errorWithLibrary((error as Error).message); } let footer: TypedMcapRecords["Footer"]; { - const footerReader = new Reader(footerAndMagicView); - const footerRecord = parseRecord(footerReader, true); - if (footerRecord?.type !== "Footer") { + const footerResult = parseRecord({ + view: footerAndMagicView, + startOffset: 0, + validateCrcs: true, + }); + if (footerResult.record?.type !== "Footer") { throw errorWithLibrary( `Unable to read footer from end of file (offset ${footerOffset}); found ${ - footerRecord?.type ?? "nothing" + footerResult.record?.type ?? "nothing" }`, ); } - if (footerReader.bytesRemaining() !== MCAP_MAGIC.length) { + if (footerResult.usedBytes !== footerAndMagicView.byteLength - MCAP_MAGIC.length) { throw errorWithLibrary( `${ - footerReader.bytesRemaining() - MCAP_MAGIC.length + footerAndMagicView.byteLength - MCAP_MAGIC.length - footerResult.usedBytes } bytes remaining after parsing footer`, ); } - footer = footerRecord; + footer = footerResult.record; } if (footer.summaryStart === 0n) { throw errorWithLibrary("File is not indexed"); @@ -254,7 +261,6 @@ export class McapIndexedReader { dataEndAndSummarySection.byteOffset, dataEndAndSummarySection.byteLength, ); - const indexReader = new Reader(indexView); const channelsById = new Map(); const schemasById = new Map(); @@ -265,42 +271,46 @@ export class McapIndexedReader { let statistics: TypedMcapRecords["Statistics"] | undefined; let dataSectionCrc: number | undefined; - let first = true; - let result; - while ((result = parseRecord(indexReader, true))) { - if (first && result.type !== "DataEnd") { + let offset = 0; + for ( + let result; + (result = parseRecord({ view: indexView, startOffset: offset, validateCrcs: true })), + result.record; + offset += result.usedBytes + ) { + if (offset === 0 && result.record.type !== "DataEnd") { throw errorWithLibrary( - `Expected DataEnd record to precede summary section, but found ${result.type}`, + `Expected DataEnd record to precede summary section, but found ${result.record.type}`, ); } - first = false; - switch (result.type) { + switch (result.record.type) { case "Schema": - schemasById.set(result.id, result); + schemasById.set(result.record.id, result.record); break; case "Channel": - channelsById.set(result.id, result); + channelsById.set(result.record.id, result.record); break; case "ChunkIndex": - chunkIndexes.push(result); + chunkIndexes.push(result.record); break; case "AttachmentIndex": - attachmentIndexes.push(result); + attachmentIndexes.push(result.record); break; case "MetadataIndex": - metadataIndexes.push(result); + metadataIndexes.push(result.record); break; case "Statistics": if (statistics) { throw errorWithLibrary("Duplicate Statistics record"); } - statistics = result; + statistics = result.record; break; case "SummaryOffset": - summaryOffsetsByOpcode.set(result.groupOpcode, result); + summaryOffsetsByOpcode.set(result.record.groupOpcode, result.record); break; case "DataEnd": - dataSectionCrc = result.dataSectionCrc === 0 ? undefined : result.dataSectionCrc; + dataSectionCrc = + result.record.dataSectionCrc === 0 ? undefined : result.record.dataSectionCrc; break; case "Header": case "Footer": @@ -309,13 +319,13 @@ export class McapIndexedReader { case "MessageIndex": case "Attachment": case "Metadata": - throw errorWithLibrary(`${result.type} record not allowed in index section`); + throw errorWithLibrary(`${result.record.type} record not allowed in index section`); case "Unknown": break; } } - if (indexReader.bytesRemaining() !== 0) { - throw errorWithLibrary(`${indexReader.bytesRemaining()} bytes remaining in index section`); + if (offset !== indexView.byteLength) { + throw errorWithLibrary(`${indexView.byteLength - offset} bytes remaining in index section`); } return new McapIndexedReader({ @@ -385,7 +395,6 @@ export class McapIndexedReader { // cursor becomes active (i.e. when we first need to access messages from the chunk) and removed // when the cursor is removed from the heap. const chunkViewCache = new Map(); - const chunkReader = new Reader(new DataView(new ArrayBuffer(0))); for (let cursor; (cursor = chunkCursors.peek()); ) { if (!cursor.hasMessageIndexes()) { // If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap. @@ -412,24 +421,27 @@ export class McapIndexedReader { `Message offset beyond chunk bounds (log time ${logTime}, offset ${offset}, chunk data length ${chunkView.byteLength}) in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`, ); } - chunkReader.reset(chunkView, Number(offset)); - const record = parseRecord(chunkReader, validateCrcs ?? true); - if (!record) { + const result = parseRecord({ + view: chunkView, + startOffset: Number(offset), + validateCrcs: validateCrcs ?? true, + }); + if (!result.record) { throw this.#errorWithLibrary( `Unable to parse record at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`, ); } - if (record.type !== "Message") { + if (result.record.type !== "Message") { throw this.#errorWithLibrary( - `Unexpected record type ${record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, + `Unexpected record type ${result.record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, ); } - if (record.logTime !== logTime) { + if (result.record.logTime !== logTime) { throw this.#errorWithLibrary( - `Message log time ${record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, + `Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, ); } - yield record; + yield result.record; if (cursor.hasMoreMessages()) { // There is no need to reorganize the heap when chunks are ordered and not overlapping. @@ -456,18 +468,19 @@ export class McapIndexedReader { continue; } const metadataData = await this.#readable.read(metadataIndex.offset, metadataIndex.length); - const metadataReader = new Reader( - new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength), - ); - const metadataRecord = parseRecord(metadataReader, false); - if (metadataRecord?.type !== "Metadata") { + const metadataResult = parseRecord({ + view: new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength), + startOffset: 0, + validateCrcs: false, + }); + if (metadataResult.record?.type !== "Metadata") { throw this.#errorWithLibrary( `Metadata data at offset ${ metadataIndex.offset - } does not point to metadata record (found ${String(metadataRecord?.type)})`, + } does not point to metadata record (found ${String(metadataResult.record?.type)})`, ); } - yield metadataRecord; + yield metadataResult.record; } } @@ -506,18 +519,23 @@ export class McapIndexedReader { attachmentIndex.offset, attachmentIndex.length, ); - const attachmentReader = new Reader( - new DataView(attachmentData.buffer, attachmentData.byteOffset, attachmentData.byteLength), - ); - const attachmentRecord = parseRecord(attachmentReader, validateCrcs ?? true); - if (attachmentRecord?.type !== "Attachment") { + const attachmentResult = parseRecord({ + view: new DataView( + attachmentData.buffer, + attachmentData.byteOffset, + attachmentData.byteLength, + ), + startOffset: 0, + validateCrcs: validateCrcs ?? true, + }); + if (attachmentResult.record?.type !== "Attachment") { throw this.#errorWithLibrary( `Attachment data at offset ${ attachmentIndex.offset - } does not point to attachment record (found ${String(attachmentRecord?.type)})`, + } does not point to attachment record (found ${String(attachmentResult.record?.type)})`, ); } - yield attachmentRecord; + yield attachmentResult.record; } } @@ -529,19 +547,20 @@ export class McapIndexedReader { chunkIndex.chunkStartOffset, chunkIndex.chunkLength, ); - const chunkReader = new Reader( - new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength), - ); - const chunkRecord = parseRecord(chunkReader, options?.validateCrcs ?? true); - if (chunkRecord?.type !== "Chunk") { + const chunkResult = parseRecord({ + view: new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength), + startOffset: 0, + validateCrcs: options?.validateCrcs ?? true, + }); + if (chunkResult.record?.type !== "Chunk") { throw this.#errorWithLibrary( `Chunk start offset ${ chunkIndex.chunkStartOffset - } does not point to chunk record (found ${String(chunkRecord?.type)})`, + } does not point to chunk record (found ${String(chunkResult.record?.type)})`, ); } - const chunk = chunkRecord; + const chunk = chunkResult.record; let buffer = chunk.records; if (chunk.compression !== "" && buffer.byteLength > 0) { const decompress = this.#decompressHandlers?.[chunk.compression]; diff --git a/typescript/core/src/McapStreamReader.ts b/typescript/core/src/McapStreamReader.ts index 6ed8292ce9..ddd412ef38 100644 --- a/typescript/core/src/McapStreamReader.ts +++ b/typescript/core/src/McapStreamReader.ts @@ -1,6 +1,6 @@ import { crc32 } from "@foxglove/crc"; -import Reader from "./Reader"; +import StreamBuffer from "./StreamBuffer"; import { MCAP_MAGIC } from "./constants"; import { parseMagic, parseRecord } from "./parse"; import { Channel, DecompressHandlers, McapMagic, TypedMcapRecord, TypedMcapRecords } from "./types"; @@ -50,9 +50,7 @@ type McapReaderOptions = { * ``` */ export default class McapStreamReader { - #buffer = new ArrayBuffer(MCAP_MAGIC.length * 2); - #view = new DataView(this.#buffer); - #reader = new Reader(this.#view, MCAP_MAGIC.length * 2); // Cursor starts at end of initial buffer + #buffer = new StreamBuffer(MCAP_MAGIC.length * 2); #decompressHandlers; #includeChunks; #validateCrcs; @@ -80,7 +78,7 @@ export default class McapStreamReader { /** @returns The number of bytes that have been received by `append()` but not yet parsed. */ bytesRemaining(): number { - return this.#reader.bytesRemaining(); + return this.#buffer.bytesRemaining(); } /** @@ -91,41 +89,7 @@ export default class McapStreamReader { if (this.#doneReading) { throw new Error("Already done reading"); } - this.#appendOrShift(data); - this.#reader.reset(this.#view); - } - #appendOrShift(data: Uint8Array): void { - /** Add data to the buffer, shifting existing data or reallocating if necessary. */ - const remainingBytes = this.#reader.bytesRemaining(); - const totalNeededBytes = remainingBytes + data.byteLength; - - if (totalNeededBytes <= this.#buffer.byteLength) { - // Data fits in the current buffer - if (this.#view.byteOffset + totalNeededBytes <= this.#buffer.byteLength) { - // Data fits by appending only - const array = new Uint8Array(this.#buffer, this.#view.byteOffset); - array.set(data, remainingBytes); - this.#view = new DataView(this.#buffer, this.#view.byteOffset, totalNeededBytes); - } else { - // Data fits but requires moving existing data to start of buffer - const existingData = new Uint8Array(this.#buffer, this.#view.byteOffset, remainingBytes); - const array = new Uint8Array(this.#buffer); - array.set(existingData, 0); - array.set(data, existingData.byteLength); - this.#view = new DataView(this.#buffer, 0, totalNeededBytes); - } - } else { - // New data doesn't fit, copy to a new buffer - - // Currently, the new buffer size may be smaller than the old size. For future optimizations, - // we could consider making the buffer size increase monotonically. - this.#buffer = new ArrayBuffer(totalNeededBytes * 2); - const array = new Uint8Array(this.#buffer); - const existingData = new Uint8Array(this.#view.buffer, this.#view.byteOffset, remainingBytes); - array.set(existingData, 0); - array.set(data, existingData.byteLength); - this.#view = new DataView(this.#buffer, 0, totalNeededBytes); - } + this.#buffer.append(data); } /** @@ -165,10 +129,11 @@ export default class McapStreamReader { *#read(): Generator { if (!this.#noMagicPrefix) { - let magic: McapMagic | undefined; - while (((magic = parseMagic(this.#reader)), !magic)) { + let magic: McapMagic | undefined, usedBytes: number | undefined; + while ((({ magic, usedBytes } = parseMagic(this.#buffer.view, 0)), !magic)) { yield; } + this.#buffer.consume(usedBytes); } let header: TypedMcapRecords["Header"] | undefined; @@ -179,10 +144,20 @@ export default class McapStreamReader { for (;;) { let record; - while (((record = parseRecord(this.#reader, this.#validateCrcs)), !record)) { - yield; + { + let usedBytes; + while ( + (({ record, usedBytes } = parseRecord({ + view: this.#buffer.view, + startOffset: 0, + validateCrcs: this.#validateCrcs, + })), + !record) + ) { + yield; + } + this.#buffer.consume(usedBytes); } - switch (record.type) { case "Unknown": break; @@ -231,10 +206,18 @@ export default class McapStreamReader { } } const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength); - const chunkReader = new Reader(view); - let chunkRecord; - while ((chunkRecord = parseRecord(chunkReader, this.#validateCrcs))) { - switch (chunkRecord.type) { + let chunkOffset = 0; + for ( + let chunkResult; + (chunkResult = parseRecord({ + view, + startOffset: chunkOffset, + validateCrcs: this.#validateCrcs, + })), + chunkResult.record; + chunkOffset += chunkResult.usedBytes + ) { + switch (chunkResult.record.type) { case "Unknown": break; case "Header": @@ -249,31 +232,34 @@ export default class McapStreamReader { case "MetadataIndex": case "SummaryOffset": case "DataEnd": - throw errorWithLibrary(`${chunkRecord.type} record not allowed inside a chunk`); + throw errorWithLibrary( + `${chunkResult.record.type} record not allowed inside a chunk`, + ); case "Schema": case "Channel": case "Message": - yield chunkRecord; + yield chunkResult.record; break; } } - if (chunkReader.bytesRemaining() !== 0) { - throw errorWithLibrary(`${chunkReader.bytesRemaining()} bytes remaining in chunk`); + if (chunkOffset !== buffer.byteLength) { + throw errorWithLibrary(`${buffer.byteLength - chunkOffset} bytes remaining in chunk`); } break; } case "Footer": try { - let magic; - while (((magic = parseMagic(this.#reader)), !magic)) { + let magic, usedBytes; + while ((({ magic, usedBytes } = parseMagic(this.#buffer.view, 0)), !magic)) { yield; } + this.#buffer.consume(usedBytes); } catch (error) { throw errorWithLibrary((error as Error).message); } - if (this.#reader.bytesRemaining() !== 0) { + if (this.#buffer.bytesRemaining() !== 0) { throw errorWithLibrary( - `${this.#reader.bytesRemaining()} bytes remaining after MCAP footer and trailing magic`, + `${this.#buffer.bytesRemaining()} bytes remaining after MCAP footer and trailing magic`, ); } return record; diff --git a/typescript/core/src/McapWriter.test.ts b/typescript/core/src/McapWriter.test.ts index e16bc515d5..57bdcd82fc 100644 --- a/typescript/core/src/McapWriter.test.ts +++ b/typescript/core/src/McapWriter.test.ts @@ -3,7 +3,6 @@ import { crc32 } from "@foxglove/crc"; import { McapIndexedReader } from "./McapIndexedReader"; import McapStreamReader from "./McapStreamReader"; import { McapWriter } from "./McapWriter"; -import Reader from "./Reader"; import { TempBuffer } from "./TempBuffer"; import { MCAP_MAGIC, Opcode } from "./constants"; import { parseMagic, parseRecord } from "./parse"; @@ -279,12 +278,13 @@ describe("McapWriter", () => { const array = tempBuffer.get(); const view = new DataView(array.buffer, array.byteOffset, array.byteLength); - const reader = new Reader(view); const records: TypedMcapRecord[] = []; - parseMagic(reader); - let result; - while ((result = parseRecord(reader, true))) { - records.push(result); + for ( + let offset = parseMagic(view, 0).usedBytes, result; + (result = parseRecord({ view, startOffset: offset, validateCrcs: true })), result.record; + offset += result.usedBytes + ) { + records.push(result.record); } const expectedChunkData = new Uint8Array([ diff --git a/typescript/core/src/Reader.ts b/typescript/core/src/Reader.ts index d0136c648b..fcc2887237 100644 --- a/typescript/core/src/Reader.ts +++ b/typescript/core/src/Reader.ts @@ -7,27 +7,13 @@ const textDecoder = new TextDecoder(); export default class Reader { #view: DataView; - #viewU8: Uint8Array; offset: number; constructor(view: DataView, offset = 0) { this.#view = view; - this.#viewU8 = new Uint8Array(view.buffer, view.byteOffset, view.byteLength); this.offset = offset; } - // Should be ~identical to the constructor, it allows us to reinitialize the reader when - // the view changes, without creating a new instance, avoiding allocation / GC overhead - reset(view: DataView, offset = 0): void { - this.#view = view; - this.#viewU8 = new Uint8Array(view.buffer, view.byteOffset, view.byteLength); - this.offset = offset; - } - - bytesRemaining(): number { - return this.#viewU8.length - this.offset; - } - uint8(): number { const value = this.#view.getUint8(this.offset); this.offset += 1; @@ -54,12 +40,14 @@ export default class Reader { string(): string { const length = this.uint32(); - if (length === 0) { - return ""; - } else if (length > this.bytesRemaining()) { + if (this.offset + length > this.#view.byteLength) { throw new Error(`String length ${length} exceeds bounds of buffer`); } - return textDecoder.decode(this.u8ArrayBorrow(length)); + const value = textDecoder.decode( + new Uint8Array(this.#view.buffer, this.#view.byteOffset + this.offset, length), + ); + this.offset += length; + return value; } keyValuePairs(readKey: (reader: Reader) => K, readValue: (reader: Reader) => V): [K, V][] { @@ -115,18 +103,4 @@ export default class Reader { } return result; } - - // Read a borrowed Uint8Array, useful temp references or borrow semantics - u8ArrayBorrow(length: number): Uint8Array { - const result = this.#viewU8.subarray(this.offset, this.offset + length); - this.offset += length; - return result; - } - - // Read a copied Uint8Array from the underlying buffer, use when you need to keep the data around - u8ArrayCopy(length: number): Uint8Array { - const result = this.#viewU8.slice(this.offset, this.offset + length); - this.offset += length; - return result; - } } diff --git a/typescript/core/src/StreamBuffer.test.ts b/typescript/core/src/StreamBuffer.test.ts new file mode 100644 index 0000000000..a45175b3e3 --- /dev/null +++ b/typescript/core/src/StreamBuffer.test.ts @@ -0,0 +1,47 @@ +import StreamBuffer from "./StreamBuffer"; + +function toArray(view: DataView) { + return new Uint8Array(view.buffer, view.byteOffset, view.byteLength); +} + +describe("ByteStorage", () => { + it("handles basic append and consume", () => { + const buffer = new StreamBuffer(); + expect(buffer.bytesRemaining()).toBe(0); + + buffer.append(new Uint8Array([1, 2, 3])); + expect(buffer.bytesRemaining()).toBe(3); + expect(() => { + buffer.consume(4); + }).toThrow(); + + expect(toArray(buffer.view)).toEqual(new Uint8Array([1, 2, 3])); + buffer.consume(3); + expect(buffer.bytesRemaining()).toBe(0); + }); + + it("handles partial consume", () => { + const buffer = new StreamBuffer(); + + buffer.append(new Uint8Array([1, 2, 3, 4, 5])); + expect(buffer.bytesRemaining()).toBe(5); + buffer.consume(2); + expect(buffer.bytesRemaining()).toBe(3); + + expect(toArray(buffer.view)).toEqual(new Uint8Array([3, 4, 5])); + buffer.consume(3); + expect(buffer.bytesRemaining()).toBe(0); + }); + + it("reuses buffer within allocated capacity", () => { + const buffer = new StreamBuffer(5); + const rawBuffer = buffer.view.buffer; + buffer.append(new Uint8Array([1, 2])); + expect(buffer.view.buffer).toBe(rawBuffer); + buffer.append(new Uint8Array([3, 4, 5])); + expect(buffer.view.buffer).toBe(rawBuffer); + buffer.append(new Uint8Array([6, 7])); + expect(buffer.view.buffer).not.toBe(rawBuffer); + expect(toArray(buffer.view)).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6, 7])); + }); +}); diff --git a/typescript/core/src/StreamBuffer.ts b/typescript/core/src/StreamBuffer.ts new file mode 100644 index 0000000000..98eaa785d5 --- /dev/null +++ b/typescript/core/src/StreamBuffer.ts @@ -0,0 +1,58 @@ +/** + * A growable buffer for use when processing a stream of data. + */ +export default class StreamBuffer { + #buffer: ArrayBuffer; + public view: DataView; + + constructor(initialCapacity = 0) { + this.#buffer = new ArrayBuffer(initialCapacity); + this.view = new DataView(this.#buffer, 0, 0); + } + + bytesRemaining(): number { + return this.view.byteLength; + } + + /** Mark some data as consumed, so the memory can be reused when new data is appended. */ + consume(count: number): void { + this.view = new DataView( + this.#buffer, + this.view.byteOffset + count, + this.view.byteLength - count, + ); + } + + /** Add data to the buffer, shifting existing data or reallocating if necessary. */ + append(data: Uint8Array): void { + if (this.view.byteOffset + this.view.byteLength + data.byteLength <= this.#buffer.byteLength) { + // Data fits by appending only + const array = new Uint8Array(this.view.buffer, this.view.byteOffset); + array.set(data, this.view.byteLength); + this.view = new DataView( + this.#buffer, + this.view.byteOffset, + this.view.byteLength + data.byteLength, + ); + } else if (this.view.byteLength + data.byteLength <= this.#buffer.byteLength) { + // Data fits in allocated buffer but requires moving existing data to start of buffer + const oldData = new Uint8Array(this.#buffer, this.view.byteOffset, this.view.byteLength); + const array = new Uint8Array(this.#buffer); + array.set(oldData, 0); + array.set(data, oldData.byteLength); + this.view = new DataView(this.#buffer, 0, this.view.byteLength + data.byteLength); + } else { + // New data doesn't fit, copy to a new buffer + + // Currently, the new buffer size may be smaller than the old size. For future optimizations, + // we could consider making the buffer size increase monotonically. + + const oldData = new Uint8Array(this.#buffer, this.view.byteOffset, this.view.byteLength); + this.#buffer = new ArrayBuffer((this.view.byteLength + data.byteLength) * 2); + const array = new Uint8Array(this.#buffer); + array.set(oldData, 0); + array.set(data, oldData.byteLength); + this.view = new DataView(this.#buffer, 0, this.view.byteLength + data.byteLength); + } + } +} diff --git a/typescript/core/src/parse.ts b/typescript/core/src/parse.ts index 95d0105750..7f2fe80285 100644 --- a/typescript/core/src/parse.ts +++ b/typescript/core/src/parse.ts @@ -1,419 +1,374 @@ import { crc32 } from "@foxglove/crc"; import Reader from "./Reader"; -import { MCAP_MAGIC, Opcode } from "./constants"; +import { isKnownOpcode, MCAP_MAGIC, Opcode } from "./constants"; import { McapMagic, TypedMcapRecord } from "./types"; /** * Parse a MCAP magic string at `startOffset` in `view`. */ -export function parseMagic(reader: Reader): McapMagic | undefined { - if (reader.bytesRemaining() < MCAP_MAGIC.length) { - return undefined; +export function parseMagic( + view: DataView, + startOffset: number, +): { magic: McapMagic; usedBytes: number } | { magic?: undefined; usedBytes: 0 } { + if (startOffset + MCAP_MAGIC.length > view.byteLength) { + return { usedBytes: 0 }; } - const magic = reader.u8ArrayBorrow(MCAP_MAGIC.length); - if (!MCAP_MAGIC.every((val, i) => val === magic[i])) { + if (!MCAP_MAGIC.every((val, i) => val === view.getUint8(startOffset + i))) { throw new Error( `Expected MCAP magic '${MCAP_MAGIC.map((val) => val.toString(16).padStart(2, "0")).join( " ", - )}', found '${Array.from(magic, (_, i) => magic[i]!.toString(16).padStart(2, "0")).join( - " ", - )}'`, + )}', found '${Array.from(MCAP_MAGIC, (_, i) => + view + .getUint8(startOffset + i) + .toString(16) + .padStart(2, "0"), + ).join(" ")}'`, ); } - return { specVersion: "0" }; + return { + magic: { specVersion: "0" }, + usedBytes: MCAP_MAGIC.length, + }; } /** - * Parse a MCAP record from the given reader + * Parse a MCAP record beginning at `startOffset` in `view`. */ -// NOTE: internal function in the hot path, (de)structuring args would be wasteful, acceptable perf/clarity tradeoff -// eslint-disable-next-line @foxglove/no-boolean-parameters -export function parseRecord(reader: Reader, validateCrcs = false): TypedMcapRecord | undefined { - const RECORD_HEADER_SIZE = 1 /*opcode*/ + 8; /*record content length*/ - if (reader.bytesRemaining() < RECORD_HEADER_SIZE) { - return undefined; +export function parseRecord({ + view, + startOffset, + validateCrcs, +}: { + view: DataView; + startOffset: number; + validateCrcs: boolean; +}): { record: TypedMcapRecord; usedBytes: number } | { record?: undefined; usedBytes: 0 } { + if (startOffset + /*opcode*/ 1 + /*record content length*/ 8 >= view.byteLength) { + return { usedBytes: 0 }; } - const start = reader.offset; - const opcode = reader.uint8(); - const recordLength = reader.uint64(); + const headerReader = new Reader(view, startOffset); + const opcode = headerReader.uint8(); + + const recordLength = headerReader.uint64(); if (recordLength > Number.MAX_SAFE_INTEGER) { throw new Error(`Record content length ${recordLength} is too large`); } - const recordLengthNum = Number(recordLength); - - if (reader.bytesRemaining() < recordLengthNum) { - reader.offset = start; // Rewind to the start of the record - return undefined; + const recordEndOffset = headerReader.offset + recordLengthNum; + if (recordEndOffset > view.byteLength) { + return { usedBytes: 0 }; } - let result: TypedMcapRecord; - switch (opcode as Opcode) { - case Opcode.HEADER: - result = parseHeader(reader, recordLengthNum); - break; - case Opcode.FOOTER: - result = parseFooter(reader, recordLengthNum); - break; - case Opcode.SCHEMA: - result = parseSchema(reader, recordLengthNum); - break; - case Opcode.CHANNEL: - result = parseChannel(reader, recordLengthNum); - break; - case Opcode.MESSAGE: - result = parseMessage(reader, recordLengthNum); - break; - case Opcode.CHUNK: - result = parseChunk(reader, recordLengthNum); - break; - case Opcode.MESSAGE_INDEX: - result = parseMessageIndex(reader, recordLengthNum); - break; - case Opcode.CHUNK_INDEX: - result = parseChunkIndex(reader, recordLengthNum); - break; - case Opcode.ATTACHMENT: - result = parseAttachment(reader, recordLengthNum, validateCrcs); - break; - case Opcode.ATTACHMENT_INDEX: - result = parseAttachmentIndex(reader, recordLengthNum); - break; - case Opcode.STATISTICS: - result = parseStatistics(reader, recordLengthNum); - break; - case Opcode.METADATA: - result = parseMetadata(reader, recordLengthNum); - break; - case Opcode.METADATA_INDEX: - result = parseMetadataIndex(reader, recordLengthNum); - break; - case Opcode.SUMMARY_OFFSET: - result = parseSummaryOffset(reader, recordLengthNum); - break; - case Opcode.DATA_END: - result = parseDataEnd(reader, recordLengthNum); - break; - default: - result = parseUnknown(reader, recordLengthNum, opcode); - break; - } - - // NOTE: a bit redundant, but ensures we've advanced by the full record length - // TODO: simplify this when we explore monomorphic paths - reader.offset = start + RECORD_HEADER_SIZE + recordLengthNum; - - return result; -} - -function parseUnknown(reader: Reader, recordLength: number, opcode: number): TypedMcapRecord { - const data = reader.u8ArrayBorrow(recordLength); - return { - type: "Unknown", - opcode, - data, - }; -} - -function parseHeader(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const profile = reader.string(); - const library = reader.string(); - reader.offset = startOffset + recordLength; - return { type: "Header", profile, library }; -} - -function parseFooter(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const summaryStart = reader.uint64(); - const summaryOffsetStart = reader.uint64(); - const summaryCrc = reader.uint32(); - reader.offset = startOffset + recordLength; - return { - type: "Footer", - summaryStart, - summaryOffsetStart, - summaryCrc, - }; -} - -function parseSchema(reader: Reader, recordLength: number): TypedMcapRecord { - const start = reader.offset; - const id = reader.uint16(); - const name = reader.string(); - const encoding = reader.string(); - const dataLen = reader.uint32(); - const end = reader.offset; - if (recordLength - (end - start) < dataLen) { - throw new Error(`Schema data length ${dataLen} exceeds bounds of record`); + if (!isKnownOpcode(opcode)) { + const data = new Uint8Array( + view.buffer, + view.byteOffset + headerReader.offset, + recordLengthNum, + ); + const record: TypedMcapRecord = { + type: "Unknown", + opcode, + data, + }; + return { record, usedBytes: recordEndOffset - startOffset }; } - const data = reader.u8ArrayCopy(dataLen); - reader.offset = start + recordLength; - - return { - type: "Schema", - id, - encoding, - name, - data, - }; -} -function parseChannel(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const channelId = reader.uint16(); - const schemaId = reader.uint16(); - const topicName = reader.string(); - const messageEncoding = reader.string(); - const metadata = reader.map( - (r) => r.string(), - (r) => r.string(), + const recordView = new DataView( + view.buffer, + view.byteOffset + headerReader.offset, + recordLengthNum, ); - reader.offset = startOffset + recordLength; + const reader = new Reader(recordView); + + switch (opcode) { + case Opcode.HEADER: { + const profile = reader.string(); + const library = reader.string(); + const record: TypedMcapRecord = { type: "Header", profile, library }; + return { record, usedBytes: recordEndOffset - startOffset }; + } - return { - type: "Channel", - id: channelId, - schemaId, - topic: topicName, - messageEncoding, - metadata, - }; -} + case Opcode.FOOTER: { + const summaryStart = reader.uint64(); + const summaryOffsetStart = reader.uint64(); + const summaryCrc = reader.uint32(); + const record: TypedMcapRecord = { + type: "Footer", + summaryStart, + summaryOffsetStart, + summaryCrc, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } -function parseMessage(reader: Reader, recordLength: number): TypedMcapRecord { - const MESSAGE_PREFIX_SIZE = 2 + 4 + 8 + 8; // channelId, sequence, logTime, publishTime - const channelId = reader.uint16(); - const sequence = reader.uint32(); - const logTime = reader.uint64(); - const publishTime = reader.uint64(); - const data = reader.u8ArrayCopy(recordLength - MESSAGE_PREFIX_SIZE); - return { - type: "Message", - channelId, - sequence, - logTime, - publishTime, - data, - }; -} + case Opcode.SCHEMA: { + const id = reader.uint16(); + const name = reader.string(); + const encoding = reader.string(); + const dataLen = reader.uint32(); + if (reader.offset + dataLen > recordView.byteLength) { + throw new Error(`Schema data length ${dataLen} exceeds bounds of record`); + } + const data = new Uint8Array( + recordView.buffer, + recordView.byteOffset + reader.offset, + dataLen, + ).slice(); + reader.offset += dataLen; + + const record: TypedMcapRecord = { + type: "Schema", + id, + encoding, + name, + data, + }; + + return { record, usedBytes: recordEndOffset - startOffset }; + } -function parseChunk(reader: Reader, recordLength: number): TypedMcapRecord { - const start = reader.offset; - const startTime = reader.uint64(); - const endTime = reader.uint64(); - const uncompressedSize = reader.uint64(); - const uncompressedCrc = reader.uint32(); - const compression = reader.string(); - const recordsByteLength = Number(reader.uint64()); - const end = reader.offset; - const prefixSize = end - start; - if (recordsByteLength + prefixSize > recordLength) { - throw new Error("Chunk records length exceeds remaining record size"); - } - const records = reader.u8ArrayCopy(recordsByteLength); - reader.offset = start + recordLength; - return { - type: "Chunk", - messageStartTime: startTime, - messageEndTime: endTime, - compression, - uncompressedSize, - uncompressedCrc, - records, - }; -} + case Opcode.CHANNEL: { + const channelId = reader.uint16(); + const schemaId = reader.uint16(); + const topicName = reader.string(); + const messageEncoding = reader.string(); + const metadata = reader.map( + (r) => r.string(), + (r) => r.string(), + ); + + const record: TypedMcapRecord = { + type: "Channel", + id: channelId, + schemaId, + topic: topicName, + messageEncoding, + metadata, + }; + + return { record, usedBytes: recordEndOffset - startOffset }; + } -function parseMessageIndex(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const channelId = reader.uint16(); - const records = reader.keyValuePairs( - (r) => r.uint64(), - (r) => r.uint64(), - ); - reader.offset = startOffset + recordLength; - return { - type: "MessageIndex", - channelId, - records, - }; -} + case Opcode.MESSAGE: { + const channelId = reader.uint16(); + const sequence = reader.uint32(); + const logTime = reader.uint64(); + const publishTime = reader.uint64(); + const data = new Uint8Array( + recordView.buffer, + recordView.byteOffset + reader.offset, + recordView.byteLength - reader.offset, + ).slice(); + const record: TypedMcapRecord = { + type: "Message", + channelId, + sequence, + logTime, + publishTime, + data, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } -function parseChunkIndex(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const messageStartTime = reader.uint64(); - const messageEndTime = reader.uint64(); - const chunkStartOffset = reader.uint64(); - const chunkLength = reader.uint64(); - const messageIndexOffsets = reader.map( - (r) => r.uint16(), - (r) => r.uint64(), - ); - const messageIndexLength = reader.uint64(); - const compression = reader.string(); - const compressedSize = reader.uint64(); - const uncompressedSize = reader.uint64(); - reader.offset = startOffset + recordLength; - return { - type: "ChunkIndex", - messageStartTime, - messageEndTime, - chunkStartOffset, - chunkLength, - messageIndexOffsets, - messageIndexLength, - compression, - compressedSize, - uncompressedSize, - }; -} + case Opcode.CHUNK: { + const startTime = reader.uint64(); + const endTime = reader.uint64(); + const uncompressedSize = reader.uint64(); + const uncompressedCrc = reader.uint32(); + const compression = reader.string(); + const recordByteLength = Number(reader.uint64()); + if (recordByteLength + reader.offset > recordView.byteLength) { + throw new Error("Chunk records length exceeds remaining record size"); + } + const records = new Uint8Array( + recordView.buffer, + recordView.byteOffset + reader.offset, + recordByteLength, + ).slice(); + const record: TypedMcapRecord = { + type: "Chunk", + messageStartTime: startTime, + messageEndTime: endTime, + compression, + uncompressedSize, + uncompressedCrc, + records, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } -function parseAttachment( - reader: Reader, - recordLength: number, - // NOTE: internal function in the hot path, (de)structuring args would be wasteful, acceptable perf/clarity tradeoff - // eslint-disable-next-line @foxglove/no-boolean-parameters - validateCrcs: boolean, -): TypedMcapRecord { - const startOffset = reader.offset; - const logTime = reader.uint64(); - const createTime = reader.uint64(); - const name = reader.string(); - const mediaType = reader.string(); - const dataLen = reader.uint64(); - // NOTE: probably not necessary, but just in case - if (BigInt(reader.offset) + dataLen > Number.MAX_SAFE_INTEGER) { - throw new Error(`Attachment too large: ${dataLen}`); - } - if (reader.offset + Number(dataLen) + 4 /*crc*/ > startOffset + recordLength) { - throw new Error(`Attachment data length ${dataLen} exceeds bounds of record`); - } - const data = reader.u8ArrayCopy(Number(dataLen)); - const crcLength = reader.offset - startOffset; - const expectedCrc = reader.uint32(); - if (validateCrcs && expectedCrc !== 0) { - reader.offset = startOffset; - const fullData = reader.u8ArrayBorrow(crcLength); - const actualCrc = crc32(fullData); - reader.offset = startOffset + crcLength + 4; - if (actualCrc !== expectedCrc) { - throw new Error(`Attachment CRC32 mismatch: expected ${expectedCrc}, actual ${actualCrc}`); + case Opcode.MESSAGE_INDEX: { + const channelId = reader.uint16(); + const records = reader.keyValuePairs( + (r) => r.uint64(), + (r) => r.uint64(), + ); + const record: TypedMcapRecord = { + type: "MessageIndex", + channelId, + records, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.CHUNK_INDEX: { + const messageStartTime = reader.uint64(); + const messageEndTime = reader.uint64(); + const chunkStartOffset = reader.uint64(); + const chunkLength = reader.uint64(); + const messageIndexOffsets = reader.map( + (r) => r.uint16(), + (r) => r.uint64(), + ); + const messageIndexLength = reader.uint64(); + const compression = reader.string(); + const compressedSize = reader.uint64(); + const uncompressedSize = reader.uint64(); + const record: TypedMcapRecord = { + type: "ChunkIndex", + messageStartTime, + messageEndTime, + chunkStartOffset, + chunkLength, + messageIndexOffsets, + messageIndexLength, + compression, + compressedSize, + uncompressedSize, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.ATTACHMENT: { + const logTime = reader.uint64(); + const createTime = reader.uint64(); + const name = reader.string(); + const mediaType = reader.string(); + const dataLen = reader.uint64(); + if (BigInt(recordView.byteOffset + reader.offset) + dataLen > Number.MAX_SAFE_INTEGER) { + throw new Error(`Attachment too large: ${dataLen}`); + } + if (reader.offset + Number(dataLen) + 4 /*crc*/ > recordView.byteLength) { + throw new Error(`Attachment data length ${dataLen} exceeds bounds of record`); + } + const data = new Uint8Array( + recordView.buffer, + recordView.byteOffset + reader.offset, + Number(dataLen), + ).slice(); + reader.offset += Number(dataLen); + const crcLength = reader.offset; + const expectedCrc = reader.uint32(); + if (validateCrcs && expectedCrc !== 0) { + const actualCrc = crc32(new DataView(recordView.buffer, recordView.byteOffset, crcLength)); + if (actualCrc !== expectedCrc) { + throw new Error( + `Attachment CRC32 mismatch: expected ${expectedCrc}, actual ${actualCrc}`, + ); + } + } + + const record: TypedMcapRecord = { + type: "Attachment", + logTime, + createTime, + name, + mediaType, + data, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.ATTACHMENT_INDEX: { + const offset = reader.uint64(); + const length = reader.uint64(); + const logTime = reader.uint64(); + const createTime = reader.uint64(); + const dataSize = reader.uint64(); + const name = reader.string(); + const mediaType = reader.string(); + + const record: TypedMcapRecord = { + type: "AttachmentIndex", + offset, + length, + logTime, + createTime, + dataSize, + name, + mediaType, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.STATISTICS: { + const messageCount = reader.uint64(); + const schemaCount = reader.uint16(); + const channelCount = reader.uint32(); + const attachmentCount = reader.uint32(); + const metadataCount = reader.uint32(); + const chunkCount = reader.uint32(); + const messageStartTime = reader.uint64(); + const messageEndTime = reader.uint64(); + const channelMessageCounts = reader.map( + (r) => r.uint16(), + (r) => r.uint64(), + ); + + const record: TypedMcapRecord = { + type: "Statistics", + messageCount, + schemaCount, + channelCount, + attachmentCount, + metadataCount, + chunkCount, + messageStartTime, + messageEndTime, + channelMessageCounts, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.METADATA: { + const name = reader.string(); + const metadata = reader.map( + (r) => r.string(), + (r) => r.string(), + ); + const record: TypedMcapRecord = { type: "Metadata", metadata, name }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.METADATA_INDEX: { + const offset = reader.uint64(); + const length = reader.uint64(); + const name = reader.string(); + + const record: TypedMcapRecord = { + type: "MetadataIndex", + offset, + length, + name, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.SUMMARY_OFFSET: { + const groupOpcode = reader.uint8(); + const groupStart = reader.uint64(); + const groupLength = reader.uint64(); + + const record: TypedMcapRecord = { + type: "SummaryOffset", + groupOpcode, + groupStart, + groupLength, + }; + return { record, usedBytes: recordEndOffset - startOffset }; + } + case Opcode.DATA_END: { + const dataSectionCrc = reader.uint32(); + const record: TypedMcapRecord = { + type: "DataEnd", + dataSectionCrc, + }; + return { record, usedBytes: recordEndOffset - startOffset }; } } - reader.offset = startOffset + recordLength; - - return { - type: "Attachment", - logTime, - createTime, - name, - mediaType, - data, - }; -} - -function parseAttachmentIndex(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const offset = reader.uint64(); - const length = reader.uint64(); - const logTime = reader.uint64(); - const createTime = reader.uint64(); - const dataSize = reader.uint64(); - const name = reader.string(); - const mediaType = reader.string(); - reader.offset = startOffset + recordLength; - - return { - type: "AttachmentIndex", - offset, - length, - logTime, - createTime, - dataSize, - name, - mediaType, - }; -} - -function parseStatistics(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const messageCount = reader.uint64(); - const schemaCount = reader.uint16(); - const channelCount = reader.uint32(); - const attachmentCount = reader.uint32(); - const metadataCount = reader.uint32(); - const chunkCount = reader.uint32(); - const messageStartTime = reader.uint64(); - const messageEndTime = reader.uint64(); - const channelMessageCounts = reader.map( - (r) => r.uint16(), - (r) => r.uint64(), - ); - reader.offset = startOffset + recordLength; - - return { - type: "Statistics", - messageCount, - schemaCount, - channelCount, - attachmentCount, - metadataCount, - chunkCount, - messageStartTime, - messageEndTime, - channelMessageCounts, - }; -} - -function parseMetadata(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const name = reader.string(); - const metadata = reader.map( - (r) => r.string(), - (r) => r.string(), - ); - reader.offset = startOffset + recordLength; - return { type: "Metadata", metadata, name }; -} - -function parseMetadataIndex(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const offset = reader.uint64(); - const length = reader.uint64(); - const name = reader.string(); - reader.offset = startOffset + recordLength; - - return { - type: "MetadataIndex", - offset, - length, - name, - }; -} - -function parseSummaryOffset(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const groupOpcode = reader.uint8(); - const groupStart = reader.uint64(); - const groupLength = reader.uint64(); - reader.offset = startOffset + recordLength; - - return { - type: "SummaryOffset", - groupOpcode, - groupStart, - groupLength, - }; -} - -function parseDataEnd(reader: Reader, recordLength: number): TypedMcapRecord { - const startOffset = reader.offset; - const dataSectionCrc = reader.uint32(); - reader.offset = startOffset + recordLength; - return { - type: "DataEnd", - dataSectionCrc, - }; }