Skip to content

Commit

Permalink
refactor(redux): custom queue impl (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
neurosnap authored Nov 10, 2023
1 parent 544d852 commit a4c160f
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 150 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const task = main(function* () {
} else {
console.error(result.error);
}
yield* each.next;
yield* each.next();
}
});

Expand Down
70 changes: 35 additions & 35 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ export type {
Channel,
Instruction,
Operation,
Port,
Predicate,
Queue,
Reject,
Resolve,
Result,
Scope,
Signal,
Stream,
Subscription,
Task,
} from "https://deno.land/x/[email protected].0/mod.ts";
} from "https://deno.land/x/[email protected].2/mod.ts";
export {
action,
createChannel,
Expand All @@ -23,15 +25,15 @@ export {
expect,
filter,
getframe,
main,
Ok,
resource,
run,
SignalQueueFactory,
sleep,
spawn,
suspend,
useAbortSignal,
} from "https://deno.land/x/[email protected].0/mod.ts";
} from "https://deno.land/x/[email protected].2/mod.ts";

import React from "https://esm.sh/[email protected]?pin=v122";
export { React };
Expand Down
9 changes: 4 additions & 5 deletions fx/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ it(
]);

const res: Result<string>[] = [];
for (const val of yield* each(results.immediate.output)) {
for (const val of yield* each(results.immediate)) {
res.push(val);
yield* each.next;
yield* each.next();
}

yield* results;
Expand Down Expand Up @@ -70,10 +70,9 @@ it(
]);

