Skip to content

Commit

Permalink
Some feedback about workflow api (#261)
Browse files Browse the repository at this point in the history
* Remove the message feature
* Use clearAll to dispose the workflow
* start -> submit rename
* Add check on set state if state is null
* Add support for empty value in awakeables/durable promise resolution
* Make DurablePromise extend CombineablePromise
  • Loading branch information
slinkydeveloper authored Feb 26, 2024
1 parent ac43e6c commit 04dd951
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 221 deletions.
4 changes: 2 additions & 2 deletions examples/workflow_example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const myworkflow = restate.workflow.workflow("acme.myworkflow", {

// to listen to signals, also use promises
const signal = ctx.promise<string>("thesignal");
const message = await signal.promise();
const message = await signal;

const result = `${message} my dear ${params.name}`;
ctx.console.log(">>>>>>>>>>> Finishing workflow with: " + result);
Expand All @@ -66,7 +66,7 @@ const myworkflow = restate.workflow.workflow("acme.myworkflow", {
},

awaitName: async (ctx: restate.workflow.SharedWfContext): Promise<string> => {
return ctx.promise<string>("name_promise").promise();
return ctx.promise<string>("name_promise");
},
});

Expand Down
2 changes: 1 addition & 1 deletion src/clients/workflow_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export function connect(restateUri: string): RestateClient {

let result: restate.workflow.WorkflowStartResult;
try {
result = await makeCall(restateUri, path, "start", workflowId, params);
result = await makeCall(restateUri, path, "submit", workflowId, params);
} catch (err) {
const error = ensureError(err);
throw new Error("Cannot start workflow: " + error.message, {
Expand Down
2 changes: 1 addition & 1 deletion src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export interface Context {
* // The sleeping service should have sent the awakeableIdentifier string to this service.
* ctx.resolveAwakeable(awakeableIdentifier, "hello");
*/
resolveAwakeable<T>(id: string, payload: T): void;
resolveAwakeable<T>(id: string, payload?: T): void;

/**
* Reject an awakeable of another service. When rejecting, the service waiting on this awakeable will be woken up with a terminal error with the provided reason.
Expand Down
7 changes: 5 additions & 2 deletions src/context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,13 @@ export class ContextImpl implements KeyedContext, RestateGrpcChannel {
};
}

public resolveAwakeable<T>(id: string, payload: T): void {
public resolveAwakeable<T>(id: string, payload?: T): void {
// We coerce undefined to null as null can be stringified by JSON.stringify
const payloadToWrite = payload === undefined ? null : payload;

this.checkState("resolveAwakeable");
this.completeAwakeable(id, {
value: Buffer.from(JSON.stringify(payload)),
value: Buffer.from(JSON.stringify(payloadToWrite)),
});
}

Expand Down
11 changes: 2 additions & 9 deletions src/workflows/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,12 @@ export interface WorkflowServices<R, T, U> extends restate.ServiceBundle {
// workflow-specific types (promises, contexts)
// ----------------------------------------------------------------------------

export interface DurablePromise<T> {
promise(): Promise<T>;
export type DurablePromise<T> = restate.CombineablePromise<T> & {
peek(): Promise<T | null>;

resolve(value?: T): void;
fail(errorMsg: string): void;
}
};

/**
* The context for the workflow's interaction methods, which are all methods
Expand Down Expand Up @@ -142,12 +141,6 @@ export enum WorkflowStartResult {
ALREADY_FINISHED = "ALREADY_FINISHED",
}

export type StatusMessage = {
sequenceNum: number;
message: string;
timestamp: Date;
};

// ----------------------------------------------------------------------------
// types and signatures for typed clients
// ----------------------------------------------------------------------------
Expand Down
125 changes: 12 additions & 113 deletions src/workflows/workflow_state_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,14 @@
*/

import * as restate from "../public_api";
import {
LifecycleStatus,
StatusMessage,
WorkflowStartResult,
} from "./workflow";
import { LifecycleStatus, WorkflowStartResult } from "./workflow";

const LIFECYCLE_STATUS_STATE_NAME = "status";
const RESULT_STATE_NAME = "result";
const RESULT_LISTENERS_NAME = "result_listeners";
const STATUS_MESSAGES_STATE_NAME = "messages";
const STATUS_MESSAGE_LISTENERS = "message_listeners";
const PROMISE_STATE_PREFIX = "prom_s_";
const USER_STATE_PREFIX = "state_";
const PROMISE_AWAKEABLE_PREFIX = "prom_l_";
const ALL_NAMES_STATE_NAME = "all_state_names";

const RESERVED_STATE_NAMES = [
LIFECYCLE_STATUS_STATE_NAME,
RESULT_STATE_NAME,
RESULT_LISTENERS_NAME,
ALL_NAMES_STATE_NAME,
];
const RESERVED_STATE_PREFIXES = [
PROMISE_STATE_PREFIX,
PROMISE_AWAKEABLE_PREFIX,
];

export type ValueOrError<T> = {
value?: T;
Expand Down Expand Up @@ -168,7 +151,7 @@ export const workflowStateService = restate.keyedRouter({
_workflowId: string,
stateName: string
): Promise<T | null> => {
return ctx.get(stateName);
return ctx.get(USER_STATE_PREFIX + stateName);
},

setState: async <T>(
Expand All @@ -190,107 +173,36 @@ export const workflowStateService = restate.keyedRouter({
return;
}

const stateName = request.stateName;

// guard against overwriting built-in states
for (const reservedStateName of RESERVED_STATE_NAMES) {
if (stateName === reservedStateName) {
throw new restate.TerminalError(
"State name is reserved: " + reservedStateName
);
}
}
for (const reservedStatePrefix of RESERVED_STATE_PREFIXES) {
if (stateName.startsWith(reservedStatePrefix)) {
throw new restate.TerminalError(
"State prefix is reserved: " + reservedStatePrefix
);
}
}
const stateName = USER_STATE_PREFIX + request.stateName;

ctx.set(stateName, request.value);
await rememberNewStateName(ctx, stateName);
},

clearState: async (
ctx: restate.KeyedContext,
_workflowId: string,
stateName: string
): Promise<void> => {
ctx.clear(stateName);
ctx.clear(USER_STATE_PREFIX + stateName);
},

stateKeys: async (ctx: restate.KeyedContext): Promise<Array<string>> => {
return (await ctx.get<string[]>(ALL_NAMES_STATE_NAME)) ?? [];
return (await ctx.stateKeys()).filter((name) =>
name.startsWith(USER_STATE_PREFIX)
);
},

clearAllState: async (ctx: restate.KeyedContext): Promise<void> => {
const stateNames = (await ctx.get<string[]>(ALL_NAMES_STATE_NAME)) ?? [];
const stateNames = (await ctx.stateKeys()).filter((name) =>
name.startsWith(USER_STATE_PREFIX)
);
for (const stateName of stateNames) {
ctx.clear(stateName);
}
},

publishMessage: async (
ctx: restate.KeyedContext,
_workflowId: string,
msg: { message: string; timestamp: Date }
): Promise<void> => {
// append message
const msgs =
(await ctx.get<StatusMessage[]>(STATUS_MESSAGES_STATE_NAME)) ?? [];
msgs.push({ sequenceNum: msgs.length, ...msg });
ctx.set(STATUS_MESSAGES_STATE_NAME, msgs);

// wake up all listeners
const listeners = (await ctx.get<string[]>(STATUS_MESSAGE_LISTENERS)) ?? [];
for (const awkId of listeners) {
ctx.resolveAwakeable(awkId, {});
}
ctx.clear(STATUS_MESSAGE_LISTENERS);
},

getLatestMessage: async (
ctx: restate.KeyedContext
): Promise<StatusMessage | null> => {
const msgs =
(await ctx.get<StatusMessage[]>(STATUS_MESSAGES_STATE_NAME)) ?? [];
if (msgs.length === 0) {
return null;
} else {
return msgs[msgs.length - 1];
}
},

pollNextMessages: async (
ctx: restate.KeyedContext,
_workflowId: string,
req: { from: number; awakId: string }
): Promise<StatusMessage[] | null> => {
const msgs =
(await ctx.get<StatusMessage[]>(STATUS_MESSAGES_STATE_NAME)) ?? [];
if (msgs.length > req.from) {
return msgs.slice(req.from);
}

// not yet available, register a listener to be woken up when more is available
const listeners = (await ctx.get<string[]>(STATUS_MESSAGE_LISTENERS)) ?? [];
listeners.push(req.awakId);
ctx.set(STATUS_MESSAGE_LISTENERS, listeners);
return null;
},

dispose: async (ctx: restate.KeyedContext): Promise<void> => {
const stateNames = (await ctx.get<string[]>(ALL_NAMES_STATE_NAME)) ?? [];
for (const stateName of stateNames) {
ctx.clear(stateName);
}
ctx.clear(ALL_NAMES_STATE_NAME);
ctx.clear(STATUS_MESSAGE_LISTENERS);
ctx.clear(STATUS_MESSAGES_STATE_NAME);
ctx.clear(RESULT_LISTENERS_NAME);
ctx.clear(RESULT_STATE_NAME);
ctx.clear(LIFECYCLE_STATUS_STATE_NAME);
ctx.clearAll();
},
});

Expand Down Expand Up @@ -325,7 +237,6 @@ async function completePromise<T>(
// first completor
// (a) set state
ctx.set(stateName, completion);
await rememberNewStateName(ctx, stateName);

// (b) complete awaiting awakeables
const listeners = (await ctx.get<string[]>(awakeableStateName)) ?? [];
Expand Down Expand Up @@ -370,9 +281,6 @@ async function subscribePromise<T>(
}

const listeners = (await ctx.get<string[]>(awakeableStateName)) ?? [];
if (listeners.length === 0) {
await rememberNewStateName(ctx, awakeableStateName);
}
listeners.push(awakeableId);
ctx.set(awakeableStateName, listeners);
return null;
Expand All @@ -385,15 +293,6 @@ async function peekPromise<T>(
return ctx.get<ValueOrError<T>>(stateName);
}

async function rememberNewStateName(
ctx: restate.KeyedContext,
stateName: string
) {
const names = (await ctx.get<string[]>(ALL_NAMES_STATE_NAME)) ?? [];
names.push(stateName);
ctx.set(ALL_NAMES_STATE_NAME, names);
}

async function checkIfRunning(ctx: restate.KeyedContext): Promise<boolean> {
const status = await ctx.get<LifecycleStatus>(LIFECYCLE_STATUS_STATE_NAME);
return status === LifecycleStatus.RUNNING;
Expand Down
Loading

0 comments on commit 04dd951

Please sign in to comment.