From e1d2a3551f78952ac777ca903323bc5f9ce194ec Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Tue, 16 Jul 2024 15:17:26 +0100 Subject: [PATCH] Implement shared context --- README.md | 5 ++-- context.go | 32 +++++++++++++------- example/main.go | 25 ++++++++++++++-- example/ticket_service.go | 12 ++++++++ facilitators.go | 59 ++----------------------------------- handler.go | 61 +++++++++++++++++++++++++++++++++------ internal/errors/error.go | 2 +- internal/state/state.go | 19 ------------ reflect.go | 34 +++++++++++++++++----- router.go | 3 +- server/restate.go | 1 + 11 files changed, 145 insertions(+), 108 deletions(-) diff --git a/README.md b/README.md index 45b2f78..f1d4a92 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ - [X] Sleep - [x] Run - [x] Awakeable +- [x] Shared object handlers +- [ ] Workflows ## Basic usage @@ -64,7 +66,6 @@ Finally checkout ```bash curl localhost:8080/UserSession/azmy/Checkout \ - -H 'content-type: application/json' \ - -d 'null' + -H 'content-type: application/json' #{"response":true} ``` diff --git a/context.go b/context.go index 28bfe54..4027314 100644 --- a/context.go +++ b/context.go @@ -26,11 +26,13 @@ type Context interface { // Service gets a Service accessor by name where service // must be another service known by restate runtime + // Note: use the CallAs and SendAs helper functions to send and receive serialised values Service(service, method string) CallClient[[]byte, []byte] // Object gets a Object accessor by name where object // must be another object known by restate runtime and // key is any string representing the key for the object + // Note: use the CallAs and SendAs helper functions to send and receive serialised values Object(object, key, method string) CallClient[[]byte, []byte] // Run runs the function (fn), storing final results (including terminal errors) @@ -115,26 +117,36 @@ type After interface { type ObjectContext interface { Context - KeyValueStore + KeyValueReader + KeyValueWriter // Key retrieves the key for this virtual object invocation. This is a no-op and is // always safe to call. Key() string } -type KeyValueStore interface { - // Set sets a byte array against a key - // Note: Use SetAs helper function to seamlessly store - // a value of specific type. - Set(key string, value []byte) +type ObjectSharedContext interface { + Context + KeyValueReader + // Key retrieves the key for this virtual object invocation. This is a no-op and is + // always safe to call. + Key() string +} + +type KeyValueReader interface { // Get gets value (bytes array) associated with key // If key does not exist, this function return a nil bytes array - // Note: Use GetAs helper function to seamlessly get value - // as specific type. + // Note: Use GetAs helper function to read serialised values Get(key string) []byte + // Keys returns a list of all associated key + Keys() []string +} + +type KeyValueWriter interface { + // Set sets a byte array against a key + // Note: Use SetAs helper function to store serialised values + Set(key string, value []byte) // Clear deletes a key Clear(key string) // ClearAll drops all stored state associated with key ClearAll() - // Keys returns a list of all associated key - Keys() []string } diff --git a/example/main.go b/example/main.go index d1c9127..d8cb761 100644 --- a/example/main.go +++ b/example/main.go @@ -12,12 +12,31 @@ import ( func main() { server := server.NewRestate(). + // Handlers can be inferred from object methods Bind(restate.Object(&userSession{})). Bind(restate.Object(&ticketService{})). Bind(restate.Service(&checkout{})). - Bind(restate.NewServiceRouter("health").Handler("ping", restate.NewServiceHandler(func(restate.Context, struct{}) (restate.Void, error) { - return restate.Void{}, nil - }))) + // Or registered explicitly + Bind(restate.NewServiceRouter("health").Handler("ping", restate.NewServiceHandler( + func(restate.Context, struct{}) (restate.Void, error) { + return restate.Void{}, nil + }))). + Bind(restate.NewObjectRouter("counter").Handler("add", restate.NewObjectHandler( + func(ctx restate.ObjectContext, delta int) (int, error) { + count, err := restate.GetAs[int](ctx, "counter") + if err != nil && err != restate.ErrKeyNotFound { + return 0, err + } + count += delta + if err := restate.SetAs(ctx, "counter", count); err != nil { + return 0, err + } + + return count, nil + })).Handler("get", restate.NewObjectSharedHandler( + func(ctx restate.ObjectSharedContext, input restate.Void) (int, error) { + return restate.GetAs[int](ctx, "counter") + }))) if err := server.Start(context.Background(), ":9080"); err != nil { slog.Error("application exited unexpectedly", "err", err.Error()) diff --git a/example/ticket_service.go b/example/ticket_service.go index de9a0aa..513ddde 100644 --- a/example/ticket_service.go +++ b/example/ticket_service.go @@ -64,3 +64,15 @@ func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (v return void, nil } + +func (t *ticketService) Status(ctx restate.ObjectSharedContext, _ restate.Void) (TicketStatus, error) { + ticketId := ctx.Key() + ctx.Log().Info("mark ticket as sold", "ticket", ticketId) + + status, err := restate.GetAs[TicketStatus](ctx, "status") + if err != nil && !errors.Is(err, restate.ErrKeyNotFound) { + return status, err + } + + return status, nil +} diff --git a/facilitators.go b/facilitators.go index 5d8cecb..57162b0 100644 --- a/facilitators.go +++ b/facilitators.go @@ -19,7 +19,7 @@ type GetOption interface { // if there is no associated value with key, an error ErrKeyNotFound is // returned // it does encoding/decoding of bytes, defaulting to json codec -func GetAs[T any](ctx ObjectContext, key string, options ...GetOption) (output T, err error) { +func GetAs[T any](ctx ObjectSharedContext, key string, options ...GetOption) (output T, err error) { opts := getOptions{} for _, opt := range options { opt.beforeGet(&opts) @@ -220,7 +220,7 @@ func (c codecCallClient[O]) Send(input any, delay time.Duration) error { return c.client.Send(bytes, delay) } -// CallClientAs helper function to use a codec for encoding and decoding, defaulting to JSON +// CallAs helper function to use a codec for encoding and decoding, defaulting to JSON func CallAs[O any](client CallClient[[]byte, []byte], options ...CallOption) CallClient[any, O] { opts := callOptions{} for _, opt := range options { @@ -232,6 +232,7 @@ func CallAs[O any](client CallClient[[]byte, []byte], options ...CallOption) Cal return codecCallClient[O]{client, opts} } +// SendAs helper function to use a codec for encoding .Send request parameters, defaulting to JSON func SendAs(client CallClient[[]byte, []byte], options ...CallOption) SendClient[any] { opts := callOptions{} for _, opt := range options { @@ -243,18 +244,6 @@ func SendAs(client CallClient[[]byte, []byte], options ...CallOption) SendClient return codecCallClient[struct{}]{client, opts} } -// // ResponseFutureAs helper function to receive JSON without immediately blocking -// func ResponseFutureAs[O any](responseFuture ResponseFuture[[]byte], options ...CallOption) ResponseFuture[O] { -// opts := callOptions{} -// for _, opt := range options { -// opt.beforeCall(&opts) -// } -// if opts.codec == nil { -// opts.codec = encoding.JSONCodec{} -// } -// return decodingResponseFuture[O]{responseFuture, opts} -// } - type decodingResponseFuture[O any] struct { ResponseFuture[[]byte] options callOptions @@ -268,45 +257,3 @@ func (d decodingResponseFuture[O]) Response() (output O, err error) { return output, d.options.codec.Unmarshal(bytes, &output) } - -// // CallAsFuture helper function to send JSON and allow receiving JSON later -// func CallAsFuture[O any, I any](client CallClient[[]byte, []byte], input I) (ResponseFuture[O], error) { -// var bytes []byte -// switch any(input).(type) { -// case Void: -// default: -// var err error -// bytes, err = json.Marshal(input) -// if err != nil { -// return nil, err -// } -// } - -// return ResponseFutureAs[O](client.Request(bytes)), nil -// } - -// type codecSendClient struct { -// client SendClient[[]byte] -// options callOptions -// } - -// func (c codecSendClient) Request(input any) error { -// bytes, err := c.options.codec.Marshal(input) -// if err != nil { -// return TerminalError(err) -// } -// return c.client.Request(bytes) -// } - -// // CallClientAs helper function to use a codec for encoding, defaulting to JSON -// func SendClientAs(client SendClient[[]byte], options ...CallOption) SendClient[any] { -// opts := callOptions{} -// for _, opt := range options { -// opt.beforeCall(&opts) -// } -// if opts.codec == nil { -// opts.codec = encoding.JSONCodec{} -// } - -// return codecSendClient{client, opts} -// } diff --git a/handler.go b/handler.go index 66a28e9..37a8b6d 100644 --- a/handler.go +++ b/handler.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/restatedev/sdk-go/encoding" + "github.com/restatedev/sdk-go/internal" ) // Void is a placeholder to signify 'no value' where a type is otherwise needed. It can be used in several contexts: @@ -28,13 +29,17 @@ type Handler interface { sealed() InputPayload() *encoding.InputPayload OutputPayload() *encoding.OutputPayload + HandlerType() *internal.ServiceHandlerType } // ServiceHandlerFn signature of service (unkeyed) handler function -type ServiceHandlerFn[I any, O any] func(ctx Context, input I) (output O, err error) +type ServiceHandlerFn[I any, O any] func(ctx Context, input I) (O, error) // ObjectHandlerFn signature for object (keyed) handler function -type ObjectHandlerFn[I any, O any] func(ctx ObjectContext, input I) (output O, err error) +type ObjectHandlerFn[I any, O any] func(ctx ObjectContext, input I) (O, error) + +// ObjectHandlerFn signature for object (keyed) handler function that can run concurrently with other handlers against a snapshot of state +type ObjectSharedHandlerFn[I any, O any] func(ctx ObjectSharedContext, input I) (O, error) type serviceHandlerOptions struct { codec encoding.PayloadCodec @@ -96,6 +101,10 @@ func (h *serviceHandler[I, O]) OutputPayload() *encoding.OutputPayload { return h.options.codec.OutputPayload() } +func (h *serviceHandler[I, O]) HandlerType() *internal.ServiceHandlerType { + return nil +} + func (h *serviceHandler[I, O]) getOptions() *serviceHandlerOptions { return &h.options } @@ -107,8 +116,11 @@ type objectHandlerOptions struct { } type objectHandler[I any, O any] struct { - fn ObjectHandlerFn[I, O] - options objectHandlerOptions + // only one of exclusiveFn or sharedFn should be set, as indicated by handlerType + exclusiveFn ObjectHandlerFn[I, O] + sharedFn ObjectSharedHandlerFn[I, O] + options objectHandlerOptions + handlerType internal.ServiceHandlerType } var _ ObjectHandler = (*objectHandler[struct{}, struct{}])(nil) @@ -126,7 +138,24 @@ func NewObjectHandler[I any, O any](fn ObjectHandlerFn[I, O], options ...ObjectH opts.codec = encoding.PartialVoidCodec[I, O]() } return &objectHandler[I, O]{ - fn: fn, + exclusiveFn: fn, + options: opts, + handlerType: internal.ServiceHandlerType_EXCLUSIVE, + } +} + +func NewObjectSharedHandler[I any, O any](fn ObjectSharedHandlerFn[I, O], options ...ObjectHandlerOption) *objectHandler[I, O] { + opts := objectHandlerOptions{} + for _, opt := range options { + opt.beforeObjectHandler(&opts) + } + if opts.codec == nil { + opts.codec = encoding.PartialVoidCodec[I, O]() + } + return &objectHandler[I, O]{ + sharedFn: fn, + options: opts, + handlerType: internal.ServiceHandlerType_SHARED, } } @@ -136,10 +165,20 @@ func (h *objectHandler[I, O]) Call(ctx ObjectContext, bytes []byte) ([]byte, err return nil, TerminalError(fmt.Errorf("request could not be decoded into handler input type: %w", err), http.StatusBadRequest) } - output, err := h.fn( - ctx, - input, - ) + var output O + var err error + switch h.handlerType { + case internal.ServiceHandlerType_EXCLUSIVE: + output, err = h.exclusiveFn( + ctx, + input, + ) + case internal.ServiceHandlerType_SHARED: + output, err = h.sharedFn( + ctx, + input, + ) + } if err != nil { return nil, err } @@ -164,4 +203,8 @@ func (h *objectHandler[I, O]) getOptions() *objectHandlerOptions { return &h.options } +func (h *objectHandler[I, O]) HandlerType() *internal.ServiceHandlerType { + return &h.handlerType +} + func (h *objectHandler[I, O]) sealed() {} diff --git a/internal/errors/error.go b/internal/errors/error.go index e7ca7ff..1cd0ca5 100644 --- a/internal/errors/error.go +++ b/internal/errors/error.go @@ -19,7 +19,7 @@ type CodeError struct { } func (e *CodeError) Error() string { - return fmt.Sprintf("[CODE %04X] %s", e.Code, e.Inner) + return fmt.Sprintf("[%d] %s", e.Code, e.Inner) } func (e *CodeError) Unwrap() error { diff --git a/internal/state/state.go b/internal/state/state.go index 3970c2e..3bc05ad 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -90,15 +90,6 @@ func (c *Context) Service(service, method string) restate.CallClient[[]byte, []b } } -// func (c *Context) ServiceSend(service, method string, delay time.Duration) restate.SendClient[[]byte] { -// return &serviceSend{ -// machine: c.machine, -// service: service, -// method: method, -// delay: delay, -// } -// } - func (c *Context) Object(service, key, method string) restate.CallClient[[]byte, []byte] { return &serviceCall{ machine: c.machine, @@ -108,16 +99,6 @@ func (c *Context) Object(service, key, method string) restate.CallClient[[]byte, } } -// func (c *Context) ObjectSend(service, key, method string, delay time.Duration) restate.SendClient[[]byte] { -// return &serviceSend{ -// machine: c.machine, -// service: service, -// method: method, -// key: key, -// delay: delay, -// } -// } - func (c *Context) Run(fn func(ctx restate.RunContext) ([]byte, error)) ([]byte, error) { return c.machine.run(fn) } diff --git a/reflect.go b/reflect.go index 8dd146c..f83e251 100644 --- a/reflect.go +++ b/reflect.go @@ -6,6 +6,7 @@ import ( "reflect" "github.com/restatedev/sdk-go/encoding" + "github.com/restatedev/sdk-go/internal" ) type serviceNamer interface { @@ -13,16 +14,17 @@ type serviceNamer interface { } var ( - typeOfContext = reflect.TypeOf((*Context)(nil)).Elem() - typeOfObjectContext = reflect.TypeOf((*ObjectContext)(nil)).Elem() - typeOfVoid = reflect.TypeOf((*Void)(nil)).Elem() - typeOfError = reflect.TypeOf((*error)(nil)).Elem() + typeOfContext = reflect.TypeOf((*Context)(nil)).Elem() + typeOfObjectContext = reflect.TypeOf((*ObjectContext)(nil)).Elem() + typeOfSharedObjectContext = reflect.TypeOf((*ObjectSharedContext)(nil)).Elem() + typeOfVoid = reflect.TypeOf((*Void)(nil)).Elem() + typeOfError = reflect.TypeOf((*error)(nil)).Elem() ) // Object converts a struct with methods into a Virtual Object where each correctly-typed // and exported method of the struct will become a handler on the Object. The Object name defaults // to the name of the struct, but this can be overidden by providing a `ServiceName() string` method. -// The handler name is the name of the method. Handler methods should be of the type `ObjectHandlerFn[I, O]`. +// The handler name is the name of the method. Handler methods should be of the type `ObjectHandlerFn[I, O]` or `ObjectSharedHandlerFn[I, O]`. // Input types I will be deserialised with the provided codec (defaults to JSON) except when they are restate.Void, // in which case no input bytes or content type may be sent. // Output types O will be serialised with the provided codec (defaults to JSON) except when they are restate.Void, @@ -51,7 +53,15 @@ func Object(object any, options ...ObjectRouterOption) *ObjectRouter { continue } - if ctxType := mtype.In(1); ctxType != typeOfObjectContext { + var handlerType internal.ServiceHandlerType + + switch mtype.In(1) { + case typeOfObjectContext: + handlerType = internal.ServiceHandlerType_EXCLUSIVE + case typeOfSharedObjectContext: + handlerType = internal.ServiceHandlerType_SHARED + default: + // first parameter is not an object context continue } @@ -82,6 +92,7 @@ func Object(object any, options ...ObjectRouterOption) *ObjectRouter { router.Handler(mname, &objectReflectHandler{ objectHandlerOptions{codec}, + handlerType, reflectHandler{ fn: method.Func, receiver: val, @@ -181,7 +192,8 @@ type reflectHandler struct { func (h *reflectHandler) sealed() {} type objectReflectHandler struct { - options objectHandlerOptions + options objectHandlerOptions + handlerType internal.ServiceHandlerType reflectHandler } @@ -227,6 +239,10 @@ func (h *objectReflectHandler) OutputPayload() *encoding.OutputPayload { return h.options.codec.OutputPayload() } +func (h *objectReflectHandler) HandlerType() *internal.ServiceHandlerType { + return &h.handlerType +} + type serviceReflectHandler struct { options serviceHandlerOptions reflectHandler @@ -273,3 +289,7 @@ func (h *serviceReflectHandler) InputPayload() *encoding.InputPayload { func (h *serviceReflectHandler) OutputPayload() *encoding.OutputPayload { return h.options.codec.OutputPayload() } + +func (h *serviceReflectHandler) HandlerType() *internal.ServiceHandlerType { + return nil +} diff --git a/router.go b/router.go index 1663fe0..3659ca2 100644 --- a/router.go +++ b/router.go @@ -2,13 +2,14 @@ package restate import ( "fmt" + "net/http" "github.com/restatedev/sdk-go/encoding" "github.com/restatedev/sdk-go/internal" ) var ( - ErrKeyNotFound = fmt.Errorf("key not found") + ErrKeyNotFound = TerminalError(fmt.Errorf("key not found"), http.StatusNotFound) ) // Router interface diff --git a/server/restate.go b/server/restate.go index 366717f..8b60b78 100644 --- a/server/restate.go +++ b/server/restate.go @@ -109,6 +109,7 @@ func (r *Restate) discover() (resource *internal.Endpoint, err error) { Name: name, Input: handler.InputPayload(), Output: handler.OutputPayload(), + Ty: handler.HandlerType(), }) } resource.Services = append(resource.Services, service)