Skip to content

Commit

Permalink
feat: takeWhileStreamFactory 개발 (#28)
Browse files Browse the repository at this point in the history
Merge pull request #28 from daengdaengLee/issue-14
  • Loading branch information
daengdaengLee authored Aug 16, 2023
2 parents 4f54f13 + c93e837 commit 08c6f68
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 96 deletions.
40 changes: 28 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,15 @@ await pipeline(

### takeStreamFactory

Create a wrapper stream that takes only at most n data from the source stream.
Create a wrapped stream that yields at most n data from the source stream.

- support curry style
- `takeStreamFactory({ n: 10 }, sourceStream)` -> `takeStreamFactory({ n: 10 })(sourceStream)`
- source stream will close automatically after yields n data.
- source stream will be closed automatically when wrapped stream is closed.
- it returns async generator that is compatible with readable stream. If you want an exact stream, wrap it with `Readable.from`.

```typescript
import { TakeStreamFactory } from "utilitystreams";
import { takeStreamFactory } from "utilitystreams";

await pipeline(
takeStreamFactory({ n: 10 }, readableStream),
Expand All @@ -180,9 +181,9 @@ await pipeline(

### TakeStream

Take only n data from the input data.
Yield at most n data from the input data.

- **If the source readable stream is large or infinite, you should prepare some end logic or use `TakeStreamFactory`.**
- **If the source readable stream is large or infinite, you should prepare some end logic or use `takeStreamFactory`.**
- It's very hard to "end" the stream "pipeline" in the middle.
- So, I prepare a callback function to do end the source readable stream.
- You have to prepare some error handling from destroy call or call some custom end logic.
Expand All @@ -198,11 +199,30 @@ await pipeline(
);
```

### takeWhileStreamFactory

Create a wrapped stream that yields data from the source stream while the predicate function returns true.

- support curry style
- `takeWhileStreamFactory({ f: predicate }, sourceStream)` -> `takeWhileStreamFactory({ f: predicate })(sourceStream)`
- source stream will be closed automatically when wrapped stream is closed.
- it returns async generator that is compatible with readable stream. If you want an exact stream, wrap it with `Readable.from`.

```typescript
import { takeWhileStreamFactory } from "utilitystreams";

await pipeline(
takeWhilStreamFactory({ f: predicate }, readableStream),
// ... other streams
process.stdout,
);
```

### TakeWhileStream

Take data while the predicate function returns true.
Yield data while the predicate function returns true.

- **If the source readable stream is large or infinite, you should prepare some end logic.**
- **If the source readable stream is large or infinite, you should prepare some end logic or use `takeWhileStreamFactory`. **
- It's very hard to "end" the stream "pipeline" in the middle.
- So, I prepare a callback function to do end the source readable stream.
- You have to prepare some error handling from destroy call or call some custom end logic.
Expand All @@ -213,11 +233,7 @@ import { TakeWhileStream } from "utilitystreams";
await pipeline(
readableStream,
// ... other streams
new TakeWhileStream({
f: (chunk) => {
return !chunk.signal;
},
}),
new TakeWhileStream({ f: predicate }),
process.stdout,
);
```
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ export { DelayStream } from "./delay-stream.js";
export { FilterStream } from "./filter-stream.js";
export { MapStream } from "./map-stream.js";
export { TakeStream } from "./take-stream.js";
export { takeStreamFactory } from "./take-stream-factory.js";
export { TakeUntilStream } from "./take-until-stream.js";
export { TakeWhileStream } from "./take-while-stream.js";
export { takeWhileStreamFactory } from "./take-while-stream-factory.js";
export { ThrottleStream } from "./throttle-stream.js";
export { ToArrayStream } from "./to-array-stream.js";
111 changes: 51 additions & 60 deletions src/take-stream-factory.test.ts
Original file line number Diff line number Diff line change
@@ -1,71 +1,62 @@
import { Readable, PassThrough } from "node:stream";
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { takeStreamFactory } from "./take-stream-factory.js";
import { asyncInputs } from "./test-util/async-inputs.js";
import { sourceStreamCloseTestTemplate } from "./test-util/source-stream-close-test-template.js";
import { syncInputs } from "./test-util/sync-inputs.js";
import { ToArrayStream } from "./to-array-stream.js";

describe(`TakeStreamFactory Test`, () => {
const n = 2;
const inputs = [1, 2, 3, 4, 5, 6];
describe(`takeStreamFactory Test`, () => {
const n = 4;
const expected = [1, 2, 3, 4];
const testCases = [
{ name: `sync data`, inputs: syncInputs },
{ name: `async data`, inputs: asyncInputs },
];

describe(`outputs only the number you set and exits.`, () => {
it(`no currey version`, async () => {
const outputs: Array<number> = [];
await pipeline(
takeStreamFactory({ n: n }, Readable.from(inputs)),
new ToArrayStream({ target: outputs }, { objectMode: true }),
);

expect(outputs).toEqual(inputs.slice(0, n));
});

it(`currey version`, async () => {
const outputs: Array<number> = [];
await pipeline(
takeStreamFactory({ n: n })(Readable.from(inputs)),
new ToArrayStream({ target: outputs }, { objectMode: true }),
);

expect(outputs).toEqual(inputs.slice(0, n));
});
});

describe(`close source stream after at most n data yields`, () => {
it(`no curry version`, async () => {
const sourceOutputs: Array<number> = [];
let sourceDone = false;
const sourceStream = Readable.from(inputs);
sourceStream.on(`data`, (value) => {
sourceOutputs.push(value);
for (const { name, inputs } of testCases) {
describe(name, () => {
it(`no currey version`, async () => {
const outputs: Array<number> = [];
await pipeline(
takeStreamFactory({ n: n }, Readable.from(inputs())),
new ToArrayStream({ target: outputs }, { objectMode: true }),
);
expect(outputs).toEqual(expected);
});

it(`currey version`, async () => {
const outputs: Array<number> = [];
await pipeline(
takeStreamFactory({ n: n })(Readable.from(inputs())),
new ToArrayStream({ target: outputs }, { objectMode: true }),
);
expect(outputs).toEqual(expected);
});
});
sourceStream.on(`close`, () => {
sourceDone = true;
});
await pipeline(
takeStreamFactory({ n: n }, sourceStream),
new PassThrough({ objectMode: true }),
);

expect(sourceOutputs).toEqual(inputs.slice(0, n));
expect(sourceDone).toBeTruthy();
});
}
});

it(`curry version`, async () => {
const sourceOutputs: Array<number> = [];
let sourceDone = false;
const sourceStream = Readable.from(inputs);
sourceStream.on(`data`, (value) => {
sourceOutputs.push(value);
describe(`close source stream when close wrapped stream.`, () => {
for (const { name, inputs } of testCases) {
describe(name, () => {
it(`no curry version`, async () => {
const sourceStream = Readable.from(inputs());
const wrappedStream = Readable.from(
takeStreamFactory({ n: n }, sourceStream),
);
await sourceStreamCloseTestTemplate(sourceStream, wrappedStream);
});

it(`curry version`, async () => {
const sourceStream = Readable.from(inputs());
const wrappedStream = Readable.from(
takeStreamFactory({ n: n })(sourceStream),
);
await sourceStreamCloseTestTemplate(sourceStream, wrappedStream);
});
});
sourceStream.on(`close`, () => {
sourceDone = true;
});
await pipeline(
takeStreamFactory({ n: n })(sourceStream),
new PassThrough({ objectMode: true }),
);

expect(sourceOutputs).toEqual(inputs.slice(0, n));
expect(sourceDone).toBeTruthy();
});
}
});
});
14 changes: 2 additions & 12 deletions src/take-until-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { TakeUntilStream } from "./take-until-stream.js";
import { asyncInputs } from "./test-util/async-inputs.js";
import { syncInputs } from "./test-util/sync-inputs.js";
import { ToArrayStream } from "./to-array-stream.js";
import { delay } from "./util.js";

describe(`TakeUntilStream Test`, () => {
function* syncInputs() {
for (const n of [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
yield n;
}
}
async function* asyncInputs() {
for (const n of syncInputs()) {
await delay(50);
yield n;
await delay(50);
}
}
const syncF = (n: number): boolean => {
return n >= 5;
};
Expand Down
71 changes: 71 additions & 0 deletions src/take-while-stream-factory.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { takeWhileStreamFactory } from "./take-while-stream-factory.js";
import { asyncInputs } from "./test-util/async-inputs.js";
import { sourceStreamCloseTestTemplate } from "./test-util/source-stream-close-test-template.js";
import { syncInputs } from "./test-util/sync-inputs.js";
import { ToArrayStream } from "./to-array-stream.js";
import { delay } from "./util.js";

describe(`takeWhileStreamFactory Test`, () => {
const syncF = (n: number): boolean => {
return n < 5;
};
const asyncF = async (n: number): Promise<boolean> => {
await delay(100);
return syncF(n);
};
const expected: Array<number> = [1, 2, 3, 4];
const testCases = [
{ name: `sync data + sync predicate`, f: syncF, inputs: syncInputs },
{ name: `sync data + async predicate`, f: asyncF, inputs: syncInputs },
{ name: `async data + sync predicate`, f: syncF, inputs: asyncInputs },
{ name: `async data + async predicate`, f: asyncF, inputs: asyncInputs },
];

describe(`outputs while the predicate function returns true and exits.`, () => {
for (const { name, f, inputs } of testCases) {
describe(name, () => {
it(`no curry version`, async () => {
const outputs: Array<number> = [];
await pipeline(
takeWhileStreamFactory({ f: f }, Readable.from(inputs())),
new ToArrayStream({ target: outputs }, { objectMode: true }),
);
expect(outputs).toEqual(expected);
});

it(`curry version`, async () => {
const outputs: Array<number> = [];
await pipeline(
takeWhileStreamFactory({ f: f })(Readable.from(inputs())),
new ToArrayStream({ target: outputs }, { objectMode: true }),
);
expect(outputs).toEqual(expected);
});
});
}
});

describe(`close source stream when close wrapped stream.`, () => {
for (const { name, f, inputs } of testCases) {
describe(name, () => {
it(`no curry version`, async () => {
const sourceStream = Readable.from(inputs());
const wrappedStream = Readable.from(
takeWhileStreamFactory({ f: f }, sourceStream),
);
await sourceStreamCloseTestTemplate(sourceStream, wrappedStream);
});

it(`curry version`, async () => {
const sourceStream = Readable.from(inputs());
const wrappedStream = Readable.from(
takeWhileStreamFactory({ f: f })(sourceStream),
);
await sourceStreamCloseTestTemplate(sourceStream, wrappedStream);
});
});
}
});
});
37 changes: 37 additions & 0 deletions src/take-while-stream-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
type Predicate<T> = (value: T) => boolean | Promise<boolean>;
type Options<T> = { f: Predicate<T> };
type Stream<T> = Iterable<T> | AsyncIterable<T>;
type Result<T> = AsyncGenerator<T>;

export function takeWhileStreamFactory<T>(
options: Options<T>,
): (stream: Stream<T>) => Result<T>;
export function takeWhileStreamFactory<T>(
options: Options<T>,
stream: Stream<T>,
): Result<T>;
export function takeWhileStreamFactory<T>(
options: Options<T>,
stream?: Stream<T>,
): ((stream: Stream<T>) => Result<T>) | Result<T> {
if (stream != null) {
return _takeWhileStreamFactory(options, stream);
}

return (stream: Stream<T>): Result<T> => {
return _takeWhileStreamFactory(options, stream);
};
}

async function* _takeWhileStreamFactory<T>(
options: Options<T>,
stream: Stream<T>,
): Result<T> {
for await (const value of stream) {
const isEnd = !(await options.f(value));
if (isEnd) {
return;
}
yield value;
}
}
14 changes: 2 additions & 12 deletions src/take-while-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { TakeWhileStream } from "./take-while-stream.js";
import { asyncInputs } from "./test-util/async-inputs.js";
import { syncInputs } from "./test-util/sync-inputs.js";
import { ToArrayStream } from "./to-array-stream.js";
import { delay } from "./util.js";

describe(`TakeWhileStream Test`, () => {
function* syncInputs() {
for (const n of [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
yield n;
}
}
async function* asyncInputs() {
for (const n of syncInputs()) {
await delay(50);
yield n;
await delay(50);
}
}
const syncF = (n: number): boolean => {
return n < 5;
};
Expand Down
10 changes: 10 additions & 0 deletions src/test-util/async-inputs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { delay } from "../util.js";
import { syncInputs } from "./sync-inputs.js";

export async function* asyncInputs() {
for (const n of syncInputs()) {
await delay(50);
yield n;
await delay(50);
}
}
Loading

0 comments on commit 08c6f68

Please sign in to comment.