diff --git a/src/to-array-stream.test.ts b/src/to-array-stream.test.ts index bfe587b..f59f4b4 100644 --- a/src/to-array-stream.test.ts +++ b/src/to-array-stream.test.ts @@ -1,49 +1,52 @@ import { Readable } from "node:stream"; import { pipeline } from "node:stream/promises"; +import { asyncInputs } from "./test-util/async-inputs.js"; +import { syncInputs } from "./test-util/sync-inputs.js"; import { ToArrayStream } from "./to-array-stream.js"; -describe(`ToArrayStream Test`, () => { - it(`put chunks in an array in order.`, async () => { - const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])]; - const outputData: Array = []; - - await pipeline( - Readable.from(inputData), - new ToArrayStream({ target: outputData }), - ); - - expect(outputData).toEqual(inputData); - }); - - it(`put chunks with its encoding in an array in order.`, async () => { - const inputData = [{ message: "1" }, { message: "2" }, { message: "3" }]; - const outputData: Array<{ chunk: Buffer; encoding: string }> = []; +const expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]; +const testCases = [ + { + name: `sync data`, + inputs: syncInputs, + }, + { + name: `async data`, + inputs: asyncInputs, + }, +]; - await pipeline( - Readable.from(inputData), - new ToArrayStream( - { target: outputData, includeEncoding: true }, - { objectMode: true }, - ), - ); - - expect( - outputData.map((data) => { - return data.chunk; - }), - ).toEqual(inputData); - for (const data of outputData) { - expect(typeof data.encoding === `string`).toBeTruthy(); +describe(`ToArrayStream Test`, () => { + describe(`put chunks in an array in order.`, () => { + for (const { name, inputs } of testCases) { + it(name, async () => { + const emitted: Array> = []; + const reducer = (acc: Array): void => { + emitted.push(acc); + }; + const toArrayStream = new ToArrayStream( + { target: [] }, + { objectMode: true }, + ); + toArrayStream.on(`data`, reducer); + await pipeline(Readable.from(inputs()), toArrayStream); + expect(emitted).toEqual(expected); + toArrayStream.off(`data`, reducer); + }); } }); - it(`the array passed to constructor is the same as the array returned by toArray method.`, async () => { - const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])]; - const outputData: Array = []; - - const toArrayStream = new ToArrayStream({ target: outputData }); - await pipeline(Readable.from(inputData), toArrayStream); - - expect(outputData).toBe(toArrayStream.toArray()); + describe(`the array passed to constructor is the same as the array returned by toArray method.`, () => { + for (const { name, inputs } of testCases) { + it(name, async () => { + const target: Array = []; + const toArrayStream = new ToArrayStream( + { target: target }, + { objectMode: true }, + ); + await pipeline(Readable.from(inputs()), toArrayStream); + expect(target).toBe(toArrayStream.toArray()); + }); + } }); }); diff --git a/src/to-array-stream.ts b/src/to-array-stream.ts index 2d0d4eb..6f0a69c 100644 --- a/src/to-array-stream.ts +++ b/src/to-array-stream.ts @@ -1,33 +1,20 @@ -import { Writable, WritableOptions } from "node:stream"; +import { TransformOptions } from "node:stream"; +import { ReduceStream } from "./reduce-stream.js"; -export class ToArrayStream extends Writable { - private readonly target: Array; - private readonly includeEncoding: boolean; - - constructor( - options1?: { target: Array; includeEncoding?: boolean }, - options2?: WritableOptions, - ) { - super(options2); - - this.target = options1?.target ?? []; - this.includeEncoding = options1?.includeEncoding ?? false; - } - - _write( - chunk: unknown, - encoding: BufferEncoding, - callback: (error?: Error | null) => void, - ) { - if (this.includeEncoding) { - this.target.push({ chunk: chunk, encoding: encoding }); - } else { - this.target.push(chunk); - } - callback(); +export class ToArrayStream extends ReduceStream, T> { + constructor(options1?: { target?: Array }, options2?: TransformOptions) { + const _options1 = { + acc: options1?.target ?? [], + f: (acc: Array, cur: T): Array => { + acc.push(cur); + return acc; + }, + emitLatest: true, + }; + super(_options1, options2); } - toArray(): Array { - return this.target; + toArray(): Array { + return this.getAcc(); } }