Skip to content

Commit

Permalink
feat: ToArrayStream 을 ReduceStream 기반으로 수정
Browse files Browse the repository at this point in the history
  • Loading branch information
daengdaengLee committed Aug 21, 2023
1 parent a3e5879 commit caabff1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 67 deletions.
81 changes: 42 additions & 39 deletions src/to-array-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,52 @@
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { asyncInputs } from "./test-util/async-inputs.js";
import { syncInputs } from "./test-util/sync-inputs.js";
import { ToArrayStream } from "./to-array-stream.js";

describe(`ToArrayStream Test`, () => {
it(`put chunks in an array in order.`, async () => {
const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])];
const outputData: Array<Buffer> = [];

await pipeline(
Readable.from(inputData),
new ToArrayStream({ target: outputData }),
);

expect(outputData).toEqual(inputData);
});

it(`put chunks with its encoding in an array in order.`, async () => {
const inputData = [{ message: "1" }, { message: "2" }, { message: "3" }];
const outputData: Array<{ chunk: Buffer; encoding: string }> = [];
const expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]];
const testCases = [
{
name: `sync data`,
inputs: syncInputs,
},
{
name: `async data`,
inputs: asyncInputs,
},
];

await pipeline(
Readable.from(inputData),
new ToArrayStream(
{ target: outputData, includeEncoding: true },
{ objectMode: true },
),
);

expect(
outputData.map((data) => {
return data.chunk;
}),
).toEqual(inputData);
for (const data of outputData) {
expect(typeof data.encoding === `string`).toBeTruthy();
describe(`ToArrayStream Test`, () => {
describe(`put chunks in an array in order.`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const emitted: Array<Array<number>> = [];
const reducer = (acc: Array<number>): void => {
emitted.push(acc);
};
const toArrayStream = new ToArrayStream(
{ target: [] },
{ objectMode: true },
);
toArrayStream.on(`data`, reducer);
await pipeline(Readable.from(inputs()), toArrayStream);
expect(emitted).toEqual(expected);
toArrayStream.off(`data`, reducer);
});
}
});

it(`the array passed to constructor is the same as the array returned by toArray method.`, async () => {
const inputData = [Buffer.from([1]), Buffer.from([2]), Buffer.from([3])];
const outputData: Array<Buffer> = [];

const toArrayStream = new ToArrayStream({ target: outputData });
await pipeline(Readable.from(inputData), toArrayStream);

expect(outputData).toBe(toArrayStream.toArray());
describe(`the array passed to constructor is the same as the array returned by toArray method.`, () => {
for (const { name, inputs } of testCases) {
it(name, async () => {
const target: Array<number> = [];
const toArrayStream = new ToArrayStream(
{ target: target },
{ objectMode: true },
);
await pipeline(Readable.from(inputs()), toArrayStream);
expect(target).toBe(toArrayStream.toArray());
});
}
});
});
43 changes: 15 additions & 28 deletions src/to-array-stream.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,20 @@
import { Writable, WritableOptions } from "node:stream";
import { TransformOptions } from "node:stream";
import { ReduceStream } from "./reduce-stream.js";

export class ToArrayStream extends Writable {
private readonly target: Array<any>;
private readonly includeEncoding: boolean;

constructor(
options1?: { target: Array<any>; includeEncoding?: boolean },
options2?: WritableOptions,
) {
super(options2);

this.target = options1?.target ?? [];
this.includeEncoding = options1?.includeEncoding ?? false;
}

_write(
chunk: unknown,
encoding: BufferEncoding,
callback: (error?: Error | null) => void,
) {
if (this.includeEncoding) {
this.target.push({ chunk: chunk, encoding: encoding });
} else {
this.target.push(chunk);
}
callback();
export class ToArrayStream<T> extends ReduceStream<Array<T>, T> {
constructor(options1?: { target?: Array<T> }, options2?: TransformOptions) {
const _options1 = {
acc: options1?.target ?? [],
f: (acc: Array<T>, cur: T): Array<T> => {
acc.push(cur);
return acc;
},
emitLatest: true,
};
super(_options1, options2);
}

toArray(): Array<any> {
return this.target;
toArray(): Array<T> {
return this.getAcc();
}
}

0 comments on commit caabff1

Please sign in to comment.