Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Sep 22, 2023
1 parent 63967b6 commit f56cb41
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
const msg = AwakeableEntryMessage.create();
const promise = this.stateMachine
.handleUserCodeMessage<Buffer>(AWAKEABLE_ENTRY_MESSAGE_TYPE, msg)
.then((result: Buffer | void) => {
.then2((result: Buffer | void) => {
if (!(result instanceof Buffer)) {
// This should either be a filled buffer or an empty buffer but never anything else.
throw RetryableError.internal(
Expand Down
52 changes: 41 additions & 11 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
completedFlag?: boolean,
protocolVersion?: number,
requiresAckFlag?: boolean
): Promise<T | void> {
): WrappedPromise<T | void> {
// if the state machine is already closed, return a promise that never
// completes, so that the user code does not resume
if (this.stateMachineClosed) {
return new CompletablePromise<T>().promise;
return wrapDeeply(new CompletablePromise<T>().promise);
}

const promise = this.journal.handleUserSideMessage(messageType, message);
Expand Down Expand Up @@ -151,17 +151,14 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
);
}

if (!p.SUSPENSION_TRIGGERS.includes(messageType)) {
return promise;
}

const proxified: Promise<T> = wrapDeeply(promise, () => {
return wrapDeeply(promise, () => {
if (!p.SUSPENSION_TRIGGERS.includes(messageType)) {
return;
}
if (this.journal.isUnResolved(journalIndex)) {
this.scheduleSuspension();
}
});

return proxified;
}

/**
Expand Down Expand Up @@ -499,9 +496,40 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
* Returns a promise that wraps the original promise and calls cb() at the first time
* this promise or any nested promise that is chained to it is awaited. (then-ed)
*/
const wrapDeeply = <T>(promise: Promise<T>, cb: () => void): Promise<T> => {

/* eslint-disable @typescript-eslint/no-explicit-any */
export type WrappedPromise<T> = Promise<T> & {
then2: <TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
| null
| undefined,
onrejected?:
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| null
| undefined
) => Promise<TResult1 | TResult2>;
};

const wrapDeeply = <T>(
promise: Promise<T>,
cb?: () => void
): WrappedPromise<T> => {
/* eslint-disable @typescript-eslint/no-explicit-any */
return {
then2: function <TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
| null
| undefined,
onrejected?:
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| null
| undefined
): Promise<TResult1 | TResult2> {
return wrapDeeply(promise.then(onfulfilled, onrejected), cb);
},

then: function <TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
Expand All @@ -512,7 +540,9 @@ const wrapDeeply = <T>(promise: Promise<T>, cb: () => void): Promise<T> => {
| null
| undefined
): Promise<TResult1 | TResult2> {
cb();
if (cb !== undefined) {
cb();
}
return promise.then(onfulfilled, onrejected);
},
catch: function <TResult = never>(
Expand Down

0 comments on commit f56cb41

Please sign in to comment.