diff --git a/README.md b/README.md index 2e0c1dc..c7869e0 100644 --- a/README.md +++ b/README.md @@ -22,9 +22,8 @@ Calling other services right now is done completely by name, hence it's not very - [x] Remote service call over restate runtime - [X] Delayed execution of remote services - [X] Sleep -- [x] Side effects - - Implementation might differ from as intended by restate since it's not documented and based on reverse engineering of the TypeScript SDK -- [ ] Awakeable +- [x] Run +- [x] Awakeable ## Basic usage diff --git a/example/checkout.go b/example/checkout.go index 5d18d5d..63f435c 100644 --- a/example/checkout.go +++ b/example/checkout.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/google/uuid" @@ -19,7 +20,7 @@ type PaymentResponse struct { } func payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) { - uuid, err := restate.SideEffectAs(ctx, func() (string, error) { + uuid, err := restate.RunAs(ctx, func(ctx context.Context) (string, error) { uuid := uuid.New() return uuid.String(), nil }) @@ -36,7 +37,7 @@ func payment(ctx restate.Context, request PaymentRequest) (response PaymentRespo response.Price = price i := 0 - _, err = restate.SideEffectAs(ctx, func() (bool, error) { + _, err = restate.RunAs(ctx, func(ctx context.Context) (bool, error) { log := log.With().Str("uuid", uuid).Int("price", price).Logger() if i > 2 { log.Info().Msg("payment succeeded") diff --git a/internal/state/state.go b/internal/state/state.go index 929f829..31d5c02 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -103,8 +103,8 @@ func (c *Context) ObjectSend(service, key string, delay time.Duration) restate.S } } -func (c *Context) SideEffect(fn func() ([]byte, error)) ([]byte, error) { - return c.machine.sideEffect(fn) +func (c *Context) Run(fn func(ctx context.Context) ([]byte, error)) ([]byte, error) { + return c.machine.run(fn) } func (c *Context) Awakeable() restate.Awakeable[[]byte] { @@ -266,8 +266,8 @@ The journal entry at position %d was: }) return - case *sideEffectFailure: - m.log.Error().Err(typ.err).Msg("Side effect returned a failure, returning error to Restate") + case *runFailure: + m.log.Error().Err(typ.err).Msg("Run returned a failure, returning error to Restate") if err := m.protocol.Write(&wire.ErrorMessage{ ErrorMessage: protocol.ErrorMessage{ diff --git a/internal/state/sys.go b/internal/state/sys.go index 6564eaf..3de7d12 100644 --- a/internal/state/sys.go +++ b/internal/state/sys.go @@ -2,6 +2,7 @@ package state import ( "bytes" + "context" "fmt" "sort" "time" @@ -273,18 +274,18 @@ func (m *Machine) _sleep(d time.Duration) *wire.SleepEntryMessage { return msg } -func (m *Machine) sideEffect(fn func() ([]byte, error)) ([]byte, error) { +func (m *Machine) run(fn func(context.Context) ([]byte, error)) ([]byte, error) { entry, entryIndex := replayOrNew( m, func(entry *wire.RunEntryMessage) *wire.RunEntryMessage { return entry }, func() *wire.RunEntryMessage { - return m._sideEffect(fn) + return m._run(fn) }, ) - // side effect must be acknowledged before proceeding + // run entry must be acknowledged before proceeding entry.Await(m.suspensionCtx, entryIndex) switch result := entry.Result.(type) { @@ -297,11 +298,11 @@ func (m *Machine) sideEffect(fn func() ([]byte, error)) ([]byte, error) { return nil, nil } - return nil, restate.TerminalError(fmt.Errorf("side effect entry had invalid result: %v", entry.Result), errors.ErrProtocolViolation) + return nil, restate.TerminalError(fmt.Errorf("run entry had invalid result: %v", entry.Result), errors.ErrProtocolViolation) } -func (m *Machine) _sideEffect(fn func() ([]byte, error)) *wire.RunEntryMessage { - bytes, err := fn() +func (m *Machine) _run(fn func(context.Context) ([]byte, error)) *wire.RunEntryMessage { + bytes, err := fn(m.ctx) if err != nil { if restate.IsTerminalError(err) { @@ -319,7 +320,7 @@ func (m *Machine) _sideEffect(fn func() ([]byte, error)) *wire.RunEntryMessage { return msg } else { - panic(m.newSideEffectFailure(err)) + panic(m.newRunFailure(err)) } } else { msg := &wire.RunEntryMessage{ @@ -335,13 +336,13 @@ func (m *Machine) _sideEffect(fn func() ([]byte, error)) *wire.RunEntryMessage { } } -type sideEffectFailure struct { +type runFailure struct { entryIndex uint32 err error } -func (m *Machine) newSideEffectFailure(err error) *sideEffectFailure { - s := &sideEffectFailure{m.entryIndex, err} +func (m *Machine) newRunFailure(err error) *runFailure { + s := &runFailure{m.entryIndex, err} m.failure = s return s } diff --git a/internal/wire/wire.go b/internal/wire/wire.go index 0eb048e..85967b8 100644 --- a/internal/wire/wire.go +++ b/internal/wire/wire.go @@ -374,7 +374,7 @@ var ( RunEntryMessageType: func(header Header, bytes []byte) (Message, error) { msg := &RunEntryMessage{} - // replayed side effects are inherently acked + // replayed run entries are inherently acked msg.Ack() return msg, proto.Unmarshal(bytes, msg) diff --git a/router.go b/router.go index 8a97c4f..0ab8857 100644 --- a/router.go +++ b/router.go @@ -72,11 +72,11 @@ type Context interface { // and delay is the duration with which to delay requests ObjectSend(object, key string, delay time.Duration) ServiceSendClient - // SideEffects runs the function (fn) until it succeeds or permanently fails. + // Run runs the function (fn) until it succeeds or permanently fails. // this stores the results of the function inside restate runtime so a replay // will produce the same value (think generating a unique id for example) - // Note: use the SideEffectAs helper function - SideEffect(fn func() ([]byte, error)) ([]byte, error) + // Note: use the RunAs helper function + Run(fn func(ctx context.Context) ([]byte, error)) ([]byte, error) Awakeable() Awakeable[[]byte] ResolveAwakeable(id string, value []byte) @@ -240,11 +240,11 @@ func SetAs[T any](ctx ObjectContext, key string, value T) error { return nil } -// SideEffectAs helper function runs a side effect function with specific concrete type as a result +// RunAs helper function runs a run function with specific concrete type as a result // it does encoding/decoding of bytes automatically using msgpack -func SideEffectAs[T any](ctx Context, fn func() (T, error)) (output T, err error) { - bytes, err := ctx.SideEffect(func() ([]byte, error) { - out, err := fn() +func RunAs[T any](ctx Context, fn func(context.Context) (T, error)) (output T, err error) { + bytes, err := ctx.Run(func(ctx context.Context) ([]byte, error) { + out, err := fn(ctx) if err != nil { return nil, err }