From c1aaa6f7690645293ab536ac05ab599e18f178f7 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 6 Feb 2024 16:31:49 +0100 Subject: [PATCH 01/11] First draft of workflow API --- examples/workflow_example.ts | 143 ++++++++ package.json | 1 + src/clients/client.ts | 215 ++++++++++++ src/workflows/workflow.ts | 126 ++++++++ src/workflows/workflow_state_service.ts | 378 ++++++++++++++++++++++ src/workflows/workflow_wrapper_service.ts | 330 +++++++++++++++++++ 6 files changed, 1193 insertions(+) create mode 100644 examples/workflow_example.ts create mode 100644 src/clients/client.ts create mode 100644 src/workflows/workflow.ts create mode 100644 src/workflows/workflow_state_service.ts create mode 100644 src/workflows/workflow_wrapper_service.ts diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts new file mode 100644 index 00000000..bc29acd8 --- /dev/null +++ b/examples/workflow_example.ts @@ -0,0 +1,143 @@ +import * as restate from "../src/public_api"; +import * as restate_wf from "../src/workflows/workflow"; +import * as restate_clients from "../src/clients/client"; +import { randomUUID } from "crypto"; + +/* eslint-disable no-console */ + +const restateIngressUrl = process.argv[2] || "http://localhost:8080"; +const restateAdminUrl = process.argv[3] || "http://localhost:9070"; + +// +// (1) Definition of the workflow +// +const myworkflow = restate_wf.workflow("acme.myworkflow", { + // + // Each workflow must have exactly one run() function, which defines + // the life cycle. This function isn't directly started, but indirectly + // via the synthetic start() function. + // + run: async (ctx: restate_wf.WfContext, params: { name: string }) => { + if (!params?.name) { + throw new restate.TerminalError("Missing parameter 'name'"); + } + + ctx.console.log(">>>>>>>>>>> Starting workflow for " + params.name); + + // workflow state can be accessed from other methods. the state becomes + // eventually visible, there is no linearizability for this state + ctx.set("name", params.name); + + // to publish state in a way that other method calls can access it with + // guarantees (await until visible), use promises + ctx.promise("name_promise").resolve(params.name); + + // to listen to signals, also use promises + const signal = ctx.promise("thesignal"); + const message = await signal.promise(); + + const result = `${message} my dear ${params.name}`; + ctx.console.log(">>>>>>>>>>> Finishing workflow with: " + result); + return result; + }, + + // + // Workflows may have an arbitrary number of other functions that take + // a 'SharedWfContext' and have shared access to state and promises + // + + signal: async (ctx: restate_wf.SharedWfContext, req: { signal: string }) => { + ctx.promise("thesignal").resolve(req.signal); + }, + + getName: async (ctx: restate_wf.SharedWfContext): Promise => { + return (await ctx.get("name")) ?? "(not yet set)"; + }, + + awaitName: async (ctx: restate_wf.SharedWfContext): Promise => { + return ctx.promise("name_promise").promise(); + }, +}); + +// typed API similar to how other Restate RPC services work +const workflowApi = myworkflow.api; + +const server = restate.createServer(); +myworkflow.registerServices(server); +server.listen(9080); + +// +// (2) Code to nteract with the workflow using an external client +// +// This submits a workflow and sends signals / queries to the workflow. +// + +async function startWorkflowAndInteract(restateUrl: string) { + const restate = restate_clients.connectRestate(restateUrl); + + const args = { name: "Restatearius" }; + const workflowId = randomUUID(); + + // Option a) we can create clients either with just the workflow service path + await restate.submitWorkflow("acme.myworkflow", workflowId, args); + + // Option b) we can supply the API signature and get a typed interface for all the methods + // Because the submit is idempotent, this call here will effectively attach to the + // previous workflow + const client = await restate.submitWorkflow(workflowApi, workflowId, args); + + // check the status (should be RUNNING) + const status = await client.status(); + console.log("Workflow status: " + status); + + // call method that reads the 'name' state + const get_name = await client.workflowInterface().getName(); + console.log("Workflow getName() (snapshot read): " + get_name); + + // call method that awaits the 'name' promise + const await_name = await client.workflowInterface().awaitName(); + console.log("Workflow awaitName() (promise): " + await_name); + + // send a signal + client.workflowInterface().signal({ signal: "hey ho!" }); + + // wait until everything is done + const result = await client.result(); + console.log("Workflow result: " + result); +} + +// +// (3) To make this example work end-to-end, with the external client below, +// we issue a registration here +// +registerDeployment(restateAdminUrl, 9080) + .then(() => startWorkflowAndInteract(restateIngressUrl)) + .then(() => process.exit(0)) + .catch((err) => { + console.error(err); + process.exit(-1); + }); + +// --------------------- utils ----------------- + +async function registerDeployment(restateAdminAddress: string, port: number) { + const serviceEndpoint = `http://host.docker.internal:${port}`; + const httpResponse = await fetch(restateAdminAddress + "/deployments", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + uri: serviceEndpoint, + }), + }); + + const responseText = await httpResponse.text(); + if (!httpResponse.ok) { + throw new Error( + `Registration failed: STATUS ${httpResponse.status} ; ${responseText}` + ); + } else { + return `Registration succeeded: STATUS ${httpResponse.status} ; ${responseText}`; + } +} diff --git a/package.json b/package.json index ffefd9be..805fa685 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "release": "release-it", "example": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/example.ts", "grpcexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/grpc_example.ts", + "workflowexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/workflow_example.ts", "handlerexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/handler_example.ts", "expressexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/embedded_example.ts" }, diff --git a/src/clients/client.ts b/src/clients/client.ts new file mode 100644 index 00000000..048671b1 --- /dev/null +++ b/src/clients/client.ts @@ -0,0 +1,215 @@ +import * as restate from "../public_api"; +import { + LifecycleStatus, + StatusMessage, + WorkflowConnectedSignature, + WorkflowExternalSignature, + WorkflowRequest, +} from "../workflows/workflow"; + +/* eslint-disable no-console */ + +export interface Restate { + submitWorkflow( + path: string, + workflowId: string, + params: T + ): Promise>; + + submitWorkflow( + workflowApi: restate.ServiceApi>, + workflowId: string, + params: T + ): Promise>; + + connectToWorkflow( + path: string, + workflowId: string + ): Promise>; + + connectToWorkflow( + workflowApi: restate.ServiceApi>, + workflowId: string + ): Promise>; +} + +export interface WorkflowClient { + workflowId(): string; + status(): Promise; // RUNNING / FINISHED / FAILED + + result(): Promise; + + workflowInterface(): restate.Client>; // call methods on workflow + + latestMessage(): Promise; + + getMessages( + fromSeqNum: number + ): AsyncGenerator; +} + +export function connectRestate(uri: string) { + return new RestateImpl(uri); +} + +// ------------------------------ implementation ------------------------------ + +class WorkflowClientImpl implements WorkflowClient { + constructor( + private readonly restateUri: string, + private readonly serviceName: string, + private readonly wfId: string + ) {} + + workflowId(): string { + return this.wfId; + } + + status(): Promise { + return this.makeCall("status", {}); + } + + result(): Promise { + return this.makeCall("waitForResult", {}); + } + + workflowInterface(): restate.Client> { + const clientProxy = new Proxy( + {}, + { + get: (_target, prop) => { + const method = prop as string; + return async (args: unknown) => { + return this.makeCall(method, args); + }; + }, + } + ); + + return clientProxy as restate.Client>; + } + + latestMessage(): Promise { + return this.makeCall("getLatestMessage", {}); + } + + async *getMessages(fromSeqNum: number) { + while (true) { + const msgs: StatusMessage[] = await this.makeCall("pollNextMessages", { + from: fromSeqNum, + }); + for (const msg of msgs) { + yield msg; + } + fromSeqNum += msgs.length; + } + } + + private async makeCall(method: string, args: TT): Promise { + return await makeCall( + this.restateUri, + this.serviceName, + method, + this.wfId, + args + ); + } +} + +class RestateImpl implements Restate { + constructor(private readonly restateUri: string) {} + + async submitWorkflow( + pathOrApi: string | restate.ServiceApi>, + workflowId: string, + params: T + ): Promise> { + const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; + const response = await makeCall( + this.restateUri, + path, + "start", + workflowId, + params + ); + console.log("Start() call completed: Workflow is " + response); + + return new WorkflowClientImpl(this.restateUri, path, workflowId); + } + + async connectToWorkflow( + pathOrApi: string | restate.ServiceApi>, + workflowId: string + ): Promise> { + const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; + return new WorkflowClientImpl(this.restateUri, path, workflowId); + } +} + +// ---------------------------------------------------------------------------- +// Utils +// ---------------------------------------------------------------------------- + +async function makeCall( + restateUri: string, + serviceName: string, + method: string, + workflowId: string, + params: T +): Promise { + if (typeof workflowId !== "string" || workflowId.length === 0) { + throw new Error("missing workflowId"); + } + if (params === undefined) { + params = {} as T; + } + if (typeof params !== "object") { + throw new Error("invalid parameters: must be an object"); + } + + const url = `${restateUri}/${serviceName}/${method}`; + const data = { + request: { workflowId, ...params } satisfies WorkflowRequest, + }; + + let body: string; + try { + body = JSON.stringify(data); + } catch (err) { + throw new Error("Cannot encode request: " + err, { cause: err }); + } + + console.log(`Making call to Restate workflow at ${url} with ${body}`); + + const httpResponse = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body, + }); + + const responseText = await httpResponse.text(); + if (!httpResponse.ok) { + throw new Error(`Request failed: ${httpResponse.status}\n${responseText}`); + } + + let response; + try { + response = JSON.parse(responseText); + } catch (err) { + throw new Error("Cannot parse response JSON: " + err, { cause: err }); + } + + if (response.error) { + throw new Error(response.error); + } + if (response.response) { + return response.response as R; + } + if (Object.keys(response).length === 0) { + return undefined as R; + } + + throw new Error("Unrecognized response object: " + responseText); +} diff --git a/src/workflows/workflow.ts b/src/workflows/workflow.ts new file mode 100644 index 00000000..58fe35f0 --- /dev/null +++ b/src/workflows/workflow.ts @@ -0,0 +1,126 @@ +import * as restate from "../public_api"; +import * as wws from "./workflow_wrapper_service"; +import * as wss from "./workflow_state_service"; + +const STATE_SERVICE_PATH_SUFFIX = "_state"; + +export interface DurablePromise { + promise(): Promise; + peek(): Promise; + + resolve(value?: T): void; + fail(errorMsg: string): void; +} + +export interface SharedWfContext { + workflowId(): string; + + get(stateName: string): Promise; + + promise(name: string): DurablePromise; +} + +export interface WfContext extends SharedWfContext, restate.RpcContext { + // publishMessage(message: string): void; +} + +export enum LifecycleStatus { + NOT_STARTED = "NOT_STARTED", + RUNNING = "RUNNING", + FINISHED = "FINISHED", + FAILED = "FAILED", +} + +export enum WorkflowStartResult { + STARTED = "STARTED", + ALREADY_STARTED = "ALREADY_STARTED", + ALREADY_FINISHED = "ALREADY_FINISHED", +} + +export type StatusMessage = { + sequenceNum: number; + message: string; + timestamp: Date; +}; + +export type WorkflowRequest = T & { workflowId: string }; + +type RunMethodSignature = (ctx: WfContext, params: T) => Promise; +type InteractionMethodSignature = ( + ctx: SharedWfContext, + params: T +) => Promise; + +type WorkflowMethodsSignatures = { + [K in keyof U]: K extends "run" + ? U[K] extends RunMethodSignature + ? U[K] + : "The 'run' methods needs to follow the signature: (ctx: WfContext, params: any) => Promise " + : // eslint-disable-next-line @typescript-eslint/no-explicit-any + U[K] extends InteractionMethodSignature + ? U[K] + : "Methods other than 'run' are interaction methods and need to follow the signature: (ctx: SharedWfContext, params: any) => Promise"; +}; + +export type Workflow = { + run: RunMethodSignature; +} & WorkflowMethodsSignatures; + +export type WorkflowExternalSignature = { + start: (param: WorkflowRequest) => Promise; + waitForResult: (request: WorkflowRequest) => Promise; +} & Omit< + { + [K in keyof U]: U[K] extends InteractionMethodSignature + ? (request: WorkflowRequest) => Promise + : never; + }, + "run" +>; + +export type WorkflowConnectedSignature = Omit< + { + [K in keyof U]: U[K] extends (ctx: SharedWfContext) => Promise + ? () => Promise + : U[K] extends (ctx: SharedWfContext, params: infer T) => Promise + ? (request: T) => Promise + : never; + }, + "run" +>; + +export interface WorkflowServices { + readonly api: restate.ServiceApi>; + + registerServices( + server: restate.RestateServer | restate.LambdaRestateServer + ): void; +} + +export function workflow( + path: string, + workflow: Workflow +): WorkflowServices { + // the state service manages all state and promises for us + const stateServiceRouter = wss.workflowStateService; + const stateServiceApi: restate.ServiceApi = { + path: path + STATE_SERVICE_PATH_SUFFIX, + }; + + // the wrapper service manages life cycle, contexts, delegation to the state service + const wrapperServiceRouter = wws.createWrapperService( + workflow, + path, + stateServiceApi + ); + + return { + api: { path } as restate.ServiceApi>, + registerServices: ( + server: restate.RestateServer | restate.LambdaRestateServer + ) => { + server.bindKeyedRouter(stateServiceApi.path, stateServiceRouter); + server.bindRouter(path, wrapperServiceRouter); + }, + } satisfies WorkflowServices; +} diff --git a/src/workflows/workflow_state_service.ts b/src/workflows/workflow_state_service.ts new file mode 100644 index 00000000..7fd14659 --- /dev/null +++ b/src/workflows/workflow_state_service.ts @@ -0,0 +1,378 @@ +import * as restate from "../public_api"; +import { + LifecycleStatus, + StatusMessage, + WorkflowStartResult, +} from "./workflow"; + +const LIFECYCLE_STATUS_STATE_NAME = "status"; +const RESULT_STATE_NAME = "result"; +const RESULT_LISTENERS_NAME = "result_listeners"; +const STATUS_MESSAGES_STATE_NAME = "messages"; +const STATUS_MESSAGE_LISTENERS = "message_listeners"; +const PROMISE_STATE_PREFIX = "prom_s_"; +const PROMISE_AWAKEABLE_PREFIX = "prom_l_"; +const ALL_NAMES_STATE_NAME = "all_state_names"; + +const RESERVED_STATE_NAMES = [ + LIFECYCLE_STATUS_STATE_NAME, + RESULT_STATE_NAME, + RESULT_LISTENERS_NAME, + ALL_NAMES_STATE_NAME, +]; +const RESERVED_STATE_PREFIXES = [ + PROMISE_STATE_PREFIX, + PROMISE_AWAKEABLE_PREFIX, +]; + +export type ValueOrError = { + value?: T; + error?: string; +}; + +export const workflowStateService = restate.keyedRouter({ + startWorkflow: async ( + ctx: restate.RpcContext + ): Promise => { + const status = + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED; + + if (status !== LifecycleStatus.NOT_STARTED) { + return status === LifecycleStatus.RUNNING + ? WorkflowStartResult.ALREADY_STARTED + : WorkflowStartResult.ALREADY_FINISHED; + } + + ctx.set(LIFECYCLE_STATUS_STATE_NAME, LifecycleStatus.RUNNING); + return WorkflowStartResult.STARTED; + }, + + finishOrFailWorkflow: async ( + ctx: restate.RpcContext, + _workflowId: string, + result: ValueOrError + ): Promise => { + if (result.error === undefined && result.value === undefined) { + throw new restate.TerminalError("Result is undefined"); + } + + const status = + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED; + + if (status !== LifecycleStatus.RUNNING) { + // not sure this can ever happen, but we put this here defensively + throw new restate.TerminalError("Unexpected state: " + status); + } + + const newStatus = result.error + ? LifecycleStatus.FAILED + : LifecycleStatus.FINISHED; + ctx.set(LIFECYCLE_STATUS_STATE_NAME, newStatus); + + await completePromise( + ctx, + RESULT_STATE_NAME, + RESULT_LISTENERS_NAME, + result + ); + }, + + getStatus: async (ctx: restate.RpcContext): Promise => { + return ( + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED + ); + }, + + completePromise: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { promiseName: string; completion: ValueOrError } + ): Promise => { + // we don't accept writes after the workflow is done + if (!(await checkIfRunning(ctx))) { + return; + } + + await completePromise( + ctx, + PROMISE_STATE_PREFIX + req.promiseName, + PROMISE_AWAKEABLE_PREFIX + req.promiseName, + req.completion + ); + }, + + peekPromise: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { promiseName: string } + ): Promise | null> => { + return peekPromise(ctx, PROMISE_STATE_PREFIX + req.promiseName); + }, + + subscribePromise: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { promiseName: string; awkId: string } + ): Promise | null> => { + return subscribePromise( + ctx, + PROMISE_STATE_PREFIX + req.promiseName, + PROMISE_AWAKEABLE_PREFIX + req.promiseName, + req.awkId + ); + }, + + getResult: async ( + ctx: restate.RpcContext + ): Promise | null> => { + return peekPromise(ctx, RESULT_STATE_NAME); + }, + + subscribeResult: async ( + ctx: restate.RpcContext, + workflowId: string, + awkId: string + ): Promise | null> => { + const status = + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED; + if (status === LifecycleStatus.NOT_STARTED) { + throw new restate.TerminalError( + `Workflow with id '${workflowId}' does not exist.` + ); + } + return subscribePromise( + ctx, + RESULT_STATE_NAME, + RESULT_LISTENERS_NAME, + awkId + ); + }, + + getState: async ( + ctx: restate.RpcContext, + _workflowId: string, + stateName: string + ): Promise => { + return ctx.get(stateName); + }, + + setState: async ( + ctx: restate.RpcContext, + _workflowId: string, + request: { stateName: string; value: T } + ): Promise => { + if (!request?.stateName) { + throw new restate.TerminalError("missing state name"); + } + if (request.value === undefined || request.value === null) { + throw new restate.TerminalError("invalid state value: " + request.value); + } + + // if the workflow isn't running (any more) we don't accept state updates + // shouldn't be possible anyways (because only workflow method has access to writable state) + // but we are defensive here against API errors + if (!(await checkIfRunning(ctx))) { + return; + } + + const stateName = request.stateName; + + // guard against overwriting built-in states + for (const reservedStateName of RESERVED_STATE_NAMES) { + if (stateName === reservedStateName) { + throw new restate.TerminalError( + "State name is reserved: " + reservedStateName + ); + } + } + for (const reservedStatePrefix of RESERVED_STATE_PREFIXES) { + if (stateName.startsWith(reservedStatePrefix)) { + throw new restate.TerminalError( + "State prefix is reserved: " + reservedStatePrefix + ); + } + } + + ctx.set(stateName, request.value); + await rememberNewStateName(ctx, stateName); + }, + + clearState: async ( + ctx: restate.RpcContext, + _workflowId: string, + stateName: string + ): Promise => { + ctx.clear(stateName); + }, + + publishMessage: async ( + ctx: restate.RpcContext, + _workflowId: string, + msg: { message: string; timestamp: Date } + ): Promise => { + // append message + const msgs = + (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; + msgs.push({ sequenceNum: msgs.length, ...msg }); + ctx.set(STATUS_MESSAGES_STATE_NAME, msgs); + + // wake up all listeners + const listeners = (await ctx.get(STATUS_MESSAGE_LISTENERS)) ?? []; + for (const awkId of listeners) { + ctx.resolveAwakeable(awkId, {}); + } + ctx.clear(STATUS_MESSAGE_LISTENERS); + }, + + getLatestMessage: async ( + ctx: restate.RpcContext + ): Promise => { + const msgs = + (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; + if (msgs.length === 0) { + return null; + } else { + return msgs[msgs.length - 1]; + } + }, + + pollNextMessages: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { from: number; awakId: string } + ): Promise => { + const msgs = + (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; + if (msgs.length > req.from) { + return msgs.slice(req.from); + } + + // not yet available, register a listener to be woken up when more is available + const listeners = (await ctx.get(STATUS_MESSAGE_LISTENERS)) ?? []; + listeners.push(req.awakId); + ctx.set(STATUS_MESSAGE_LISTENERS, listeners); + return null; + }, + + dispose: async (ctx: restate.RpcContext): Promise => { + const stateNames = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; + for (const stateName of stateNames) { + ctx.clear(stateName); + } + ctx.clear(ALL_NAMES_STATE_NAME); + ctx.clear(STATUS_MESSAGE_LISTENERS); + ctx.clear(STATUS_MESSAGES_STATE_NAME); + ctx.clear(RESULT_LISTENERS_NAME); + ctx.clear(RESULT_STATE_NAME); + ctx.clear(LIFECYCLE_STATUS_STATE_NAME); + }, +}); + +export type api = typeof workflowStateService; + +// ---------------------------------------------------------------------------- + +async function completePromise( + ctx: restate.RpcContext, + stateName: string, + awakeableStateName: string, + completion: ValueOrError +): Promise> { + if (completion.value !== undefined && completion.error !== undefined) { + throw new restate.TerminalError( + "Completion can only be either with value or with error" + ); + } + if (completion.value !== undefined && completion.value === null) { + throw new restate.TerminalError("promise cannot be completed with null"); + } + if (completion.error !== undefined && completion.error === null) { + throw new restate.TerminalError("promise cannot be rejected with null"); + } + + const currVal = await ctx.get>(stateName); + if (currVal !== null) { + // promise already completed + return currVal; + } + + // first completor + // (a) set state + ctx.set(stateName, completion); + await rememberNewStateName(ctx, stateName); + + // (b) complete awaiting awakeables + const listeners = (await ctx.get(awakeableStateName)) ?? []; + listeners.forEach((awkId: string) => { + if (completion.error !== undefined) { + ctx.rejectAwakeable(awkId, completion.error); + } else { + ctx.resolveAwakeable(awkId, completion.value); + } + }); + ctx.clear(awakeableStateName); + + return completion; +} + +async function subscribePromise( + ctx: restate.RpcContext, + stateName: string, + awakeableStateName: string, + awakeableId: string +): Promise | null> { + const currVal = await ctx.get>(stateName); + + // case (a), we have a value already + if (currVal !== null) { + if (currVal.error !== undefined) { + ctx.rejectAwakeable(awakeableId, currVal.error); + } else { + ctx.resolveAwakeable(awakeableId, currVal.value); + } + return currVal; + } + + // case (b), we remember the awk Id and get when we have a value + // but only if the workflow is still running + if (!(await checkIfRunning(ctx))) { + const response = { + error: "Promised will never resolve because workflow is not running", + }; + ctx.rejectAwakeable(awakeableId, response.error); + return response; + } + + const listeners = (await ctx.get(awakeableStateName)) ?? []; + if (listeners.length === 0) { + await rememberNewStateName(ctx, awakeableStateName); + } + listeners.push(awakeableId); + ctx.set(awakeableStateName, listeners); + return null; +} + +async function peekPromise( + ctx: restate.RpcContext, + stateName: string +): Promise | null> { + return ctx.get>(stateName); +} + +async function rememberNewStateName( + ctx: restate.RpcContext, + stateName: string +) { + const names = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; + names.push(stateName); + ctx.set(ALL_NAMES_STATE_NAME, names); +} + +async function checkIfRunning(ctx: restate.RpcContext): Promise { + const status = await ctx.get(LIFECYCLE_STATUS_STATE_NAME); + return status === LifecycleStatus.RUNNING; +} diff --git a/src/workflows/workflow_wrapper_service.ts b/src/workflows/workflow_wrapper_service.ts new file mode 100644 index 00000000..c3de1196 --- /dev/null +++ b/src/workflows/workflow_wrapper_service.ts @@ -0,0 +1,330 @@ +import * as restate from "../public_api"; +import * as wf from "./workflow"; +import * as wss from "./workflow_state_service"; + +// ---------------------------------------------------------------------------- +// Workflow Context Implementations +// ---------------------------------------------------------------------------- + +class SharedPromiseImpl implements wf.DurablePromise { + constructor( + private readonly workflowId: string, + private readonly promiseName: string, + private readonly ctx: restate.RpcContext, + private readonly stateServiceApi: restate.ServiceApi + ) {} + + promise(): Promise { + const awk = this.ctx.awakeable(); + + this.ctx.send(this.stateServiceApi).subscribePromise(this.workflowId, { + promiseName: this.promiseName, + awkId: awk.id, + }); + + return awk.promise; + } + + async peek(): Promise { + const result = await this.ctx + .rpc(this.stateServiceApi) + .peekPromise(this.workflowId, { promiseName: this.promiseName }); + + if (result === null) { + return null; + } + if (result.error !== undefined) { + return Promise.reject(new Error(result.error)); + } + return Promise.resolve(result.value as T); + } + + resolve(value?: T): void { + this.ctx.send(this.stateServiceApi).completePromise(this.workflowId, { + promiseName: this.promiseName, + completion: { value }, + }); + } + + fail(errorMsg: string): void { + this.ctx.send(this.stateServiceApi).completePromise(this.workflowId, { + promiseName: this.promiseName, + completion: { error: errorMsg }, + }); + } +} + +class SharedContextImpl implements wf.SharedWfContext { + constructor( + protected readonly ctx: restate.RpcContext, + protected readonly wfId: string, + protected readonly stateServiceApi: restate.ServiceApi + ) {} + + workflowId(): string { + return this.wfId; + } + + get(stateName: string): Promise { + return this.ctx + .rpc(this.stateServiceApi) + .getState(this.wfId, stateName) as Promise; + } + + promise(name: string): wf.DurablePromise { + return new SharedPromiseImpl( + this.wfId, + name, + this.ctx, + this.stateServiceApi + ); + } +} + +class ExclusiveContextImpl extends SharedContextImpl implements wf.WfContext { + public readonly id: Buffer; + public readonly serviceName: string; + public readonly rand: restate.Rand; + public readonly console: Console; + + constructor( + ctx: restate.RpcContext, + wfId: string, + stateServiceApi: restate.ServiceApi + ) { + super(ctx, wfId, stateServiceApi); + this.id = ctx.id; + this.serviceName = ctx.serviceName; + this.rand = ctx.rand; + this.console = ctx.console; + } + + publishMessage(message: string): void { + this.ctx + .send(this.stateServiceApi) + .publishMessage(this.wfId, { message, timestamp: new Date() }); + } + + grpcChannel(): restate.RestateGrpcChannel { + return this.ctx.grpcChannel(); + } + + set(stateName: string, value: T): void { + this.ctx + .send(this.stateServiceApi) + .setState(this.wfId, { stateName, value }); + } + + clear(stateName: string): void { + this.ctx.send(this.stateServiceApi).clearState(this.wfId, stateName); + } + + sideEffect( + fn: () => Promise, + retryPolicy?: restate.RestateUtils.RetrySettings + ): Promise { + return this.ctx.sideEffect(fn, retryPolicy); + } + + awakeable(): { id: string; promise: restate.CombineablePromise } { + return this.ctx.awakeable(); + } + resolveAwakeable(id: string, payload: T): void { + this.ctx.resolveAwakeable(id, payload); + } + rejectAwakeable(id: string, reason: string): void { + this.ctx.rejectAwakeable(id, reason); + } + + sleep(millis: number): restate.CombineablePromise { + return this.ctx.sleep(millis); + } + + rpc(opts: restate.ServiceApi): restate.Client { + return this.ctx.rpc(opts); + } + send(opts: restate.ServiceApi): restate.SendClient { + return this.ctx.send(opts); + } + sendDelayed( + opts: restate.ServiceApi, + delay: number + ): restate.SendClient { + return this.ctx.sendDelayed(opts, delay); + } +} + +// ---------------------------------------------------------------------------- + +// ---------------------------------------------------------------------------- + +export function createWrapperService( + workflow: wf.Workflow, + path: string, + stateServiceApi: restate.ServiceApi +) { + const wrapperService = { + start: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + + const started = await ctx + .rpc(stateServiceApi) + .startWorkflow(request.workflowId); + if (started === wf.WorkflowStartResult.STARTED) { + ctx.send(wrapperServiceApi).run(request); + } + return started; + }, + + run: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + + const wfCtx = new ExclusiveContextImpl( + ctx, + request.workflowId, + stateServiceApi + ); + try { + const result = await workflow.run(wfCtx, request); + const resultValue = result !== undefined ? result : {}; + await ctx + .rpc(stateServiceApi) + .finishOrFailWorkflow(request.workflowId, { value: resultValue }); + return result; + } catch (err) { + const msg = stringifyError(err); + await ctx + .rpc(stateServiceApi) + .finishOrFailWorkflow(request.workflowId, { error: msg }); + throw err; + } finally { + ctx + .sendDelayed(stateServiceApi, 2 * 60 * 1000) + .dispose(request.workflowId); + } + }, + + waitForResult: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + + const awakeable = ctx.awakeable(); + await ctx + .rpc(stateServiceApi) + .subscribeResult(request.workflowId, awakeable.id); + return awakeable.promise; + }, + + status: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + return ctx.rpc(stateServiceApi).getStatus(request.workflowId); + }, + + getLatestMessage: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + return ctx.rpc(stateServiceApi).getLatestMessage(request.workflowId); + }, + + pollNextMessages: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest<{ from: number }> + ): Promise => { + checkRequestAndWorkflowId(request); + + // eslint-disable-next-line no-constant-condition + while (true) { + const awk = ctx.awakeable(); + const messages = await ctx + .rpc(stateServiceApi) + .pollNextMessages(request.workflowId, { + from: request.from, + awakId: awk.id, + }); + if (messages !== undefined && messages !== null) { + return messages; + } + + await awk.promise; + } + }, + }; + + // add all the interaction methods to the wrapper service + for (const [route, handler] of Object.entries(workflow)) { + if (typeof handler !== "function" || route === "run") { + continue; + } + if (handler.length < 1 || handler.length > 2) { + throw new Error( + "Workflow function does not conform to correct signature: must have at least one argument (SharedWfContext) and at most a second argument (the request parameter)" + ); + } + + const wrappingHandler = async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + const wfCtx = new SharedContextImpl( + ctx, + request.workflowId, + stateServiceApi + ); + + // impl. note: we need the extra cast to 'unknown', because the 'run' method is + // otherwise incompatible with the cast. we exclude that method in the filter above, + // but the compiler doesn't recognize that. + return ( + handler as unknown as (ctx: wf.SharedWfContext, req: IN) => Promise + )(wfCtx, request); + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (wrapperService as any)[route] = wrappingHandler; + } + + const wrapperServiceRouter = restate.router(wrapperService); + const wrapperServiceApi: restate.ServiceApi = { + path, + }; + + return wrapperServiceRouter; +} + +function checkRequestAndWorkflowId(request: wf.WorkflowRequest): void { + if (request === undefined) { + throw new restate.TerminalError("Request parameter is undefined"); + } + if (request.workflowId === undefined) { + throw new restate.TerminalError("Request is missing property 'workflowId'"); + } +} + +function stringifyError(error: unknown): string { + if (typeof error === "string") { + return error; + } + if (error instanceof Error) { + const e = error as Error; + return `${e.name}: ${e.message}\nStack: ${e.stack}`; + } + try { + return JSON.stringify(error); + } catch (err) { + return "(cause not stringify-able)"; + } +} From a3ab41aed3e108baf1e491080fe8d6d3edbaefb3 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 6 Feb 2024 20:49:46 +0100 Subject: [PATCH 02/11] Add license headers to new files --- src/clients/client.ts | 11 +++++++++++ src/workflows/workflow.ts | 11 +++++++++++ src/workflows/workflow_state_service.ts | 11 +++++++++++ src/workflows/workflow_wrapper_service.ts | 11 +++++++++++ 4 files changed, 44 insertions(+) diff --git a/src/clients/client.ts b/src/clients/client.ts index 048671b1..1bd4442c 100644 --- a/src/clients/client.ts +++ b/src/clients/client.ts @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + import * as restate from "../public_api"; import { LifecycleStatus, diff --git a/src/workflows/workflow.ts b/src/workflows/workflow.ts index 58fe35f0..45e38e3a 100644 --- a/src/workflows/workflow.ts +++ b/src/workflows/workflow.ts @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + import * as restate from "../public_api"; import * as wws from "./workflow_wrapper_service"; import * as wss from "./workflow_state_service"; diff --git a/src/workflows/workflow_state_service.ts b/src/workflows/workflow_state_service.ts index 7fd14659..4535e3bd 100644 --- a/src/workflows/workflow_state_service.ts +++ b/src/workflows/workflow_state_service.ts @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + import * as restate from "../public_api"; import { LifecycleStatus, diff --git a/src/workflows/workflow_wrapper_service.ts b/src/workflows/workflow_wrapper_service.ts index c3de1196..e7341371 100644 --- a/src/workflows/workflow_wrapper_service.ts +++ b/src/workflows/workflow_wrapper_service.ts @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + import * as restate from "../public_api"; import * as wf from "./workflow"; import * as wss from "./workflow_state_service"; From 21d1d01c9a7a6be71bb790eb45df67d5ca945614 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 6 Feb 2024 20:50:38 +0100 Subject: [PATCH 03/11] Rename client to workflow_client --- examples/workflow_example.ts | 2 +- src/clients/{client.ts => workflow_client.ts} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/clients/{client.ts => workflow_client.ts} (100%) diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts index bc29acd8..e487a760 100644 --- a/examples/workflow_example.ts +++ b/examples/workflow_example.ts @@ -1,6 +1,6 @@ import * as restate from "../src/public_api"; import * as restate_wf from "../src/workflows/workflow"; -import * as restate_clients from "../src/clients/client"; +import * as restate_clients from "../src/clients/workflow_client"; import { randomUUID } from "crypto"; /* eslint-disable no-console */ diff --git a/src/clients/client.ts b/src/clients/workflow_client.ts similarity index 100% rename from src/clients/client.ts rename to src/clients/workflow_client.ts From 01ab378a57a6e877f9f130b9fdd3628861e9517f Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 6 Feb 2024 20:51:26 +0100 Subject: [PATCH 04/11] Temporarily remove methods for message subscription --- src/clients/workflow_client.ts | 39 +++++++++++++++++----------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/src/clients/workflow_client.ts b/src/clients/workflow_client.ts index 1bd4442c..c332ed71 100644 --- a/src/clients/workflow_client.ts +++ b/src/clients/workflow_client.ts @@ -12,7 +12,6 @@ import * as restate from "../public_api"; import { LifecycleStatus, - StatusMessage, WorkflowConnectedSignature, WorkflowExternalSignature, WorkflowRequest, @@ -52,11 +51,11 @@ export interface WorkflowClient { workflowInterface(): restate.Client>; // call methods on workflow - latestMessage(): Promise; + // latestMessage(): Promise; - getMessages( - fromSeqNum: number - ): AsyncGenerator; + // getMessages( + // fromSeqNum: number + // ): AsyncGenerator; } export function connectRestate(uri: string) { @@ -100,21 +99,21 @@ class WorkflowClientImpl implements WorkflowClient { return clientProxy as restate.Client>; } - latestMessage(): Promise { - return this.makeCall("getLatestMessage", {}); - } - - async *getMessages(fromSeqNum: number) { - while (true) { - const msgs: StatusMessage[] = await this.makeCall("pollNextMessages", { - from: fromSeqNum, - }); - for (const msg of msgs) { - yield msg; - } - fromSeqNum += msgs.length; - } - } + // latestMessage(): Promise { + // return this.makeCall("getLatestMessage", {}); + // } + + // async *getMessages(fromSeqNum: number) { + // while (true) { + // const msgs: StatusMessage[] = await this.makeCall("pollNextMessages", { + // from: fromSeqNum, + // }); + // for (const msg of msgs) { + // yield msg; + // } + // fromSeqNum += msgs.length; + // } + // } private async makeCall(method: string, args: TT): Promise { return await makeCall( From a41d78b5eaa8feae3967183836fef4e29147bac9 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 6 Feb 2024 21:05:54 +0100 Subject: [PATCH 05/11] Export workflows in pubic API --- examples/workflow_example.ts | 14 ++++++++------ src/public_api.ts | 1 + 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts index e487a760..13413972 100644 --- a/examples/workflow_example.ts +++ b/examples/workflow_example.ts @@ -1,5 +1,4 @@ import * as restate from "../src/public_api"; -import * as restate_wf from "../src/workflows/workflow"; import * as restate_clients from "../src/clients/workflow_client"; import { randomUUID } from "crypto"; @@ -11,13 +10,13 @@ const restateAdminUrl = process.argv[3] || "http://localhost:9070"; // // (1) Definition of the workflow // -const myworkflow = restate_wf.workflow("acme.myworkflow", { +const myworkflow = restate.workflow.workflow("acme.myworkflow", { // // Each workflow must have exactly one run() function, which defines // the life cycle. This function isn't directly started, but indirectly // via the synthetic start() function. // - run: async (ctx: restate_wf.WfContext, params: { name: string }) => { + run: async (ctx: restate.workflow.WfContext, params: { name: string }) => { if (!params?.name) { throw new restate.TerminalError("Missing parameter 'name'"); } @@ -46,15 +45,18 @@ const myworkflow = restate_wf.workflow("acme.myworkflow", { // a 'SharedWfContext' and have shared access to state and promises // - signal: async (ctx: restate_wf.SharedWfContext, req: { signal: string }) => { + signal: async ( + ctx: restate.workflow.SharedWfContext, + req: { signal: string } + ) => { ctx.promise("thesignal").resolve(req.signal); }, - getName: async (ctx: restate_wf.SharedWfContext): Promise => { + getName: async (ctx: restate.workflow.SharedWfContext): Promise => { return (await ctx.get("name")) ?? "(not yet set)"; }, - awaitName: async (ctx: restate_wf.SharedWfContext): Promise => { + awaitName: async (ctx: restate.workflow.SharedWfContext): Promise => { return ctx.promise("name_promise").promise(); }, }); diff --git a/src/public_api.ts b/src/public_api.ts index f4285f98..67e9341a 100644 --- a/src/public_api.ts +++ b/src/public_api.ts @@ -47,3 +47,4 @@ export { connection, RestateConnectionOptions, } from "./embedded/api"; +export * as workflow from "./workflows/workflow"; From 877e66dbbb1df04f710cc7c8399b2c3469f866c4 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 7 Feb 2024 02:35:17 +0100 Subject: [PATCH 06/11] Use ServiceBundle for nicer registration. --- examples/workflow_example.ts | 4 +--- src/workflows/workflow.ts | 14 ++++---------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts index 13413972..3055a629 100644 --- a/examples/workflow_example.ts +++ b/examples/workflow_example.ts @@ -64,9 +64,7 @@ const myworkflow = restate.workflow.workflow("acme.myworkflow", { // typed API similar to how other Restate RPC services work const workflowApi = myworkflow.api; -const server = restate.createServer(); -myworkflow.registerServices(server); -server.listen(9080); +restate.createServer().bind(myworkflow).listen(9080); // // (2) Code to nteract with the workflow using an external client diff --git a/src/workflows/workflow.ts b/src/workflows/workflow.ts index 45e38e3a..f2bc3814 100644 --- a/src/workflows/workflow.ts +++ b/src/workflows/workflow.ts @@ -100,12 +100,8 @@ export type WorkflowConnectedSignature = Omit< "run" >; -export interface WorkflowServices { +export interface WorkflowServices extends restate.ServiceBundle { readonly api: restate.ServiceApi>; - - registerServices( - server: restate.RestateServer | restate.LambdaRestateServer - ): void; } export function workflow( @@ -127,11 +123,9 @@ export function workflow( return { api: { path } as restate.ServiceApi>, - registerServices: ( - server: restate.RestateServer | restate.LambdaRestateServer - ) => { - server.bindKeyedRouter(stateServiceApi.path, stateServiceRouter); - server.bindRouter(path, wrapperServiceRouter); + registerServices: (endpoint: restate.ServiceEndpoint) => { + endpoint.bindKeyedRouter(stateServiceApi.path, stateServiceRouter); + endpoint.bindRouter(path, wrapperServiceRouter); }, } satisfies WorkflowServices; } From b67fcac8a15a726596e5c71c4f1228caf6b2ab8a Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 7 Feb 2024 06:21:30 +0100 Subject: [PATCH 07/11] Restructure external clients and add comments --- examples/workflow_example.ts | 21 ++- src/clients/workflow_client.ts | 241 +++++++++++++++++++++------------ src/public_api.ts | 1 + 3 files changed, 174 insertions(+), 89 deletions(-) diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts index 3055a629..e44f4c58 100644 --- a/examples/workflow_example.ts +++ b/examples/workflow_example.ts @@ -1,5 +1,4 @@ import * as restate from "../src/public_api"; -import * as restate_clients from "../src/clients/workflow_client"; import { randomUUID } from "crypto"; /* eslint-disable no-console */ @@ -67,24 +66,34 @@ const workflowApi = myworkflow.api; restate.createServer().bind(myworkflow).listen(9080); // -// (2) Code to nteract with the workflow using an external client +// (2) Code to interact with the workflow using an external client // // This submits a workflow and sends signals / queries to the workflow. // - async function startWorkflowAndInteract(restateUrl: string) { - const restate = restate_clients.connectRestate(restateUrl); + const restateServer = restate.clients.connect(restateUrl); const args = { name: "Restatearius" }; const workflowId = randomUUID(); // Option a) we can create clients either with just the workflow service path - await restate.submitWorkflow("acme.myworkflow", workflowId, args); + const submit1 = await restateServer.submitWorkflow( + "acme.myworkflow", + workflowId, + args + ); + console.log("Submitted workflow with result: " + submit1.status); // Option b) we can supply the API signature and get a typed interface for all the methods // Because the submit is idempotent, this call here will effectively attach to the // previous workflow - const client = await restate.submitWorkflow(workflowApi, workflowId, args); + const submit2 = await restateServer.submitWorkflow( + workflowApi, + workflowId, + args + ); + console.log("Submitted workflow with result: " + submit2.status); + const client = submit2.client; // check the status (should be RUNNING) const status = await client.status(); diff --git a/src/clients/workflow_client.ts b/src/clients/workflow_client.ts index c332ed71..8d906159 100644 --- a/src/clients/workflow_client.ts +++ b/src/clients/workflow_client.ts @@ -10,60 +10,173 @@ */ import * as restate from "../public_api"; -import { - LifecycleStatus, - WorkflowConnectedSignature, - WorkflowExternalSignature, - WorkflowRequest, -} from "../workflows/workflow"; +import { ensureError } from "../types/errors"; -/* eslint-disable no-console */ +/** + * A client to interact with running workflows. + */ +export interface WorkflowClient { + /** + * Gets the ID of the workflow that this client talks to. + */ + workflowId(): string; + + /** + * Gets the status of the workflow, as a {@link restate.workflow.LifecycleStatus}. + * This will take on the values "NOT_STARTED", "RUNNING", "FINISHED", "FAILED". + */ + status(): Promise; + + /** + * Returns a promise completed with the result. This will resolve successfully on successful + * termination of the workflow, and will be rejected if the workflow throws an Error. + */ + result(): Promise; + + /** + * Gets the interface to the workflow through which all the workflow's additional methods + * can be called. + * + * To get the proper typed client, use the {@link WorkflowConnection.submitWorkflow} or + * {@link WorkflowConnection.connectToWorkflow} functions that accpet a typed ServiceApi + * object, as in the example below. + * + * @example + * In the workflow definition: + * ``` + * const myWorkflow = restate.workflow.workflow("acme.myworkflow", { ... }); + * export const myWorkflowApi = myworkflow.api; + * ``` + * In the client code: + * ``` + * import { myWorkflowApi } from "../server/myWorkflow" + * ... + * const restate = connectWorkflows("https://restatehost:8080"); + * restate.submitWorkflow(myWorkflowApi, workflowId, args); + * restate.connectToWorkflow(myWorkflowApi, workflowId); + * ``` + */ + workflowInterface(): restate.Client< + restate.workflow.WorkflowConnectedSignature + >; +} -export interface Restate { +/** + * A connection to Restate that let's you submit workflows or connect to workflows. + * This is a typed client that internally makes HTTP calls to Restate to launch trigger + * an execution of a workflow service, or to connect to an existing execution. + */ +export interface RestateClient { submitWorkflow( path: string, workflowId: string, params: T - ): Promise>; + ): Promise<{ + status: restate.workflow.WorkflowStartResult; + client: WorkflowClient; + }>; submitWorkflow( - workflowApi: restate.ServiceApi>, + workflowApi: restate.ServiceApi< + restate.workflow.WorkflowExternalSignature + >, workflowId: string, params: T - ): Promise>; + ): Promise<{ + status: restate.workflow.WorkflowStartResult; + client: WorkflowClient; + }>; connectToWorkflow( path: string, workflowId: string - ): Promise>; + ): Promise<{ + status: restate.workflow.LifecycleStatus; + client: WorkflowClient; + }>; connectToWorkflow( - workflowApi: restate.ServiceApi>, + workflowApi: restate.ServiceApi< + restate.workflow.WorkflowExternalSignature + >, workflowId: string - ): Promise>; + ): Promise<{ + status: restate.workflow.LifecycleStatus; + client: WorkflowClient; + }>; } -export interface WorkflowClient { - workflowId(): string; - status(): Promise; // RUNNING / FINISHED / FAILED - - result(): Promise; - - workflowInterface(): restate.Client>; // call methods on workflow - - // latestMessage(): Promise; +/** + * Creates a typed client to start and interact with workflow executions. + * The specifiec URI must point to the Restate request endpoint (ingress). + * + * This function doesn't immediately verify the connection, it will not fail + * if Restate is unreachable. Connection failures will only manifest when + * attempting to submit or connect a specific workflow. + */ +export function connect(restateUri: string): RestateClient { + return { + submitWorkflow: async ( + pathOrApi: + | string + | restate.ServiceApi< + restate.workflow.WorkflowExternalSignature + >, + workflowId: string, + params: T + ): Promise<{ + status: restate.workflow.WorkflowStartResult; + client: WorkflowClient; + }> => { + const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; + + let result: restate.workflow.WorkflowStartResult; + try { + result = await makeCall(restateUri, path, "start", workflowId, params); + } catch (err) { + const error = ensureError(err); + throw new Error("Cannot start workflow: " + error.message, { + cause: error, + }); + } - // getMessages( - // fromSeqNum: number - // ): AsyncGenerator; -} + return { + status: result, + client: new WorkflowClientImpl(restateUri, path, workflowId), + }; + }, -export function connectRestate(uri: string) { - return new RestateImpl(uri); + async connectToWorkflow( + pathOrApi: + | string + | restate.ServiceApi< + restate.workflow.WorkflowExternalSignature + >, + workflowId: string + ): Promise<{ + status: restate.workflow.LifecycleStatus; + client: WorkflowClient; + }> { + const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; + const client: WorkflowClient = new WorkflowClientImpl( + restateUri, + path, + workflowId + ); + const status = await client.status(); + if (status === restate.workflow.LifecycleStatus.NOT_STARTED) { + throw new Error( + "No workflow running/finished/failed with ID " + workflowId + ); + } + return { + status, + client: new WorkflowClientImpl(restateUri, path, workflowId), + }; + }, + } satisfies RestateClient; } -// ------------------------------ implementation ------------------------------ - class WorkflowClientImpl implements WorkflowClient { constructor( private readonly restateUri: string, @@ -75,7 +188,7 @@ class WorkflowClientImpl implements WorkflowClient { return this.wfId; } - status(): Promise { + status(): Promise { return this.makeCall("status", {}); } @@ -83,7 +196,9 @@ class WorkflowClientImpl implements WorkflowClient { return this.makeCall("waitForResult", {}); } - workflowInterface(): restate.Client> { + workflowInterface(): restate.Client< + restate.workflow.WorkflowConnectedSignature + > { const clientProxy = new Proxy( {}, { @@ -96,25 +211,11 @@ class WorkflowClientImpl implements WorkflowClient { } ); - return clientProxy as restate.Client>; + return clientProxy as restate.Client< + restate.workflow.WorkflowConnectedSignature + >; } - // latestMessage(): Promise { - // return this.makeCall("getLatestMessage", {}); - // } - - // async *getMessages(fromSeqNum: number) { - // while (true) { - // const msgs: StatusMessage[] = await this.makeCall("pollNextMessages", { - // from: fromSeqNum, - // }); - // for (const msg of msgs) { - // yield msg; - // } - // fromSeqNum += msgs.length; - // } - // } - private async makeCall(method: string, args: TT): Promise { return await makeCall( this.restateUri, @@ -126,36 +227,6 @@ class WorkflowClientImpl implements WorkflowClient { } } -class RestateImpl implements Restate { - constructor(private readonly restateUri: string) {} - - async submitWorkflow( - pathOrApi: string | restate.ServiceApi>, - workflowId: string, - params: T - ): Promise> { - const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; - const response = await makeCall( - this.restateUri, - path, - "start", - workflowId, - params - ); - console.log("Start() call completed: Workflow is " + response); - - return new WorkflowClientImpl(this.restateUri, path, workflowId); - } - - async connectToWorkflow( - pathOrApi: string | restate.ServiceApi>, - workflowId: string - ): Promise> { - const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; - return new WorkflowClientImpl(this.restateUri, path, workflowId); - } -} - // ---------------------------------------------------------------------------- // Utils // ---------------------------------------------------------------------------- @@ -167,7 +238,7 @@ async function makeCall( workflowId: string, params: T ): Promise { - if (typeof workflowId !== "string" || workflowId.length === 0) { + if (!workflowId || typeof workflowId !== "string") { throw new Error("missing workflowId"); } if (params === undefined) { @@ -179,7 +250,10 @@ async function makeCall( const url = `${restateUri}/${serviceName}/${method}`; const data = { - request: { workflowId, ...params } satisfies WorkflowRequest, + request: { + workflowId, + ...params, + } satisfies restate.workflow.WorkflowRequest, }; let body: string; @@ -189,7 +263,8 @@ async function makeCall( throw new Error("Cannot encode request: " + err, { cause: err }); } - console.log(`Making call to Restate workflow at ${url} with ${body}`); + // eslint-disable-next-line no-console + console.debug(`Making call to Restate at ${url}`); const httpResponse = await fetch(url, { method: "POST", diff --git a/src/public_api.ts b/src/public_api.ts index 67e9341a..6599aba7 100644 --- a/src/public_api.ts +++ b/src/public_api.ts @@ -48,3 +48,4 @@ export { RestateConnectionOptions, } from "./embedded/api"; export * as workflow from "./workflows/workflow"; +export * as clients from "./clients/workflow_client"; From 09412fd4320e8e6fe62e8543d25575d8ca3c647b Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 7 Feb 2024 07:29:23 +0100 Subject: [PATCH 08/11] Set retention to one week for now --- src/workflows/workflow_wrapper_service.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/workflows/workflow_wrapper_service.ts b/src/workflows/workflow_wrapper_service.ts index e7341371..6bc1feaa 100644 --- a/src/workflows/workflow_wrapper_service.ts +++ b/src/workflows/workflow_wrapper_service.ts @@ -13,6 +13,8 @@ import * as restate from "../public_api"; import * as wf from "./workflow"; import * as wss from "./workflow_state_service"; +const DEFAULT_RETENTION_PERIOD = 7 * 24 * 60 * 60 * 1000; // 1 week + // ---------------------------------------------------------------------------- // Workflow Context Implementations // ---------------------------------------------------------------------------- @@ -166,7 +168,7 @@ class ExclusiveContextImpl extends SharedContextImpl implements wf.WfContext { } // ---------------------------------------------------------------------------- - +// the service that wraps the workflow methods // ---------------------------------------------------------------------------- export function createWrapperService( @@ -216,7 +218,7 @@ export function createWrapperService( throw err; } finally { ctx - .sendDelayed(stateServiceApi, 2 * 60 * 1000) + .sendDelayed(stateServiceApi, DEFAULT_RETENTION_PERIOD) .dispose(request.workflowId); } }, From eb4bd4f011b4d995a46919ea81ec6e3e1180ff55 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 7 Feb 2024 07:36:43 +0100 Subject: [PATCH 09/11] Clean up workflows and simplify some names --- src/clients/workflow_client.ts | 24 ++-- src/workflows/workflow.ts | 200 +++++++++++++++++++++------------ 2 files changed, 134 insertions(+), 90 deletions(-) diff --git a/src/clients/workflow_client.ts b/src/clients/workflow_client.ts index 8d906159..d42e4dea 100644 --- a/src/clients/workflow_client.ts +++ b/src/clients/workflow_client.ts @@ -56,9 +56,7 @@ export interface WorkflowClient { * restate.connectToWorkflow(myWorkflowApi, workflowId); * ``` */ - workflowInterface(): restate.Client< - restate.workflow.WorkflowConnectedSignature - >; + workflowInterface(): restate.Client>; } /** @@ -78,7 +76,7 @@ export interface RestateClient { submitWorkflow( workflowApi: restate.ServiceApi< - restate.workflow.WorkflowExternalSignature + restate.workflow.WorkflowRestateRpcApi >, workflowId: string, params: T @@ -97,7 +95,7 @@ export interface RestateClient { connectToWorkflow( workflowApi: restate.ServiceApi< - restate.workflow.WorkflowExternalSignature + restate.workflow.WorkflowRestateRpcApi >, workflowId: string ): Promise<{ @@ -119,9 +117,7 @@ export function connect(restateUri: string): RestateClient { submitWorkflow: async ( pathOrApi: | string - | restate.ServiceApi< - restate.workflow.WorkflowExternalSignature - >, + | restate.ServiceApi>, workflowId: string, params: T ): Promise<{ @@ -149,9 +145,7 @@ export function connect(restateUri: string): RestateClient { async connectToWorkflow( pathOrApi: | string - | restate.ServiceApi< - restate.workflow.WorkflowExternalSignature - >, + | restate.ServiceApi>, workflowId: string ): Promise<{ status: restate.workflow.LifecycleStatus; @@ -196,9 +190,7 @@ class WorkflowClientImpl implements WorkflowClient { return this.makeCall("waitForResult", {}); } - workflowInterface(): restate.Client< - restate.workflow.WorkflowConnectedSignature - > { + workflowInterface(): restate.Client> { const clientProxy = new Proxy( {}, { @@ -211,9 +203,7 @@ class WorkflowClientImpl implements WorkflowClient { } ); - return clientProxy as restate.Client< - restate.workflow.WorkflowConnectedSignature - >; + return clientProxy as restate.Client>; } private async makeCall(method: string, args: TT): Promise { diff --git a/src/workflows/workflow.ts b/src/workflows/workflow.ts index f2bc3814..ed137a25 100644 --- a/src/workflows/workflow.ts +++ b/src/workflows/workflow.ts @@ -15,6 +15,90 @@ import * as wss from "./workflow_state_service"; const STATE_SERVICE_PATH_SUFFIX = "_state"; +// ---------------------------------------------------------------------------- +// workflow definition / registration +// ---------------------------------------------------------------------------- + +/** + * Creates a new workflow service that will be served under the given path. + * + * A workflow must consist of + * - one run method: `run(ctx: WfContext, params: T) => Promise` + * - an arbitrary number of interaction methods: `foo(ctx: SharedWfContext, params: X) => Promise` + */ +export function workflow( + path: string, + workflow: Workflow +): WorkflowServices { + // the state service manages all state and promises for us + const stateServiceRouter = wss.workflowStateService; + const stateServiceApi: restate.ServiceApi = { + path: path + STATE_SERVICE_PATH_SUFFIX, + }; + + // the wrapper service manages life cycle, contexts, delegation to the state service + const wrapperServiceRouter = wws.createWrapperService( + workflow, + path, + stateServiceApi + ); + + return { + api: { path } as restate.ServiceApi>, + registerServices: (endpoint: restate.ServiceEndpoint) => { + endpoint.bindKeyedRouter(stateServiceApi.path, stateServiceRouter); + endpoint.bindRouter(path, wrapperServiceRouter); + }, + } satisfies WorkflowServices; +} + +/** + * The type signature of a workflow. + * A workflow must consist of + * - one run method: `run(ctx: WfContext, params: T) => Promise` + * - an arbitrary number of interaction methods: `foo(ctx: SharedWfContext, params: T) => Promise` + */ +export type Workflow = { + run: RunMethod; +} & WorkflowMethods; + +type RunMethod = (ctx: WfContext, params: T) => Promise; +type InteractionMethod = (ctx: SharedWfContext, params: T) => Promise; + +type WorkflowMethods = { + [K in keyof U]: K extends "run" + ? U[K] extends RunMethod + ? U[K] + : "The 'run' methods needs to follow the signature: (ctx: WfContext, params: any) => Promise " + : // eslint-disable-next-line @typescript-eslint/no-explicit-any + U[K] extends InteractionMethod + ? U[K] + : "Methods other than 'run' are interaction methods and need to follow the signature: (ctx: SharedWfContext, params: any) => Promise"; +}; + +/** + * The workflow service(s) and API. + * + * Register at a Restate endpoint (HTTP/2, Lambda, etc.) as follows: + * ``` + * const myWorkflow = restate.workflows.workflow("org.acme.myworkflow", { + * // workflow implementation + * }) + * restate.createServer().bind(myWorkflow) + * ``` + * + * The {@link WorkflowServices.api} can be used to create typed clients, both + * from other Restate-backed serviced (e.g., `ctx.rpc(api).triggerMySignal()`) + * or from external clients (`clients.connectWorkflows(restateUri).connectToWorkflow(api, id);`). + */ +export interface WorkflowServices extends restate.ServiceBundle { + readonly api: restate.ServiceApi>; +} + +// ---------------------------------------------------------------------------- +// workflow-specific types (promises, contexts) +// ---------------------------------------------------------------------------- + export interface DurablePromise { promise(): Promise; peek(): Promise; @@ -23,6 +107,12 @@ export interface DurablePromise { fail(errorMsg: string): void; } +/** + * The context for the workflow's interaction methods, which are all methods + * other than the 'run()' method. + * + * This gives primarily access to state reads and promises. + */ export interface SharedWfContext { workflowId(): string; @@ -31,9 +121,13 @@ export interface SharedWfContext { promise(name: string): DurablePromise; } -export interface WfContext extends SharedWfContext, restate.RpcContext { - // publishMessage(message: string): void; -} +/** + * The context for the workflow's 'run()' function. + * + * This is a full context as for stateful durable keyed services, plus the + * workflow-specific bits, like workflowID and durable promises. + */ +export interface WfContext extends SharedWfContext, restate.RpcContext {} export enum LifecycleStatus { NOT_STARTED = "NOT_STARTED", @@ -54,78 +148,38 @@ export type StatusMessage = { timestamp: Date; }; -export type WorkflowRequest = T & { workflowId: string }; - -type RunMethodSignature = (ctx: WfContext, params: T) => Promise; -type InteractionMethodSignature = ( - ctx: SharedWfContext, - params: T -) => Promise; - -type WorkflowMethodsSignatures = { - [K in keyof U]: K extends "run" - ? U[K] extends RunMethodSignature - ? U[K] - : "The 'run' methods needs to follow the signature: (ctx: WfContext, params: any) => Promise " - : // eslint-disable-next-line @typescript-eslint/no-explicit-any - U[K] extends InteractionMethodSignature - ? U[K] - : "Methods other than 'run' are interaction methods and need to follow the signature: (ctx: SharedWfContext, params: any) => Promise"; -}; +// ---------------------------------------------------------------------------- +// types and signatures for typed clients +// ---------------------------------------------------------------------------- -export type Workflow = { - run: RunMethodSignature; -} & WorkflowMethodsSignatures; +/** + * The type of requests accepted by the workflow service. + * Must contain the 'workflowId' property. + */ +export type WorkflowRequest = T & { workflowId: string }; -export type WorkflowExternalSignature = { +/** + * The API signature of the workflow for use with RPC operations from Restate services. + */ +export type WorkflowRestateRpcApi = { start: (param: WorkflowRequest) => Promise; waitForResult: (request: WorkflowRequest) => Promise; -} & Omit< - { - [K in keyof U]: U[K] extends InteractionMethodSignature - ? (request: WorkflowRequest) => Promise - : never; - }, - "run" ->; - -export type WorkflowConnectedSignature = Omit< - { - [K in keyof U]: U[K] extends (ctx: SharedWfContext) => Promise - ? () => Promise - : U[K] extends (ctx: SharedWfContext, params: infer T) => Promise - ? (request: T) => Promise - : never; - }, - "run" ->; - -export interface WorkflowServices extends restate.ServiceBundle { - readonly api: restate.ServiceApi>; -} - -export function workflow( - path: string, - workflow: Workflow -): WorkflowServices { - // the state service manages all state and promises for us - const stateServiceRouter = wss.workflowStateService; - const stateServiceApi: restate.ServiceApi = { - path: path + STATE_SERVICE_PATH_SUFFIX, - }; - - // the wrapper service manages life cycle, contexts, delegation to the state service - const wrapperServiceRouter = wws.createWrapperService( - workflow, - path, - stateServiceApi - ); + status: (request: WorkflowRequest) => Promise; +} & { + [K in keyof Omit]: U[K] extends InteractionMethod + ? (request: WorkflowRequest) => Promise + : never; +}; - return { - api: { path } as restate.ServiceApi>, - registerServices: (endpoint: restate.ServiceEndpoint) => { - endpoint.bindKeyedRouter(stateServiceApi.path, stateServiceRouter); - endpoint.bindRouter(path, wrapperServiceRouter); - }, - } satisfies WorkflowServices; -} +/** + * The API signature of the workflow for external clients. + */ +export type WorkflowClientApi = { + [K in keyof Omit]: U[K] extends ( + ctx: SharedWfContext + ) => Promise + ? () => Promise + : U[K] extends (ctx: SharedWfContext, params: infer T) => Promise + ? (request: T) => Promise + : never; +}; From ad1f1fd83dd0cc75f5d7cdd931e6c9019e3246bc Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 7 Feb 2024 11:11:38 +0100 Subject: [PATCH 10/11] Add a temporary comment about running the workflow example --- examples/workflow_example.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts index e44f4c58..32b5160d 100644 --- a/examples/workflow_example.ts +++ b/examples/workflow_example.ts @@ -3,8 +3,18 @@ import { randomUUID } from "crypto"; /* eslint-disable no-console */ +// ------------- NOTE !!! ------------------ +// unlike the other dev samples, this one includes a client and interaction +// with the workflow, so it needs a running Restate runtime. +// The protocol switched to a new version some days ago, so one needs the +// latest nightly runtime build to run the current SDK main branch. +// +// start that via: +// docker run --name restate_dev --rm -p 8080:8080 -p 9070:9070 -p 9071:9071 --add-host=host.docker.internal:host-gateway ghcr.io/restatedev/restate:main + const restateIngressUrl = process.argv[2] || "http://localhost:8080"; const restateAdminUrl = process.argv[3] || "http://localhost:9070"; +const serviceHost = process.argv[4] || "host.docker.internal"; // // (1) Definition of the workflow @@ -130,7 +140,7 @@ registerDeployment(restateAdminUrl, 9080) // --------------------- utils ----------------- async function registerDeployment(restateAdminAddress: string, port: number) { - const serviceEndpoint = `http://host.docker.internal:${port}`; + const serviceEndpoint = `http://${serviceHost}:${port}`; const httpResponse = await fetch(restateAdminAddress + "/deployments", { method: "POST", headers: { From 5a69ec65fdcd03162384fb5668df458a44eb4c08 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Mon, 12 Feb 2024 16:04:02 +0100 Subject: [PATCH 11/11] Add new ctx methods --- src/workflows/workflow_wrapper_service.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/workflows/workflow_wrapper_service.ts b/src/workflows/workflow_wrapper_service.ts index 6bc1feaa..f91bd46f 100644 --- a/src/workflows/workflow_wrapper_service.ts +++ b/src/workflows/workflow_wrapper_service.ts @@ -132,6 +132,14 @@ class ExclusiveContextImpl extends SharedContextImpl implements wf.WfContext { this.ctx.send(this.stateServiceApi).clearState(this.wfId, stateName); } + stateKeys(): Promise { + return this.ctx.stateKeys(); + } + + clearAll(): void { + this.ctx.clearAll(); + } + sideEffect( fn: () => Promise, retryPolicy?: restate.RestateUtils.RetrySettings