Skip to content

Commit

Permalink
Rename side effect to run
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 12, 2024
1 parent 1a0578a commit 7102ef0
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 27 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ Calling other services right now is done completely by name, hence it's not very
- [x] Remote service call over restate runtime
- [X] Delayed execution of remote services
- [X] Sleep
- [x] Side effects
- Implementation might differ from as intended by restate since it's not documented and based on reverse engineering of the TypeScript SDK
- [ ] Awakeable
- [x] Run
- [x] Awakeable

## Basic usage

Expand Down
5 changes: 3 additions & 2 deletions example/checkout.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"

"github.com/google/uuid"
Expand All @@ -19,7 +20,7 @@ type PaymentResponse struct {
}

func payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) {
uuid, err := restate.SideEffectAs(ctx, func() (string, error) {
uuid, err := restate.RunAs(ctx, func(ctx context.Context) (string, error) {
uuid := uuid.New()
return uuid.String(), nil
})
Expand All @@ -36,7 +37,7 @@ func payment(ctx restate.Context, request PaymentRequest) (response PaymentRespo

response.Price = price
i := 0
_, err = restate.SideEffectAs(ctx, func() (bool, error) {
_, err = restate.RunAs(ctx, func(ctx context.Context) (bool, error) {
log := log.With().Str("uuid", uuid).Int("price", price).Logger()
if i > 2 {
log.Info().Msg("payment succeeded")
Expand Down
8 changes: 4 additions & 4 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (c *Context) ObjectSend(service, key string, delay time.Duration) restate.S
}
}

func (c *Context) SideEffect(fn func() ([]byte, error)) ([]byte, error) {
return c.machine.sideEffect(fn)
func (c *Context) Run(fn func(ctx context.Context) ([]byte, error)) ([]byte, error) {
return c.machine.run(fn)
}

func (c *Context) Awakeable() restate.Awakeable[[]byte] {
Expand Down Expand Up @@ -266,8 +266,8 @@ The journal entry at position %d was:
})

return
case *sideEffectFailure:
m.log.Error().Err(typ.err).Msg("Side effect returned a failure, returning error to Restate")
case *runFailure:
m.log.Error().Err(typ.err).Msg("Run returned a failure, returning error to Restate")

if err := m.protocol.Write(&wire.ErrorMessage{
ErrorMessage: protocol.ErrorMessage{
Expand Down
21 changes: 11 additions & 10 deletions internal/state/sys.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"bytes"
"context"
"fmt"
"sort"
"time"
Expand Down Expand Up @@ -273,18 +274,18 @@ func (m *Machine) _sleep(d time.Duration) *wire.SleepEntryMessage {
return msg
}

func (m *Machine) sideEffect(fn func() ([]byte, error)) ([]byte, error) {
func (m *Machine) run(fn func(context.Context) ([]byte, error)) ([]byte, error) {
entry, entryIndex := replayOrNew(
m,
func(entry *wire.RunEntryMessage) *wire.RunEntryMessage {
return entry
},
func() *wire.RunEntryMessage {
return m._sideEffect(fn)
return m._run(fn)
},
)

// side effect must be acknowledged before proceeding
// run entry must be acknowledged before proceeding
entry.Await(m.suspensionCtx, entryIndex)

switch result := entry.Result.(type) {
Expand All @@ -297,11 +298,11 @@ func (m *Machine) sideEffect(fn func() ([]byte, error)) ([]byte, error) {
return nil, nil
}

return nil, restate.TerminalError(fmt.Errorf("side effect entry had invalid result: %v", entry.Result), errors.ErrProtocolViolation)
return nil, restate.TerminalError(fmt.Errorf("run entry had invalid result: %v", entry.Result), errors.ErrProtocolViolation)
}

func (m *Machine) _sideEffect(fn func() ([]byte, error)) *wire.RunEntryMessage {
bytes, err := fn()
func (m *Machine) _run(fn func(context.Context) ([]byte, error)) *wire.RunEntryMessage {
bytes, err := fn(m.ctx)

if err != nil {
if restate.IsTerminalError(err) {
Expand All @@ -319,7 +320,7 @@ func (m *Machine) _sideEffect(fn func() ([]byte, error)) *wire.RunEntryMessage {

return msg
} else {
panic(m.newSideEffectFailure(err))
panic(m.newRunFailure(err))
}
} else {
msg := &wire.RunEntryMessage{
Expand All @@ -335,13 +336,13 @@ func (m *Machine) _sideEffect(fn func() ([]byte, error)) *wire.RunEntryMessage {
}
}

type sideEffectFailure struct {
type runFailure struct {
entryIndex uint32
err error
}

func (m *Machine) newSideEffectFailure(err error) *sideEffectFailure {
s := &sideEffectFailure{m.entryIndex, err}
func (m *Machine) newRunFailure(err error) *runFailure {
s := &runFailure{m.entryIndex, err}
m.failure = s
return s
}
2 changes: 1 addition & 1 deletion internal/wire/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ var (
RunEntryMessageType: func(header Header, bytes []byte) (Message, error) {
msg := &RunEntryMessage{}

// replayed side effects are inherently acked
// replayed run entries are inherently acked
msg.Ack()

return msg, proto.Unmarshal(bytes, msg)
Expand Down
14 changes: 7 additions & 7 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ type Context interface {
// and delay is the duration with which to delay requests
ObjectSend(object, key string, delay time.Duration) ServiceSendClient

// SideEffects runs the function (fn) until it succeeds or permanently fails.
// Run runs the function (fn) until it succeeds or permanently fails.
// this stores the results of the function inside restate runtime so a replay
// will produce the same value (think generating a unique id for example)
// Note: use the SideEffectAs helper function
SideEffect(fn func() ([]byte, error)) ([]byte, error)
// Note: use the RunAs helper function
Run(fn func(ctx context.Context) ([]byte, error)) ([]byte, error)

Awakeable() Awakeable[[]byte]
ResolveAwakeable(id string, value []byte)
Expand Down Expand Up @@ -240,11 +240,11 @@ func SetAs[T any](ctx ObjectContext, key string, value T) error {
return nil
}

// SideEffectAs helper function runs a side effect function with specific concrete type as a result
// RunAs helper function runs a run function with specific concrete type as a result
// it does encoding/decoding of bytes automatically using msgpack
func SideEffectAs[T any](ctx Context, fn func() (T, error)) (output T, err error) {
bytes, err := ctx.SideEffect(func() ([]byte, error) {
out, err := fn()
func RunAs[T any](ctx Context, fn func(context.Context) (T, error)) (output T, err error) {
bytes, err := ctx.Run(func(ctx context.Context) ([]byte, error) {
out, err := fn(ctx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7102ef0

Please sign in to comment.