const res: Result<string>[] = [];
const { output } = results.sequence;
for (const val of yield* each(output)) {
for (const val of yield* each(results.sequence)) {
res.push(val);
yield* each.next;
yield* each.next();
}

yield* results;
Expand Down
8 changes: 4 additions & 4 deletions fx/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export function parallel<T>(operations: OpFn<T>[]) {
tasks.push(
yield* spawn(function* () {
const result = yield* safe(op);
yield* immediate.input.send(result);
yield* immediate.send(result);
return result;
}),
);
Expand All @@ -30,11 +30,11 @@ export function parallel<T>(operations: OpFn<T>[]) {
for (const tsk of tasks) {
const res = yield* tsk;
results.push(res);
yield* sequence.input.send(res);
yield* sequence.send(res);
}

yield* sequence.input.close();
yield* immediate.input.close();
yield* sequence.close();
yield* immediate.close();
});

function* wait(): Operation<Result<T>[]> {
Expand Down
19 changes: 9 additions & 10 deletions matcher.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,50 @@
import type { AnyAction } from "./types.ts";
import type { Predicate } from "./deps.ts";

type ActionType = string;
type GuardPredicate<G extends T, T = unknown> = (arg: T) => arg is G;
type APredicate = (action: AnyAction) => boolean;
type Predicate = (action: AnyAction) => boolean;
type StringableActionCreator<A extends AnyAction = AnyAction> = {
(...args: unknown[]): A;
toString(): string;
};
type SubPattern = APredicate | StringableActionCreator | ActionType;
type SubPattern = Predicate | StringableActionCreator | ActionType;
export type Pattern = SubPattern | SubPattern[];
type ActionSubPattern<Guard extends AnyAction = AnyAction> =
| GuardPredicate<Guard, AnyAction>
| StringableActionCreator<Guard>
| APredicate
| Predicate
| ActionType;
export type ActionPattern<Guard extends AnyAction = AnyAction> =
| ActionSubPattern<Guard>
| ActionSubPattern<Guard>[];

export function matcher(pattern: ActionPattern): Predicate<AnyAction> {
export function matcher(pattern: ActionPattern): Predicate {
if (pattern === "*") {
return function* (input) {
return function (input) {
return !!input;
};
}

if (typeof pattern === "string") {
return function* (input) {
return function (input) {
return pattern === input.type;
};
}

if (Array.isArray(pattern)) {
return function* (input) {
return function (input) {
return pattern.some((p) => matcher(p)(input));
};
}

if (typeof pattern === "function" && Object.hasOwn(pattern, "toString")) {
return function* (input) {
return function (input) {
return pattern.toString() === input.type;
};
}

if (typeof pattern === "function") {
return function* (input) {
return function (input) {
return pattern(input) as boolean;
};
}
Expand Down
2 changes: 0 additions & 2 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export {
each,
Err,
getframe,
main,
Ok,
resource,
run,
Expand All @@ -25,7 +24,6 @@ export type {
Channel,
Instruction,
Operation,
Port,
Result,
Scope,
Stream,
Expand Down
4 changes: 2 additions & 2 deletions npm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ async function main() {
},
],
mappings: {
"https://deno.land/x/[email protected].0/mod.ts": {
"https://deno.land/x/[email protected].2/mod.ts": {
name: "effection",
version: "3.0.0-beta.0",
version: "3.0.0-beta.2",
},
"https://esm.sh/[email protected]?pin=v122": {
name: "react",
Expand Down
14 changes: 14 additions & 0 deletions queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { createQueue } from "./deps.ts";

export function createFilterQueue<T, TClose>(predicate: (v: T) => boolean) {
const queue = createQueue<T, TClose>();

return {
...queue,
add(value: T) {
if (predicate(value)) {
queue.add(value);
}
},
};
}
57 changes: 41 additions & 16 deletions redux/fx.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
import type { Action, Operation, Signal } from "../deps.ts";
import { createContext, each, filter, spawn } from "../deps.ts";
import type { Action, Operation, Queue, Signal, Stream } from "../deps.ts";
import {
createContext,
createQueue,
each,
SignalQueueFactory,
spawn,
} from "../deps.ts";
import { call } from "../fx/mod.ts";
import { ActionPattern, matcher } from "../matcher.ts";
import type { ActionWPayload, AnyAction } from "../types.ts";

import type { StoreLike } from "./types.ts";

export const ActionContext = createContext<Signal<Action, void>>(
"redux:action",
);
export const StoreContext = createContext<StoreLike>("redux:store");

function createFilterQueue<T, TClose>(
predicate: (a: T) => boolean,
): Queue<T, TClose> {
const queue = createQueue<T, TClose>();

return {
...queue,
add(value: T) {
if (predicate(value)) {
queue.add(value);
}
},
};
}

export function* put(action: AnyAction | AnyAction[]) {
const store = yield* StoreContext;
if (Array.isArray(action)) {
Expand Down Expand Up @@ -42,16 +62,22 @@ export function* select<S, R>(selectorFn: (s: S) => R) {
return selectorFn(store.getState() as S);
}

function* createPatternStream(pattern: ActionPattern) {
const signal = yield* ActionContext;
const match = matcher(pattern);
const fd = filter(match)(signal.stream);
return fd;
function useActions(pattern: ActionPattern): Stream<AnyAction, void> {
return {
*subscribe() {
const actions = yield* ActionContext;
const match = matcher(pattern);
yield* SignalQueueFactory.set(() =>
createFilterQueue<AnyAction, void>(match) as any
);
return yield* actions.subscribe();
},
};
}

export function take<P>(pattern: ActionPattern): Operation<ActionWPayload<P>>;
export function* take(pattern: ActionPattern): Operation<Action> {
const fd = yield* createPatternStream(pattern);
const fd = useActions(pattern);
for (const action of yield* each(fd)) {
return action;
}
Expand All @@ -64,10 +90,10 @@ export function* takeEvery<T>(
op: (action: Action) => Operation<T>,
) {
return yield* spawn(function* (): Operation<void> {
const fd = yield* createPatternStream(pattern);
const fd = useActions(pattern);
for (const action of yield* each(fd)) {
yield* spawn(() => op(action));
yield* each.next;
yield* each.next();
}
});
}
Expand All @@ -77,15 +103,15 @@ export function* takeLatest<T>(
op: (action: Action) => Operation<T>,
) {
return yield* spawn(function* (): Operation<void> {
const fd = yield* createPatternStream(pattern);
const fd = useActions(pattern);
let lastTask;

for (const action of yield* each(fd)) {
if (lastTask) {
yield* lastTask.halt();
}
lastTask = yield* spawn(() => op(action));
yield* each.next;
yield* each.next();
}
});
}
Expand All @@ -96,10 +122,9 @@ export function* takeLeading<T>(
op: (action: Action) => Operation<T>,
) {
return yield* spawn(function* (): Operation<void> {
const fd = yield* createPatternStream(pattern);
for (const action of yield* each(fd)) {
while (true) {
const action = yield* take(pattern);
yield* call(() => op(action));
yield* each.next;
}
});
}
Expand Down
Loading

0 comments on commit a4c160f

Please sign in to comment.