Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Feb 7, 2024
1 parent b33a8dd commit 86a228b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 4 deletions.
14 changes: 14 additions & 0 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ message ClearStateEntryMessage {
bytes key = 1;
}

// Completable: Yes
// Fallible: No
// Type: 0x0800 + 4
message GetStateKeysEntryMessage {
message StateKeys {
repeated bytes keys = 1;
}

oneof result {
StateKeys value = 14;
Failure failure = 15;
};
}

// ------ Syscalls ------

// Completable: Yes
Expand Down
15 changes: 13 additions & 2 deletions src/local_state_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

import {
ClearStateEntryMessage,
GetStateEntryMessage,
SetStateEntryMessage,
GetStateEntryMessage, GetStateKeysEntryMessage, SetStateEntryMessage,
StartMessage_StateEntry,
} from "./generated/proto/protocol";
import { Empty } from "./generated/google/protobuf/empty";
Expand Down Expand Up @@ -55,6 +54,18 @@ export class LocalStateStore {
}
}

public getStateKeys(): GetStateKeysEntryMessage {
if (this.isPartial) {
return {}
}

return {
value: {
keys: Array.from(this.state.keys()).map(b => Buffer.from(b))
}
}
}

public set<T>(key: string, value: T): SetStateEntryMessage {
const bytes = Buffer.from(jsonSerialize(value));
this.state.set(key, bytes);
Expand Down
2 changes: 2 additions & 0 deletions src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ export interface RestateBaseContext {
*/
get<T>(name: string): Promise<T | null>;

stateKeys(): Promise<Array<string>>

/**
* Set/store state in the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
Expand Down
23 changes: 21 additions & 2 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
AwakeableEntryMessage,
BackgroundInvokeEntryMessage,
CompleteAwakeableEntryMessage,
DeepPartial,
DeepPartial, GetStateKeysEntryMessage_StateKeys,
InvokeEntryMessage,
SleepEntryMessage,
} from "./generated/proto/protocol";
Expand All @@ -33,7 +33,7 @@ import {
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE, GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
INVOKE_ENTRY_MESSAGE_TYPE,
SET_STATE_ENTRY_MESSAGE_TYPE,
SIDE_EFFECT_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -135,6 +135,25 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
return getState();
}

// DON'T make this function async!!! see sideEffect comment for details.
public stateKeys(): Promise<Array<string>> {
// Check if this is a valid action
this.checkState("state keys");

// Create the message and let the state machine process it
const msg = this.stateMachine.localStateStore.getStateKeys();

const getState = async (): Promise<Array<string>> => {
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
msg
);

return (result as GetStateKeysEntryMessage_StateKeys).keys.map(b => b.toString());
};
return getState();
}

public set<T>(name: string, value: T): void {
this.checkState("set state");
const msg = this.stateMachine.localStateStore.set(name, value);
Expand Down
8 changes: 8 additions & 0 deletions src/types/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ErrorMessage,
EndMessage,
GetStateEntryMessage,
GetStateKeysEntryMessage,
InvokeEntryMessage,
OutputStreamEntryMessage,
PollInputStreamEntryMessage,
Expand All @@ -42,6 +43,7 @@ export {
ErrorMessage,
EndMessage,
GetStateEntryMessage,
GetStateKeysEntryMessage,
InvokeEntryMessage,
OutputStreamEntryMessage,
PollInputStreamEntryMessage,
Expand All @@ -64,6 +66,7 @@ export const OUTPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0401n;
export const GET_STATE_ENTRY_MESSAGE_TYPE = 0x0800n;
export const SET_STATE_ENTRY_MESSAGE_TYPE = 0x0801n;
export const CLEAR_STATE_ENTRY_MESSAGE_TYPE = 0x0802n;
export const GET_STATE_KEYS_ENTRY_MESSAGE_TYPE = 0x0804n;
export const SLEEP_ENTRY_MESSAGE_TYPE = 0x0c00n;
export const INVOKE_ENTRY_MESSAGE_TYPE = 0x0c01n;
export const BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE = 0x0c02n;
Expand Down Expand Up @@ -91,6 +94,7 @@ export const KNOWN_MESSAGE_TYPES = new Set([
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
SET_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
SLEEP_ENTRY_MESSAGE_TYPE,
Expand All @@ -112,6 +116,7 @@ const PROTOBUF_MESSAGE_NAME_BY_TYPE = new Map<bigint, string>([
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, "PollInputStreamEntryMessage"],
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, "OutputStreamEntryMessage"],
[GET_STATE_ENTRY_MESSAGE_TYPE, "GetStateEntryMessage"],
[GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, "GetStateKeysEntryMessage"],
[SET_STATE_ENTRY_MESSAGE_TYPE, "SetStateEntryMessage"],
[CLEAR_STATE_ENTRY_MESSAGE_TYPE, "ClearStateEntryMessage"],
[SLEEP_ENTRY_MESSAGE_TYPE, "SleepEntryMessage"],
Expand Down Expand Up @@ -140,6 +145,7 @@ const PROTOBUF_MESSAGES: Array<[bigint, any]> = [
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, PollInputStreamEntryMessage],
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, OutputStreamEntryMessage],
[GET_STATE_ENTRY_MESSAGE_TYPE, GetStateEntryMessage],
[GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, GetStateKeysEntryMessage],
[SET_STATE_ENTRY_MESSAGE_TYPE, SetStateEntryMessage],
[CLEAR_STATE_ENTRY_MESSAGE_TYPE, ClearStateEntryMessage],
[SLEEP_ENTRY_MESSAGE_TYPE, SleepEntryMessage],
Expand All @@ -163,6 +169,7 @@ export type ProtocolMessage =
| PollInputStreamEntryMessage
| OutputStreamEntryMessage
| GetStateEntryMessage
| GetStateKeysEntryMessage
| SetStateEntryMessage
| ClearStateEntryMessage
| SleepEntryMessage
Expand All @@ -178,6 +185,7 @@ export type ProtocolMessage =
export const SUSPENSION_TRIGGERS: bigint[] = [
INVOKE_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
AWAKEABLE_ENTRY_MESSAGE_TYPE,
SLEEP_ENTRY_MESSAGE_TYPE,
COMBINATOR_ENTRY_MESSAGE,
Expand Down

0 comments on commit 86a228b

Please sign in to comment.