Skip to content

Commit

Permalink
Revert "perf(ts): reuse Reader whilst parsing records" (#1227)
Browse files Browse the repository at this point in the history
Reverts #1212

This broke streaming in Foxglove app. Until we can investigate we need
to put the library back to a working state.
  • Loading branch information
defunctzombie authored Aug 30, 2024
1 parent 462981d commit ed894e7
Show file tree
Hide file tree
Showing 8 changed files with 600 additions and 556 deletions.
25 changes: 15 additions & 10 deletions typescript/core/src/ChunkCursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import Reader from "./Reader";
import { parseRecord } from "./parse";
import { sortedIndexBy } from "./sortedIndexBy";
import { sortedLastIndexBy } from "./sortedLastIndex";
Expand Down Expand Up @@ -137,25 +136,31 @@ export class ChunkCursor {
messageIndexes.byteLength,
);

const reader = new Reader(messageIndexesView);
let offset = 0;
const arrayOfMessageOffsets: [logTime: bigint, offset: bigint][][] = [];
let record;
while ((record = parseRecord(reader, true))) {
if (record.type !== "MessageIndex") {
for (
let result;
(result = parseRecord({ view: messageIndexesView, startOffset: offset, validateCrcs: true })),
result.record;
offset += result.usedBytes
) {
if (result.record.type !== "MessageIndex") {
continue;
}
if (
record.records.length === 0 ||
(this.#relevantChannels && !this.#relevantChannels.has(record.channelId))
result.record.records.length === 0 ||
(this.#relevantChannels && !this.#relevantChannels.has(result.record.channelId))
) {
continue;
}

arrayOfMessageOffsets.push(record.records);
arrayOfMessageOffsets.push(result.record.records);
}

if (reader.bytesRemaining() !== 0) {
throw new Error(`${reader.bytesRemaining()} bytes remaining in message index section`);
if (offset !== messageIndexesView.byteLength) {
throw new Error(
`${messageIndexesView.byteLength - offset} bytes remaining in message index section`,
);
}

this.#orderedMessageOffsets = arrayOfMessageOffsets
Expand Down
159 changes: 89 additions & 70 deletions typescript/core/src/McapIndexedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { crc32, crc32Final, crc32Init, crc32Update } from "@foxglove/crc";
import Heap from "heap-js";

import { ChunkCursor } from "./ChunkCursor";
import Reader from "./Reader";
import { MCAP_MAGIC } from "./constants";
import { parseMagic, parseRecord } from "./parse";
import { DecompressHandlers, IReadable, TypedMcapRecords } from "./types";
Expand Down Expand Up @@ -112,7 +111,7 @@ export class McapIndexedReader {
headerPrefix.byteOffset,
headerPrefix.byteLength,
);
void parseMagic(new Reader(headerPrefixView));
void parseMagic(headerPrefixView, 0);
const headerContentLength = headerPrefixView.getBigUint64(
MCAP_MAGIC.length + /* Opcode.HEADER */ 1,
true,
Expand All @@ -122,19 +121,26 @@ export class McapIndexedReader {

const headerRecord = await readable.read(BigInt(MCAP_MAGIC.length), headerReadLength);
headerEndOffset = BigInt(MCAP_MAGIC.length) + headerReadLength;
const headerReader = new Reader(
new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength),
);
const headerResult = parseRecord(headerReader, true);
if (headerResult?.type !== "Header") {
const headerResult = parseRecord({
view: new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength),
startOffset: 0,
validateCrcs: true,
});
if (headerResult.record?.type !== "Header") {
throw new Error(
`Unable to read header at beginning of file; found ${headerResult?.type ?? "nothing"}`,
`Unable to read header at beginning of file; found ${
headerResult.record?.type ?? "nothing"
}`,
);
}
if (headerReader.bytesRemaining() !== 0) {
throw new Error(`${headerReader.bytesRemaining()} bytes remaining after parsing header`);
if (headerResult.usedBytes !== headerRecord.byteLength) {
throw new Error(
`${
headerRecord.byteLength - headerResult.usedBytes
} bytes remaining after parsing header`,
);
}
header = headerResult;
header = headerResult.record;
}

function errorWithLibrary(message: string): Error {
Expand Down Expand Up @@ -173,32 +179,33 @@ export class McapIndexedReader {
}

try {
void parseMagic(
new Reader(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length),
);
void parseMagic(footerAndMagicView, footerAndMagicView.byteLength - MCAP_MAGIC.length);
} catch (error) {
throw errorWithLibrary((error as Error).message);
}

let footer: TypedMcapRecords["Footer"];
{
const footerReader = new Reader(footerAndMagicView);
const footerRecord = parseRecord(footerReader, true);
if (footerRecord?.type !== "Footer") {
const footerResult = parseRecord({
view: footerAndMagicView,
startOffset: 0,
validateCrcs: true,
});
if (footerResult.record?.type !== "Footer") {
throw errorWithLibrary(
`Unable to read footer from end of file (offset ${footerOffset}); found ${
footerRecord?.type ?? "nothing"
footerResult.record?.type ?? "nothing"
}`,
);
}
if (footerReader.bytesRemaining() !== MCAP_MAGIC.length) {
if (footerResult.usedBytes !== footerAndMagicView.byteLength - MCAP_MAGIC.length) {
throw errorWithLibrary(
`${
footerReader.bytesRemaining() - MCAP_MAGIC.length
footerAndMagicView.byteLength - MCAP_MAGIC.length - footerResult.usedBytes
} bytes remaining after parsing footer`,
);
}
footer = footerRecord;
footer = footerResult.record;
}
if (footer.summaryStart === 0n) {
throw errorWithLibrary("File is not indexed");
Expand Down Expand Up @@ -254,7 +261,6 @@ export class McapIndexedReader {
dataEndAndSummarySection.byteOffset,
dataEndAndSummarySection.byteLength,
);
const indexReader = new Reader(indexView);

const channelsById = new Map<number, TypedMcapRecords["Channel"]>();
const schemasById = new Map<number, TypedMcapRecords["Schema"]>();
Expand All @@ -265,42 +271,46 @@ export class McapIndexedReader {
let statistics: TypedMcapRecords["Statistics"] | undefined;
let dataSectionCrc: number | undefined;

let first = true;
let result;
while ((result = parseRecord(indexReader, true))) {
if (first && result.type !== "DataEnd") {
let offset = 0;
for (
let result;
(result = parseRecord({ view: indexView, startOffset: offset, validateCrcs: true })),
result.record;
offset += result.usedBytes
) {
if (offset === 0 && result.record.type !== "DataEnd") {
throw errorWithLibrary(
`Expected DataEnd record to precede summary section, but found ${result.type}`,
`Expected DataEnd record to precede summary section, but found ${result.record.type}`,
);
}
first = false;
switch (result.type) {
switch (result.record.type) {
case "Schema":
schemasById.set(result.id, result);
schemasById.set(result.record.id, result.record);
break;
case "Channel":
channelsById.set(result.id, result);
channelsById.set(result.record.id, result.record);
break;
case "ChunkIndex":
chunkIndexes.push(result);
chunkIndexes.push(result.record);
break;
case "AttachmentIndex":
attachmentIndexes.push(result);
attachmentIndexes.push(result.record);
break;
case "MetadataIndex":
metadataIndexes.push(result);
metadataIndexes.push(result.record);
break;
case "Statistics":
if (statistics) {
throw errorWithLibrary("Duplicate Statistics record");
}
statistics = result;
statistics = result.record;
break;
case "SummaryOffset":
summaryOffsetsByOpcode.set(result.groupOpcode, result);
summaryOffsetsByOpcode.set(result.record.groupOpcode, result.record);
break;
case "DataEnd":
dataSectionCrc = result.dataSectionCrc === 0 ? undefined : result.dataSectionCrc;
dataSectionCrc =
result.record.dataSectionCrc === 0 ? undefined : result.record.dataSectionCrc;
break;
case "Header":
case "Footer":
Expand All @@ -309,13 +319,13 @@ export class McapIndexedReader {
case "MessageIndex":
case "Attachment":
case "Metadata":
throw errorWithLibrary(`${result.type} record not allowed in index section`);
throw errorWithLibrary(`${result.record.type} record not allowed in index section`);
case "Unknown":
break;
}
}
if (indexReader.bytesRemaining() !== 0) {
throw errorWithLibrary(`${indexReader.bytesRemaining()} bytes remaining in index section`);
if (offset !== indexView.byteLength) {
throw errorWithLibrary(`${indexView.byteLength - offset} bytes remaining in index section`);
}

return new McapIndexedReader({
Expand Down Expand Up @@ -385,7 +395,6 @@ export class McapIndexedReader {
// cursor becomes active (i.e. when we first need to access messages from the chunk) and removed
// when the cursor is removed from the heap.
const chunkViewCache = new Map<bigint, DataView>();
const chunkReader = new Reader(new DataView(new ArrayBuffer(0)));
for (let cursor; (cursor = chunkCursors.peek()); ) {
if (!cursor.hasMessageIndexes()) {
// If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap.
Expand All @@ -412,24 +421,27 @@ export class McapIndexedReader {
`Message offset beyond chunk bounds (log time ${logTime}, offset ${offset}, chunk data length ${chunkView.byteLength}) in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`,
);
}
chunkReader.reset(chunkView, Number(offset));
const record = parseRecord(chunkReader, validateCrcs ?? true);
if (!record) {
const result = parseRecord({
view: chunkView,
startOffset: Number(offset),
validateCrcs: validateCrcs ?? true,
});
if (!result.record) {
throw this.#errorWithLibrary(
`Unable to parse record at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset}`,
);
}
if (record.type !== "Message") {
if (result.record.type !== "Message") {
throw this.#errorWithLibrary(
`Unexpected record type ${record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
`Unexpected record type ${result.record.type} in message index (time ${logTime}, offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
);
}
if (record.logTime !== logTime) {
if (result.record.logTime !== logTime) {
throw this.#errorWithLibrary(
`Message log time ${record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
`Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
);
}
yield record;
yield result.record;

if (cursor.hasMoreMessages()) {
// There is no need to reorganize the heap when chunks are ordered and not overlapping.
Expand All @@ -456,18 +468,19 @@ export class McapIndexedReader {
continue;
}
const metadataData = await this.#readable.read(metadataIndex.offset, metadataIndex.length);
const metadataReader = new Reader(
new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength),
);
const metadataRecord = parseRecord(metadataReader, false);
if (metadataRecord?.type !== "Metadata") {
const metadataResult = parseRecord({
view: new DataView(metadataData.buffer, metadataData.byteOffset, metadataData.byteLength),
startOffset: 0,
validateCrcs: false,
});
if (metadataResult.record?.type !== "Metadata") {
throw this.#errorWithLibrary(
`Metadata data at offset ${
metadataIndex.offset
} does not point to metadata record (found ${String(metadataRecord?.type)})`,
} does not point to metadata record (found ${String(metadataResult.record?.type)})`,
);
}
yield metadataRecord;
yield metadataResult.record;
}
}

Expand Down Expand Up @@ -506,18 +519,23 @@ export class McapIndexedReader {
attachmentIndex.offset,
attachmentIndex.length,
);
const attachmentReader = new Reader(
new DataView(attachmentData.buffer, attachmentData.byteOffset, attachmentData.byteLength),
);
const attachmentRecord = parseRecord(attachmentReader, validateCrcs ?? true);
if (attachmentRecord?.type !== "Attachment") {
const attachmentResult = parseRecord({
view: new DataView(
attachmentData.buffer,
attachmentData.byteOffset,
attachmentData.byteLength,
),
startOffset: 0,
validateCrcs: validateCrcs ?? true,
});
if (attachmentResult.record?.type !== "Attachment") {
throw this.#errorWithLibrary(
`Attachment data at offset ${
attachmentIndex.offset
} does not point to attachment record (found ${String(attachmentRecord?.type)})`,
} does not point to attachment record (found ${String(attachmentResult.record?.type)})`,
);
}
yield attachmentRecord;
yield attachmentResult.record;
}
}

Expand All @@ -529,19 +547,20 @@ export class McapIndexedReader {
chunkIndex.chunkStartOffset,
chunkIndex.chunkLength,
);
const chunkReader = new Reader(
new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength),
);
const chunkRecord = parseRecord(chunkReader, options?.validateCrcs ?? true);
if (chunkRecord?.type !== "Chunk") {
const chunkResult = parseRecord({
view: new DataView(chunkData.buffer, chunkData.byteOffset, chunkData.byteLength),
startOffset: 0,
validateCrcs: options?.validateCrcs ?? true,
});
if (chunkResult.record?.type !== "Chunk") {
throw this.#errorWithLibrary(
`Chunk start offset ${
chunkIndex.chunkStartOffset
} does not point to chunk record (found ${String(chunkRecord?.type)})`,
} does not point to chunk record (found ${String(chunkResult.record?.type)})`,
);
}

const chunk = chunkRecord;
const chunk = chunkResult.record;
let buffer = chunk.records;
if (chunk.compression !== "" && buffer.byteLength > 0) {
const decompress = this.#decompressHandlers?.[chunk.compression];
Expand Down
Loading

0 comments on commit ed894e7

Please sign in to comment.