Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k committed Sep 17, 2024
1 parent 9e63342 commit f8a2e33
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
12 changes: 8 additions & 4 deletions typescript/core/src/McapStreamReader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,12 @@ describe("McapStreamReader", () => {
topic: "foo",
metadata: new Map(),
};
const messageSize = 1_000;
const messageRecordBytes = 1 + 8 + 2 + 4 + 8 + 8 + messageSize;

const makeMessage = (fillNumber: number) => ({
channelId: 0,
data: new Uint8Array(1_000).fill(fillNumber),
data: new Uint8Array(messageSize).fill(fillNumber),
logTime: 0n,
publishTime: 0n,
sequence: 0,
Expand All @@ -686,25 +689,26 @@ describe("McapStreamReader", () => {
recordBuilder.writeMessage(makeMessage(1));
recordBuilder.writeMessage(makeMessage(2));
streamReader.append(recordBuilder.buffer);
expect(streamReader.bytesRemaining()).toBe(recordBuilder.buffer.byteLength);
const remainingBytes = recordBuilder.buffer.byteLength;
expect(streamReader.bytesRemaining()).toBe(2 * messageRecordBytes);

// Add one more message. Nothing has been consumed yet, but the internal buffer should be
// large enough to simply append the new data.
recordBuilder.reset();
recordBuilder.writeMessage(makeMessage(3));
streamReader.append(recordBuilder.buffer);
expect(streamReader.bytesRemaining()).toBe(remainingBytes + recordBuilder.buffer.byteLength);
expect(streamReader.bytesRemaining()).toBe(3 * messageRecordBytes);

// Read some (but not all) messages to forward the reader's internal offset
expect(streamReader.nextRecord()).toEqual({ ...makeMessage(1), type: "Message" });
expect(streamReader.nextRecord()).toEqual({ ...makeMessage(2), type: "Message" });
expect(streamReader.bytesRemaining()).toBe(1 * messageRecordBytes);

// Add more messages. This will cause existing data to be shifted to the beginning of the buffer.
recordBuilder.reset();
recordBuilder.writeMessage(makeMessage(4));
recordBuilder.writeMessage(makeMessage(5));
streamReader.append(recordBuilder.buffer);
expect(streamReader.bytesRemaining()).toBe(3 * messageRecordBytes);

expect(streamReader.nextRecord()).toEqual({ ...makeMessage(3), type: "Message" });
expect(streamReader.nextRecord()).toEqual({ ...makeMessage(4), type: "Message" });
Expand Down
17 changes: 9 additions & 8 deletions typescript/core/src/McapStreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type McapReaderOptions = {
*/
export default class McapStreamReader {
#buffer = new ArrayBuffer(MCAP_MAGIC.length * 2);
#view = new DataView(this.#buffer);
#reader = new Reader(this.#view, MCAP_MAGIC.length * 2); // Cursor starts at end of initial buffer
#view = new DataView(this.#buffer, 0, 0);
#reader = new Reader(this.#view);
#decompressHandlers;
#includeChunks;
#validateCrcs;
Expand Down Expand Up @@ -93,13 +93,14 @@ export default class McapStreamReader {
}
this.#appendOrShift(data);
}

#appendOrShift(data: Uint8Array): void {
/** Add data to the buffer, shifting existing data or reallocating if necessary. */
const consumedBytes = this.#reader.offset;
const remainingBytes = this.#view.byteLength - consumedBytes;
const totalNeededBytes = remainingBytes + data.byteLength;
const unconsumedBytes = this.#view.byteLength - consumedBytes;
const neededCapacity = unconsumedBytes + data.byteLength;

if (totalNeededBytes <= this.#buffer.byteLength) {
if (neededCapacity <= this.#buffer.byteLength) {
// Data fits in the current buffer
if (
this.#view.byteOffset + this.#view.byteLength + data.byteLength <=
Expand All @@ -121,7 +122,7 @@ export default class McapStreamReader {
const existingData = new Uint8Array(
this.#buffer,
this.#view.byteOffset + consumedBytes,
remainingBytes,
unconsumedBytes,
);
const array = new Uint8Array(this.#buffer);
array.set(existingData, 0);
Expand All @@ -134,12 +135,12 @@ export default class McapStreamReader {

// Currently, the new buffer size may be smaller than the old size. For future optimizations,
// we could consider making the buffer size increase monotonically.
this.#buffer = new ArrayBuffer(totalNeededBytes * 2);
this.#buffer = new ArrayBuffer(neededCapacity * 2);
const array = new Uint8Array(this.#buffer);
const existingData = new Uint8Array(
this.#view.buffer,
this.#view.byteOffset + consumedBytes,
remainingBytes,
unconsumedBytes,
);
array.set(existingData, 0);
array.set(data, existingData.byteLength);
Expand Down

0 comments on commit f8a2e33

Please sign in to comment.