From f8a2e33ee23a9711417a77d3ad61baf40236fa76 Mon Sep 17 00:00:00 2001 From: Hans-Joachim Krauch Date: Tue, 17 Sep 2024 17:46:12 +0200 Subject: [PATCH] comments --- typescript/core/src/McapStreamReader.test.ts | 12 ++++++++---- typescript/core/src/McapStreamReader.ts | 17 +++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/typescript/core/src/McapStreamReader.test.ts b/typescript/core/src/McapStreamReader.test.ts index f4df0fbaa..4b5bdde71 100644 --- a/typescript/core/src/McapStreamReader.test.ts +++ b/typescript/core/src/McapStreamReader.test.ts @@ -667,9 +667,12 @@ describe("McapStreamReader", () => { topic: "foo", metadata: new Map(), }; + const messageSize = 1_000; + const messageRecordBytes = 1 + 8 + 2 + 4 + 8 + 8 + messageSize; + const makeMessage = (fillNumber: number) => ({ channelId: 0, - data: new Uint8Array(1_000).fill(fillNumber), + data: new Uint8Array(messageSize).fill(fillNumber), logTime: 0n, publishTime: 0n, sequence: 0, @@ -686,25 +689,26 @@ describe("McapStreamReader", () => { recordBuilder.writeMessage(makeMessage(1)); recordBuilder.writeMessage(makeMessage(2)); streamReader.append(recordBuilder.buffer); - expect(streamReader.bytesRemaining()).toBe(recordBuilder.buffer.byteLength); - const remainingBytes = recordBuilder.buffer.byteLength; + expect(streamReader.bytesRemaining()).toBe(2 * messageRecordBytes); // Add one more message. Nothing has been consumed yet, but the internal buffer should be // large enough to simply append the new data. recordBuilder.reset(); recordBuilder.writeMessage(makeMessage(3)); streamReader.append(recordBuilder.buffer); - expect(streamReader.bytesRemaining()).toBe(remainingBytes + recordBuilder.buffer.byteLength); + expect(streamReader.bytesRemaining()).toBe(3 * messageRecordBytes); // Read some (but not all) messages to forward the reader's internal offset expect(streamReader.nextRecord()).toEqual({ ...makeMessage(1), type: "Message" }); expect(streamReader.nextRecord()).toEqual({ ...makeMessage(2), type: "Message" }); + expect(streamReader.bytesRemaining()).toBe(1 * messageRecordBytes); // Add more messages. This will cause existing data to be shifted to the beginning of the buffer. recordBuilder.reset(); recordBuilder.writeMessage(makeMessage(4)); recordBuilder.writeMessage(makeMessage(5)); streamReader.append(recordBuilder.buffer); + expect(streamReader.bytesRemaining()).toBe(3 * messageRecordBytes); expect(streamReader.nextRecord()).toEqual({ ...makeMessage(3), type: "Message" }); expect(streamReader.nextRecord()).toEqual({ ...makeMessage(4), type: "Message" }); diff --git a/typescript/core/src/McapStreamReader.ts b/typescript/core/src/McapStreamReader.ts index 0c10d325c..f04db4773 100644 --- a/typescript/core/src/McapStreamReader.ts +++ b/typescript/core/src/McapStreamReader.ts @@ -51,8 +51,8 @@ type McapReaderOptions = { */ export default class McapStreamReader { #buffer = new ArrayBuffer(MCAP_MAGIC.length * 2); - #view = new DataView(this.#buffer); - #reader = new Reader(this.#view, MCAP_MAGIC.length * 2); // Cursor starts at end of initial buffer + #view = new DataView(this.#buffer, 0, 0); + #reader = new Reader(this.#view); #decompressHandlers; #includeChunks; #validateCrcs; @@ -93,13 +93,14 @@ export default class McapStreamReader { } this.#appendOrShift(data); } + #appendOrShift(data: Uint8Array): void { /** Add data to the buffer, shifting existing data or reallocating if necessary. */ const consumedBytes = this.#reader.offset; - const remainingBytes = this.#view.byteLength - consumedBytes; - const totalNeededBytes = remainingBytes + data.byteLength; + const unconsumedBytes = this.#view.byteLength - consumedBytes; + const neededCapacity = unconsumedBytes + data.byteLength; - if (totalNeededBytes <= this.#buffer.byteLength) { + if (neededCapacity <= this.#buffer.byteLength) { // Data fits in the current buffer if ( this.#view.byteOffset + this.#view.byteLength + data.byteLength <= @@ -121,7 +122,7 @@ export default class McapStreamReader { const existingData = new Uint8Array( this.#buffer, this.#view.byteOffset + consumedBytes, - remainingBytes, + unconsumedBytes, ); const array = new Uint8Array(this.#buffer); array.set(existingData, 0); @@ -134,12 +135,12 @@ export default class McapStreamReader { // Currently, the new buffer size may be smaller than the old size. For future optimizations, // we could consider making the buffer size increase monotonically. - this.#buffer = new ArrayBuffer(totalNeededBytes * 2); + this.#buffer = new ArrayBuffer(neededCapacity * 2); const array = new Uint8Array(this.#buffer); const existingData = new Uint8Array( this.#view.buffer, this.#view.byteOffset + consumedBytes, - remainingBytes, + unconsumedBytes, ); array.set(existingData, 0); array.set(data, existingData.byteLength);