Skip to content

Commit

Permalink
feat: ToArrayStream 을 ReduceStream 을 상속받아 구현하도록 수정 (#36)
Browse files Browse the repository at this point in the history
Merge pull request #36 from daengdaengLee/issue-35
  • Loading branch information
daengdaengLee authored Aug 21, 2023
2 parents 64bac3f + 01c9201 commit 80ba8fe
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 117 deletions.
82 changes: 37 additions & 45 deletions src/reduce-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,70 +20,62 @@ const expected = [
[2, 4, 6, 8, 10],
];
const testCases = [
{ name: `sync data`, inputs: syncInputs },
{ name: `async data`, inputs: asyncInputs },
{
name: `sync data + emit latest (default option)`,
inputs: syncInputs,
emitLatest: true,
expected: [expected.at(-1)],
},
{
name: `sync data + emit each`,
inputs: syncInputs,
emitLatest: false,
expected: expected,
},
{
name: `async data + emit latest (default option)`,
inputs: asyncInputs,
emitLatest: true,
expected: [expected.at(-1)],
},
{
name: `async data + emit each`,
inputs: asyncInputs,
emitLatest: false,
expected: expected,
},
];

describe(`ReduceStream Test`, () => {
describe(`accumulate and emit latest acc. (default)`, () => {
for (const { name, inputs } of testCases) {
describe(`accumulate values`, () => {
for (const { name, inputs, emitLatest, expected } of testCases) {
it(name, async () => {
const emitted: Array<Array<number>> = [];
const reducer = (acc: Array<number>): void => {
emitted.push(acc);
};
const reduceStream = new ReduceStream(
{ f: f, acc: [] },
{ f: f, acc: [], emitLatest: emitLatest },
{ objectMode: true },
);
reduceStream.on(`data`, (acc) => {
emitted.push(acc);
});
reduceStream.on(`data`, reducer);
await pipeline(Readable.from(inputs()), reduceStream);
expect(emitted).toEqual([expected.at(-1)]);
expect(emitted).toEqual(expected);
reduceStream.off(`data`, reducer);
});
}
});

describe(`accumulate and emit each acc.`, () => {
for (const { name, inputs } of testCases) {
describe(`get accumulated object by getAcc()`, () => {
for (const { name, inputs, emitLatest, expected } of testCases) {
it(name, async () => {
const emitted: Array<Array<number>> = [];
const reduceStream = new ReduceStream(
{ f: f, acc: [], emitLatest: false },
{ f: f, acc: [], emitLatest: emitLatest },
{ objectMode: true },
);
reduceStream.on(`data`, (acc) => {
emitted.push(acc);
});
await pipeline(Readable.from(inputs()), reduceStream);
expect(emitted).toEqual(expected);
expect(reduceStream.getAcc()).toEqual(expected.at(-1));
});
}
});

describe(`get accumulated object by getAcc()`, () => {
describe(`emit latest (default)`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const reduceStream = new ReduceStream(
{ f: f, acc: [] },
{ objectMode: true },
);
await pipeline(Readable.from(inputs()), reduceStream);
expect(reduceStream.getAcc()).toEqual(expected.at(-1));
});
}
});

describe(`emit everytime`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const reduceStream = new ReduceStream(
{ f: f, acc: [], emitLatest: false },
{ objectMode: true },
);
await pipeline(Readable.from(inputs()), reduceStream);
expect(reduceStream.getAcc()).toEqual(expected.at(-1));
});
}
});
});
});
21 changes: 16 additions & 5 deletions src/take-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Readable } from "node:stream";
import { PassThrough, Readable, TransformCallback } from "node:stream";
import { pipeline } from "node:stream/promises";
import { TakeStream } from "./take-stream.js";
import { n, expected, testCases } from "./test-util/take-test-cases.js";
import { expected, n, testCases } from "./test-util/take-test-cases.js";
import { ToArrayStream } from "./to-array-stream.js";

