Skip to content

Commit

Permalink
typescript: McapIndexedReader: add method to only await when neccessary
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Jul 26, 2024
1 parent 05505c7 commit 270a38a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 37 deletions.
134 changes: 104 additions & 30 deletions typescript/benchmarks/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
import { McapIndexedReader, McapStreamReader, McapWriter, TempBuffer } from "@mcap/core";
import { McapIndexedReader, McapStreamReader, McapWriter, McapTypes, IWritable } from "@mcap/core";
import assert from "assert";
import { program } from "commander";
import fs from "fs/promises";
import os from "os";
import path from "path";

import { runBenchmark } from "./bench";

class ReadableFile implements McapTypes.IReadable {
#fd: fs.FileHandle;
constructor(fd: fs.FileHandle) {
this.#fd = fd;
}
async read(offset: bigint, size: bigint): Promise<Uint8Array> {
const res = new Uint8Array(Number(size));
await this.#fd.read(res, 0, Number(size), Number(offset));
return res;
}
async size(): Promise<bigint> {
const stat = await this.#fd.stat();
return BigInt(stat.size);
}
}

class WritableFile implements IWritable {
#fd: fs.FileHandle;
#pos: bigint;
constructor(fd: fs.FileHandle) {
this.#fd = fd;
this.#pos = BigInt(0);
}
async write(buffer: Uint8Array): Promise<void> {
await this.#fd.write(buffer);
this.#pos = this.#pos + BigInt(buffer.length);
}
position(): bigint {
return this.#pos;
}
}

