diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts new file mode 100644 index 00000000..32b5160d --- /dev/null +++ b/examples/workflow_example.ts @@ -0,0 +1,162 @@ +import * as restate from "../src/public_api"; +import { randomUUID } from "crypto"; + +/* eslint-disable no-console */ + +// ------------- NOTE !!! ------------------ +// unlike the other dev samples, this one includes a client and interaction +// with the workflow, so it needs a running Restate runtime. +// The protocol switched to a new version some days ago, so one needs the +// latest nightly runtime build to run the current SDK main branch. +// +// start that via: +// docker run --name restate_dev --rm -p 8080:8080 -p 9070:9070 -p 9071:9071 --add-host=host.docker.internal:host-gateway ghcr.io/restatedev/restate:main + +const restateIngressUrl = process.argv[2] || "http://localhost:8080"; +const restateAdminUrl = process.argv[3] || "http://localhost:9070"; +const serviceHost = process.argv[4] || "host.docker.internal"; + +// +// (1) Definition of the workflow +// +const myworkflow = restate.workflow.workflow("acme.myworkflow", { + // + // Each workflow must have exactly one run() function, which defines + // the life cycle. This function isn't directly started, but indirectly + // via the synthetic start() function. + // + run: async (ctx: restate.workflow.WfContext, params: { name: string }) => { + if (!params?.name) { + throw new restate.TerminalError("Missing parameter 'name'"); + } + + ctx.console.log(">>>>>>>>>>> Starting workflow for " + params.name); + + // workflow state can be accessed from other methods. the state becomes + // eventually visible, there is no linearizability for this state + ctx.set("name", params.name); + + // to publish state in a way that other method calls can access it with + // guarantees (await until visible), use promises + ctx.promise("name_promise").resolve(params.name); + + // to listen to signals, also use promises + const signal = ctx.promise("thesignal"); + const message = await signal.promise(); + + const result = `${message} my dear ${params.name}`; + ctx.console.log(">>>>>>>>>>> Finishing workflow with: " + result); + return result; + }, + + // + // Workflows may have an arbitrary number of other functions that take + // a 'SharedWfContext' and have shared access to state and promises + // + + signal: async ( + ctx: restate.workflow.SharedWfContext, + req: { signal: string } + ) => { + ctx.promise("thesignal").resolve(req.signal); + }, + + getName: async (ctx: restate.workflow.SharedWfContext): Promise => { + return (await ctx.get("name")) ?? "(not yet set)"; + }, + + awaitName: async (ctx: restate.workflow.SharedWfContext): Promise => { + return ctx.promise("name_promise").promise(); + }, +}); + +// typed API similar to how other Restate RPC services work +const workflowApi = myworkflow.api; + +restate.createServer().bind(myworkflow).listen(9080); + +// +// (2) Code to interact with the workflow using an external client +// +// This submits a workflow and sends signals / queries to the workflow. +// +async function startWorkflowAndInteract(restateUrl: string) { + const restateServer = restate.clients.connect(restateUrl); + + const args = { name: "Restatearius" }; + const workflowId = randomUUID(); + + // Option a) we can create clients either with just the workflow service path + const submit1 = await restateServer.submitWorkflow( + "acme.myworkflow", + workflowId, + args + ); + console.log("Submitted workflow with result: " + submit1.status); + + // Option b) we can supply the API signature and get a typed interface for all the methods + // Because the submit is idempotent, this call here will effectively attach to the + // previous workflow + const submit2 = await restateServer.submitWorkflow( + workflowApi, + workflowId, + args + ); + console.log("Submitted workflow with result: " + submit2.status); + const client = submit2.client; + + // check the status (should be RUNNING) + const status = await client.status(); + console.log("Workflow status: " + status); + + // call method that reads the 'name' state + const get_name = await client.workflowInterface().getName(); + console.log("Workflow getName() (snapshot read): " + get_name); + + // call method that awaits the 'name' promise + const await_name = await client.workflowInterface().awaitName(); + console.log("Workflow awaitName() (promise): " + await_name); + + // send a signal + client.workflowInterface().signal({ signal: "hey ho!" }); + + // wait until everything is done + const result = await client.result(); + console.log("Workflow result: " + result); +} + +// +// (3) To make this example work end-to-end, with the external client below, +// we issue a registration here +// +registerDeployment(restateAdminUrl, 9080) + .then(() => startWorkflowAndInteract(restateIngressUrl)) + .then(() => process.exit(0)) + .catch((err) => { + console.error(err); + process.exit(-1); + }); + +// --------------------- utils ----------------- + +async function registerDeployment(restateAdminAddress: string, port: number) { + const serviceEndpoint = `http://${serviceHost}:${port}`; + const httpResponse = await fetch(restateAdminAddress + "/deployments", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + uri: serviceEndpoint, + }), + }); + + const responseText = await httpResponse.text(); + if (!httpResponse.ok) { + throw new Error( + `Registration failed: STATUS ${httpResponse.status} ; ${responseText}` + ); + } else { + return `Registration succeeded: STATUS ${httpResponse.status} ; ${responseText}`; + } +} diff --git a/package.json b/package.json index ffefd9be..805fa685 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "release": "release-it", "example": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/example.ts", "grpcexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/grpc_example.ts", + "workflowexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/workflow_example.ts", "handlerexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/handler_example.ts", "expressexample": "RESTATE_DEBUG_LOGGING=OFF ts-node-dev --transpile-only ./examples/embedded_example.ts" }, diff --git a/src/clients/workflow_client.ts b/src/clients/workflow_client.ts new file mode 100644 index 00000000..d42e4dea --- /dev/null +++ b/src/clients/workflow_client.ts @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + +import * as restate from "../public_api"; +import { ensureError } from "../types/errors"; + +/** + * A client to interact with running workflows. + */ +export interface WorkflowClient { + /** + * Gets the ID of the workflow that this client talks to. + */ + workflowId(): string; + + /** + * Gets the status of the workflow, as a {@link restate.workflow.LifecycleStatus}. + * This will take on the values "NOT_STARTED", "RUNNING", "FINISHED", "FAILED". + */ + status(): Promise; + + /** + * Returns a promise completed with the result. This will resolve successfully on successful + * termination of the workflow, and will be rejected if the workflow throws an Error. + */ + result(): Promise; + + /** + * Gets the interface to the workflow through which all the workflow's additional methods + * can be called. + * + * To get the proper typed client, use the {@link WorkflowConnection.submitWorkflow} or + * {@link WorkflowConnection.connectToWorkflow} functions that accpet a typed ServiceApi + * object, as in the example below. + * + * @example + * In the workflow definition: + * ``` + * const myWorkflow = restate.workflow.workflow("acme.myworkflow", { ... }); + * export const myWorkflowApi = myworkflow.api; + * ``` + * In the client code: + * ``` + * import { myWorkflowApi } from "../server/myWorkflow" + * ... + * const restate = connectWorkflows("https://restatehost:8080"); + * restate.submitWorkflow(myWorkflowApi, workflowId, args); + * restate.connectToWorkflow(myWorkflowApi, workflowId); + * ``` + */ + workflowInterface(): restate.Client>; +} + +/** + * A connection to Restate that let's you submit workflows or connect to workflows. + * This is a typed client that internally makes HTTP calls to Restate to launch trigger + * an execution of a workflow service, or to connect to an existing execution. + */ +export interface RestateClient { + submitWorkflow( + path: string, + workflowId: string, + params: T + ): Promise<{ + status: restate.workflow.WorkflowStartResult; + client: WorkflowClient; + }>; + + submitWorkflow( + workflowApi: restate.ServiceApi< + restate.workflow.WorkflowRestateRpcApi + >, + workflowId: string, + params: T + ): Promise<{ + status: restate.workflow.WorkflowStartResult; + client: WorkflowClient; + }>; + + connectToWorkflow( + path: string, + workflowId: string + ): Promise<{ + status: restate.workflow.LifecycleStatus; + client: WorkflowClient; + }>; + + connectToWorkflow( + workflowApi: restate.ServiceApi< + restate.workflow.WorkflowRestateRpcApi + >, + workflowId: string + ): Promise<{ + status: restate.workflow.LifecycleStatus; + client: WorkflowClient; + }>; +} + +/** + * Creates a typed client to start and interact with workflow executions. + * The specifiec URI must point to the Restate request endpoint (ingress). + * + * This function doesn't immediately verify the connection, it will not fail + * if Restate is unreachable. Connection failures will only manifest when + * attempting to submit or connect a specific workflow. + */ +export function connect(restateUri: string): RestateClient { + return { + submitWorkflow: async ( + pathOrApi: + | string + | restate.ServiceApi>, + workflowId: string, + params: T + ): Promise<{ + status: restate.workflow.WorkflowStartResult; + client: WorkflowClient; + }> => { + const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; + + let result: restate.workflow.WorkflowStartResult; + try { + result = await makeCall(restateUri, path, "start", workflowId, params); + } catch (err) { + const error = ensureError(err); + throw new Error("Cannot start workflow: " + error.message, { + cause: error, + }); + } + + return { + status: result, + client: new WorkflowClientImpl(restateUri, path, workflowId), + }; + }, + + async connectToWorkflow( + pathOrApi: + | string + | restate.ServiceApi>, + workflowId: string + ): Promise<{ + status: restate.workflow.LifecycleStatus; + client: WorkflowClient; + }> { + const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path; + const client: WorkflowClient = new WorkflowClientImpl( + restateUri, + path, + workflowId + ); + const status = await client.status(); + if (status === restate.workflow.LifecycleStatus.NOT_STARTED) { + throw new Error( + "No workflow running/finished/failed with ID " + workflowId + ); + } + return { + status, + client: new WorkflowClientImpl(restateUri, path, workflowId), + }; + }, + } satisfies RestateClient; +} + +class WorkflowClientImpl implements WorkflowClient { + constructor( + private readonly restateUri: string, + private readonly serviceName: string, + private readonly wfId: string + ) {} + + workflowId(): string { + return this.wfId; + } + + status(): Promise { + return this.makeCall("status", {}); + } + + result(): Promise { + return this.makeCall("waitForResult", {}); + } + + workflowInterface(): restate.Client> { + const clientProxy = new Proxy( + {}, + { + get: (_target, prop) => { + const method = prop as string; + return async (args: unknown) => { + return this.makeCall(method, args); + }; + }, + } + ); + + return clientProxy as restate.Client>; + } + + private async makeCall(method: string, args: TT): Promise { + return await makeCall( + this.restateUri, + this.serviceName, + method, + this.wfId, + args + ); + } +} + +// ---------------------------------------------------------------------------- +// Utils +// ---------------------------------------------------------------------------- + +async function makeCall( + restateUri: string, + serviceName: string, + method: string, + workflowId: string, + params: T +): Promise { + if (!workflowId || typeof workflowId !== "string") { + throw new Error("missing workflowId"); + } + if (params === undefined) { + params = {} as T; + } + if (typeof params !== "object") { + throw new Error("invalid parameters: must be an object"); + } + + const url = `${restateUri}/${serviceName}/${method}`; + const data = { + request: { + workflowId, + ...params, + } satisfies restate.workflow.WorkflowRequest, + }; + + let body: string; + try { + body = JSON.stringify(data); + } catch (err) { + throw new Error("Cannot encode request: " + err, { cause: err }); + } + + // eslint-disable-next-line no-console + console.debug(`Making call to Restate at ${url}`); + + const httpResponse = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body, + }); + + const responseText = await httpResponse.text(); + if (!httpResponse.ok) { + throw new Error(`Request failed: ${httpResponse.status}\n${responseText}`); + } + + let response; + try { + response = JSON.parse(responseText); + } catch (err) { + throw new Error("Cannot parse response JSON: " + err, { cause: err }); + } + + if (response.error) { + throw new Error(response.error); + } + if (response.response) { + return response.response as R; + } + if (Object.keys(response).length === 0) { + return undefined as R; + } + + throw new Error("Unrecognized response object: " + responseText); +} diff --git a/src/public_api.ts b/src/public_api.ts index f4285f98..6599aba7 100644 --- a/src/public_api.ts +++ b/src/public_api.ts @@ -47,3 +47,5 @@ export { connection, RestateConnectionOptions, } from "./embedded/api"; +export * as workflow from "./workflows/workflow"; +export * as clients from "./clients/workflow_client"; diff --git a/src/workflows/workflow.ts b/src/workflows/workflow.ts new file mode 100644 index 00000000..ed137a25 --- /dev/null +++ b/src/workflows/workflow.ts @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + +import * as restate from "../public_api"; +import * as wws from "./workflow_wrapper_service"; +import * as wss from "./workflow_state_service"; + +const STATE_SERVICE_PATH_SUFFIX = "_state"; + +// ---------------------------------------------------------------------------- +// workflow definition / registration +// ---------------------------------------------------------------------------- + +/** + * Creates a new workflow service that will be served under the given path. + * + * A workflow must consist of + * - one run method: `run(ctx: WfContext, params: T) => Promise` + * - an arbitrary number of interaction methods: `foo(ctx: SharedWfContext, params: X) => Promise` + */ +export function workflow( + path: string, + workflow: Workflow +): WorkflowServices { + // the state service manages all state and promises for us + const stateServiceRouter = wss.workflowStateService; + const stateServiceApi: restate.ServiceApi = { + path: path + STATE_SERVICE_PATH_SUFFIX, + }; + + // the wrapper service manages life cycle, contexts, delegation to the state service + const wrapperServiceRouter = wws.createWrapperService( + workflow, + path, + stateServiceApi + ); + + return { + api: { path } as restate.ServiceApi>, + registerServices: (endpoint: restate.ServiceEndpoint) => { + endpoint.bindKeyedRouter(stateServiceApi.path, stateServiceRouter); + endpoint.bindRouter(path, wrapperServiceRouter); + }, + } satisfies WorkflowServices; +} + +/** + * The type signature of a workflow. + * A workflow must consist of + * - one run method: `run(ctx: WfContext, params: T) => Promise` + * - an arbitrary number of interaction methods: `foo(ctx: SharedWfContext, params: T) => Promise` + */ +export type Workflow = { + run: RunMethod; +} & WorkflowMethods; + +type RunMethod = (ctx: WfContext, params: T) => Promise; +type InteractionMethod = (ctx: SharedWfContext, params: T) => Promise; + +type WorkflowMethods = { + [K in keyof U]: K extends "run" + ? U[K] extends RunMethod + ? U[K] + : "The 'run' methods needs to follow the signature: (ctx: WfContext, params: any) => Promise " + : // eslint-disable-next-line @typescript-eslint/no-explicit-any + U[K] extends InteractionMethod + ? U[K] + : "Methods other than 'run' are interaction methods and need to follow the signature: (ctx: SharedWfContext, params: any) => Promise"; +}; + +/** + * The workflow service(s) and API. + * + * Register at a Restate endpoint (HTTP/2, Lambda, etc.) as follows: + * ``` + * const myWorkflow = restate.workflows.workflow("org.acme.myworkflow", { + * // workflow implementation + * }) + * restate.createServer().bind(myWorkflow) + * ``` + * + * The {@link WorkflowServices.api} can be used to create typed clients, both + * from other Restate-backed serviced (e.g., `ctx.rpc(api).triggerMySignal()`) + * or from external clients (`clients.connectWorkflows(restateUri).connectToWorkflow(api, id);`). + */ +export interface WorkflowServices extends restate.ServiceBundle { + readonly api: restate.ServiceApi>; +} + +// ---------------------------------------------------------------------------- +// workflow-specific types (promises, contexts) +// ---------------------------------------------------------------------------- + +export interface DurablePromise { + promise(): Promise; + peek(): Promise; + + resolve(value?: T): void; + fail(errorMsg: string): void; +} + +/** + * The context for the workflow's interaction methods, which are all methods + * other than the 'run()' method. + * + * This gives primarily access to state reads and promises. + */ +export interface SharedWfContext { + workflowId(): string; + + get(stateName: string): Promise; + + promise(name: string): DurablePromise; +} + +/** + * The context for the workflow's 'run()' function. + * + * This is a full context as for stateful durable keyed services, plus the + * workflow-specific bits, like workflowID and durable promises. + */ +export interface WfContext extends SharedWfContext, restate.RpcContext {} + +export enum LifecycleStatus { + NOT_STARTED = "NOT_STARTED", + RUNNING = "RUNNING", + FINISHED = "FINISHED", + FAILED = "FAILED", +} + +export enum WorkflowStartResult { + STARTED = "STARTED", + ALREADY_STARTED = "ALREADY_STARTED", + ALREADY_FINISHED = "ALREADY_FINISHED", +} + +export type StatusMessage = { + sequenceNum: number; + message: string; + timestamp: Date; +}; + +// ---------------------------------------------------------------------------- +// types and signatures for typed clients +// ---------------------------------------------------------------------------- + +/** + * The type of requests accepted by the workflow service. + * Must contain the 'workflowId' property. + */ +export type WorkflowRequest = T & { workflowId: string }; + +/** + * The API signature of the workflow for use with RPC operations from Restate services. + */ +export type WorkflowRestateRpcApi = { + start: (param: WorkflowRequest) => Promise; + waitForResult: (request: WorkflowRequest) => Promise; + status: (request: WorkflowRequest) => Promise; +} & { + [K in keyof Omit]: U[K] extends InteractionMethod + ? (request: WorkflowRequest) => Promise + : never; +}; + +/** + * The API signature of the workflow for external clients. + */ +export type WorkflowClientApi = { + [K in keyof Omit]: U[K] extends ( + ctx: SharedWfContext + ) => Promise + ? () => Promise + : U[K] extends (ctx: SharedWfContext, params: infer T) => Promise + ? (request: T) => Promise + : never; +}; diff --git a/src/workflows/workflow_state_service.ts b/src/workflows/workflow_state_service.ts new file mode 100644 index 00000000..4535e3bd --- /dev/null +++ b/src/workflows/workflow_state_service.ts @@ -0,0 +1,389 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + +import * as restate from "../public_api"; +import { + LifecycleStatus, + StatusMessage, + WorkflowStartResult, +} from "./workflow"; + +const LIFECYCLE_STATUS_STATE_NAME = "status"; +const RESULT_STATE_NAME = "result"; +const RESULT_LISTENERS_NAME = "result_listeners"; +const STATUS_MESSAGES_STATE_NAME = "messages"; +const STATUS_MESSAGE_LISTENERS = "message_listeners"; +const PROMISE_STATE_PREFIX = "prom_s_"; +const PROMISE_AWAKEABLE_PREFIX = "prom_l_"; +const ALL_NAMES_STATE_NAME = "all_state_names"; + +const RESERVED_STATE_NAMES = [ + LIFECYCLE_STATUS_STATE_NAME, + RESULT_STATE_NAME, + RESULT_LISTENERS_NAME, + ALL_NAMES_STATE_NAME, +]; +const RESERVED_STATE_PREFIXES = [ + PROMISE_STATE_PREFIX, + PROMISE_AWAKEABLE_PREFIX, +]; + +export type ValueOrError = { + value?: T; + error?: string; +}; + +export const workflowStateService = restate.keyedRouter({ + startWorkflow: async ( + ctx: restate.RpcContext + ): Promise => { + const status = + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED; + + if (status !== LifecycleStatus.NOT_STARTED) { + return status === LifecycleStatus.RUNNING + ? WorkflowStartResult.ALREADY_STARTED + : WorkflowStartResult.ALREADY_FINISHED; + } + + ctx.set(LIFECYCLE_STATUS_STATE_NAME, LifecycleStatus.RUNNING); + return WorkflowStartResult.STARTED; + }, + + finishOrFailWorkflow: async ( + ctx: restate.RpcContext, + _workflowId: string, + result: ValueOrError + ): Promise => { + if (result.error === undefined && result.value === undefined) { + throw new restate.TerminalError("Result is undefined"); + } + + const status = + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED; + + if (status !== LifecycleStatus.RUNNING) { + // not sure this can ever happen, but we put this here defensively + throw new restate.TerminalError("Unexpected state: " + status); + } + + const newStatus = result.error + ? LifecycleStatus.FAILED + : LifecycleStatus.FINISHED; + ctx.set(LIFECYCLE_STATUS_STATE_NAME, newStatus); + + await completePromise( + ctx, + RESULT_STATE_NAME, + RESULT_LISTENERS_NAME, + result + ); + }, + + getStatus: async (ctx: restate.RpcContext): Promise => { + return ( + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED + ); + }, + + completePromise: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { promiseName: string; completion: ValueOrError } + ): Promise => { + // we don't accept writes after the workflow is done + if (!(await checkIfRunning(ctx))) { + return; + } + + await completePromise( + ctx, + PROMISE_STATE_PREFIX + req.promiseName, + PROMISE_AWAKEABLE_PREFIX + req.promiseName, + req.completion + ); + }, + + peekPromise: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { promiseName: string } + ): Promise | null> => { + return peekPromise(ctx, PROMISE_STATE_PREFIX + req.promiseName); + }, + + subscribePromise: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { promiseName: string; awkId: string } + ): Promise | null> => { + return subscribePromise( + ctx, + PROMISE_STATE_PREFIX + req.promiseName, + PROMISE_AWAKEABLE_PREFIX + req.promiseName, + req.awkId + ); + }, + + getResult: async ( + ctx: restate.RpcContext + ): Promise | null> => { + return peekPromise(ctx, RESULT_STATE_NAME); + }, + + subscribeResult: async ( + ctx: restate.RpcContext, + workflowId: string, + awkId: string + ): Promise | null> => { + const status = + (await ctx.get(LIFECYCLE_STATUS_STATE_NAME)) ?? + LifecycleStatus.NOT_STARTED; + if (status === LifecycleStatus.NOT_STARTED) { + throw new restate.TerminalError( + `Workflow with id '${workflowId}' does not exist.` + ); + } + return subscribePromise( + ctx, + RESULT_STATE_NAME, + RESULT_LISTENERS_NAME, + awkId + ); + }, + + getState: async ( + ctx: restate.RpcContext, + _workflowId: string, + stateName: string + ): Promise => { + return ctx.get(stateName); + }, + + setState: async ( + ctx: restate.RpcContext, + _workflowId: string, + request: { stateName: string; value: T } + ): Promise => { + if (!request?.stateName) { + throw new restate.TerminalError("missing state name"); + } + if (request.value === undefined || request.value === null) { + throw new restate.TerminalError("invalid state value: " + request.value); + } + + // if the workflow isn't running (any more) we don't accept state updates + // shouldn't be possible anyways (because only workflow method has access to writable state) + // but we are defensive here against API errors + if (!(await checkIfRunning(ctx))) { + return; + } + + const stateName = request.stateName; + + // guard against overwriting built-in states + for (const reservedStateName of RESERVED_STATE_NAMES) { + if (stateName === reservedStateName) { + throw new restate.TerminalError( + "State name is reserved: " + reservedStateName + ); + } + } + for (const reservedStatePrefix of RESERVED_STATE_PREFIXES) { + if (stateName.startsWith(reservedStatePrefix)) { + throw new restate.TerminalError( + "State prefix is reserved: " + reservedStatePrefix + ); + } + } + + ctx.set(stateName, request.value); + await rememberNewStateName(ctx, stateName); + }, + + clearState: async ( + ctx: restate.RpcContext, + _workflowId: string, + stateName: string + ): Promise => { + ctx.clear(stateName); + }, + + publishMessage: async ( + ctx: restate.RpcContext, + _workflowId: string, + msg: { message: string; timestamp: Date } + ): Promise => { + // append message + const msgs = + (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; + msgs.push({ sequenceNum: msgs.length, ...msg }); + ctx.set(STATUS_MESSAGES_STATE_NAME, msgs); + + // wake up all listeners + const listeners = (await ctx.get(STATUS_MESSAGE_LISTENERS)) ?? []; + for (const awkId of listeners) { + ctx.resolveAwakeable(awkId, {}); + } + ctx.clear(STATUS_MESSAGE_LISTENERS); + }, + + getLatestMessage: async ( + ctx: restate.RpcContext + ): Promise => { + const msgs = + (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; + if (msgs.length === 0) { + return null; + } else { + return msgs[msgs.length - 1]; + } + }, + + pollNextMessages: async ( + ctx: restate.RpcContext, + _workflowId: string, + req: { from: number; awakId: string } + ): Promise => { + const msgs = + (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; + if (msgs.length > req.from) { + return msgs.slice(req.from); + } + + // not yet available, register a listener to be woken up when more is available + const listeners = (await ctx.get(STATUS_MESSAGE_LISTENERS)) ?? []; + listeners.push(req.awakId); + ctx.set(STATUS_MESSAGE_LISTENERS, listeners); + return null; + }, + + dispose: async (ctx: restate.RpcContext): Promise => { + const stateNames = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; + for (const stateName of stateNames) { + ctx.clear(stateName); + } + ctx.clear(ALL_NAMES_STATE_NAME); + ctx.clear(STATUS_MESSAGE_LISTENERS); + ctx.clear(STATUS_MESSAGES_STATE_NAME); + ctx.clear(RESULT_LISTENERS_NAME); + ctx.clear(RESULT_STATE_NAME); + ctx.clear(LIFECYCLE_STATUS_STATE_NAME); + }, +}); + +export type api = typeof workflowStateService; + +// ---------------------------------------------------------------------------- + +async function completePromise( + ctx: restate.RpcContext, + stateName: string, + awakeableStateName: string, + completion: ValueOrError +): Promise> { + if (completion.value !== undefined && completion.error !== undefined) { + throw new restate.TerminalError( + "Completion can only be either with value or with error" + ); + } + if (completion.value !== undefined && completion.value === null) { + throw new restate.TerminalError("promise cannot be completed with null"); + } + if (completion.error !== undefined && completion.error === null) { + throw new restate.TerminalError("promise cannot be rejected with null"); + } + + const currVal = await ctx.get>(stateName); + if (currVal !== null) { + // promise already completed + return currVal; + } + + // first completor + // (a) set state + ctx.set(stateName, completion); + await rememberNewStateName(ctx, stateName); + + // (b) complete awaiting awakeables + const listeners = (await ctx.get(awakeableStateName)) ?? []; + listeners.forEach((awkId: string) => { + if (completion.error !== undefined) { + ctx.rejectAwakeable(awkId, completion.error); + } else { + ctx.resolveAwakeable(awkId, completion.value); + } + }); + ctx.clear(awakeableStateName); + + return completion; +} + +async function subscribePromise( + ctx: restate.RpcContext, + stateName: string, + awakeableStateName: string, + awakeableId: string +): Promise | null> { + const currVal = await ctx.get>(stateName); + + // case (a), we have a value already + if (currVal !== null) { + if (currVal.error !== undefined) { + ctx.rejectAwakeable(awakeableId, currVal.error); + } else { + ctx.resolveAwakeable(awakeableId, currVal.value); + } + return currVal; + } + + // case (b), we remember the awk Id and get when we have a value + // but only if the workflow is still running + if (!(await checkIfRunning(ctx))) { + const response = { + error: "Promised will never resolve because workflow is not running", + }; + ctx.rejectAwakeable(awakeableId, response.error); + return response; + } + + const listeners = (await ctx.get(awakeableStateName)) ?? []; + if (listeners.length === 0) { + await rememberNewStateName(ctx, awakeableStateName); + } + listeners.push(awakeableId); + ctx.set(awakeableStateName, listeners); + return null; +} + +async function peekPromise( + ctx: restate.RpcContext, + stateName: string +): Promise | null> { + return ctx.get>(stateName); +} + +async function rememberNewStateName( + ctx: restate.RpcContext, + stateName: string +) { + const names = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; + names.push(stateName); + ctx.set(ALL_NAMES_STATE_NAME, names); +} + +async function checkIfRunning(ctx: restate.RpcContext): Promise { + const status = await ctx.get(LIFECYCLE_STATUS_STATE_NAME); + return status === LifecycleStatus.RUNNING; +} diff --git a/src/workflows/workflow_wrapper_service.ts b/src/workflows/workflow_wrapper_service.ts new file mode 100644 index 00000000..f91bd46f --- /dev/null +++ b/src/workflows/workflow_wrapper_service.ts @@ -0,0 +1,351 @@ +/* + * Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + +import * as restate from "../public_api"; +import * as wf from "./workflow"; +import * as wss from "./workflow_state_service"; + +const DEFAULT_RETENTION_PERIOD = 7 * 24 * 60 * 60 * 1000; // 1 week + +// ---------------------------------------------------------------------------- +// Workflow Context Implementations +// ---------------------------------------------------------------------------- + +class SharedPromiseImpl implements wf.DurablePromise { + constructor( + private readonly workflowId: string, + private readonly promiseName: string, + private readonly ctx: restate.RpcContext, + private readonly stateServiceApi: restate.ServiceApi + ) {} + + promise(): Promise { + const awk = this.ctx.awakeable(); + + this.ctx.send(this.stateServiceApi).subscribePromise(this.workflowId, { + promiseName: this.promiseName, + awkId: awk.id, + }); + + return awk.promise; + } + + async peek(): Promise { + const result = await this.ctx + .rpc(this.stateServiceApi) + .peekPromise(this.workflowId, { promiseName: this.promiseName }); + + if (result === null) { + return null; + } + if (result.error !== undefined) { + return Promise.reject(new Error(result.error)); + } + return Promise.resolve(result.value as T); + } + + resolve(value?: T): void { + this.ctx.send(this.stateServiceApi).completePromise(this.workflowId, { + promiseName: this.promiseName, + completion: { value }, + }); + } + + fail(errorMsg: string): void { + this.ctx.send(this.stateServiceApi).completePromise(this.workflowId, { + promiseName: this.promiseName, + completion: { error: errorMsg }, + }); + } +} + +class SharedContextImpl implements wf.SharedWfContext { + constructor( + protected readonly ctx: restate.RpcContext, + protected readonly wfId: string, + protected readonly stateServiceApi: restate.ServiceApi + ) {} + + workflowId(): string { + return this.wfId; + } + + get(stateName: string): Promise { + return this.ctx + .rpc(this.stateServiceApi) + .getState(this.wfId, stateName) as Promise; + } + + promise(name: string): wf.DurablePromise { + return new SharedPromiseImpl( + this.wfId, + name, + this.ctx, + this.stateServiceApi + ); + } +} + +class ExclusiveContextImpl extends SharedContextImpl implements wf.WfContext { + public readonly id: Buffer; + public readonly serviceName: string; + public readonly rand: restate.Rand; + public readonly console: Console; + + constructor( + ctx: restate.RpcContext, + wfId: string, + stateServiceApi: restate.ServiceApi + ) { + super(ctx, wfId, stateServiceApi); + this.id = ctx.id; + this.serviceName = ctx.serviceName; + this.rand = ctx.rand; + this.console = ctx.console; + } + + publishMessage(message: string): void { + this.ctx + .send(this.stateServiceApi) + .publishMessage(this.wfId, { message, timestamp: new Date() }); + } + + grpcChannel(): restate.RestateGrpcChannel { + return this.ctx.grpcChannel(); + } + + set(stateName: string, value: T): void { + this.ctx + .send(this.stateServiceApi) + .setState(this.wfId, { stateName, value }); + } + + clear(stateName: string): void { + this.ctx.send(this.stateServiceApi).clearState(this.wfId, stateName); + } + + stateKeys(): Promise { + return this.ctx.stateKeys(); + } + + clearAll(): void { + this.ctx.clearAll(); + } + + sideEffect( + fn: () => Promise, + retryPolicy?: restate.RestateUtils.RetrySettings + ): Promise { + return this.ctx.sideEffect(fn, retryPolicy); + } + + awakeable(): { id: string; promise: restate.CombineablePromise } { + return this.ctx.awakeable(); + } + resolveAwakeable(id: string, payload: T): void { + this.ctx.resolveAwakeable(id, payload); + } + rejectAwakeable(id: string, reason: string): void { + this.ctx.rejectAwakeable(id, reason); + } + + sleep(millis: number): restate.CombineablePromise { + return this.ctx.sleep(millis); + } + + rpc(opts: restate.ServiceApi): restate.Client { + return this.ctx.rpc(opts); + } + send(opts: restate.ServiceApi): restate.SendClient { + return this.ctx.send(opts); + } + sendDelayed( + opts: restate.ServiceApi, + delay: number + ): restate.SendClient { + return this.ctx.sendDelayed(opts, delay); + } +} + +// ---------------------------------------------------------------------------- +// the service that wraps the workflow methods +// ---------------------------------------------------------------------------- + +export function createWrapperService( + workflow: wf.Workflow, + path: string, + stateServiceApi: restate.ServiceApi +) { + const wrapperService = { + start: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + + const started = await ctx + .rpc(stateServiceApi) + .startWorkflow(request.workflowId); + if (started === wf.WorkflowStartResult.STARTED) { + ctx.send(wrapperServiceApi).run(request); + } + return started; + }, + + run: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + + const wfCtx = new ExclusiveContextImpl( + ctx, + request.workflowId, + stateServiceApi + ); + try { + const result = await workflow.run(wfCtx, request); + const resultValue = result !== undefined ? result : {}; + await ctx + .rpc(stateServiceApi) + .finishOrFailWorkflow(request.workflowId, { value: resultValue }); + return result; + } catch (err) { + const msg = stringifyError(err); + await ctx + .rpc(stateServiceApi) + .finishOrFailWorkflow(request.workflowId, { error: msg }); + throw err; + } finally { + ctx + .sendDelayed(stateServiceApi, DEFAULT_RETENTION_PERIOD) + .dispose(request.workflowId); + } + }, + + waitForResult: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + + const awakeable = ctx.awakeable(); + await ctx + .rpc(stateServiceApi) + .subscribeResult(request.workflowId, awakeable.id); + return awakeable.promise; + }, + + status: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + return ctx.rpc(stateServiceApi).getStatus(request.workflowId); + }, + + getLatestMessage: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + return ctx.rpc(stateServiceApi).getLatestMessage(request.workflowId); + }, + + pollNextMessages: async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest<{ from: number }> + ): Promise => { + checkRequestAndWorkflowId(request); + + // eslint-disable-next-line no-constant-condition + while (true) { + const awk = ctx.awakeable(); + const messages = await ctx + .rpc(stateServiceApi) + .pollNextMessages(request.workflowId, { + from: request.from, + awakId: awk.id, + }); + if (messages !== undefined && messages !== null) { + return messages; + } + + await awk.promise; + } + }, + }; + + // add all the interaction methods to the wrapper service + for (const [route, handler] of Object.entries(workflow)) { + if (typeof handler !== "function" || route === "run") { + continue; + } + if (handler.length < 1 || handler.length > 2) { + throw new Error( + "Workflow function does not conform to correct signature: must have at least one argument (SharedWfContext) and at most a second argument (the request parameter)" + ); + } + + const wrappingHandler = async ( + ctx: restate.RpcContext, + request: wf.WorkflowRequest + ): Promise => { + checkRequestAndWorkflowId(request); + const wfCtx = new SharedContextImpl( + ctx, + request.workflowId, + stateServiceApi + ); + + // impl. note: we need the extra cast to 'unknown', because the 'run' method is + // otherwise incompatible with the cast. we exclude that method in the filter above, + // but the compiler doesn't recognize that. + return ( + handler as unknown as (ctx: wf.SharedWfContext, req: IN) => Promise + )(wfCtx, request); + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (wrapperService as any)[route] = wrappingHandler; + } + + const wrapperServiceRouter = restate.router(wrapperService); + const wrapperServiceApi: restate.ServiceApi = { + path, + }; + + return wrapperServiceRouter; +} + +function checkRequestAndWorkflowId(request: wf.WorkflowRequest): void { + if (request === undefined) { + throw new restate.TerminalError("Request parameter is undefined"); + } + if (request.workflowId === undefined) { + throw new restate.TerminalError("Request is missing property 'workflowId'"); + } +} + +function stringifyError(error: unknown): string { + if (typeof error === "string") { + return error; + } + if (error instanceof Error) { + const e = error as Error; + return `${e.name}: ${e.message}\nStack: ${e.stack}`; + } + try { + return JSON.stringify(error); + } catch (err) { + return "(cause not stringify-able)"; + } +}