From 22397130d0a7ff0a34f33414ae986f71cb9791a8 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Sat, 6 Jul 2024 20:24:52 +0100 Subject: [PATCH] experimental awakeable support --- error.go | 139 +------------------------------ internal/state/awakeable.go | 160 ++++++++++++++++++++++++++++++++++++ internal/state/state.go | 22 +++-- internal/wire/wire.go | 32 +++++++- router.go | 56 +++++++++++++ server/restate.go | 2 +- 6 files changed, 265 insertions(+), 146 deletions(-) create mode 100644 internal/state/awakeable.go diff --git a/error.go b/error.go index 56d19e9..7ff33b1 100644 --- a/error.go +++ b/error.go @@ -7,143 +7,6 @@ import ( type Code uint16 -const ( - /** - * The operation was cancelled, typically by the caller. - * HTTP 408 - */ - CANCELLED Code = 1 - /** - * Unknown error. For example, this error may be returned when a - * Status value received from another address space belongs to an error - * space that is not known in this address space. Also errors raised by APIs - * that do not return enough error information may be converted to this - * error. - * HTTP 500 - */ - UNKNOWN Code = 2 - /** - * The client specified an invalid argument. Note that - * this differs from FAILED_PRECONDITION. INVALID_ARGUMENT indicates - * arguments that are problematic regardless of the state of the system - * (e.g., a malformed file name). - * HTTP 400 - */ - INVALID_ARGUMENT Code = 3 - /** - * The deadline expired before the operation could - * complete. For operations that change the state of the system, this error - * may be returned even if the operation has completed successfully. For - * example, a successful response from a server could have been delayed - * long. - * HTTP 408 - */ - DEADLINE_EXCEEDED Code = 4 - /** - * Some requested entity (e.g., file or directory) was not - * found. Note to server developers: if a request is denied for an entire - * class of users, such as gradual feature rollout or undocumented - * allowlist, NOT_FOUND may be used. If a request is denied for some users - * within a class of users, such as user-based access control, - * PERMISSION_DENIED must be used. - * HTTP 404 - */ - NOT_FOUND Code = 5 - /** - * The entity that a client attempted to create (e.g., file - * or directory) already exists. - * HTTP 409 - */ - ALREADY_EXISTS Code = 6 - /** - * The caller does not have permission to execute the - * specified operation. PERMISSION_DENIED must not be used for rejections - * caused by exhausting some resource (use RESOURCE_EXHAUSTED instead for - * those errors). PERMISSION_DENIED must not be used if the caller can not - * be identified (use UNAUTHENTICATED instead for those errors). This error - * code does not imply the request is valid or the requested entity exists - * or satisfies other pre-conditions. - * HTTP 403 - */ - PERMISSION_DENIED Code = 7 - /** - * Some resource has been exhausted, perhaps a per-user - * quota, or perhaps the entire file system is out of space. - * HTTP 413 - */ - RESOURCE_EXHAUSTED Code = 8 - /** - * The operation was rejected because the system is - * not in a state required for the operation's execution. For example, the - * directory to be deleted is non-empty, an rmdir operation is applied to a - * non-directory, etc. Service implementors can use the following guidelines - * to decide between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: (a) Use - * UNAVAILABLE if the client can retry just the failing call. (b) Use - * ABORTED if the client should retry at a higher level (e.g., when a - * client-specified test-and-set fails, indicating the client should restart - * a read-modify-write sequence). (c) Use FAILED_PRECONDITION if the client - * should not retry until the system state has been explicitly fixed. E.g., - * if an "rmdir" fails because the directory is non-empty, - * FAILED_PRECONDITION should be returned since the client should not retry - * unless the files are deleted from the directory. - * HTTP 412 - */ - FAILED_PRECONDITION Code = 9 - /** - * The operation was aborted, typically due to a concurrency issue - * such as a sequencer check failure or transaction abort. See the - * guidelines above for deciding between FAILED_PRECONDITION, ABORTED, and - * UNAVAILABLE. - * HTTP 409 - */ - ABORTED Code = 10 - /** - * The operation was attempted past the valid range. E.g., - * seeking or reading past end-of-file. Unlike INVALID_ARGUMENT, this error - * indicates a problem that may be fixed if the system state changes. For - * example, a 32-bit file system will generate INVALID_ARGUMENT if asked to - * read at an offset that is not in the range [0,2^32-1], but it will - * generate OUT_OF_RANGE if asked to read from an offset past the current - * file size. There is a fair bit of overlap between FAILED_PRECONDITION and - * OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific error) - * when it applies so that callers who are iterating through a space can - * easily look for an OUT_OF_RANGE error to detect when they are done. - * HTTP 400 - */ - OUT_OF_RANGE Code = 11 - /** - * The operation is not implemented or is not - * supported/enabled in this service. - * HTTP 501 - */ - UNIMPLEMENTED Code = 12 - /** - * Internal errors. This means that some invariants expected by - * the underlying system have been broken. This error code is reserved for - * serious errors. - * HTTP 500 - */ - INTERNAL Code = 13 - /** - * The service is currently unavailable. This is most likely a - * transient condition, which can be corrected by retrying with a backoff. - * Note that it is not always safe to retry non-idempotent operations. - * HTTP 503 - */ - UNAVAILABLE Code = 14 - /** - * Unrecoverable data loss or corruption. - * HTTP 500 - */ - DATA_LOSS Code = 15 - /** - * The request does not have valid authentication - * credentials for the operation. - * HTTP 401 - */ - UNAUTHENTICATED Code = 16 -) - type codeError struct { code Code inner error @@ -224,5 +87,5 @@ func ErrorCode(err error) Code { return e.code } - return UNKNOWN + return 500 } diff --git a/internal/state/awakeable.go b/internal/state/awakeable.go new file mode 100644 index 0000000..9af4360 --- /dev/null +++ b/internal/state/awakeable.go @@ -0,0 +1,160 @@ +package state + +import ( + "bytes" + "encoding/base64" + "encoding/binary" + "fmt" + + restate "github.com/restatedev/sdk-go" + "github.com/restatedev/sdk-go/generated/proto/protocol" + "github.com/restatedev/sdk-go/internal/wire" +) + +const AWAKEABLE_IDENTIFIER_PREFIX = "prom_1" + +type awakeable[T any] interface { + restate.Awakeable[T] + setEntryIndex(entryIndex uint32) +} + +type completedAwakeable[T any] struct { + invocationID []byte + entryIndex uint32 + result restate.Result[T] +} + +func (c completedAwakeable[T]) Id() string { return awakeableID(c.invocationID, c.entryIndex) } +func (c completedAwakeable[T]) Chan() <-chan restate.Result[T] { + ch := make(chan restate.Result[T], 1) + ch <- c.result + return ch +} +func (c completedAwakeable[T]) setEntryIndex(entryIndex uint32) { c.entryIndex = entryIndex } + +type suspendingAwakeable[T any] struct { + invocationID []byte + entryIndex uint32 +} + +func (c suspendingAwakeable[T]) Id() string { return awakeableID(c.invocationID, c.entryIndex) } + +// this is a temporary hack; always suspend when this channel is read +// currently needed because we don't have a way to process the completion while the invocation is in progress +// and so can only deal with it on replay +func (c suspendingAwakeable[T]) Chan() <-chan restate.Result[T] { + panic(&suspend{resumeEntry: c.entryIndex}) +} +func (c suspendingAwakeable[T]) setEntryIndex(entryIndex uint32) { c.entryIndex = entryIndex } + +func awakeableID(invocationID []byte, entryIndex uint32) string { + bytes := make([]byte, 0, len(invocationID)+4) + bytes = append(bytes, invocationID...) + bytes = binary.BigEndian.AppendUint32(bytes, entryIndex) + return base64.URLEncoding.EncodeToString(bytes) +} + +func (c *Machine) awakeable() (restate.Awakeable[[]byte], error) { + awakeable, err := replayOrNew( + c, + wire.AwakeableEntryMessageType, + func(entry *wire.AwakeableEntryMessage) (awakeable[[]byte], error) { + if entry.Payload.Result == nil { + return suspendingAwakeable[[]byte]{invocationID: c.id}, nil + } + switch result := entry.Payload.Result.(type) { + case *protocol.AwakeableEntryMessage_Value: + return completedAwakeable[[]byte]{invocationID: c.id, result: restate.Result[[]byte]{Value: result.Value}}, nil + case *protocol.AwakeableEntryMessage_Failure: + return completedAwakeable[[]byte]{invocationID: c.id, result: restate.Result[[]byte]{Err: restate.TerminalError(fmt.Errorf(result.Failure.Message), restate.Code(result.Failure.Code))}}, nil + default: + panic("unreachable") + } + }, + func() (awakeable[[]byte], error) { + if err := c._awakeable(); err != nil { + return nil, err + } + return suspendingAwakeable[[]byte]{invocationID: c.id}, nil + }, + ) + if err != nil { + return nil, err + } + // This needs to be done after handling the message in the state machine + // otherwise the index is not yet incremented. + awakeable.setEntryIndex(uint32(c.entryIndex)) + return awakeable, nil +} + +func (c *Machine) _awakeable() error { + if err := c.protocol.Write(&protocol.AwakeableEntryMessage{}); err != nil { + return err + } + return nil +} + +func (c *Machine) resolveAwakeable(id string, value []byte) error { + _, err := replayOrNew( + c, + wire.CompleteAwakeableEntryMessageType, + func(entry *wire.CompleteAwakeableEntryMessage) (restate.Void, error) { + messageValue, ok := entry.Payload.Result.(*protocol.CompleteAwakeableEntryMessage_Value) + if entry.Payload.Id != id || !ok || !bytes.Equal(messageValue.Value, value) { + return restate.Void{}, errEntryMismatch + } + return restate.Void{}, nil + }, + func() (restate.Void, error) { + if err := c._resolveAwakeable(id, value); err != nil { + return restate.Void{}, err + } + return restate.Void{}, nil + }, + ) + return err +} + +func (c *Machine) _resolveAwakeable(id string, value []byte) error { + if err := c.protocol.Write(&protocol.CompleteAwakeableEntryMessage{ + Id: id, + Result: &protocol.CompleteAwakeableEntryMessage_Value{Value: value}, + }); err != nil { + return err + } + return nil +} + +func (c *Machine) rejectAwakeable(id string, reason error) error { + _, err := replayOrNew( + c, + wire.CompleteAwakeableEntryMessageType, + func(entry *wire.CompleteAwakeableEntryMessage) (restate.Void, error) { + messageFailure, ok := entry.Payload.Result.(*protocol.CompleteAwakeableEntryMessage_Failure) + if entry.Payload.Id != id || !ok || messageFailure.Failure.Code != 500 || messageFailure.Failure.Message != reason.Error() { + return restate.Void{}, errEntryMismatch + } + return restate.Void{}, nil + }, + func() (restate.Void, error) { + if err := c._rejectAwakeable(id, reason); err != nil { + return restate.Void{}, err + } + return restate.Void{}, nil + }, + ) + return err +} + +func (c *Machine) _rejectAwakeable(id string, reason error) error { + if err := c.protocol.Write(&protocol.CompleteAwakeableEntryMessage{ + Id: id, + Result: &protocol.CompleteAwakeableEntryMessage_Failure{Failure: &protocol.Failure{ + Code: 500, + Message: reason.Error(), + }}, + }); err != nil { + return err + } + return nil +} diff --git a/internal/state/state.go b/internal/state/state.go index 66c6abd..080c3f6 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -93,6 +93,18 @@ func (c *Context) SideEffect(fn func() ([]byte, error)) ([]byte, error) { return c.machine.sideEffect(fn) } +func (c *Context) Awakeable() (restate.Awakeable[[]byte], error) { + return c.machine.awakeable() +} + +func (c *Context) ResolveAwakeable(id string, value []byte) error { + return c.machine.resolveAwakeable(id, value) +} + +func (c *Context) RejectAwakeable(id string, reason error) error { + return c.machine.rejectAwakeable(id, reason) +} + func (c *Context) Key() string { return c.machine.key } @@ -118,7 +130,7 @@ type Machine struct { mutex sync.Mutex // state - id string + id []byte key string partial bool @@ -153,7 +165,7 @@ func (m *Machine) Start(inner context.Context, trace string) error { start := msg.(*wire.StartMessage) - m.id = start.Payload.DebugId + m.id = start.Payload.Id m.key = start.Payload.Key m.log = log.With().Str("id", start.Payload.DebugId).Str("method", trace).Logger() @@ -197,7 +209,7 @@ func (m *Machine) output(bytes []byte, err error) proto.Message { } } -func (m *Machine) invoke(ctx *Context, key string, input []byte) error { +func (m *Machine) invoke(ctx *Context, input []byte) error { // always terminate the invocation with // an end message. // this will always terminate the connection @@ -228,7 +240,7 @@ func (m *Machine) invoke(ctx *Context, key string, input []byte) error { // unknown panic! // send an error message (retryable) err := m.protocol.Write(&protocol.ErrorMessage{ - Code: uint32(restate.INTERNAL), + Code: 500, Message: fmt.Sprint(typ), Description: string(debug.Stack()), }) @@ -279,7 +291,7 @@ func (m *Machine) process(ctx *Context, start *wire.StartMessage) error { inputMsg := msg.(*wire.InputEntryMessage) value := inputMsg.Payload.GetValue() - return m.invoke(ctx, start.Payload.Key, value) + return m.invoke(ctx, value) } diff --git a/internal/wire/wire.go b/internal/wire/wire.go index a750d0f..46f2d62 100644 --- a/internal/wire/wire.go +++ b/internal/wire/wire.go @@ -203,14 +203,18 @@ func (s *Protocol) Write(message proto.Message, flags ...Flag) error { typ = ClearStateEntryMessageType case *protocol.ClearAllStateEntryMessage: typ = ClearAllStateEntryMessageType + case *protocol.GetStateKeysEntryMessage: + typ = GetStateKeysEntryMessageType case *protocol.SleepEntryMessage: typ = SleepEntryMessageType case *protocol.CallEntryMessage: typ = CallEntryMessageType case *protocol.OneWayCallEntryMessage: typ = OneWayCallEntryMessageType - case *protocol.GetStateKeysEntryMessage: - typ = GetStateKeysEntryMessageType + case *protocol.AwakeableEntryMessage: + typ = AwakeableEntryMessageType + case *protocol.CompletePromiseEntryMessage: + typ = CompleteAwakeableEntryMessageType case *protocol.RunEntryMessage: typ = RunEntryMessageType default: @@ -341,6 +345,20 @@ var ( return msg, proto.Unmarshal(bytes, &msg.Payload) }, + AwakeableEntryMessageType: func(header Header, bytes []byte) (Message, error) { + msg := &AwakeableEntryMessage{ + Header: header, + } + + return msg, proto.Unmarshal(bytes, &msg.Payload) + }, + CompleteAwakeableEntryMessageType: func(header Header, bytes []byte) (Message, error) { + msg := &CompleteAwakeableEntryMessage{ + Header: header, + } + + return msg, proto.Unmarshal(bytes, &msg.Payload) + }, RunEntryMessageType: func(header Header, bytes []byte) (Message, error) { msg := &RunEntryMessage{ Header: header, @@ -411,6 +429,16 @@ type OneWayCallEntryMessage struct { Payload protocol.OneWayCallEntryMessage } +type AwakeableEntryMessage struct { + Header + Payload protocol.AwakeableEntryMessage +} + +type CompleteAwakeableEntryMessage struct { + Header + Payload protocol.CompleteAwakeableEntryMessage +} + type RunEntryMessage struct { Header Payload protocol.RunEntryMessage diff --git a/router.go b/router.go index de8f5bb..6a5fe14 100644 --- a/router.go +++ b/router.go @@ -2,6 +2,7 @@ package restate import ( "context" + "encoding/json" "fmt" "time" @@ -48,6 +49,10 @@ type Context interface { // will produce the same value (think generating a unique id for example) // Note: use the SideEffectAs helper function SideEffect(fn func() ([]byte, error)) ([]byte, error) + + Awakeable() (Awakeable[[]byte], error) + ResolveAwakeable(id string, value []byte) error + RejectAwakeable(id string, reason error) error } // Router interface @@ -225,3 +230,54 @@ func SideEffectAs[T any](ctx Context, fn func() (T, error)) (output T, err error return output, TerminalError(err) } + +type Awakeable[T any] interface { + Id() string + Chan() <-chan Result[T] +} + +type Result[T any] struct { + Value T + Err error +} + +type decodingAwakeable[T any] struct { + inner Awakeable[[]byte] +} + +func (d decodingAwakeable[T]) Id() string { return d.inner.Id() } +func (d decodingAwakeable[T]) Chan() <-chan Result[T] { + inner := d.inner.Chan() + out := make(chan Result[T], 1) + go func() { + result := <-inner + if result.Err != nil { + out <- Result[T]{Err: result.Err} + } else { + var value T + if err := json.Unmarshal(result.Value, &value); err != nil { + out <- Result[T]{Err: TerminalError(err)} + } else { + out <- Result[T]{Value: value} + } + } + }() + return out +} + +func AwakeableAs[T any](ctx Context) (Awakeable[T], error) { + inner, err := ctx.Awakeable() + if err != nil { + return nil, err + } + + return decodingAwakeable[T]{inner: inner}, nil +} + +func ResolveAwakeableAs[T any](ctx Context, id string, value T) error { + bytes, err := json.Marshal(value) + if err != nil { + return TerminalError(err) + } + return ctx.ResolveAwakeable(id, bytes) +} diff --git a/server/restate.go b/server/restate.go index 5b90144..c8b0256 100644 --- a/server/restate.go +++ b/server/restate.go @@ -110,7 +110,7 @@ func (r *Restate) discoverHandler(writer http.ResponseWriter, req *http.Request) serviceDiscoveryProtocolVersion := selectSupportedServiceDiscoveryProtocolVersion(acceptVersionsString) if serviceDiscoveryProtocolVersion == discovery.ServiceDiscoveryProtocolVersion_SERVICE_DISCOVERY_PROTOCOL_VERSION_UNSPECIFIED { - writer.Write([]byte(fmt.Sprint("Unsupported service discovery protocol version '%s'", acceptVersionsString))) + writer.Write([]byte(fmt.Sprintf("Unsupported service discovery protocol version '%s'", acceptVersionsString))) writer.WriteHeader(http.StatusUnsupportedMediaType) return }