Skip to content

Commit

Permalink
Introduce orTimeout(millis) API, to easily combine a CombineablePromi…
Browse files Browse the repository at this point in the history
…se with a sleep to implement timeouts. (#234)
  • Loading branch information
slinkydeveloper authored Jan 30, 2024
1 parent 309ccc9 commit 56a00bf
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ import { RestateGrpcContextImpl } from "./restate_context_impl";
*/
export type CombineablePromise<T> = Promise<T> & {
__restate_context: RestateBaseContext;

/**
* Creates a promise that awaits for the current promise up to the specified timeout duration.
* If the timeout is fired, this Promise will be rejected with a {@link TimeoutError}.
*
* @param millis duration of the sleep in millis.
* This is a lower-bound.
*/
orTimeout(millis: number): Promise<T>;
};

/**
Expand Down
34 changes: 31 additions & 3 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
TerminalError,
ensureError,
errorToFailureWithTerminal,
TimeoutError,
} from "./types/errors";
import { jsonSerialize, jsonDeserialize } from "./utils/utils";
import { Empty } from "./generated/google/protobuf/empty";
Expand All @@ -62,7 +63,10 @@ import { Client, SendClient } from "./types/router";
import { RpcRequest, RpcResponse } from "./generated/proto/dynrpc";
import { requestFromArgs } from "./utils/assumptions";
import { RandImpl } from "./utils/rand";
import { newJournalEntryPromiseId } from "./promise_combinator_tracker";
import {
newJournalEntryPromiseId,
PromiseId,

Check warning on line 68 in src/restate_context_impl.ts

View workflow job for this annotation

GitHub Actions / build (19.x)

'PromiseId' is defined but never used
} from "./promise_combinator_tracker";
import { WrappedPromise } from "./utils/promises";

export enum CallContexType {
Expand Down Expand Up @@ -345,7 +349,7 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
return this.markCombineablePromise(this.sleepInternal(millis));
}

private sleepInternal(millis: number): Promise<void> {
private sleepInternal(millis: number): WrappedPromise<void> {
return this.stateMachine.handleUserCodeMessage<void>(
SLEEP_ENTRY_MESSAGE_TYPE,
SleepEntryMessage.create({ wakeUpTime: Date.now() + millis })
Expand Down Expand Up @@ -494,12 +498,36 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
private markCombineablePromise<T>(
p: Promise<T>
): InternalCombineablePromise<T> {
const journalIndex = this.stateMachine.getUserCodeJournalIndex();
const orTimeout = (millis: number): Promise<T> => {
const sleepPromise: Promise<T> = this.sleepInternal(millis).transform(
() => {
throw new TimeoutError();
}
);
const sleepPromiseIndex = this.stateMachine.getUserCodeJournalIndex();

return this.stateMachine.createCombinator(Promise.race.bind(Promise), [
{
id: newJournalEntryPromiseId(journalIndex),
promise: p,
},
{
id: newJournalEntryPromiseId(sleepPromiseIndex),
promise: sleepPromise,
},
]) as Promise<T>;
};

return Object.defineProperties(p, {
__restate_context: {
value: this,
},
journalIndex: {
value: this.stateMachine.getUserCodeJournalIndex(),
value: journalIndex,
},
orTimeout: {
value: orTimeout.bind(this),
},
}) as InternalCombineablePromise<T>;
}
Expand Down
6 changes: 6 additions & 0 deletions src/types/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ export class TerminalError extends RestateError {
}
}

export class TimeoutError extends TerminalError {
constructor() {
super("Timeout occurred", { errorCode: ErrorCodes.DEADLINE_EXCEEDED });
}
}

// Leads to Restate retries
export class RetryableError extends RestateError {
constructor(message: string, options?: { errorCode?: number; cause?: any }) {
Expand Down
61 changes: 61 additions & 0 deletions test/promise_combinators.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
} from "./protoutils";
import { TestGreeter, TestResponse } from "../src/generated/proto/test";
import { SLEEP_ENTRY_MESSAGE_TYPE } from "../src/types/protocol";
import { TimeoutError } from "../src/types/errors";
import { CombineablePromise } from "../src/restate_context";

class AwakeableSleepRaceGreeter implements TestGreeter {
Expand Down Expand Up @@ -327,3 +328,63 @@ describe("CombineablePromiseThenSideEffect", () => {
]);
});
});

class AwakeableOrTimeoutGreeter implements TestGreeter {
async greet(): Promise<TestResponse> {
const ctx = restate.useContext(this);

const { promise } = ctx.awakeable<string>();
try {
const result = await promise.orTimeout(100);
return TestResponse.create({
greeting: `Hello ${result}`,
});
} catch (e) {
if (e instanceof TimeoutError) {
return TestResponse.create({
greeting: `Hello timed-out`,
});
}
}

throw new Error("Unexpected result");
}
}

describe("AwakeableOrTimeoutGreeter", () => {
it("handles completion of awakeable", async () => {
const result = await new TestDriver(new AwakeableOrTimeoutGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
completionMessage(1, JSON.stringify("Francesco")),
ackMessage(3),
]).run();

expect(result.length).toStrictEqual(5);
expect(result[0]).toStrictEqual(awakeableMessage());
expect(result[1].messageType).toStrictEqual(SLEEP_ENTRY_MESSAGE_TYPE);
expect(result.slice(2)).toStrictEqual([
combinatorEntryMessage(0, [1]),
outputMessage(greetResponse(`Hello Francesco`)),
END_MESSAGE,
]);
});

it("handles completion of sleep", async () => {
const result = await new TestDriver(new AwakeableOrTimeoutGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
completionMessage(2, undefined, true),
ackMessage(3),
]).run();

expect(result.length).toStrictEqual(5);
expect(result[0]).toStrictEqual(awakeableMessage());
expect(result[1].messageType).toStrictEqual(SLEEP_ENTRY_MESSAGE_TYPE);
expect(result.slice(2)).toStrictEqual([
combinatorEntryMessage(0, [2]),
outputMessage(greetResponse(`Hello timed-out`)),
END_MESSAGE,
]);
});
});

0 comments on commit 56a00bf

Please sign in to comment.