Skip to content

Commit

Permalink
experimental awakeable support
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 6, 2024
1 parent ed19fa2 commit 2239713
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 146 deletions.
139 changes: 1 addition & 138 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,143 +7,6 @@ import (

type Code uint16

const (
/**
* The operation was cancelled, typically by the caller.
* HTTP 408
*/
CANCELLED Code = 1
/**
* Unknown error. For example, this error may be returned when a
* Status value received from another address space belongs to an error
* space that is not known in this address space. Also errors raised by APIs
* that do not return enough error information may be converted to this
* error.
* HTTP 500
*/
UNKNOWN Code = 2
/**
* The client specified an invalid argument. Note that
* this differs from FAILED_PRECONDITION. INVALID_ARGUMENT indicates
* arguments that are problematic regardless of the state of the system
* (e.g., a malformed file name).
* HTTP 400
*/
INVALID_ARGUMENT Code = 3
/**
* The deadline expired before the operation could
* complete. For operations that change the state of the system, this error
* may be returned even if the operation has completed successfully. For
* example, a successful response from a server could have been delayed
* long.
* HTTP 408
*/
DEADLINE_EXCEEDED Code = 4
/**
* Some requested entity (e.g., file or directory) was not
* found. Note to server developers: if a request is denied for an entire
* class of users, such as gradual feature rollout or undocumented
* allowlist, NOT_FOUND may be used. If a request is denied for some users
* within a class of users, such as user-based access control,
* PERMISSION_DENIED must be used.
* HTTP 404
*/
NOT_FOUND Code = 5
/**
* The entity that a client attempted to create (e.g., file
* or directory) already exists.
* HTTP 409
*/
ALREADY_EXISTS Code = 6
/**
* The caller does not have permission to execute the
* specified operation. PERMISSION_DENIED must not be used for rejections
* caused by exhausting some resource (use RESOURCE_EXHAUSTED instead for
* those errors). PERMISSION_DENIED must not be used if the caller can not
* be identified (use UNAUTHENTICATED instead for those errors). This error
* code does not imply the request is valid or the requested entity exists
* or satisfies other pre-conditions.
* HTTP 403
*/
PERMISSION_DENIED Code = 7
/**
* Some resource has been exhausted, perhaps a per-user
* quota, or perhaps the entire file system is out of space.
* HTTP 413
*/
RESOURCE_EXHAUSTED Code = 8
/**
* The operation was rejected because the system is
* not in a state required for the operation's execution. For example, the
* directory to be deleted is non-empty, an rmdir operation is applied to a
* non-directory, etc. Service implementors can use the following guidelines
* to decide between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: (a) Use
* UNAVAILABLE if the client can retry just the failing call. (b) Use
* ABORTED if the client should retry at a higher level (e.g., when a
* client-specified test-and-set fails, indicating the client should restart
* a read-modify-write sequence). (c) Use FAILED_PRECONDITION if the client
* should not retry until the system state has been explicitly fixed. E.g.,
* if an "rmdir" fails because the directory is non-empty,
* FAILED_PRECONDITION should be returned since the client should not retry
* unless the files are deleted from the directory.
* HTTP 412
*/
FAILED_PRECONDITION Code = 9
/**
* The operation was aborted, typically due to a concurrency issue
* such as a sequencer check failure or transaction abort. See the
* guidelines above for deciding between FAILED_PRECONDITION, ABORTED, and
* UNAVAILABLE.
* HTTP 409
*/
ABORTED Code = 10
/**
* The operation was attempted past the valid range. E.g.,
* seeking or reading past end-of-file. Unlike INVALID_ARGUMENT, this error
* indicates a problem that may be fixed if the system state changes. For
* example, a 32-bit file system will generate INVALID_ARGUMENT if asked to
* read at an offset that is not in the range [0,2^32-1], but it will
* generate OUT_OF_RANGE if asked to read from an offset past the current
* file size. There is a fair bit of overlap between FAILED_PRECONDITION and
* OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific error)
* when it applies so that callers who are iterating through a space can
* easily look for an OUT_OF_RANGE error to detect when they are done.
* HTTP 400
*/
OUT_OF_RANGE Code = 11
/**
* The operation is not implemented or is not
* supported/enabled in this service.
* HTTP 501
*/
UNIMPLEMENTED Code = 12
/**
* Internal errors. This means that some invariants expected by
* the underlying system have been broken. This error code is reserved for
* serious errors.
* HTTP 500
*/
INTERNAL Code = 13
/**
* The service is currently unavailable. This is most likely a
* transient condition, which can be corrected by retrying with a backoff.
* Note that it is not always safe to retry non-idempotent operations.
* HTTP 503
*/
UNAVAILABLE Code = 14
/**
* Unrecoverable data loss or corruption.
* HTTP 500
*/
DATA_LOSS Code = 15
/**
* The request does not have valid authentication
* credentials for the operation.
* HTTP 401
*/
UNAUTHENTICATED Code = 16
)

type codeError struct {
code Code
inner error
Expand Down Expand Up @@ -224,5 +87,5 @@ func ErrorCode(err error) Code {
return e.code
}

return UNKNOWN
return 500
}
160 changes: 160 additions & 0 deletions internal/state/awakeable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package state

import (
"bytes"
"encoding/base64"
"encoding/binary"
"fmt"

restate "github.com/restatedev/sdk-go"
"github.com/restatedev/sdk-go/generated/proto/protocol"
"github.com/restatedev/sdk-go/internal/wire"
)

const AWAKEABLE_IDENTIFIER_PREFIX = "prom_1"

type awakeable[T any] interface {
restate.Awakeable[T]
setEntryIndex(entryIndex uint32)
}

type completedAwakeable[T any] struct {
invocationID []byte
entryIndex uint32
result restate.Result[T]
}

func (c completedAwakeable[T]) Id() string { return awakeableID(c.invocationID, c.entryIndex) }
func (c completedAwakeable[T]) Chan() <-chan restate.Result[T] {
ch := make(chan restate.Result[T], 1)
ch <- c.result
return ch
}
func (c completedAwakeable[T]) setEntryIndex(entryIndex uint32) { c.entryIndex = entryIndex }

type suspendingAwakeable[T any] struct {
invocationID []byte
entryIndex uint32
}

func (c suspendingAwakeable[T]) Id() string { return awakeableID(c.invocationID, c.entryIndex) }

// this is a temporary hack; always suspend when this channel is read
// currently needed because we don't have a way to process the completion while the invocation is in progress
// and so can only deal with it on replay
func (c suspendingAwakeable[T]) Chan() <-chan restate.Result[T] {
panic(&suspend{resumeEntry: c.entryIndex})
}
func (c suspendingAwakeable[T]) setEntryIndex(entryIndex uint32) { c.entryIndex = entryIndex }

func awakeableID(invocationID []byte, entryIndex uint32) string {
bytes := make([]byte, 0, len(invocationID)+4)
bytes = append(bytes, invocationID...)
bytes = binary.BigEndian.AppendUint32(bytes, entryIndex)
return base64.URLEncoding.EncodeToString(bytes)
}

func (c *Machine) awakeable() (restate.Awakeable[[]byte], error) {
awakeable, err := replayOrNew(
c,
wire.AwakeableEntryMessageType,
func(entry *wire.AwakeableEntryMessage) (awakeable[[]byte], error) {
if entry.Payload.Result == nil {
return suspendingAwakeable[[]byte]{invocationID: c.id}, nil
}
switch result := entry.Payload.Result.(type) {
case *protocol.AwakeableEntryMessage_Value:
return completedAwakeable[[]byte]{invocationID: c.id, result: restate.Result[[]byte]{Value: result.Value}}, nil
case *protocol.AwakeableEntryMessage_Failure:
return completedAwakeable[[]byte]{invocationID: c.id, result: restate.Result[[]byte]{Err: restate.TerminalError(fmt.Errorf(result.Failure.Message), restate.Code(result.Failure.Code))}}, nil
default:
panic("unreachable")
}
},
func() (awakeable[[]byte], error) {
if err := c._awakeable(); err != nil {
return nil, err
}
return suspendingAwakeable[[]byte]{invocationID: c.id}, nil
},
)
if err != nil {
return nil, err
}
// This needs to be done after handling the message in the state machine
// otherwise the index is not yet incremented.
awakeable.setEntryIndex(uint32(c.entryIndex))
return awakeable, nil
}

func (c *Machine) _awakeable() error {
if err := c.protocol.Write(&protocol.AwakeableEntryMessage{}); err != nil {
return err
}
return nil
}

func (c *Machine) resolveAwakeable(id string, value []byte) error {
_, err := replayOrNew(
c,
wire.CompleteAwakeableEntryMessageType,
func(entry *wire.CompleteAwakeableEntryMessage) (restate.Void, error) {
messageValue, ok := entry.Payload.Result.(*protocol.CompleteAwakeableEntryMessage_Value)
if entry.Payload.Id != id || !ok || !bytes.Equal(messageValue.Value, value) {
return restate.Void{}, errEntryMismatch
}
return restate.Void{}, nil
},
func() (restate.Void, error) {
if err := c._resolveAwakeable(id, value); err != nil {
return restate.Void{}, err
}
return restate.Void{}, nil
},
)
return err
}

func (c *Machine) _resolveAwakeable(id string, value []byte) error {
if err := c.protocol.Write(&protocol.CompleteAwakeableEntryMessage{
Id: id,
Result: &protocol.CompleteAwakeableEntryMessage_Value{Value: value},
}); err != nil {
return err
}
return nil
}

func (c *Machine) rejectAwakeable(id string, reason error) error {
_, err := replayOrNew(
c,
wire.CompleteAwakeableEntryMessageType,
func(entry *wire.CompleteAwakeableEntryMessage) (restate.Void, error) {
messageFailure, ok := entry.Payload.Result.(*protocol.CompleteAwakeableEntryMessage_Failure)
if entry.Payload.Id != id || !ok || messageFailure.Failure.Code != 500 || messageFailure.Failure.Message != reason.Error() {
return restate.Void{}, errEntryMismatch
}
return restate.Void{}, nil
},
func() (restate.Void, error) {
if err := c._rejectAwakeable(id, reason); err != nil {
return restate.Void{}, err
}
return restate.Void{}, nil
},
)
return err
}

func (c *Machine) _rejectAwakeable(id string, reason error) error {
if err := c.protocol.Write(&protocol.CompleteAwakeableEntryMessage{
Id: id,
Result: &protocol.CompleteAwakeableEntryMessage_Failure{Failure: &protocol.Failure{
Code: 500,
Message: reason.Error(),
}},
}); err != nil {
return err
}
return nil
}
22 changes: 17 additions & 5 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func (c *Context) SideEffect(fn func() ([]byte, error)) ([]byte, error) {
return c.machine.sideEffect(fn)
}

func (c *Context) Awakeable() (restate.Awakeable[[]byte], error) {
return c.machine.awakeable()
}

func (c *Context) ResolveAwakeable(id string, value []byte) error {
return c.machine.resolveAwakeable(id, value)
}

func (c *Context) RejectAwakeable(id string, reason error) error {
return c.machine.rejectAwakeable(id, reason)
}

func (c *Context) Key() string {
return c.machine.key
}
Expand All @@ -118,7 +130,7 @@ type Machine struct {
mutex sync.Mutex

// state
id string
id []byte
key string

partial bool
Expand Down Expand Up @@ -153,7 +165,7 @@ func (m *Machine) Start(inner context.Context, trace string) error {

start := msg.(*wire.StartMessage)

m.id = start.Payload.DebugId
m.id = start.Payload.Id
m.key = start.Payload.Key

m.log = log.With().Str("id", start.Payload.DebugId).Str("method", trace).Logger()
Expand Down Expand Up @@ -197,7 +209,7 @@ func (m *Machine) output(bytes []byte, err error) proto.Message {
}
}

func (m *Machine) invoke(ctx *Context, key string, input []byte) error {
func (m *Machine) invoke(ctx *Context, input []byte) error {
// always terminate the invocation with
// an end message.
// this will always terminate the connection
Expand Down Expand Up @@ -228,7 +240,7 @@ func (m *Machine) invoke(ctx *Context, key string, input []byte) error {
// unknown panic!
// send an error message (retryable)
err := m.protocol.Write(&protocol.ErrorMessage{
Code: uint32(restate.INTERNAL),
Code: 500,
Message: fmt.Sprint(typ),
Description: string(debug.Stack()),
})
Expand Down Expand Up @@ -279,7 +291,7 @@ func (m *Machine) process(ctx *Context, start *wire.StartMessage) error {

inputMsg := msg.(*wire.InputEntryMessage)
value := inputMsg.Payload.GetValue()
return m.invoke(ctx, start.Payload.Key, value)
return m.invoke(ctx, value)

}

Expand Down
Loading

0 comments on commit 2239713

Please sign in to comment.