Skip to content

Commit

Permalink
Merge pull request #859 from thefrontside/stream-as-operation
Browse files Browse the repository at this point in the history
♻️ Return to Stream as Operation
  • Loading branch information
taras authored Dec 16, 2023
2 parents 6c1e43a + 8c04411 commit 78e0d59
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 71 deletions.
2 changes: 1 addition & 1 deletion lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function subscribe<T, R>(iter: AsyncIterator<T, R>): Subscription<T, R> {
*/
export function stream<T, R>(iterable: AsyncIterable<T, R>): Stream<T, R> {
return {
*subscribe() {
*[Symbol.iterator]() {
return subscribe(iterable[Symbol.asyncIterator]());
},
};
Expand Down
6 changes: 3 additions & 3 deletions lib/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export interface Channel<T, TClose> extends Stream<T, TClose> {
*
* yield* channel.send('too early'); // the channel has no subscribers yet!
*
* let subscription1 = yield* channel.subscribe();
* let subscription2 = yield* channel.subscribe();
* let subscription1 = yield* channel;
* let subscription2 = yield* channel;
*
* yield* channel.send('hello');
* yield* channel.send('world');
Expand All @@ -58,6 +58,6 @@ export function createChannel<T, TClose = void>(): Channel<T, TClose> {
return {
send: lift(signal.send),
close: lift(signal.close),
subscribe: signal.subscribe,
[Symbol.iterator]: signal[Symbol.iterator],
};
}
2 changes: 1 addition & 1 deletion lib/each.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { createContext } from "./context.ts";
export function each<T>(stream: Stream<T, unknown>): Operation<Iterable<T>> {
return {
*[Symbol.iterator]() {
let subscription = yield* stream.subscribe();
let subscription = yield* stream;
let current = yield* subscription.next();
let stack = yield* EachStack.get();
if (!stack) {
Expand Down
32 changes: 14 additions & 18 deletions lib/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function once<
>(target: T, name: K): Operation<EventTypeFromEventTarget<T, K>> {
return {
*[Symbol.iterator]() {
let subscription = yield* on(target, name).subscribe();
let subscription = yield* on(target, name);
let next = yield* subscription.next();
return next.value;
},
Expand All @@ -55,23 +55,19 @@ export function on<
T extends EventTarget,
K extends EventList<T> | (string & {}),
>(target: T, name: K): Stream<EventTypeFromEventTarget<T, K>, never> {
return {
subscribe() {
return resource(function* (provide) {
let { send, subscribe } = createSignal<Event>();
return resource(function* (provide) {
let signal = createSignal<Event>();

target.addEventListener(name, send);
target.addEventListener(name, signal.send);

try {
yield* provide(
yield* subscribe() as Operation<
Subscription<EventTypeFromEventTarget<T, K>, never>
>,
);
} finally {
target.removeEventListener(name, send);
}
});
},
};
try {
yield* provide(
yield* signal as Operation<
Subscription<EventTypeFromEventTarget<T, K>, never>
>,
);
} finally {
target.removeEventListener(name, signal.send);
}
});
}
8 changes: 4 additions & 4 deletions lib/signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ export interface Signal<T, TClose> extends Stream<T, TClose> {
* ```javascript
* export function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
* return {
* *subscribe() {
* *[Symbol.iterator]() {
* const actions = yield* ActionContext;
* yield* QueueFactory.set(() => createFilterQueue(matcher(pattern));
* return yield* actions.subscribe();
* return yield* actions;
* }
* }
* }
Expand Down Expand Up @@ -116,7 +116,7 @@ export const SignalQueueFactory = createContext(
export function createSignal<T, TClose = never>(): Signal<T, TClose> {
let subscribers = new Set<Queue<T, TClose>>();

let useSubscription = resource<Subscription<T, TClose>>(function* (provide) {
let subscribe = resource<Subscription<T, TClose>>(function* (provide) {
let newQueue = yield* SignalQueueFactory;
let queue = newQueue<T, TClose>();
subscribers.add(queue);
Expand All @@ -140,5 +140,5 @@ export function createSignal<T, TClose = never>(): Signal<T, TClose> {
}
}

return { send, close, subscribe: () => useSubscription };
return { ...subscribe, send, close };
}
5 changes: 1 addition & 4 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ export interface Scope {
*
* @see https://frontside.com/effection/docs/collections#stream
*/
//export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;
export interface Stream<T, TReturn> {
subscribe(): Operation<Subscription<T, TReturn>>;
}
export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;

/**
* The Effection equivalent of an [`AsyncIterator`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator)
Expand Down
19 changes: 10 additions & 9 deletions test/channel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ describe("Channel", () => {
$afterEach(() => close());

it("does not use the same event twice when serially subscribed to a channel", function* () {
let { subscribe, ...input } = createChannel<string, void>();
let input = createChannel<string, void>();

let actual: string[] = [];
function* channel() {
yield* sleep(10);
Expand All @@ -29,11 +30,11 @@ describe("Channel", () => {
function* root() {
yield* spawn(channel);

let subscription = yield* subscribe();
let subscription = yield* input;
let result = yield* subscription.next();
actual.push(result.value as string);

subscription = yield* subscribe();
subscription = yield* input;
result = yield* subscription.next();
actual.push(result.value as string);
}
Expand All @@ -51,7 +52,7 @@ describe("Channel", () => {

describe("sending a message", () => {
it("receives message on subscription", function* () {
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
yield* channel.send("hello");
let result = yield* subscription.next();
expect(result.done).toEqual(false);
Expand All @@ -61,7 +62,7 @@ describe("Channel", () => {

describe("blocking on next", () => {
it("receives message on subscription done", function* () {
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
let result = yield* spawn(() => subscription.next());
yield* sleep(10);
yield* channel.send("hello");
Expand All @@ -71,7 +72,7 @@ describe("Channel", () => {

describe("sending multiple messages", () => {
it("receives messages in order", function* () {
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
let { send } = channel;
yield* send("hello");
yield* send("foo");
Expand All @@ -86,7 +87,7 @@ describe("Channel", () => {
it("receives message on subscribable end", function* () {
let channel = createChannel();

let subscription = yield* channel.subscribe();
let subscription = yield* channel;

yield* channel.send("hello");

Expand All @@ -101,7 +102,7 @@ describe("Channel", () => {
describe("without argument", () => {
it("closes subscriptions", function* () {
let channel = createChannel();
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
yield* channel.send("foo");
yield* channel.close();
expect(yield* subscription.next()).toEqual({
Expand All @@ -118,7 +119,7 @@ describe("Channel", () => {
describe("with close argument", () => {
it("closes subscriptions with the argument", function* () {
let channel = createChannel<string, number>();
let subscription = yield* channel.subscribe();
let subscription = yield* channel;
yield* channel.send("foo");
yield* channel.close(12);

Expand Down
22 changes: 11 additions & 11 deletions test/each.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ import { createChannel, each, run, spawn, suspend } from "../mod.ts";
describe("each", () => {
it("can be used to iterate a stream", async () => {
await run(function* () {
let { subscribe, ...input } = createChannel<string, void>();
let channel = createChannel<string, void>();
let actual = [] as string[];
yield* spawn(function* () {
for (let value of yield* each({ subscribe })) {
for (let value of yield* each(channel)) {
actual.push(value);
yield* each.next();
}
});

yield* input.send("one");
yield* input.send("two");
yield* input.send("three");
yield* channel.send("one");
yield* channel.send("two");
yield* channel.send("three");

expect(actual).toEqual(["one", "two", "three"]);
});
Expand Down Expand Up @@ -53,30 +53,30 @@ describe("each", () => {

it("handles context correctly if you break out of a loop", async () => {
await expect(run(function* () {
let { subscribe, ...input } = createChannel<string>();
let channel = createChannel<string>();

yield* spawn(function* () {
for (let _ of yield* each({ subscribe })) {
for (let _ of yield* each(channel)) {
break;
}
// we're out of the loop, each.next() should be invalid.
yield* each.next();
});

yield* input.send("hello");
yield* channel.send("hello");
yield* suspend();
})).rejects.toHaveProperty("name", "IterationError");
});

it("throws an error if you forget to invoke each.next()", async () => {
await expect(run(function* () {
let { subscribe, ...input } = createChannel<string>();
let channel = createChannel<string>();
yield* spawn(function* () {
for (let _ of yield* each({ subscribe })) {
for (let _ of yield* each(channel)) {
_;
}
});
yield* input.send("hello");
yield* channel.send("hello");
yield* suspend();
})).rejects.toHaveProperty("name", "IterationError");
});
Expand Down
38 changes: 18 additions & 20 deletions www/docs/collections.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ await main(function*() {
// the channel has no subscribers yet!
yield* channel.send('too early');

let subscription1 = yield* channel.subscribe();
let subscription2 = yield* channel.subscribe();
let subscription1 = yield* channel;
let subscription2 = yield* channel;

yield* send('hello');
yield* send('world');
Expand Down Expand Up @@ -178,7 +178,7 @@ imchannel { main, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();

let subscription = yield* channel.subscribe();
let subscription = yield* channel;

yield* spawn(function*() {
yield* sleep(1000);
Expand Down Expand Up @@ -216,7 +216,7 @@ import { main, sleep, spawn, createChannel } from "effection";
await main(function*() {
let channel = createChannel();

let subscription = yield* channel.subscribe();
let subscription = yield* channel;

yield* spawn(function*() {
yield* channel.send('hello');
Expand Down Expand Up @@ -344,8 +344,8 @@ function* logAndCancel(button) {
```
It turns out that [resources][resources] are just what we need to make
this happen. If you recall, a [`Stream`][stream] is just has a
subscribe() [`Operation`][operation] that returns a
this happen. If you recall, a [`Stream`][stream] is just an
[`Operation`][operation] that returns a
[`Subscription`][subscription]. So the simplest way to implement such
an operation is as a _resource that provides a subscription_.
Expand All @@ -358,20 +358,18 @@ Armed with resources, we can now implement our hypothetical `clicksOn` utility.
import { resource } from "effection"

export function clicksOn(button) {
return {
subscribe: () => resource(function*(provide) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);

let subscription = yield* clicks.subscribe();
yield* provide(subscription);

} finally {
button.removeEventListener("click", send);
}
}),
};
return resource(function*(provide) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);

let subscription = yield* clicks.subscribe();
yield* provide(subscription);

} finally {
button.removeEventListener("click", send);
}
});
}
```
Expand Down

0 comments on commit 78e0d59

Please sign in to comment.