From 2ed7112b97501a23355f0ec85153ab2a2f3c63d6 Mon Sep 17 00:00:00 2001 From: daengdaengLee Date: Mon, 21 Aug 2023 17:16:10 +0900 Subject: [PATCH 1/4] =?UTF-8?q?test:=20ReduceStream=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reduce-stream.test.ts | 74 +++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/src/reduce-stream.test.ts b/src/reduce-stream.test.ts index c3cf565..f823f74 100644 --- a/src/reduce-stream.test.ts +++ b/src/reduce-stream.test.ts @@ -20,70 +20,60 @@ const expected = [ [2, 4, 6, 8, 10], ]; const testCases = [ - { name: `sync data`, inputs: syncInputs }, - { name: `async data`, inputs: asyncInputs }, + { + name: `sync data + emit latest (default option)`, + inputs: syncInputs, + emitLatest: true, + expected: [expected.at(-1)], + }, + { + name: `sync data + emit each`, + inputs: syncInputs, + emitLatest: false, + expected: expected, + }, + { + name: `async data + emit latest (default option)`, + inputs: asyncInputs, + emitLatest: true, + expected: [expected.at(-1)], + }, + { + name: `async data + emit each`, + inputs: asyncInputs, + emitLatest: false, + expected: expected, + }, ]; describe(`ReduceStream Test`, () => { - describe(`accumulate and emit latest acc. (default)`, () => { - for (const { name, inputs } of testCases) { + describe(`accumulate values`, () => { + for (const { name, inputs, emitLatest, expected } of testCases) { it(name, async () => { const emitted: Array> = []; const reduceStream = new ReduceStream( - { f: f, acc: [] }, + { f: f, acc: [], emitLatest: emitLatest }, { objectMode: true }, ); reduceStream.on(`data`, (acc) => { emitted.push(acc); }); await pipeline(Readable.from(inputs()), reduceStream); - expect(emitted).toEqual([expected.at(-1)]); + expect(emitted).toEqual(expected); }); } }); - describe(`accumulate and emit each acc.`, () => { - for (const { name, inputs } of testCases) { + describe(`get accumulated object by getAcc()`, () => { + for (const { name, inputs, emitLatest, expected } of testCases) { it(name, async () => { - const emitted: Array> = []; const reduceStream = new ReduceStream( - { f: f, acc: [], emitLatest: false }, + { f: f, acc: [], emitLatest: emitLatest }, { objectMode: true }, ); - reduceStream.on(`data`, (acc) => { - emitted.push(acc); - }); await pipeline(Readable.from(inputs()), reduceStream); - expect(emitted).toEqual(expected); + expect(reduceStream.getAcc()).toEqual(expected.at(-1)); }); } }); - - describe(`get accumulated object by getAcc()`, () => { - describe(`emit latest (default)`, () => { - for (const { name, inputs } of testCases) { - it(name, async () => { - const reduceStream = new ReduceStream( - { f: f, acc: [] }, - { objectMode: true }, - ); - await pipeline(Readable.from(inputs()), reduceStream); - expect(reduceStream.getAcc()).toEqual(expected.at(-1)); - }); - } - }); - - describe(`emit everytime`, () => { - for (const { name, inputs } of testCases) { - it(name, async () => { - const reduceStream = new ReduceStream( - { f: f, acc: [], emitLatest: false }, - { objectMode: true }, - ); - await pipeline(Readable.from(inputs()), reduceStream); - expect(reduceStream.getAcc()).toEqual(expected.at(-1)); - }); - } - }); - }); }); From a3e587940a7d5a9b4c1ae5a95db26415a5293ce8 Mon Sep 17 00:00:00 2001 From: daengdaengLee Date: Mon, 21 Aug 2023 17:24:06 +0900 Subject: [PATCH 2/4] =?UTF-8?q?test:=20ReduceStream=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reduce-stream.test.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/reduce-stream.test.ts b/src/reduce-stream.test.ts index f823f74..5e6445c 100644 --- a/src/reduce-stream.test.ts +++ b/src/reduce-stream.test.ts @@ -51,15 +51,17 @@ describe(`ReduceStream Test`, () => { for (const { name, inputs, emitLatest, expected } of testCases) { it(name, async () => { const emitted: Array> = []; + const reducer = (acc: Array): void => { + emitted.push(acc); + }; const reduceStream = new ReduceStream( { f: f, acc: [], emitLatest: emitLatest }, { objectMode: true }, ); - reduceStream.on(`data`, (acc) => { - emitted.push(acc); - }); + reduceStream.on(`data`, reducer); await pipeline(Readable.from(inputs()), reduceStream); expect(emitted).toEqual(expected); + reduceStream.off(`data`, reducer); }); } }); From caabff11a5ec4d0cfb43aef70a6dd2c98982c641 Mon Sep 17 00:00:00 2001 From: daengdaengLee Date: Mon, 21 Aug 2023 17:24:54 +0900 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20ToArrayStream=20=EC=9D=84=20ReduceS?= =?UTF-8?q?tream=20=EA=B8=B0=EB=B0=98=EC=9C=BC=EB=A1=9C=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/to-array-stream.test.ts | 81 +++++++++++++++++++------------------ src/to-array-stream.ts | 43 +++++++------------- 2 files changed, 57 insertions(+), 67 deletions(-) diff --git a/src/to-array-stream.test.ts b/src/to-array-stream.test.ts index bfe587b..f59f4b4 100644 --- a/src/to-array-stream.test.ts +++ b/src/to-array-stream.test.ts @@ -1,49 +1,52 @@ import { Readable } from "node:stream"; import { pipeline } from "node:stream/promises"; +import { asyncInputs } from "./test-util/async-inputs.js"; +import { syncInputs } from "./test-util/sync-inputs.js"; import { ToArrayStream } from "./to-array-stream.js"; -describe(`ToArrayStream Test`, () => { - it(`put chunks in an array in order.`, async () => { - const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])]; - const outputData: Array = []; - - await pipeline( - Readable.from(inputData), - new ToArrayStream({ target: outputData }), - ); - - expect(outputData).toEqual(inputData); - }); - - it(`put chunks with its encoding in an array in order.`, async () => { - const inputData = [{ message: "1" }, { message: "2" }, { message: "3" }]; - const outputData: Array<{ chunk: Buffer; encoding: string }> = []; +const expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]; +const testCases = [ + { + name: `sync data`, + inputs: syncInputs, + }, + { + name: `async data`, + inputs: asyncInputs, + }, +]; - await pipeline( - Readable.from(inputData), - new ToArrayStream( - { target: outputData, includeEncoding: true }, - { objectMode: true }, - ), - ); - - expect( - outputData.map((data) => { - return data.chunk; - }), - ).toEqual(inputData); - for (const data of outputData) { - expect(typeof data.encoding === `string`).toBeTruthy(); +describe(`ToArrayStream Test`, () => { + describe(`put chunks in an array in order.`, () => { + for (const { name, inputs } of testCases) { + it(name, async () => { + const emitted: Array> = []; + const reducer = (acc: Array): void => { + emitted.push(acc); + }; + const toArrayStream = new ToArrayStream( + { target: [] }, + { objectMode: true }, + ); + toArrayStream.on(`data`, reducer); + await pipeline(Readable.from(inputs()), toArrayStream); + expect(emitted).toEqual(expected); + toArrayStream.off(`data`, reducer); + }); } }); - it(`the array passed to constructor is the same as the array returned by toArray method.`, async () => { - const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])]; - const outputData: Array = []; - - const toArrayStream = new ToArrayStream({ target: outputData }); - await pipeline(Readable.from(inputData), toArrayStream); - - expect(outputData).toBe(toArrayStream.toArray()); + describe(`the array passed to constructor is the same as the array returned by toArray method.`, () => { + for (const { name, inputs } of testCases) { + it(name, async () => { + const target: Array = []; + const toArrayStream = new ToArrayStream( + { target: target }, + { objectMode: true }, + ); + await pipeline(Readable.from(inputs()), toArrayStream); + expect(target).toBe(toArrayStream.toArray()); + }); + } }); }); diff --git a/src/to-array-stream.ts b/src/to-array-stream.ts index 2d0d4eb..6f0a69c 100644 --- a/src/to-array-stream.ts +++ b/src/to-array-stream.ts @@ -1,33 +1,20 @@ -import { Writable, WritableOptions } from "node:stream"; +import { TransformOptions } from "node:stream"; +import { ReduceStream } from "./reduce-stream.js"; -export class ToArrayStream extends Writable { - private readonly target: Array; - private readonly includeEncoding: boolean; - - constructor( - options1?: { target: Array; includeEncoding?: boolean }, - options2?: WritableOptions, - ) { - super(options2); - - this.target = options1?.target ?? []; - this.includeEncoding = options1?.includeEncoding ?? false; - } - - _write( - chunk: unknown, - encoding: BufferEncoding, - callback: (error?: Error | null) => void, - ) { - if (this.includeEncoding) { - this.target.push({ chunk: chunk, encoding: encoding }); - } else { - this.target.push(chunk); - } - callback(); +export class ToArrayStream extends ReduceStream, T> { + constructor(options1?: { target?: Array }, options2?: TransformOptions) { + const _options1 = { + acc: options1?.target ?? [], + f: (acc: Array, cur: T): Array => { + acc.push(cur); + return acc; + }, + emitLatest: true, + }; + super(_options1, options2); } - toArray(): Array { - return this.target; + toArray(): Array { + return this.getAcc(); } } From 01c92017622cd1e53fcbeffaff10d5124a5c9922 Mon Sep 17 00:00:00 2001 From: daengdaengLee Date: Mon, 21 Aug 2023 17:43:28 +0900 Subject: [PATCH 4/4] =?UTF-8?q?test:=20=EA=B9=A8=EC=A7=84=20=ED=85=8C?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/take-stream.test.ts | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/take-stream.test.ts b/src/take-stream.test.ts index 3eb1ed0..8a25bca 100644 --- a/src/take-stream.test.ts +++ b/src/take-stream.test.ts @@ -1,7 +1,7 @@ -import { Readable } from "node:stream"; +import { PassThrough, Readable, TransformCallback } from "node:stream"; import { pipeline } from "node:stream/promises"; import { TakeStream } from "./take-stream.js"; -import { n, expected, testCases } from "./test-util/take-test-cases.js"; +import { expected, n, testCases } from "./test-util/take-test-cases.js"; import { ToArrayStream } from "./to-array-stream.js"; describe(`TakeStream Test`, () => { @@ -22,15 +22,26 @@ describe(`TakeStream Test`, () => { describe(`done() when n-th data transformed.`, () => { for (const { name, inputs } of testCases) { it(name, async () => { - const outputs: Array = []; + let emittedData: any = null; let dataWhenDone = null; const done = (): void => { - dataWhenDone = outputs.at(-1); + dataWhenDone = emittedData; }; await pipeline( Readable.from(inputs()), new TakeStream({ n: n, done: done }, { objectMode: true }), - new ToArrayStream({ target: outputs }, { objectMode: true }), + new PassThrough({ + transform( + chunk: any, + encoding: BufferEncoding, + callback: TransformCallback, + ) { + emittedData = chunk; + this.push(chunk, encoding); + callback(); + }, + objectMode: true, + }), ); expect(dataWhenDone).toEqual(expected.at(-1)); });