Skip to content

Commit

Permalink
Avoid channels in awakeables
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 9, 2024
1 parent 6cbc82d commit 7f7514e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 46 deletions.
41 changes: 13 additions & 28 deletions internal/state/awakeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/base64"
"encoding/binary"
"fmt"

restate "github.com/restatedev/sdk-go"
"github.com/restatedev/sdk-go/generated/proto/protocol"
Expand Down Expand Up @@ -32,35 +33,19 @@ type completionAwakeable struct {
}

func (c *completionAwakeable) Id() string { return awakeableID(c.invocationID, c.entryIndex) }
func (c *completionAwakeable) Chan() <-chan restate.Result[[]byte] {
ch := make(chan restate.Result[[]byte], 1)
if c.entry.Completed() {
// fast path
ch <- resultFromAwakeable(c.entry)
return ch
}
// slow path
go func() {
if err := c.entry.Await(c.ctx); err != nil {
ch <- restate.Result[[]byte]{Err: err}
} else {
ch <- resultFromAwakeable(c.entry)
func (c *completionAwakeable) Result() ([]byte, error) {
if err := c.entry.Await(c.ctx); err != nil {
return nil, err
} else {
switch result := c.entry.Result.(type) {
case *protocol.AwakeableEntryMessage_Value:
return result.Value, nil
case *protocol.AwakeableEntryMessage_Failure:
return nil, ErrorFromFailure(result.Failure)
default:
return nil, fmt.Errorf("unexpected result in completed awakeable entry: %v", c.entry.Result)
}
}()
return ch
}

type completedAwakeable[T any] struct {
invocationID []byte
entryIndex uint32
result restate.Result[T]
}

func (c *completedAwakeable[T]) Id() string { return awakeableID(c.invocationID, c.entryIndex) }
func (c *completedAwakeable[T]) Chan() <-chan restate.Result[T] {
ch := make(chan restate.Result[T], 1)
ch <- c.result
return ch
}
}

func awakeableID(invocationID []byte, entryIndex uint32) string {
Expand Down
28 changes: 10 additions & 18 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func SideEffectAs[T any](ctx Context, fn func() (T, error)) (output T, err error

type Awakeable[T any] interface {
Id() string
Chan() <-chan Result[T]
Result() (T, error)
}

type Result[T any] struct {
Expand All @@ -267,23 +267,15 @@ type decodingAwakeable[T any] struct {
}

func (d decodingAwakeable[T]) Id() string { return d.inner.Id() }
func (d decodingAwakeable[T]) Chan() <-chan Result[T] {
inner := d.inner.Chan()
out := make(chan Result[T], 1)
go func() {
result := <-inner
if result.Err != nil {
out <- Result[T]{Err: result.Err}
} else {
var value T
if err := json.Unmarshal(result.Value, &value); err != nil {
out <- Result[T]{Err: TerminalError(err)}
} else {
out <- Result[T]{Value: value}
}
}
}()
return out
func (d decodingAwakeable[T]) Result() (out T, err error) {
bytes, err := d.inner.Result()
if err != nil {
return out, err
}
if err := json.Unmarshal(bytes, &out); err != nil {
return out, err
}
return
}

func AwakeableAs[T any](ctx Context) (Awakeable[T], error) {
Expand Down

0 comments on commit 7f7514e

Please sign in to comment.