/**
* An IWritable that copies data to memory, but overwrites previous data. This allows benchmarking
* the copies without actually allocating the full initial capacity.
Expand Down Expand Up @@ -38,56 +73,95 @@ async function benchmarkReaders() {
const chunkSize = 1024 * 1024 * 4;
const numMessages = 1_000_000;
const messageData = new Uint8Array(messageSize).fill(42);
const buf = new TempBuffer();
const writer = new McapWriter({ writable: buf, chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
const filepath = path.join(os.tmpdir(), "sample.mcap");
{
const fd = await fs.open(filepath, "w");

const writer = new McapWriter({ writable: new WritableFile(fd), chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
});
}
await writer.end();
await fd.close();
}
await writer.end();
await runBenchmark(McapStreamReader.name, async () => {
const fd = await fs.open(filepath);
const stream = fd.createReadStream();
const reader = new McapStreamReader();
reader.append(buf.get());
let messageCount = 0;
for (;;) {
const rec = reader.nextRecord();
if (rec != undefined) {
if (rec.type === "Message") {
stream.on("data", (chunk) => {
reader.append(Buffer.from(chunk));
for (let record; (record = reader.nextRecord()); ) {
if (record.type === "Message") {
messageCount++;
}
} else {
break;
}
}
});
await new Promise((resolve) => stream.on("end", resolve));
stream.close();
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark(McapIndexedReader.name, async () => {
const reader = await McapIndexedReader.Initialize({ readable: buf });
await runBenchmark("readMessages_async", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for await (const _ of reader.readMessages()) {
messageCount++;
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark(McapIndexedReader.name + "_reverse", async () => {
const reader = await McapIndexedReader.Initialize({ readable: buf });
await runBenchmark("readMessages_async_reverse", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for await (const _ of reader.readMessages({ reverse: true })) {
messageCount++;
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark("readMessages_sync", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for (const { promise } of reader.readMessagesSync()) {
if (promise != undefined) {
await promise;
} else {
messageCount++;
}
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
await runBenchmark("readMessages_sync_reverse", async () => {
const fd = await fs.open(filepath);
const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) });
let messageCount = 0;
for (const { promise } of reader.readMessagesSync({ reverse: true })) {
if (promise != undefined) {
await promise;
} else {
messageCount++;
}
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
await fd.close();
});
}

Expand Down
66 changes: 59 additions & 7 deletions typescript/core/src/McapIndexedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ type McapIndexedReaderArgs = {
dataSectionCrc?: number;
};

type ReadMessagesSyncResult =
| {
promise: Promise<void>;
message: undefined;
}
| {
promise: undefined;
message: TypedMcapRecords["Message"];
};

export class McapIndexedReader {
readonly chunkIndexes: readonly TypedMcapRecords["ChunkIndex"][];
readonly attachmentIndexes: readonly TypedMcapRecords["AttachmentIndex"][];
Expand Down Expand Up @@ -345,6 +355,13 @@ export class McapIndexedReader {
});
}

/** Returns an async iterator that iterates over messages in the MCAP file in order of log time.
* @param args.topics: if defined, only messages from channels matching the topics will be yielded.
* @param args.startTime: if defined, only messages with log times on or after this time will be yielded.
* @param args.endTime: if defined, only messages with log times on or before this time will be yielded.
* @param args.reverse: if true, messages will be yielded in reverse log-time order.
* @param args.validateCrcs: if false, chunk CRCs will not be validated while reading the MCAP.
*/
async *readMessages(
args: {
topics?: readonly string[];
Expand All @@ -354,6 +371,33 @@ export class McapIndexedReader {
validateCrcs?: boolean;
} = {},
): AsyncGenerator<TypedMcapRecords["Message"], void, void> {
for (const { promise, message } of this.readMessagesSync(args)) {
if (promise != undefined) {
await promise;
} else {
yield message;
}
}
}

/** Returns an iterator that iterates over messages in the MCAP file in order of log time.
* The returned object will have either the `message` or `promise` member defined. If `promise`
* is not undefined, the caller must wait for it to resolve before calling next().
* @param args.topics: if defined, only messages from channels matching the topics will be yielded.
* @param args.startTime: if defined, only messages with log times on or after this time will be yielded.
* @param args.endTime: if defined, only messages with log times on or before this time will be yielded.
* @param args.reverse: if true, messages will be yielded in reverse log-time order.
* @param args.validateCrcs: if false, chunk CRCs will not be validated while reading the MCAP.
*/
*readMessagesSync(
args: {
topics?: readonly string[];
startTime?: bigint;
endTime?: bigint;
reverse?: boolean;
validateCrcs?: boolean;
} = {},
): Generator<ReadMessagesSyncResult, void, void> {
const {
topics,
startTime = this.#messageStartTime,
Expand Down Expand Up @@ -398,7 +442,10 @@ export class McapIndexedReader {
for (let cursor; (cursor = chunkCursors.peek()); ) {
if (!cursor.hasMessageIndexes()) {
// If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap.
await cursor.loadMessageIndexes(this.#readable);
yield {
promise: cursor.loadMessageIndexes(this.#readable),
message: undefined,
};
if (cursor.hasMoreMessages()) {
chunkCursors.replace(cursor);
} else {
Expand All @@ -409,12 +456,17 @@ export class McapIndexedReader {

let chunkView = chunkViewCache.get(cursor.chunkIndex.chunkStartOffset);
if (!chunkView) {
chunkView = await this.#loadChunkData(cursor.chunkIndex, {
validateCrcs: validateCrcs ?? true,
});
chunkViewCache.set(cursor.chunkIndex.chunkStartOffset, chunkView);
const promise = (async () => {
chunkView = await this.#loadChunkData(cursor.chunkIndex, {
validateCrcs: validateCrcs ?? true,
});
chunkViewCache.set(cursor.chunkIndex.chunkStartOffset, chunkView);
})();
yield { promise, message: undefined };
}
if (chunkView == undefined) {
throw new Error("must wait on yielded promise before continuing");
}

const [logTime, offset] = cursor.popMessage();
if (offset >= BigInt(chunkView.byteLength)) {
throw this.#errorWithLibrary(
Expand All @@ -441,7 +493,7 @@ export class McapIndexedReader {
`Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`,
);
}
yield result.record;
yield { promise: undefined, message: result.record };

if (cursor.hasMoreMessages()) {
// There is no need to reorganize the heap when chunks are ordered and not overlapping.
Expand Down

0 comments on commit 270a38a

Please sign in to comment.