Skip to content

Commit

Permalink
Final wireup with API definition for all and race (more to follow)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Jan 17, 2024
1 parent b4dcfe6 commit b14b6f5
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/promise_combinator_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export function newJournalEntryPromiseId(entryIndex: number): PromiseId {
*/
function prepareReplayedPromiseCombinator(
combinatorIndex: number,
combinatorConstructor: (promises: Promise<any>[]) => Promise<any>,
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promises: Array<{ id: PromiseId; promise: Promise<any> }>,
readReplayOrder: (combinatorIndex: number) => PromiseId[]
): WrappedPromise<any> {
Expand Down Expand Up @@ -133,7 +133,7 @@ export class PromiseCombinatorTracker {
) {}

public createCombinatorInReplayMode(
combinatorConstructor: (promises: Promise<any>[]) => Promise<any>,
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promises: Array<{ id: PromiseId; promise: Promise<any> }>
): WrappedPromise<any> {
const combinatorIndex = this.nextCombinatorIndex;
Expand Down
28 changes: 28 additions & 0 deletions src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,34 @@ export interface RestateBaseContext {
* await ctx.sleep(1000);
*/
sleep(millis: number): CombineablePromise<void>;

// --- Promise combinators

/**
* Creates a Promise that is resolved with an array of results when all of the provided Promises
* resolve, or rejected when any Promise is rejected.
*
* See {@link Promise.all} for more details.
*
* @param values An iterable of Promises.
* @returns A new Promise.
*/
all<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }>;

/**
* Creates a Promise that is resolved or rejected when any of the provided Promises are resolved
* or rejected.
*
* See {@link Promise.race} for more details.
*
* @param values An iterable of Promises.
* @returns A new Promise.
*/
race<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<Awaited<T[number]>>;
}

export interface Rand {
Expand Down
64 changes: 60 additions & 4 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ import { Client, SendClient } from "./types/router";
import { RpcRequest, RpcResponse } from "./generated/proto/dynrpc";
import { requestFromArgs } from "./utils/assumpsions";
import { RandImpl } from "./utils/rand";
import {
newJournalEntryPromiseId,
PromiseId,
} from "./promise_combinator_tracker";

export enum CallContexType {
None,
Expand Down Expand Up @@ -225,6 +229,10 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
);
}

rpcGateway(): RpcGateway {
return new RpcContextImpl(this);
}

// DON'T make this function async!!!
// The reason is that we want the erros thrown by the initial checks to be propagated in the caller context,
// and not in the promise context. To understand the semantic difference, make this function async and run the
Expand Down Expand Up @@ -344,6 +352,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
);
}

// -- Awakeables

public awakeable<T>(): { id: string; promise: CombineablePromise<T> } {
this.checkState("awakeable");

Expand Down Expand Up @@ -399,6 +409,44 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
);
}

// -- Combinators

all<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }> {
return this.stateMachine.createCombinator(
Promise.all.bind(Promise),
this.extractPromisesWithIds(values)
);
}

race<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<Awaited<T[number]>> {
return this.stateMachine.createCombinator(
Promise.race.bind(Promise),
this.extractPromisesWithIds(values)
);
}

private extractPromisesWithIds(
promises: Iterable<CombineablePromise<any>>
): Array<{ id: PromiseId; promise: Promise<any> }> {
const outPromises = [];

for (const promise of promises) {
const index = (promise as InternalCombineablePromise<any>).journalIndex;
outPromises.push({
id: newJournalEntryPromiseId(index),
promise: promise,
});
}

return outPromises;
}

// -- Various private methods

private isInSideEffect(): boolean {
const context = RestateGrpcContextImpl.callContext.getStore();
return context?.type === CallContexType.SideEffect;
Expand Down Expand Up @@ -460,10 +508,6 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
},
}) as InternalCombineablePromise<T>;
}

rpcGateway(): RpcGateway {
return new RpcContextImpl(this);
}
}

async function executeWithRetries<T>(
Expand Down Expand Up @@ -626,6 +670,18 @@ export class RpcContextImpl implements RpcContext {
return this.ctx.sleep(millis);
}

all<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }> {
return this.ctx.all(values);
}

race<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<Awaited<T[number]>> {
return this.ctx.race(values);
}

grpcChannel(): RestateGrpcChannel {
return this.ctx;
}
Expand Down
32 changes: 23 additions & 9 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,30 @@ export class StateMachine<I, O> implements RestateStreamConsumer {

// -- Methods related to combinators to wire up promise combinator API with PromiseCombinatorTracker

public createCombinator() {
// TODO create combinator
// if replay { combinator in replay mode } else { combinator in processing mode }
public createCombinator(
combinatorConstructor: (promises: PromiseLike<any>[]) => Promise<any>,
promises: Array<{ id: PromiseId; promise: Promise<any> }>
) {
if (this.stateMachineClosed) {
return WRAPPED_PROMISE_PENDING as WrappedPromise<any>;
}

// We need to wrap deeply again to schedule suspension here!
return wrapDeeply(Promise.resolve("TODO"), () => {
if (this.journal.isUnResolved(0 /* TODO */)) {
this.scheduleSuspension();
}
});
if (this.journal.isProcessing()) {
return wrapDeeply(
this.promiseCombinatorTracker.createCombinatorInProcessingMode(
combinatorConstructor,
promises
),
() => {
this.scheduleSuspension();
}
);
} else {
return this.promiseCombinatorTracker.createCombinatorInReplayMode(
combinatorConstructor,
promises
);
}
}

readCombinatorOrderEntry(combinatorId: number): PromiseId[] {
Expand Down

0 comments on commit b14b6f5

Please sign in to comment.