describe(`TakeStream Test`, () => {
Expand All @@ -22,15 +22,26 @@ describe(`TakeStream Test`, () => {
describe(`done() when n-th data transformed.`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const outputs: Array<number> = [];
let emittedData: any = null;
let dataWhenDone = null;
const done = (): void => {
dataWhenDone = outputs.at(-1);
dataWhenDone = emittedData;
};
await pipeline(
Readable.from(inputs()),
new TakeStream({ n: n, done: done }, { objectMode: true }),
new ToArrayStream({ target: outputs }, { objectMode: true }),
new PassThrough({
transform(
chunk: any,
encoding: BufferEncoding,
callback: TransformCallback,
) {
emittedData = chunk;
this.push(chunk, encoding);
callback();
},
objectMode: true,
}),
);
expect(dataWhenDone).toEqual(expected.at(-1));
});
Expand Down
81 changes: 42 additions & 39 deletions src/to-array-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,52 @@
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { asyncInputs } from "./test-util/async-inputs.js";
import { syncInputs } from "./test-util/sync-inputs.js";
import { ToArrayStream } from "./to-array-stream.js";

describe(`ToArrayStream Test`, () => {
it(`put chunks in an array in order.`, async () => {
const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])];
const outputData: Array<Buffer> = [];

await pipeline(
Readable.from(inputData),
new ToArrayStream({ target: outputData }),
);

expect(outputData).toEqual(inputData);
});

it(`put chunks with its encoding in an array in order.`, async () => {
const inputData = [{ message: "1" }, { message: "2" }, { message: "3" }];
const outputData: Array<{ chunk: Buffer; encoding: string }> = [];
const expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]];
const testCases = [
{
name: `sync data`,
inputs: syncInputs,
},
{
name: `async data`,
inputs: asyncInputs,
},
];

await pipeline(
Readable.from(inputData),
new ToArrayStream(
{ target: outputData, includeEncoding: true },
{ objectMode: true },
),
);

expect(
outputData.map((data) => {
return data.chunk;
}),
).toEqual(inputData);
for (const data of outputData) {
expect(typeof data.encoding === `string`).toBeTruthy();
describe(`ToArrayStream Test`, () => {
describe(`put chunks in an array in order.`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const emitted: Array<Array<number>> = [];
const reducer = (acc: Array<number>): void => {
emitted.push(acc);
};
const toArrayStream = new ToArrayStream(
{ target: [] },
{ objectMode: true },
);
toArrayStream.on(`data`, reducer);
await pipeline(Readable.from(inputs()), toArrayStream);
expect(emitted).toEqual(expected);
toArrayStream.off(`data`, reducer);
});
}
});

it(`the array passed to constructor is the same as the array returned by toArray method.`, async () => {
const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])];
const outputData: Array<Buffer> = [];

const toArrayStream = new ToArrayStream({ target: outputData });
await pipeline(Readable.from(inputData), toArrayStream);

expect(outputData).toBe(toArrayStream.toArray());
describe(`the array passed to constructor is the same as the array returned by toArray method.`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const target: Array<number> = [];
const toArrayStream = new ToArrayStream(
{ target: target },
{ objectMode: true },
);
await pipeline(Readable.from(inputs()), toArrayStream);
expect(target).toBe(toArrayStream.toArray());
});
}
});
});
43 changes: 15 additions & 28 deletions src/to-array-stream.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,20 @@
import { Writable, WritableOptions } from "node:stream";
import { TransformOptions } from "node:stream";
import { ReduceStream } from "./reduce-stream.js";

export class ToArrayStream extends Writable {
private readonly target: Array<any>;
private readonly includeEncoding: boolean;

constructor(
options1?: { target: Array<any>; includeEncoding?: boolean },
options2?: WritableOptions,
) {
super(options2);

this.target = options1?.target ?? [];
this.includeEncoding = options1?.includeEncoding ?? false;
}

_write(
chunk: unknown,
encoding: BufferEncoding,
callback: (error?: Error | null) => void,
) {
if (this.includeEncoding) {
this.target.push({ chunk: chunk, encoding: encoding });
} else {
this.target.push(chunk);
}
callback();
export class ToArrayStream<T> extends ReduceStream<Array<T>, T> {
constructor(options1?: { target?: Array<T> }, options2?: TransformOptions) {
const _options1 = {
acc: options1?.target ?? [],
f: (acc: Array<T>, cur: T): Array<T> => {
acc.push(cur);
return acc;
},
emitLatest: true,
};
super(_options1, options2);
}

toArray(): Array<any> {
return this.target;
toArray(): Array<T> {
return this.getAcc();
}
}

0 comments on commit 80ba8fe

Please sign in to comment.