Skip to content

Commit

Permalink
Implement shared context
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 16, 2024
1 parent 1830197 commit e1d2a35
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 108 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
- [X] Sleep
- [x] Run
- [x] Awakeable
- [x] Shared object handlers
- [ ] Workflows

## Basic usage

Expand Down Expand Up @@ -64,7 +66,6 @@ Finally checkout

```bash
curl localhost:8080/UserSession/azmy/Checkout \
-H 'content-type: application/json' \
-d 'null'
-H 'content-type: application/json'
#{"response":true}
```
32 changes: 22 additions & 10 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ type Context interface {

// Service gets a Service accessor by name where service
// must be another service known by restate runtime
// Note: use the CallAs and SendAs helper functions to send and receive serialised values
Service(service, method string) CallClient[[]byte, []byte]

// Object gets a Object accessor by name where object
// must be another object known by restate runtime and
// key is any string representing the key for the object
// Note: use the CallAs and SendAs helper functions to send and receive serialised values
Object(object, key, method string) CallClient[[]byte, []byte]

// Run runs the function (fn), storing final results (including terminal errors)
Expand Down Expand Up @@ -115,26 +117,36 @@ type After interface {

type ObjectContext interface {
Context
KeyValueStore
KeyValueReader
KeyValueWriter
// Key retrieves the key for this virtual object invocation. This is a no-op and is
// always safe to call.
Key() string
}

type KeyValueStore interface {
// Set sets a byte array against a key
// Note: Use SetAs helper function to seamlessly store
// a value of specific type.
Set(key string, value []byte)
type ObjectSharedContext interface {
Context
KeyValueReader
// Key retrieves the key for this virtual object invocation. This is a no-op and is
// always safe to call.
Key() string
}

type KeyValueReader interface {
// Get gets value (bytes array) associated with key
// If key does not exist, this function return a nil bytes array
// Note: Use GetAs helper function to seamlessly get value
// as specific type.
// Note: Use GetAs helper function to read serialised values
Get(key string) []byte
// Keys returns a list of all associated key
Keys() []string
}

type KeyValueWriter interface {
// Set sets a byte array against a key
// Note: Use SetAs helper function to store serialised values
Set(key string, value []byte)
// Clear deletes a key
Clear(key string)
// ClearAll drops all stored state associated with key
ClearAll()
// Keys returns a list of all associated key
Keys() []string
}
25 changes: 22 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,31 @@ import (
func main() {

server := server.NewRestate().
// Handlers can be inferred from object methods
Bind(restate.Object(&userSession{})).
Bind(restate.Object(&ticketService{})).
Bind(restate.Service(&checkout{})).
Bind(restate.NewServiceRouter("health").Handler("ping", restate.NewServiceHandler(func(restate.Context, struct{}) (restate.Void, error) {
return restate.Void{}, nil
})))
// Or registered explicitly
Bind(restate.NewServiceRouter("health").Handler("ping", restate.NewServiceHandler(
func(restate.Context, struct{}) (restate.Void, error) {
return restate.Void{}, nil
}))).
Bind(restate.NewObjectRouter("counter").Handler("add", restate.NewObjectHandler(
func(ctx restate.ObjectContext, delta int) (int, error) {
count, err := restate.GetAs[int](ctx, "counter")
if err != nil && err != restate.ErrKeyNotFound {
return 0, err
}
count += delta
if err := restate.SetAs(ctx, "counter", count); err != nil {
return 0, err
}

return count, nil
})).Handler("get", restate.NewObjectSharedHandler(
func(ctx restate.ObjectSharedContext, input restate.Void) (int, error) {
return restate.GetAs[int](ctx, "counter")
})))

if err := server.Start(context.Background(), ":9080"); err != nil {
slog.Error("application exited unexpectedly", "err", err.Error())
Expand Down
12 changes: 12 additions & 0 deletions example/ticket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,15 @@ func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (v

return void, nil
}

func (t *ticketService) Status(ctx restate.ObjectSharedContext, _ restate.Void) (TicketStatus, error) {
ticketId := ctx.Key()
ctx.Log().Info("mark ticket as sold", "ticket", ticketId)

status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return status, err
}

return status, nil
}
59 changes: 3 additions & 56 deletions facilitators.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type GetOption interface {
// if there is no associated value with key, an error ErrKeyNotFound is
// returned
// it does encoding/decoding of bytes, defaulting to json codec
func GetAs[T any](ctx ObjectContext, key string, options ...GetOption) (output T, err error) {
func GetAs[T any](ctx ObjectSharedContext, key string, options ...GetOption) (output T, err error) {
opts := getOptions{}
for _, opt := range options {
opt.beforeGet(&opts)
Expand Down Expand Up @@ -220,7 +220,7 @@ func (c codecCallClient[O]) Send(input any, delay time.Duration) error {
return c.client.Send(bytes, delay)
}

// CallClientAs helper function to use a codec for encoding and decoding, defaulting to JSON
// CallAs helper function to use a codec for encoding and decoding, defaulting to JSON
func CallAs[O any](client CallClient[[]byte, []byte], options ...CallOption) CallClient[any, O] {
opts := callOptions{}
for _, opt := range options {
Expand All @@ -232,6 +232,7 @@ func CallAs[O any](client CallClient[[]byte, []byte], options ...CallOption) Cal
return codecCallClient[O]{client, opts}
}

// SendAs helper function to use a codec for encoding .Send request parameters, defaulting to JSON
func SendAs(client CallClient[[]byte, []byte], options ...CallOption) SendClient[any] {
opts := callOptions{}
for _, opt := range options {
Expand All @@ -243,18 +244,6 @@ func SendAs(client CallClient[[]byte, []byte], options ...CallOption) SendClient
return codecCallClient[struct{}]{client, opts}
}

// // ResponseFutureAs helper function to receive JSON without immediately blocking
// func ResponseFutureAs[O any](responseFuture ResponseFuture[[]byte], options ...CallOption) ResponseFuture[O] {
// opts := callOptions{}
// for _, opt := range options {
// opt.beforeCall(&opts)
// }
// if opts.codec == nil {
// opts.codec = encoding.JSONCodec{}
// }
// return decodingResponseFuture[O]{responseFuture, opts}
// }

type decodingResponseFuture[O any] struct {
ResponseFuture[[]byte]
options callOptions
Expand All @@ -268,45 +257,3 @@ func (d decodingResponseFuture[O]) Response() (output O, err error) {

return output, d.options.codec.Unmarshal(bytes, &output)
}

// // CallAsFuture helper function to send JSON and allow receiving JSON later
// func CallAsFuture[O any, I any](client CallClient[[]byte, []byte], input I) (ResponseFuture[O], error) {
// var bytes []byte
// switch any(input).(type) {
// case Void:
// default:
// var err error
// bytes, err = json.Marshal(input)
// if err != nil {
// return nil, err
// }
// }

// return ResponseFutureAs[O](client.Request(bytes)), nil
// }

// type codecSendClient struct {
// client SendClient[[]byte]
// options callOptions
// }

// func (c codecSendClient) Request(input any) error {
// bytes, err := c.options.codec.Marshal(input)
// if err != nil {
// return TerminalError(err)
// }
// return c.client.Request(bytes)
// }

// // CallClientAs helper function to use a codec for encoding, defaulting to JSON
// func SendClientAs(client SendClient[[]byte], options ...CallOption) SendClient[any] {
// opts := callOptions{}
// for _, opt := range options {
// opt.beforeCall(&opts)
// }
// if opts.codec == nil {
// opts.codec = encoding.JSONCodec{}
// }

// return codecSendClient{client, opts}
// }
61 changes: 52 additions & 9 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/restatedev/sdk-go/encoding"
"github.com/restatedev/sdk-go/internal"
)

// Void is a placeholder to signify 'no value' where a type is otherwise needed. It can be used in several contexts:
Expand All @@ -28,13 +29,17 @@ type Handler interface {
sealed()
InputPayload() *encoding.InputPayload
OutputPayload() *encoding.OutputPayload
HandlerType() *internal.ServiceHandlerType
}

// ServiceHandlerFn signature of service (unkeyed) handler function
type ServiceHandlerFn[I any, O any] func(ctx Context, input I) (output O, err error)
type ServiceHandlerFn[I any, O any] func(ctx Context, input I) (O, error)

// ObjectHandlerFn signature for object (keyed) handler function
type ObjectHandlerFn[I any, O any] func(ctx ObjectContext, input I) (output O, err error)
type ObjectHandlerFn[I any, O any] func(ctx ObjectContext, input I) (O, error)

// ObjectHandlerFn signature for object (keyed) handler function that can run concurrently with other handlers against a snapshot of state
type ObjectSharedHandlerFn[I any, O any] func(ctx ObjectSharedContext, input I) (O, error)

type serviceHandlerOptions struct {
codec encoding.PayloadCodec
Expand Down Expand Up @@ -96,6 +101,10 @@ func (h *serviceHandler[I, O]) OutputPayload() *encoding.OutputPayload {
return h.options.codec.OutputPayload()
}

func (h *serviceHandler[I, O]) HandlerType() *internal.ServiceHandlerType {
return nil
}

func (h *serviceHandler[I, O]) getOptions() *serviceHandlerOptions {
return &h.options
}
Expand All @@ -107,8 +116,11 @@ type objectHandlerOptions struct {
}

type objectHandler[I any, O any] struct {
fn ObjectHandlerFn[I, O]
options objectHandlerOptions
// only one of exclusiveFn or sharedFn should be set, as indicated by handlerType
exclusiveFn ObjectHandlerFn[I, O]
sharedFn ObjectSharedHandlerFn[I, O]
options objectHandlerOptions
handlerType internal.ServiceHandlerType
}

var _ ObjectHandler = (*objectHandler[struct{}, struct{}])(nil)
Expand All @@ -126,7 +138,24 @@ func NewObjectHandler[I any, O any](fn ObjectHandlerFn[I, O], options ...ObjectH
opts.codec = encoding.PartialVoidCodec[I, O]()
}
return &objectHandler[I, O]{
fn: fn,
exclusiveFn: fn,
options: opts,
handlerType: internal.ServiceHandlerType_EXCLUSIVE,
}
}

func NewObjectSharedHandler[I any, O any](fn ObjectSharedHandlerFn[I, O], options ...ObjectHandlerOption) *objectHandler[I, O] {
opts := objectHandlerOptions{}
for _, opt := range options {
opt.beforeObjectHandler(&opts)
}
if opts.codec == nil {
opts.codec = encoding.PartialVoidCodec[I, O]()
}
return &objectHandler[I, O]{
sharedFn: fn,
options: opts,
handlerType: internal.ServiceHandlerType_SHARED,
}
}

Expand All @@ -136,10 +165,20 @@ func (h *objectHandler[I, O]) Call(ctx ObjectContext, bytes []byte) ([]byte, err
return nil, TerminalError(fmt.Errorf("request could not be decoded into handler input type: %w", err), http.StatusBadRequest)
}

output, err := h.fn(
ctx,
input,
)
var output O
var err error
switch h.handlerType {
case internal.ServiceHandlerType_EXCLUSIVE:
output, err = h.exclusiveFn(
ctx,
input,
)
case internal.ServiceHandlerType_SHARED:
output, err = h.sharedFn(
ctx,
input,
)
}
if err != nil {
return nil, err
}
Expand All @@ -164,4 +203,8 @@ func (h *objectHandler[I, O]) getOptions() *objectHandlerOptions {
return &h.options
}

func (h *objectHandler[I, O]) HandlerType() *internal.ServiceHandlerType {
return &h.handlerType
}

func (h *objectHandler[I, O]) sealed() {}
2 changes: 1 addition & 1 deletion internal/errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type CodeError struct {
}

func (e *CodeError) Error() string {
return fmt.Sprintf("[CODE %04X] %s", e.Code, e.Inner)
return fmt.Sprintf("[%d] %s", e.Code, e.Inner)
}

func (e *CodeError) Unwrap() error {
Expand Down
19 changes: 0 additions & 19 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ func (c *Context) Service(service, method string) restate.CallClient[[]byte, []b
}
}

// func (c *Context) ServiceSend(service, method string, delay time.Duration) restate.SendClient[[]byte] {
// return &serviceSend{
// machine: c.machine,
// service: service,
// method: method,
// delay: delay,
// }
// }

func (c *Context) Object(service, key, method string) restate.CallClient[[]byte, []byte] {
return &serviceCall{
machine: c.machine,
Expand All @@ -108,16 +99,6 @@ func (c *Context) Object(service, key, method string) restate.CallClient[[]byte,
}
}

// func (c *Context) ObjectSend(service, key, method string, delay time.Duration) restate.SendClient[[]byte] {
// return &serviceSend{
// machine: c.machine,
// service: service,
// method: method,
// key: key,
// delay: delay,
// }
// }

func (c *Context) Run(fn func(ctx restate.RunContext) ([]byte, error)) ([]byte, error) {
return c.machine.run(fn)
}
Expand Down
Loading

0 comments on commit e1d2a35

Please sign in to comment.