From 86a228b5121d64935cb1bce015bce1e1a442da19 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 7 Feb 2024 18:03:13 +0100 Subject: [PATCH] WIP --- proto/protocol.proto | 14 ++++++++++++++ src/local_state_store.ts | 15 +++++++++++++-- src/restate_context.ts | 2 ++ src/restate_context_impl.ts | 23 +++++++++++++++++++++-- src/types/protocol.ts | 8 ++++++++ 5 files changed, 58 insertions(+), 4 deletions(-) diff --git a/proto/protocol.proto b/proto/protocol.proto index 410632ad..32b0f910 100644 --- a/proto/protocol.proto +++ b/proto/protocol.proto @@ -151,6 +151,20 @@ message ClearStateEntryMessage { bytes key = 1; } +// Completable: Yes +// Fallible: No +// Type: 0x0800 + 4 +message GetStateKeysEntryMessage { + message StateKeys { + repeated bytes keys = 1; + } + + oneof result { + StateKeys value = 14; + Failure failure = 15; + }; +} + // ------ Syscalls ------ // Completable: Yes diff --git a/src/local_state_store.ts b/src/local_state_store.ts index 800a227d..9443f0cd 100644 --- a/src/local_state_store.ts +++ b/src/local_state_store.ts @@ -11,8 +11,7 @@ import { ClearStateEntryMessage, - GetStateEntryMessage, - SetStateEntryMessage, + GetStateEntryMessage, GetStateKeysEntryMessage, SetStateEntryMessage, StartMessage_StateEntry, } from "./generated/proto/protocol"; import { Empty } from "./generated/google/protobuf/empty"; @@ -55,6 +54,18 @@ export class LocalStateStore { } } + public getStateKeys(): GetStateKeysEntryMessage { + if (this.isPartial) { + return {} + } + + return { + value: { + keys: Array.from(this.state.keys()).map(b => Buffer.from(b)) + } + } + } + public set(key: string, value: T): SetStateEntryMessage { const bytes = Buffer.from(jsonSerialize(value)); this.state.set(key, bytes); diff --git a/src/restate_context.ts b/src/restate_context.ts index 68d4f309..a4292582 100644 --- a/src/restate_context.ts +++ b/src/restate_context.ts @@ -78,6 +78,8 @@ export interface RestateBaseContext { */ get(name: string): Promise; + stateKeys(): Promise> + /** * Set/store state in the Restate runtime. * Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))` diff --git a/src/restate_context_impl.ts b/src/restate_context_impl.ts index 99421498..083493bd 100644 --- a/src/restate_context_impl.ts +++ b/src/restate_context_impl.ts @@ -23,7 +23,7 @@ import { AwakeableEntryMessage, BackgroundInvokeEntryMessage, CompleteAwakeableEntryMessage, - DeepPartial, + DeepPartial, GetStateKeysEntryMessage_StateKeys, InvokeEntryMessage, SleepEntryMessage, } from "./generated/proto/protocol"; @@ -33,7 +33,7 @@ import { BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE, CLEAR_STATE_ENTRY_MESSAGE_TYPE, COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE, - GET_STATE_ENTRY_MESSAGE_TYPE, + GET_STATE_ENTRY_MESSAGE_TYPE, GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, INVOKE_ENTRY_MESSAGE_TYPE, SET_STATE_ENTRY_MESSAGE_TYPE, SIDE_EFFECT_ENTRY_MESSAGE_TYPE, @@ -135,6 +135,25 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { return getState(); } + // DON'T make this function async!!! see sideEffect comment for details. + public stateKeys(): Promise> { + // Check if this is a valid action + this.checkState("state keys"); + + // Create the message and let the state machine process it + const msg = this.stateMachine.localStateStore.getStateKeys(); + + const getState = async (): Promise> => { + const result = await this.stateMachine.handleUserCodeMessage( + GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, + msg + ); + + return (result as GetStateKeysEntryMessage_StateKeys).keys.map(b => b.toString()); + }; + return getState(); + } + public set(name: string, value: T): void { this.checkState("set state"); const msg = this.stateMachine.localStateStore.set(name, value); diff --git a/src/types/protocol.ts b/src/types/protocol.ts index c3259eb9..7bc52d17 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -23,6 +23,7 @@ import { ErrorMessage, EndMessage, GetStateEntryMessage, + GetStateKeysEntryMessage, InvokeEntryMessage, OutputStreamEntryMessage, PollInputStreamEntryMessage, @@ -42,6 +43,7 @@ export { ErrorMessage, EndMessage, GetStateEntryMessage, + GetStateKeysEntryMessage, InvokeEntryMessage, OutputStreamEntryMessage, PollInputStreamEntryMessage, @@ -64,6 +66,7 @@ export const OUTPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0401n; export const GET_STATE_ENTRY_MESSAGE_TYPE = 0x0800n; export const SET_STATE_ENTRY_MESSAGE_TYPE = 0x0801n; export const CLEAR_STATE_ENTRY_MESSAGE_TYPE = 0x0802n; +export const GET_STATE_KEYS_ENTRY_MESSAGE_TYPE = 0x0804n; export const SLEEP_ENTRY_MESSAGE_TYPE = 0x0c00n; export const INVOKE_ENTRY_MESSAGE_TYPE = 0x0c01n; export const BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE = 0x0c02n; @@ -91,6 +94,7 @@ export const KNOWN_MESSAGE_TYPES = new Set([ POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, GET_STATE_ENTRY_MESSAGE_TYPE, + GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, SET_STATE_ENTRY_MESSAGE_TYPE, CLEAR_STATE_ENTRY_MESSAGE_TYPE, SLEEP_ENTRY_MESSAGE_TYPE, @@ -112,6 +116,7 @@ const PROTOBUF_MESSAGE_NAME_BY_TYPE = new Map([ [POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, "PollInputStreamEntryMessage"], [OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, "OutputStreamEntryMessage"], [GET_STATE_ENTRY_MESSAGE_TYPE, "GetStateEntryMessage"], + [GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, "GetStateKeysEntryMessage"], [SET_STATE_ENTRY_MESSAGE_TYPE, "SetStateEntryMessage"], [CLEAR_STATE_ENTRY_MESSAGE_TYPE, "ClearStateEntryMessage"], [SLEEP_ENTRY_MESSAGE_TYPE, "SleepEntryMessage"], @@ -140,6 +145,7 @@ const PROTOBUF_MESSAGES: Array<[bigint, any]> = [ [POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, PollInputStreamEntryMessage], [OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, OutputStreamEntryMessage], [GET_STATE_ENTRY_MESSAGE_TYPE, GetStateEntryMessage], + [GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, GetStateKeysEntryMessage], [SET_STATE_ENTRY_MESSAGE_TYPE, SetStateEntryMessage], [CLEAR_STATE_ENTRY_MESSAGE_TYPE, ClearStateEntryMessage], [SLEEP_ENTRY_MESSAGE_TYPE, SleepEntryMessage], @@ -163,6 +169,7 @@ export type ProtocolMessage = | PollInputStreamEntryMessage | OutputStreamEntryMessage | GetStateEntryMessage + | GetStateKeysEntryMessage | SetStateEntryMessage | ClearStateEntryMessage | SleepEntryMessage @@ -178,6 +185,7 @@ export type ProtocolMessage = export const SUSPENSION_TRIGGERS: bigint[] = [ INVOKE_ENTRY_MESSAGE_TYPE, GET_STATE_ENTRY_MESSAGE_TYPE, + GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, AWAKEABLE_ENTRY_MESSAGE_TYPE, SLEEP_ENTRY_MESSAGE_TYPE, COMBINATOR_ENTRY_MESSAGE,