Skip to content

Commit

Permalink
Merged prepareReplayedPromiseCombinator with createPromiseCombinator.…
Browse files Browse the repository at this point in the history
… Now on `then` we either replay deterministically, or wire up the proxy promises with listeners to record the order.

We need this change because we cannot establish before the await point whether we're in processing mode or not. The construction of the promise combinator might happen before it's awaited, and its await might be interleaved with other entry writes.
  • Loading branch information
slinkydeveloper committed Jan 17, 2024
1 parent b14b6f5 commit 36752ff
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 114 deletions.
136 changes: 65 additions & 71 deletions src/promise_combinator_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,98 +23,99 @@ export function newJournalEntryPromiseId(entryIndex: number): PromiseId {
}

/**
* Replay a Promise combinator
* Prepare a Promise combinator
*
* @param combinatorIndex the index of this combinator
* @param combinatorConstructor the function that creates the combinator promise, e.g. Promise.all/any/race/allSettled
* @param promises the promises given by the user, and the respective ids
* @param readReplayOrder the function to read the replay order
* @param onNewCompleted callback when a child entry is resolved
* @param onCombinatorResolved callback when the combinator is resolved
* @param onCombinatorReplayed callback when the combinator is replayed
*/
function prepareReplayedPromiseCombinator(
function preparePromiseCombinator(
combinatorIndex: number,
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promises: Array<{ id: PromiseId; promise: Promise<any> }>,
readReplayOrder: (combinatorIndex: number) => PromiseId[]
readReplayOrder: (combinatorIndex: number) => PromiseId[] | undefined,
onNewCompleted: (combinatorIndex: number, promiseId: PromiseId) => void,
onCombinatorResolved: (combinatorIndex: number) => void,
onCombinatorReplayed: (combinatorIndex: number) => void
): WrappedPromise<any> {
// Create the proxy promises and index them
const promisesWithProxyPromise = promises.map((v) => ({
id: v.id,
promise: v.promise,
originalPromise: v.promise,
proxyPromise: new CompletablePromise<any>(),
}));
const promisesMap = new Map(
promisesWithProxyPromise.map((v) => [
// We need to define a key format for this map...
v.id.type.toString() + "-" + v.id.id.toString(),
{ promise: v.promise, proxyPromise: v.proxyPromise },
{ originalPromise: v.originalPromise, proxyPromise: v.proxyPromise },
])
);

// Create the combinator
// Create the combinator using the proxy promises
const combinator = combinatorConstructor(
promisesWithProxyPromise.map((v) => v.proxyPromise.promise)
).finally(() =>
// Once the combinator is resolved, notify back.
onCombinatorResolved(combinatorIndex)
);

return wrapDeeply(combinator, () => {
// We read the replay order on the await point.
// This is important because when writing the entry, we write it on the await point!
const replayOrder = readReplayOrder(combinatorIndex);
// Now follow the replayIndexes order

if (replayOrder === undefined) {
// We're in processing mode! We need to wire up original promises with proxy promises
for (const {
originalPromise,
proxyPromise,
id,
} of promisesWithProxyPromise) {
originalPromise
// This code works deterministically because the javascript runtime will enqueue
// the listeners of the proxy promise (which are mounted in Promise.all/any) in a single FIFO queue,
// so a subsequent resolve on another proxy promise can't overtake this one.
//
// Some resources:
// * https://stackoverflow.com/questions/38059284/why-does-javascript-promise-then-handler-run-after-other-code
// * https://262.ecma-international.org/6.0/#sec-jobs-and-job-queues
// * https://tr.javascript.info/microtask-queue
.then(
(v) => {
onNewCompleted(combinatorIndex, id);
proxyPromise.resolve(v);
},
(e) => {
onNewCompleted(combinatorIndex, id);
proxyPromise.reject(e);
}
);
}
return;
}

// We're in replay mode, Now follow the replayIndexes order.
onCombinatorReplayed(combinatorIndex);
for (const promiseId of replayOrder) {
// These are already completed, so once we set the then callback they will be immediately resolved.
const { promise, proxyPromise } = promisesMap.get(
const { originalPromise, proxyPromise } = promisesMap.get(
promiseId.type.toString() + "-" + promiseId.id.toString()
)!!;

// Because this promise is already completed, promise.then will immediately enqueue in the promise microtask queue
// the handlers to execute.
// See the comment below for more details.
promise.then(
originalPromise.then(
(v) => proxyPromise.resolve(v),
(e) => proxyPromise.reject(e)
);
}
});
}

/**
* Create a pending promise combinator
*
* @param combinatorIndex this is an index given by the state machine to this combinator. This is passed through to onCombinatorResolved, and can be used to establish an order between combinators (e.g. to make sure order entries are written in order)
* @param combinatorConstructor the function that creates the combinator promise, e.g. Promise.all/any/race/allSettled
* @param promisesWithIds the promises given by the user, and the respective entry indexes
* @param onNewCompleted callback when a child entry is resolved
* @param onCombinatorResolved callback when the combinator is resolved
*/
function createPromiseCombinator(
combinatorIndex: number,
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promisesWithIds: Array<{ id: PromiseId; promise: Promise<any> }>,
onNewCompleted: (combinatorIndex: number, promiseId: PromiseId) => void,
onCombinatorResolved: (combinatorIndex: number) => void
): Promise<any> {
// We still create a proxy promise as then of the child promises,
// because we MUST make sure that onNewCompleted is executed before the promise registered in the combinator gets fulfilled.
const proxyPromises = promisesWithIds.map((promiseWithId) =>
// This code works deterministically because the javascript runtime will enqueue
// the listeners of the proxy promise (which are mounted in Promise.all/any) in a single FIFO queue,
// so a subsequent resolve on another proxy promise can't overtake this one.
//
// Some resources:
// * https://stackoverflow.com/questions/38059284/why-does-javascript-promise-then-handler-run-after-other-code
// * https://262.ecma-international.org/6.0/#sec-jobs-and-job-queues
// * https://tr.javascript.info/microtask-queue
promiseWithId.promise.finally(() =>
onNewCompleted(combinatorIndex, promiseWithId.id)
)
);

// Create the combinator
return combinatorConstructor(proxyPromises).finally(() =>
onCombinatorResolved(combinatorIndex)
);
}

/**
* This class takes care of creating and managing deterministic promise combinators.
*
Expand All @@ -125,44 +126,33 @@ export class PromiseCombinatorTracker {
private pendingCombinators: Map<number, PromiseId[]> = new Map();

constructor(
private readonly readReplayOrder: (combinatorIndex: number) => PromiseId[],
private readonly readReplayOrder: (
combinatorIndex: number
) => PromiseId[] | undefined,
private readonly onWriteCombinatorOrder: (
combinatorIndex: number,
order: PromiseId[]
) => void
) {}

public createCombinatorInReplayMode(
public createCombinator(
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promises: Array<{ id: PromiseId; promise: Promise<any> }>
): WrappedPromise<any> {
const combinatorIndex = this.nextCombinatorIndex;
this.nextCombinatorIndex++;

return prepareReplayedPromiseCombinator(
combinatorIndex,
combinatorConstructor,
promises,
this.readReplayOrder
);
}

public createCombinatorInProcessingMode(
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promises: Array<{ id: PromiseId; promise: Promise<any> }>
): Promise<any> {
const combinatorIndex = this.nextCombinatorIndex;
this.nextCombinatorIndex++;

// Prepare combinator order
this.pendingCombinators.set(combinatorIndex, []);

return createPromiseCombinator(
return preparePromiseCombinator(
combinatorIndex,
combinatorConstructor,
promises,
this.readReplayOrder,
this.appendOrder.bind(this),
this.onCombinatorResolved.bind(this)
this.onCombinatorResolved.bind(this),
this.onCombinatorReplayed.bind(this)
);
}

Expand All @@ -176,12 +166,16 @@ export class PromiseCombinatorTracker {
order.push(promiseId);
}

private onCombinatorReplayed(idx: number) {
// This avoids republishing the order
this.pendingCombinators.delete(idx);
}

private onCombinatorResolved(idx: number) {
const order = this.pendingCombinators.get(idx);
if (order === undefined) {
throw new Error(
"Unexpected onCombinatorResolved called with a combinator identifier not registered. This sounds like an implementation bug."
);
// It was already published
return;
}

// We don't need this list anymore.
Expand Down
34 changes: 14 additions & 20 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,30 +202,26 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
return WRAPPED_PROMISE_PENDING as WrappedPromise<any>;
}

if (this.journal.isProcessing()) {
return wrapDeeply(
this.promiseCombinatorTracker.createCombinatorInProcessingMode(
combinatorConstructor,
promises
),
() => {
this.scheduleSuspension();
}
);
} else {
return this.promiseCombinatorTracker.createCombinatorInReplayMode(
return wrapDeeply(
this.promiseCombinatorTracker.createCombinator(
combinatorConstructor,
promises
);
}
),
() => {
// We must check the next entry, as wrapDeeply executes the outer wrap first and the inner afterward
// Because in the inner wrap increments the journal index, we must check for that.
if (this.journal.isUnResolved(this.getUserCodeJournalIndex() + 1)) {
this.scheduleSuspension();
}
}
);
}

readCombinatorOrderEntry(combinatorId: number): PromiseId[] {
readCombinatorOrderEntry(combinatorId: number): PromiseId[] | undefined {
const wannabeCombinatorEntry = this.journal.readNextReplayEntry();
if (wannabeCombinatorEntry === undefined) {
throw RetryableError.internal(
`Illegal state: expected replay message for combinator, but none found for ${this.journal.getUserCodeJournalIndex()}`
);
// We're in processing mode
return undefined;
}
if (wannabeCombinatorEntry.messageType !== COMBINATOR_ENTRY_MESSAGE) {
throw RetryableError.internal(
Expand Down Expand Up @@ -256,8 +252,6 @@ export class StateMachine<I, O> implements RestateStreamConsumer {

writeCombinatorOrderEntry(combinatorId: number, order: PromiseId[]) {
if (this.journal.isProcessing()) {
this.journal.incrementUserCodeIndex();

const combinatorMessage: CombinatorEntryMessage = {
combinatorId,
journalEntriesOrder: order.map((pid) => pid.id),
Expand Down
52 changes: 29 additions & 23 deletions test/promise_combinator_tracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ describe("PromiseCombinatorTracker with Promise.any", () => {
promises
);

// Any doesn't return on first reject
completers[0].reject("bla");
completers[2].resolve("my value");
setImmediate(() => {
// Any doesn't return on first reject
completers[0].reject("bla");
completers[2].resolve("my value");
});

const { order, result } = await testResultPromise;
expect(result).toStrictEqual("my value");
Expand All @@ -45,7 +47,9 @@ describe("PromiseCombinatorTracker with Promise.any", () => {
promises
);

completers[2].resolve("my value");
setImmediate(() => {
completers[2].resolve("my value");
});

const { order, result } = await testResultPromise;
expect(result).toStrictEqual("my value");
Expand Down Expand Up @@ -94,8 +98,10 @@ describe("PromiseCombinatorTracker with Promise.all", () => {
promises
);

completers[2].resolve("my value");
completers[0].reject("my error");
setImmediate(() => {
completers[2].resolve("my value");
completers[0].reject("my error");
});

const { order, result } = await testResultPromise;
expect(result).toStrictEqual("my error");
Expand All @@ -110,9 +116,11 @@ describe("PromiseCombinatorTracker with Promise.all", () => {
promises
);

completers[2].resolve("my value 2");
completers[0].resolve("my value 0");
completers[1].resolve("my value 1");
setImmediate(() => {
completers[2].resolve("my value 2");
completers[0].resolve("my value 0");
completers[1].resolve("my value 1");
});

const { order, result } = await testResultPromise;
expect(result).toStrictEqual(["my value 0", "my value 1", "my value 2"]);
Expand Down Expand Up @@ -183,23 +191,21 @@ async function testCombinatorInProcessingMode(
const resultMap = new Map<number, PromiseId[]>();
const tracker = new PromiseCombinatorTracker(
() => {
throw new Error("Unexpected call");
return undefined;
},
(combinatorIndex, order) => resultMap.set(combinatorIndex, order)
);

return tracker
.createCombinatorInProcessingMode(combinatorConstructor, promises)
.then(
(result) => ({
order: resultMap.get(0),
result,
}),
(result) => ({
order: resultMap.get(0),
result,
})
);
return tracker.createCombinator(combinatorConstructor, promises).transform(
(result) => ({
order: resultMap.get(0),
result,
}),
(result) => ({
order: resultMap.get(0),
result,
})
);
}

async function testCombinatorInReplayMode(
Expand All @@ -219,7 +225,7 @@ async function testCombinatorInReplayMode(

return (
tracker
.createCombinatorInReplayMode(combinatorConstructor, promises)
.createCombinator(combinatorConstructor, promises)
// To make sure it behaves like testCombinatorInProcessingMode and always succeeds
.transform(
(v) => v,
Expand Down

0 comments on commit 36752ff

Please sign in to comment.