Skip to content

Commit

Permalink
observable support
Browse files Browse the repository at this point in the history
  • Loading branch information
LoaderB0T committed Sep 5, 2023
1 parent 5a71cd7 commit 5fc349c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
8 changes: 5 additions & 3 deletions docs/docs/miscellaneous/batching.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ In this case, subscribers will only receive **one** emission instead of two.

## emitOnceAsync

In some cases, you might need to use `emitOnce` with async functions. To do so, you can use `emitOnceAsync`:
In some cases, you might need to use `emitOnce` with async functions or observables. To do so, you can use `emitOnceAsync`:

```ts title=todos.repository.ts
export async function updateCount() {
Expand Down Expand Up @@ -140,7 +140,9 @@ await emitOnceAsync(async () => {
});
```

You can also provide an observable to `emitOnceAsync`, in this case, the store will only update when the observable emits its **first** value.

Using `emitOnceAsync` inside `emitOnce` will not work as expected because `emitOnce` will not wait for the async function to finish.

Use `emitOnceAsync` with caution, the store will not update until the async function finishes.
If your async function takes too long to finish, the app might appear unresponsive.
Use `emitOnceAsync` with caution, the store will not update until the async function finishes or the observable emits its first value.
If your async function or observable takes too long to finish, the app might appear unresponsive.
42 changes: 42 additions & 0 deletions packages/store/src/lib/batch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Subject, map } from 'rxjs';
import {
emitOnce,
batchInProgress,
Expand Down Expand Up @@ -363,6 +364,47 @@ test('nested batch in async batch', async () => {
expect(spy).toHaveBeenCalledWith(21);
});

test('async batch with observable', async () => {
const store = createStore(
{
name: 'todos',
},
withProps<{ name: string; count: number }>({ name: 'foo', count: 1 })
);

const spy = jest.fn();
store.pipe(select((s) => s.count)).subscribe(spy);

expect(batchInProgress.getValue()).toBeFalsy();
expect(asyncBatchesInProgress).toBe(0);

const obs$ = new Subject<void>();

const v = emitOnceAsync(() => obs$.pipe(map(() => {
for (let i = 0; i < 10; i++) {
store.update((s) => ({
...s,
count: s.count + 1,
}));
}
})));


expect(batchInProgress.getValue()).toBeTruthy();
expect(asyncBatchesInProgress).toBe(1);

obs$.next();

await v;

expect(batchInProgress.getValue()).toBeFalsy();
expect(asyncBatchesInProgress).toBe(0);

expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenCalledWith(1);
expect(spy).toHaveBeenCalledWith(11);
});

class Deferred {
public promise: Promise<unknown>;
public resolve!: (value: unknown) => void;
Expand Down
8 changes: 5 additions & 3 deletions packages/store/src/lib/batch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BehaviorSubject } from 'rxjs';
import { BehaviorSubject, Observable, firstValueFrom, isObservable } from 'rxjs';
import { filter, take } from 'rxjs/operators';

export let asyncBatchesInProgress = 0;
Expand All @@ -23,12 +23,14 @@ export function emitOnce<T>(cb: () => T) {
return cb();
}

export async function emitOnceAsync<T>(cb: () => Promise<T>) {
export async function emitOnceAsync<T>(cb: () => Promise<T> | Observable<T>) {
asyncBatchesInProgress++;
if (!batchInProgress.getValue()) {
batchInProgress.next(true);
}
const value = await cb();

const callbackReturnValue = cb();
const value = await (isObservable(callbackReturnValue) ? firstValueFrom(callbackReturnValue) : callbackReturnValue);
if (--asyncBatchesInProgress === 0) {
batchInProgress.next(false);
}
Expand Down

0 comments on commit 5fc349c

Please sign in to comment.