From 270a38a67e0d9d108876627ab2eab5f82e75dde2 Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 26 Jul 2024 13:27:38 +1000 Subject: [PATCH] typescript: McapIndexedReader: add method to only await when neccessary --- typescript/benchmarks/index.ts | 134 ++++++++++++++++++----- typescript/core/src/McapIndexedReader.ts | 66 +++++++++-- 2 files changed, 163 insertions(+), 37 deletions(-) diff --git a/typescript/benchmarks/index.ts b/typescript/benchmarks/index.ts index 818f8f6073..e5ba3ef3c5 100644 --- a/typescript/benchmarks/index.ts +++ b/typescript/benchmarks/index.ts @@ -1,9 +1,44 @@ -import { McapIndexedReader, McapStreamReader, McapWriter, TempBuffer } from "@mcap/core"; +import { McapIndexedReader, McapStreamReader, McapWriter, McapTypes, IWritable } from "@mcap/core"; import assert from "assert"; import { program } from "commander"; +import fs from "fs/promises"; +import os from "os"; +import path from "path"; import { runBenchmark } from "./bench"; +class ReadableFile implements McapTypes.IReadable { + #fd: fs.FileHandle; + constructor(fd: fs.FileHandle) { + this.#fd = fd; + } + async read(offset: bigint, size: bigint): Promise { + const res = new Uint8Array(Number(size)); + await this.#fd.read(res, 0, Number(size), Number(offset)); + return res; + } + async size(): Promise { + const stat = await this.#fd.stat(); + return BigInt(stat.size); + } +} + +class WritableFile implements IWritable { + #fd: fs.FileHandle; + #pos: bigint; + constructor(fd: fs.FileHandle) { + this.#fd = fd; + this.#pos = BigInt(0); + } + async write(buffer: Uint8Array): Promise { + await this.#fd.write(buffer); + this.#pos = this.#pos + BigInt(buffer.length); + } + position(): bigint { + return this.#pos; + } +} + /** * An IWritable that copies data to memory, but overwrites previous data. This allows benchmarking * the copies without actually allocating the full initial capacity. @@ -38,56 +73,95 @@ async function benchmarkReaders() { const chunkSize = 1024 * 1024 * 4; const numMessages = 1_000_000; const messageData = new Uint8Array(messageSize).fill(42); - const buf = new TempBuffer(); - const writer = new McapWriter({ writable: buf, chunkSize }); - await writer.start({ library: "", profile: "" }); - const channelId = await writer.registerChannel({ - schemaId: 0, - topic: "", - messageEncoding: "", - metadata: new Map([]), - }); - for (let i = 0; i < numMessages; i++) { - await writer.addMessage({ - channelId, - sequence: i, - logTime: BigInt(i), - publishTime: BigInt(i), - data: messageData, + const filepath = path.join(os.tmpdir(), "sample.mcap"); + { + const fd = await fs.open(filepath, "w"); + + const writer = new McapWriter({ writable: new WritableFile(fd), chunkSize }); + await writer.start({ library: "", profile: "" }); + const channelId = await writer.registerChannel({ + schemaId: 0, + topic: "", + messageEncoding: "", + metadata: new Map([]), }); + for (let i = 0; i < numMessages; i++) { + await writer.addMessage({ + channelId, + sequence: i, + logTime: BigInt(i), + publishTime: BigInt(i), + data: messageData, + }); + } + await writer.end(); + await fd.close(); } - await writer.end(); await runBenchmark(McapStreamReader.name, async () => { + const fd = await fs.open(filepath); + const stream = fd.createReadStream(); const reader = new McapStreamReader(); - reader.append(buf.get()); let messageCount = 0; - for (;;) { - const rec = reader.nextRecord(); - if (rec != undefined) { - if (rec.type === "Message") { + stream.on("data", (chunk) => { + reader.append(Buffer.from(chunk)); + for (let record; (record = reader.nextRecord()); ) { + if (record.type === "Message") { messageCount++; } - } else { - break; } - } + }); + await new Promise((resolve) => stream.on("end", resolve)); + stream.close(); assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`); + await fd.close(); }); - await runBenchmark(McapIndexedReader.name, async () => { - const reader = await McapIndexedReader.Initialize({ readable: buf }); + await runBenchmark("readMessages_async", async () => { + const fd = await fs.open(filepath); + const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) }); let messageCount = 0; for await (const _ of reader.readMessages()) { messageCount++; } assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`); + await fd.close(); }); - await runBenchmark(McapIndexedReader.name + "_reverse", async () => { - const reader = await McapIndexedReader.Initialize({ readable: buf }); + await runBenchmark("readMessages_async_reverse", async () => { + const fd = await fs.open(filepath); + const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) }); let messageCount = 0; for await (const _ of reader.readMessages({ reverse: true })) { messageCount++; } assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`); + await fd.close(); + }); + await runBenchmark("readMessages_sync", async () => { + const fd = await fs.open(filepath); + const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) }); + let messageCount = 0; + for (const { promise } of reader.readMessagesSync()) { + if (promise != undefined) { + await promise; + } else { + messageCount++; + } + } + assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`); + await fd.close(); + }); + await runBenchmark("readMessages_sync_reverse", async () => { + const fd = await fs.open(filepath); + const reader = await McapIndexedReader.Initialize({ readable: new ReadableFile(fd) }); + let messageCount = 0; + for (const { promise } of reader.readMessagesSync({ reverse: true })) { + if (promise != undefined) { + await promise; + } else { + messageCount++; + } + } + assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`); + await fd.close(); }); } diff --git a/typescript/core/src/McapIndexedReader.ts b/typescript/core/src/McapIndexedReader.ts index 5955300a40..cdc3650e7b 100644 --- a/typescript/core/src/McapIndexedReader.ts +++ b/typescript/core/src/McapIndexedReader.ts @@ -22,6 +22,16 @@ type McapIndexedReaderArgs = { dataSectionCrc?: number; }; +type ReadMessagesSyncResult = + | { + promise: Promise; + message: undefined; + } + | { + promise: undefined; + message: TypedMcapRecords["Message"]; + }; + export class McapIndexedReader { readonly chunkIndexes: readonly TypedMcapRecords["ChunkIndex"][]; readonly attachmentIndexes: readonly TypedMcapRecords["AttachmentIndex"][]; @@ -345,6 +355,13 @@ export class McapIndexedReader { }); } + /** Returns an async iterator that iterates over messages in the MCAP file in order of log time. + * @param args.topics: if defined, only messages from channels matching the topics will be yielded. + * @param args.startTime: if defined, only messages with log times on or after this time will be yielded. + * @param args.endTime: if defined, only messages with log times on or before this time will be yielded. + * @param args.reverse: if true, messages will be yielded in reverse log-time order. + * @param args.validateCrcs: if false, chunk CRCs will not be validated while reading the MCAP. + */ async *readMessages( args: { topics?: readonly string[]; @@ -354,6 +371,33 @@ export class McapIndexedReader { validateCrcs?: boolean; } = {}, ): AsyncGenerator { + for (const { promise, message } of this.readMessagesSync(args)) { + if (promise != undefined) { + await promise; + } else { + yield message; + } + } + } + + /** Returns an iterator that iterates over messages in the MCAP file in order of log time. + * The returned object will have either the `message` or `promise` member defined. If `promise` + * is not undefined, the caller must wait for it to resolve before calling next(). + * @param args.topics: if defined, only messages from channels matching the topics will be yielded. + * @param args.startTime: if defined, only messages with log times on or after this time will be yielded. + * @param args.endTime: if defined, only messages with log times on or before this time will be yielded. + * @param args.reverse: if true, messages will be yielded in reverse log-time order. + * @param args.validateCrcs: if false, chunk CRCs will not be validated while reading the MCAP. + */ + *readMessagesSync( + args: { + topics?: readonly string[]; + startTime?: bigint; + endTime?: bigint; + reverse?: boolean; + validateCrcs?: boolean; + } = {}, + ): Generator { const { topics, startTime = this.#messageStartTime, @@ -398,7 +442,10 @@ export class McapIndexedReader { for (let cursor; (cursor = chunkCursors.peek()); ) { if (!cursor.hasMessageIndexes()) { // If we encounter a chunk whose message indexes have not been loaded yet, load them and re-organize the heap. - await cursor.loadMessageIndexes(this.#readable); + yield { + promise: cursor.loadMessageIndexes(this.#readable), + message: undefined, + }; if (cursor.hasMoreMessages()) { chunkCursors.replace(cursor); } else { @@ -409,12 +456,17 @@ export class McapIndexedReader { let chunkView = chunkViewCache.get(cursor.chunkIndex.chunkStartOffset); if (!chunkView) { - chunkView = await this.#loadChunkData(cursor.chunkIndex, { - validateCrcs: validateCrcs ?? true, - }); - chunkViewCache.set(cursor.chunkIndex.chunkStartOffset, chunkView); + const promise = (async () => { + chunkView = await this.#loadChunkData(cursor.chunkIndex, { + validateCrcs: validateCrcs ?? true, + }); + chunkViewCache.set(cursor.chunkIndex.chunkStartOffset, chunkView); + })(); + yield { promise, message: undefined }; + } + if (chunkView == undefined) { + throw new Error("must wait on yielded promise before continuing"); } - const [logTime, offset] = cursor.popMessage(); if (offset >= BigInt(chunkView.byteLength)) { throw this.#errorWithLibrary( @@ -441,7 +493,7 @@ export class McapIndexedReader { `Message log time ${result.record.logTime} did not match message index entry (${logTime} at offset ${offset} in chunk at offset ${cursor.chunkIndex.chunkStartOffset})`, ); } - yield result.record; + yield { promise: undefined, message: result.record }; if (cursor.hasMoreMessages()) { // There is no need to reorganize the heap when chunks are ordered and not overlapping.