diff --git a/src/promise_combinator_tracker.ts b/src/promise_combinator_tracker.ts index d2396f0c..2773711d 100644 --- a/src/promise_combinator_tracker.ts +++ b/src/promise_combinator_tracker.ts @@ -23,53 +23,92 @@ export function newJournalEntryPromiseId(entryIndex: number): PromiseId { } /** - * Replay a Promise combinator + * Prepare a Promise combinator * * @param combinatorIndex the index of this combinator * @param combinatorConstructor the function that creates the combinator promise, e.g. Promise.all/any/race/allSettled * @param promises the promises given by the user, and the respective ids * @param readReplayOrder the function to read the replay order + * @param onNewCompleted callback when a child entry is resolved + * @param onCombinatorResolved callback when the combinator is resolved + * @param onCombinatorReplayed callback when the combinator is replayed */ -function prepareReplayedPromiseCombinator( +function preparePromiseCombinator( combinatorIndex: number, combinatorConstructor: (promises: PromiseLike[]) => Promise, promises: Array<{ id: PromiseId; promise: Promise }>, - readReplayOrder: (combinatorIndex: number) => PromiseId[] + readReplayOrder: (combinatorIndex: number) => PromiseId[] | undefined, + onNewCompleted: (combinatorIndex: number, promiseId: PromiseId) => void, + onCombinatorResolved: (combinatorIndex: number) => void, + onCombinatorReplayed: (combinatorIndex: number) => void ): WrappedPromise { // Create the proxy promises and index them const promisesWithProxyPromise = promises.map((v) => ({ id: v.id, - promise: v.promise, + originalPromise: v.promise, proxyPromise: new CompletablePromise(), })); const promisesMap = new Map( promisesWithProxyPromise.map((v) => [ // We need to define a key format for this map... v.id.type.toString() + "-" + v.id.id.toString(), - { promise: v.promise, proxyPromise: v.proxyPromise }, + { originalPromise: v.originalPromise, proxyPromise: v.proxyPromise }, ]) ); - // Create the combinator + // Create the combinator using the proxy promises const combinator = combinatorConstructor( promisesWithProxyPromise.map((v) => v.proxyPromise.promise) + ).finally(() => + // Once the combinator is resolved, notify back. + onCombinatorResolved(combinatorIndex) ); return wrapDeeply(combinator, () => { - // We read the replay order on the await point. - // This is important because when writing the entry, we write it on the await point! const replayOrder = readReplayOrder(combinatorIndex); - // Now follow the replayIndexes order + + if (replayOrder === undefined) { + // We're in processing mode! We need to wire up original promises with proxy promises + for (const { + originalPromise, + proxyPromise, + id, + } of promisesWithProxyPromise) { + originalPromise + // This code works deterministically because the javascript runtime will enqueue + // the listeners of the proxy promise (which are mounted in Promise.all/any) in a single FIFO queue, + // so a subsequent resolve on another proxy promise can't overtake this one. + // + // Some resources: + // * https://stackoverflow.com/questions/38059284/why-does-javascript-promise-then-handler-run-after-other-code + // * https://262.ecma-international.org/6.0/#sec-jobs-and-job-queues + // * https://tr.javascript.info/microtask-queue + .then( + (v) => { + onNewCompleted(combinatorIndex, id); + proxyPromise.resolve(v); + }, + (e) => { + onNewCompleted(combinatorIndex, id); + proxyPromise.reject(e); + } + ); + } + return; + } + + // We're in replay mode, Now follow the replayIndexes order. + onCombinatorReplayed(combinatorIndex); for (const promiseId of replayOrder) { // These are already completed, so once we set the then callback they will be immediately resolved. - const { promise, proxyPromise } = promisesMap.get( + const { originalPromise, proxyPromise } = promisesMap.get( promiseId.type.toString() + "-" + promiseId.id.toString() )!!; // Because this promise is already completed, promise.then will immediately enqueue in the promise microtask queue // the handlers to execute. // See the comment below for more details. - promise.then( + originalPromise.then( (v) => proxyPromise.resolve(v), (e) => proxyPromise.reject(e) ); @@ -77,44 +116,6 @@ function prepareReplayedPromiseCombinator( }); } -/** - * Create a pending promise combinator - * - * @param combinatorIndex this is an index given by the state machine to this combinator. This is passed through to onCombinatorResolved, and can be used to establish an order between combinators (e.g. to make sure order entries are written in order) - * @param combinatorConstructor the function that creates the combinator promise, e.g. Promise.all/any/race/allSettled - * @param promisesWithIds the promises given by the user, and the respective entry indexes - * @param onNewCompleted callback when a child entry is resolved - * @param onCombinatorResolved callback when the combinator is resolved - */ -function createPromiseCombinator( - combinatorIndex: number, - combinatorConstructor: (promises: PromiseLike[]) => Promise, - promisesWithIds: Array<{ id: PromiseId; promise: Promise }>, - onNewCompleted: (combinatorIndex: number, promiseId: PromiseId) => void, - onCombinatorResolved: (combinatorIndex: number) => void -): Promise { - // We still create a proxy promise as then of the child promises, - // because we MUST make sure that onNewCompleted is executed before the promise registered in the combinator gets fulfilled. - const proxyPromises = promisesWithIds.map((promiseWithId) => - // This code works deterministically because the javascript runtime will enqueue - // the listeners of the proxy promise (which are mounted in Promise.all/any) in a single FIFO queue, - // so a subsequent resolve on another proxy promise can't overtake this one. - // - // Some resources: - // * https://stackoverflow.com/questions/38059284/why-does-javascript-promise-then-handler-run-after-other-code - // * https://262.ecma-international.org/6.0/#sec-jobs-and-job-queues - // * https://tr.javascript.info/microtask-queue - promiseWithId.promise.finally(() => - onNewCompleted(combinatorIndex, promiseWithId.id) - ) - ); - - // Create the combinator - return combinatorConstructor(proxyPromises).finally(() => - onCombinatorResolved(combinatorIndex) - ); -} - /** * This class takes care of creating and managing deterministic promise combinators. * @@ -125,44 +126,33 @@ export class PromiseCombinatorTracker { private pendingCombinators: Map = new Map(); constructor( - private readonly readReplayOrder: (combinatorIndex: number) => PromiseId[], + private readonly readReplayOrder: ( + combinatorIndex: number + ) => PromiseId[] | undefined, private readonly onWriteCombinatorOrder: ( combinatorIndex: number, order: PromiseId[] ) => void ) {} - public createCombinatorInReplayMode( + public createCombinator( combinatorConstructor: (promises: PromiseLike[]) => Promise, promises: Array<{ id: PromiseId; promise: Promise }> ): WrappedPromise { const combinatorIndex = this.nextCombinatorIndex; this.nextCombinatorIndex++; - return prepareReplayedPromiseCombinator( - combinatorIndex, - combinatorConstructor, - promises, - this.readReplayOrder - ); - } - - public createCombinatorInProcessingMode( - combinatorConstructor: (promises: PromiseLike[]) => Promise, - promises: Array<{ id: PromiseId; promise: Promise }> - ): Promise { - const combinatorIndex = this.nextCombinatorIndex; - this.nextCombinatorIndex++; - // Prepare combinator order this.pendingCombinators.set(combinatorIndex, []); - return createPromiseCombinator( + return preparePromiseCombinator( combinatorIndex, combinatorConstructor, promises, + this.readReplayOrder, this.appendOrder.bind(this), - this.onCombinatorResolved.bind(this) + this.onCombinatorResolved.bind(this), + this.onCombinatorReplayed.bind(this) ); } @@ -176,12 +166,16 @@ export class PromiseCombinatorTracker { order.push(promiseId); } + private onCombinatorReplayed(idx: number) { + // This avoids republishing the order + this.pendingCombinators.delete(idx); + } + private onCombinatorResolved(idx: number) { const order = this.pendingCombinators.get(idx); if (order === undefined) { - throw new Error( - "Unexpected onCombinatorResolved called with a combinator identifier not registered. This sounds like an implementation bug." - ); + // It was already published + return; } // We don't need this list anymore. diff --git a/src/state_machine.ts b/src/state_machine.ts index 58d519e6..e6798dc0 100644 --- a/src/state_machine.ts +++ b/src/state_machine.ts @@ -202,30 +202,26 @@ export class StateMachine implements RestateStreamConsumer { return WRAPPED_PROMISE_PENDING as WrappedPromise; } - if (this.journal.isProcessing()) { - return wrapDeeply( - this.promiseCombinatorTracker.createCombinatorInProcessingMode( - combinatorConstructor, - promises - ), - () => { - this.scheduleSuspension(); - } - ); - } else { - return this.promiseCombinatorTracker.createCombinatorInReplayMode( + return wrapDeeply( + this.promiseCombinatorTracker.createCombinator( combinatorConstructor, promises - ); - } + ), + () => { + // We must check the next entry, as wrapDeeply executes the outer wrap first and the inner afterward + // Because in the inner wrap increments the journal index, we must check for that. + if (this.journal.isUnResolved(this.getUserCodeJournalIndex() + 1)) { + this.scheduleSuspension(); + } + } + ); } - readCombinatorOrderEntry(combinatorId: number): PromiseId[] { + readCombinatorOrderEntry(combinatorId: number): PromiseId[] | undefined { const wannabeCombinatorEntry = this.journal.readNextReplayEntry(); if (wannabeCombinatorEntry === undefined) { - throw RetryableError.internal( - `Illegal state: expected replay message for combinator, but none found for ${this.journal.getUserCodeJournalIndex()}` - ); + // We're in processing mode + return undefined; } if (wannabeCombinatorEntry.messageType !== COMBINATOR_ENTRY_MESSAGE) { throw RetryableError.internal( @@ -256,8 +252,6 @@ export class StateMachine implements RestateStreamConsumer { writeCombinatorOrderEntry(combinatorId: number, order: PromiseId[]) { if (this.journal.isProcessing()) { - this.journal.incrementUserCodeIndex(); - const combinatorMessage: CombinatorEntryMessage = { combinatorId, journalEntriesOrder: order.map((pid) => pid.id), diff --git a/test/promise_combinator_tracker.test.ts b/test/promise_combinator_tracker.test.ts index 50091cc6..0448fa77 100644 --- a/test/promise_combinator_tracker.test.ts +++ b/test/promise_combinator_tracker.test.ts @@ -26,9 +26,11 @@ describe("PromiseCombinatorTracker with Promise.any", () => { promises ); - // Any doesn't return on first reject - completers[0].reject("bla"); - completers[2].resolve("my value"); + setImmediate(() => { + // Any doesn't return on first reject + completers[0].reject("bla"); + completers[2].resolve("my value"); + }); const { order, result } = await testResultPromise; expect(result).toStrictEqual("my value"); @@ -45,7 +47,9 @@ describe("PromiseCombinatorTracker with Promise.any", () => { promises ); - completers[2].resolve("my value"); + setImmediate(() => { + completers[2].resolve("my value"); + }); const { order, result } = await testResultPromise; expect(result).toStrictEqual("my value"); @@ -94,8 +98,10 @@ describe("PromiseCombinatorTracker with Promise.all", () => { promises ); - completers[2].resolve("my value"); - completers[0].reject("my error"); + setImmediate(() => { + completers[2].resolve("my value"); + completers[0].reject("my error"); + }); const { order, result } = await testResultPromise; expect(result).toStrictEqual("my error"); @@ -110,9 +116,11 @@ describe("PromiseCombinatorTracker with Promise.all", () => { promises ); - completers[2].resolve("my value 2"); - completers[0].resolve("my value 0"); - completers[1].resolve("my value 1"); + setImmediate(() => { + completers[2].resolve("my value 2"); + completers[0].resolve("my value 0"); + completers[1].resolve("my value 1"); + }); const { order, result } = await testResultPromise; expect(result).toStrictEqual(["my value 0", "my value 1", "my value 2"]); @@ -183,23 +191,21 @@ async function testCombinatorInProcessingMode( const resultMap = new Map(); const tracker = new PromiseCombinatorTracker( () => { - throw new Error("Unexpected call"); + return undefined; }, (combinatorIndex, order) => resultMap.set(combinatorIndex, order) ); - return tracker - .createCombinatorInProcessingMode(combinatorConstructor, promises) - .then( - (result) => ({ - order: resultMap.get(0), - result, - }), - (result) => ({ - order: resultMap.get(0), - result, - }) - ); + return tracker.createCombinator(combinatorConstructor, promises).transform( + (result) => ({ + order: resultMap.get(0), + result, + }), + (result) => ({ + order: resultMap.get(0), + result, + }) + ); } async function testCombinatorInReplayMode( @@ -219,7 +225,7 @@ async function testCombinatorInReplayMode( return ( tracker - .createCombinatorInReplayMode(combinatorConstructor, promises) + .createCombinator(combinatorConstructor, promises) // To make sure it behaves like testCombinatorInProcessingMode and always succeeds .transform( (v) => v,