Skip to content

Commit

Permalink
Add ctx.clearAll()
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Feb 7, 2024
1 parent b33a8dd commit d1bfed1
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 4 deletions.
6 changes: 6 additions & 0 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ message ClearStateEntryMessage {
bytes key = 1;
}

// Completable: No
// Fallible: No
// Type: 0x0800 + 3
message ClearAllStateEntryMessage {
}

// ------ Syscalls ------

// Completable: Yes
Expand Down
4 changes: 3 additions & 1 deletion src/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { Failure } from "./generated/proto/protocol";
import {
AWAKEABLE_ENTRY_MESSAGE_TYPE,
AwakeableEntryMessage,
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
COMBINATOR_ENTRY_MESSAGE,
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -115,6 +115,7 @@ export class Journal<I, O> {
}
case p.SET_STATE_ENTRY_MESSAGE_TYPE:
case p.CLEAR_STATE_ENTRY_MESSAGE_TYPE:
case p.CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE:
case p.COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE:
case p.BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE: {
// Do not need completion
Expand Down Expand Up @@ -312,6 +313,7 @@ export class Journal<I, O> {
}
case SET_STATE_ENTRY_MESSAGE_TYPE:
case CLEAR_STATE_ENTRY_MESSAGE_TYPE:
case CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE:
case COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE:
case BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE:
case COMBINATOR_ENTRY_MESSAGE: {
Expand Down
11 changes: 10 additions & 1 deletion src/local_state_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/

import {
ClearAllStateEntryMessage,
ClearStateEntryMessage,
GetStateEntryMessage,
SetStateEntryMessage,
Expand All @@ -19,12 +20,14 @@ import { Empty } from "./generated/google/protobuf/empty";
import { jsonSerialize } from "./utils/utils";

export class LocalStateStore {
private isPartial: boolean;
private state: Map<string, Buffer | Empty>;

constructor(readonly isPartial: boolean, state: StartMessage_StateEntry[]) {
constructor(isPartial: boolean, state: StartMessage_StateEntry[]) {
this.state = new Map<string, Buffer | Empty>(
state.map(({ key, value }) => [key.toString(), value])
);
this.isPartial = isPartial;
}

public get(key: string): GetStateEntryMessage {
Expand Down Expand Up @@ -75,4 +78,10 @@ export class LocalStateStore {
public add(key: string, result: Buffer | Empty): void {
this.state.set(key, result);
}

public clearAll(): ClearAllStateEntryMessage {
this.state.clear();
this.isPartial = false;
return {};
}
}
13 changes: 11 additions & 2 deletions src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export interface RestateBaseContext {
*
* @example
* const ctx = restate.useContext(this);
* const state = ctx.set("STATE", "Hello");
* ctx.set("STATE", "Hello");
*/
set<T>(name: string, value: T): void;

Expand All @@ -98,10 +98,19 @@ export interface RestateBaseContext {
*
* @example
* const ctx = restate.useContext(this);
* const state = ctx.clear("STATE");
* ctx.clear("STATE");
*/
clear(name: string): void;

/**
* Clear/delete all the state entries in the Restate runtime.
*
* @example
* const ctx = restate.useContext(this);
* ctx.clearAll();
*/
clearAll(): void;

/**
* Execute a side effect and store the result in Restate. The side effect will thus not
* be re-executed during a later replay, but take the durable result from Restate.
Expand Down
11 changes: 11 additions & 0 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
AWAKEABLE_ENTRY_MESSAGE_TYPE,
AWAKEABLE_IDENTIFIER_PREFIX,
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -151,6 +152,16 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
);
}

public clearAll(): void {
this.checkState("clear all state");

const msg = this.stateMachine.localStateStore.clearAll();
this.stateMachine.handleUserCodeMessage(
CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
msg
);
}

// --- Calls, background calls, etc

public request(
Expand Down
7 changes: 7 additions & 0 deletions src/types/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
AwakeableEntryMessage,
BackgroundInvokeEntryMessage,
ClearStateEntryMessage,
ClearAllStateEntryMessage,
CompleteAwakeableEntryMessage,
CompletionMessage,
EntryAckMessage,
Expand All @@ -37,6 +38,7 @@ export {
AwakeableEntryMessage,
BackgroundInvokeEntryMessage,
ClearStateEntryMessage,
ClearAllStateEntryMessage,
CompleteAwakeableEntryMessage,
CompletionMessage,
ErrorMessage,
Expand Down Expand Up @@ -64,6 +66,7 @@ export const OUTPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0401n;
export const GET_STATE_ENTRY_MESSAGE_TYPE = 0x0800n;
export const SET_STATE_ENTRY_MESSAGE_TYPE = 0x0801n;
export const CLEAR_STATE_ENTRY_MESSAGE_TYPE = 0x0802n;
export const CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE = 0x0803n;
export const SLEEP_ENTRY_MESSAGE_TYPE = 0x0c00n;
export const INVOKE_ENTRY_MESSAGE_TYPE = 0x0c01n;
export const BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE = 0x0c02n;
Expand Down Expand Up @@ -93,6 +96,7 @@ export const KNOWN_MESSAGE_TYPES = new Set([
GET_STATE_ENTRY_MESSAGE_TYPE,
SET_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
SLEEP_ENTRY_MESSAGE_TYPE,
INVOKE_ENTRY_MESSAGE_TYPE,
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
Expand All @@ -114,6 +118,7 @@ const PROTOBUF_MESSAGE_NAME_BY_TYPE = new Map<bigint, string>([
[GET_STATE_ENTRY_MESSAGE_TYPE, "GetStateEntryMessage"],
[SET_STATE_ENTRY_MESSAGE_TYPE, "SetStateEntryMessage"],
[CLEAR_STATE_ENTRY_MESSAGE_TYPE, "ClearStateEntryMessage"],
[CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE, "ClearAllStateEntryMessage"],
[SLEEP_ENTRY_MESSAGE_TYPE, "SleepEntryMessage"],
[INVOKE_ENTRY_MESSAGE_TYPE, "InvokeEntryMessage"],
[BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, "BackgroundInvokeEntryMessage"],
Expand Down Expand Up @@ -142,6 +147,7 @@ const PROTOBUF_MESSAGES: Array<[bigint, any]> = [
[GET_STATE_ENTRY_MESSAGE_TYPE, GetStateEntryMessage],
[SET_STATE_ENTRY_MESSAGE_TYPE, SetStateEntryMessage],
[CLEAR_STATE_ENTRY_MESSAGE_TYPE, ClearStateEntryMessage],
[CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE, ClearAllStateEntryMessage],
[SLEEP_ENTRY_MESSAGE_TYPE, SleepEntryMessage],
[INVOKE_ENTRY_MESSAGE_TYPE, InvokeEntryMessage],
[BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, BackgroundInvokeEntryMessage],
Expand All @@ -165,6 +171,7 @@ export type ProtocolMessage =
| GetStateEntryMessage
| SetStateEntryMessage
| ClearStateEntryMessage
| ClearAllStateEntryMessage
| SleepEntryMessage
| InvokeEntryMessage
| BackgroundInvokeEntryMessage
Expand Down
14 changes: 14 additions & 0 deletions src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import {
COMPLETION_MESSAGE_TYPE,
ENTRY_ACK_MESSAGE_TYPE,
ERROR_MESSAGE_TYPE,
formatMessageType,
GET_STATE_ENTRY_MESSAGE_TYPE,
KNOWN_MESSAGE_TYPES,
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PROTOBUF_MESSAGE_BY_TYPE,
ProtocolMessage,
SLEEP_ENTRY_MESSAGE_TYPE,
START_MESSAGE_TYPE,
Expand All @@ -31,6 +33,18 @@ export class Message {
readonly protocolVersion?: number,
readonly requiresAck?: boolean
) {}

// For debugging purposes
toJSON(): unknown {
const pbType = PROTOBUF_MESSAGE_BY_TYPE.get(this.messageType);
if (pbType === undefined) {
return this;
}
return {
messageType: formatMessageType(this.messageType),
message: pbType.toJSON(this.message),
};
}
}

class MessageType {
Expand Down
68 changes: 68 additions & 0 deletions test/eager_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import {
import * as restate from "../src/public_api";
import { TestDriver } from "./testdriver";
import {
CLEAR_ALL_STATE_ENTRY_MESSAGE,
clearStateMessage,
completionMessage,
completionMessageWithEmpty,
END_MESSAGE,
getStateMessage,
getStateMessageWithEmptyResult,
greetRequest,
greetResponse,
inputMessage,
Expand Down Expand Up @@ -280,3 +283,68 @@ describe("MultipleGet", () => {
]);
});
});

class GetClearAllThenGet implements TestGreeter {
async greet(): Promise<TestResponse> {
const ctx = restate.useContext(this);

const state1 = (await ctx.get<string>("STATE")) || "nothing";
const state2 = (await ctx.get<string>("ANOTHER_STATE")) || "nothing";

ctx.clearAll();

const state3 = (await ctx.get<string>("STATE")) || "nothing";
const state4 = (await ctx.get<string>("ANOTHER_STATE")) || "nothing";

return {
greeting: [state1, state2, state3, state4].join(),
};
}
}

describe("GetClearAllThenGet", () => {
it("with complete state in the eager state map", async () => {
const result = await new TestDriver(
new GetClearAllThenGet(),
[
startMessage(1, false, [keyVal("STATE", "One")]),
inputMessage(greetRequest("")),
],
ProtocolMode.REQUEST_RESPONSE
).run();

// First get goes to the runtime, the others get completed with local state
expect(result).toStrictEqual([
getStateMessage("STATE", "One"),
getStateMessageWithEmptyResult("ANOTHER_STATE"),
CLEAR_ALL_STATE_ENTRY_MESSAGE,
getStateMessageWithEmptyResult("STATE"),
getStateMessageWithEmptyResult("ANOTHER_STATE"),
outputMessage(
greetResponse(["One", "nothing", "nothing", "nothing"].join())
),
END_MESSAGE,
]);
});

it("with lazy state in the eager state map", async () => {
const result = await new TestDriver(new GetClearAllThenGet(), [
startMessage(1, true, [keyVal("STATE", "One")]),
inputMessage(greetRequest("")),
completionMessageWithEmpty(2),
]).run();

// First get goes to the runtime, the others get completed with local state
expect(result).toStrictEqual([
getStateMessage("STATE", "One"),
getStateMessage("ANOTHER_STATE"),
CLEAR_ALL_STATE_ENTRY_MESSAGE,
getStateMessageWithEmptyResult("STATE"),
getStateMessageWithEmptyResult("ANOTHER_STATE"),
outputMessage(
greetResponse(["One", "nothing", "nothing", "nothing"].join())
),
END_MESSAGE,
]);
});
});
26 changes: 26 additions & 0 deletions test/protoutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import {
EndMessage,
AWAKEABLE_IDENTIFIER_PREFIX,
COMBINATOR_ENTRY_MESSAGE,
CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
ClearAllStateEntryMessage,
} from "../src/types/protocol";
import { Message } from "../src/types/types";
import { TestRequest, TestResponse } from "../src/generated/proto/test";
Expand Down Expand Up @@ -183,6 +185,16 @@ export function getStateMessage<T>(
}
}

export function getStateMessageWithEmptyResult(key: string): Message {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage.create({
key: Buffer.from(key),
empty: Empty.create({}),
})
);
}

export function setStateMessage<T>(key: string, value: T): Message {
return new Message(
SET_STATE_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -274,6 +286,16 @@ export function completionMessage(
}
}

export function completionMessageWithEmpty(index: number): Message {
return new Message(
COMPLETION_MESSAGE_TYPE,
CompletionMessage.create({
entryIndex: index,
empty: Empty.create(),
})
);
}

export function ackMessage(index: number): Message {
return new Message(
ENTRY_ACK_MESSAGE_TYPE,
Expand Down Expand Up @@ -520,6 +542,10 @@ export function keyVal(key: string, value: any): Buffer[] {
}

export const END_MESSAGE = new Message(END_MESSAGE_TYPE, EndMessage.create());
export const CLEAR_ALL_STATE_ENTRY_MESSAGE = new Message(
CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
ClearAllStateEntryMessage.create()
);

// a utility function to print the results of a test
export function printResults(results: Message[]) {
Expand Down

0 comments on commit d1bfed1

Please sign in to comment.