Skip to content

Commit

Permalink
Align more with typescript sdk endpoint binding api
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 12, 2024
1 parent 7102ef0 commit 2c02142
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 140 deletions.
22 changes: 6 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,6 @@

[Restate](https://restate.dev/) is a system for easily building resilient applications using *distributed durable async/await*. This repository contains the Restate SDK for writing services in **Golang**.

This SDK is an individual effort to build a golang SDK for restate runtime. The implementation is based on the service protocol documentation found [here](https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md) and a lot of experimentation with the protocol.

This means that it's not granted that this SDK matches exactly what `restate` has intended but it's a best effort interpretation of the docs

Since **service discovery** was not documented (or at least I could not find any documentation for it), the implementation is based on reverse engineering the TypeScript SDK.

This implementation of the SDK **ONLY** supports `dynrpc`. There is noway yet that you can define your service interface with `gRPC`

Calling other services right now is done completely by name, hence it's not very safe since you can miss up arguments list/type for example but hopefully later on we can generate stubs or use `gRPC` interfaces to define services.

## Features implemented

- [x] Log replay (resume of execution on failure)
Expand Down Expand Up @@ -57,14 +47,14 @@ In yet a third terminal do the following steps
- Add tickets to basket

```bash
curl -v localhost:8080/UserSession/addTicket \
curl -v localhost:8080/UserSession/azmy/AddTicket \
-H 'content-type: application/json' \
-d '{"key": "azmy", "request": "ticket-1"}'
-d '"ticket-1"'

# {"response":true}
curl -v localhost:8080/UserSession/addTicket \
curl -v localhost:8080/UserSession/azmy/AddTicket \
-H 'content-type: application/json' \
-d '{"key": "azmy", "request": "ticket-2"}'
-d '"ticket-2"'
# {"response":true}
```

Expand All @@ -73,8 +63,8 @@ Trying adding the same tickets again should return `false` since they are alread
Finally checkout

```bash
curl localhost:8080/UserSession/checkout \
curl localhost:8080/UserSession/azmy/Checkout \
-H 'content-type: application/json' \
-d '{"key": "azmy", "request": null}'
-d 'null'
#{"response":true}
```
26 changes: 14 additions & 12 deletions example/checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"math/rand"

"github.com/google/uuid"
restate "github.com/restatedev/sdk-go"
Expand All @@ -19,7 +20,15 @@ type PaymentResponse struct {
Price int `json:"price"`
}

func payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) {
type checkout struct{}

func (c *checkout) Name() string {
return CheckoutServiceName
}

const CheckoutServiceName = "Checkout"

func (c *checkout) Payment(ctx restate.Context, request PaymentRequest) (response PaymentResponse, err error) {
uuid, err := restate.RunAs(ctx, func(ctx context.Context) (string, error) {
uuid := uuid.New()
return uuid.String(), nil
Expand All @@ -36,17 +45,15 @@ func payment(ctx restate.Context, request PaymentRequest) (response PaymentRespo
price := len(request.Tickets) * 30

response.Price = price
i := 0
_, err = restate.RunAs(ctx, func(ctx context.Context) (bool, error) {
log := log.With().Str("uuid", uuid).Int("price", price).Logger()
if i > 2 {
if rand.Float64() < 0.5 {
log.Info().Msg("payment succeeded")
return true, nil
} else {
log.Error().Msg("payment failed")
return false, fmt.Errorf("failed to pay")
}

log.Error().Msg("payment failed")
i += 1
return false, fmt.Errorf("failed to pay")
})

if err != nil {
Expand All @@ -57,8 +64,3 @@ func payment(ctx restate.Context, request PaymentRequest) (response PaymentRespo

return response, nil
}

var (
Checkout = restate.NewServiceRouter().
Handler("checkout", restate.NewServiceHandler(payment))
)
13 changes: 4 additions & 9 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,21 @@ import (
"context"
"os"

restate "github.com/restatedev/sdk-go"
"github.com/restatedev/sdk-go/server"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

const (
UserSessionServiceName = "UserSession"
TicketServiceName = "TicketService"
CheckoutServiceName = "Checkout"
)

func main() {

log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
zerolog.SetGlobalLevel(zerolog.InfoLevel)

server := server.NewRestate().
Bind(UserSessionServiceName, UserSession).
Bind(TicketServiceName, TicketService).
Bind(CheckoutServiceName, Checkout)
Bind(restate.Object(&userSession{})).
Bind(restate.Object(&ticketService{})).
Bind(restate.Service(&checkout{}))

if err := server.Start(context.Background(), ":9080"); err != nil {
log.Error().Err(err).Msg("application exited unexpectedly")
Expand Down
19 changes: 9 additions & 10 deletions example/ticket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ const (
TicketSold TicketStatus = 2
)

func reserve(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
const TicketServiceName = "TicketService"

type ticketService struct{}

func (t *ticketService) Name() string { return TicketServiceName }

func (t *ticketService) Reserve(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
status, err := restate.GetAs[TicketStatus](ctx, "status")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return false, err
Expand All @@ -28,7 +34,7 @@ func reserve(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
return false, nil
}

func unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
func (t *ticketService) Unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
log.Info().Str("ticket", ticketId).Msg("un-reserving ticket")
status, err := restate.GetAs[TicketStatus](ctx, "status")
Expand All @@ -44,7 +50,7 @@ func unreserve(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, er
return void, nil
}

func markAsSold(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
func (t *ticketService) MarkAsSold(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, err error) {
ticketId := ctx.Key()
log.Info().Str("ticket", ticketId).Msg("mark ticket as sold")

Expand All @@ -59,10 +65,3 @@ func markAsSold(ctx restate.ObjectContext, _ restate.Void) (void restate.Void, e

return void, nil
}

var (
TicketService = restate.NewObjectRouter().
Handler("reserve", restate.NewObjectHandler(reserve)).
Handler("unreserve", restate.NewObjectHandler(unreserve)).
Handler("markAsSold", restate.NewObjectHandler(markAsSold))
)
31 changes: 16 additions & 15 deletions example/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (
"github.com/rs/zerolog/log"
)

func addTicket(ctx restate.ObjectContext, ticketId string) (bool, error) {
const UserSessionServiceName = "UserSession"

type userSession struct{}

func (u *userSession) Name() string {
return UserSessionServiceName
}

func (u *userSession) AddTicket(ctx restate.ObjectContext, ticketId string) (bool, error) {
userId := ctx.Key()

var success bool
if err := ctx.Object(TicketServiceName, ticketId).Method("reserve").Request(userId).Response(&success); err != nil {
if err := ctx.Object(TicketServiceName, ticketId).Method("Reserve").Request(userId).Response(&success); err != nil {
return false, err
}

Expand All @@ -34,14 +42,14 @@ func addTicket(ctx restate.ObjectContext, ticketId string) (bool, error) {
return false, err
}

if err := ctx.ObjectSend(UserSessionServiceName, ticketId, 15*time.Minute).Method("expireTicket").Request(ticketId); err != nil {
if err := ctx.ObjectSend(UserSessionServiceName, ticketId, 15*time.Minute).Method("ExpireTicket").Request(ticketId); err != nil {
return false, err
}

return true, nil
}

func expireTicket(ctx restate.ObjectContext, ticketId string) (void restate.Void, err error) {
func (u *userSession) ExpireTicket(ctx restate.ObjectContext, ticketId string) (void restate.Void, err error) {
tickets, err := restate.GetAs[[]string](ctx, "tickets")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
return void, err
Expand All @@ -63,10 +71,10 @@ func expireTicket(ctx restate.ObjectContext, ticketId string) (void restate.Void
return void, err
}

return void, ctx.ObjectSend(TicketServiceName, ticketId, 0).Method("unreserve").Request(nil)
return void, ctx.ObjectSend(TicketServiceName, ticketId, 0).Method("Unreserve").Request(nil)
}

func checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
func (u *userSession) Checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
userId := ctx.Key()
tickets, err := restate.GetAs[[]string](ctx, "tickets")
if err != nil && !errors.Is(err, restate.ErrKeyNotFound) {
Expand All @@ -81,7 +89,7 @@ func checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {

var response PaymentResponse
if err := ctx.Object(CheckoutServiceName, "").
Method("checkout").
Method("Payment").
Request(PaymentRequest{UserID: userId, Tickets: tickets}).
Response(&response); err != nil {
return false, err
Expand All @@ -90,7 +98,7 @@ func checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
log.Info().Str("id", response.ID).Int("price", response.Price).Msg("payment details")

for _, ticket := range tickets {
call := ctx.ObjectSend(TicketServiceName, ticket, 0).Method("markAsSold")
call := ctx.ObjectSend(TicketServiceName, ticket, 0).Method("MarkAsSold")
if err := call.Request(nil); err != nil {
return false, err
}
Expand All @@ -99,10 +107,3 @@ func checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
ctx.Clear("tickets")
return true, nil
}

var (
UserSession = restate.NewObjectRouter().
Handler("addTicket", restate.NewObjectHandler(addTicket)).
Handler("expireTicket", restate.NewObjectHandler(expireTicket)).
Handler("checkout", restate.NewObjectHandler(checkout))
)
80 changes: 32 additions & 48 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package restate
import (
"encoding/json"
"fmt"
"reflect"
)

// Void is a placeholder used usually for functions that their signature require that
Expand All @@ -19,95 +18,80 @@ func (v *Void) UnmarshalJSON(_ []byte) error {
return nil
}

type ServiceHandler struct {
fn reflect.Value
input reflect.Type
output reflect.Type
type serviceHandler[I any, O any] struct {
fn ServiceHandlerFn[I, O]
}

// NewServiceHandler create a new handler for a service
func NewServiceHandler[I any, O any](fn ServiceHandlerFn[I, O]) *ServiceHandler {
return &ServiceHandler{
fn: reflect.ValueOf(fn),
input: reflect.TypeFor[I](),
output: reflect.TypeFor[O](),
func NewServiceHandler[I any, O any](fn ServiceHandlerFn[I, O]) *serviceHandler[I, O] {
return &serviceHandler[I, O]{
fn: fn,
}
}

func (h *ServiceHandler) Call(ctx Context, bytes []byte) ([]byte, error) {
input := reflect.New(h.input)
func (h *serviceHandler[I, O]) Call(ctx Context, bytes []byte) ([]byte, error) {
input := new(I)

if len(bytes) > 0 {
// use the zero value if there is no input data at all
if err := json.Unmarshal(bytes, input.Interface()); err != nil {
if err := json.Unmarshal(bytes, input); err != nil {
return nil, TerminalError(fmt.Errorf("request doesn't match handler signature: %w", err))
}
}

// we are sure about the fn signature so it's safe to do this
output := h.fn.Call([]reflect.Value{
reflect.ValueOf(ctx),
input.Elem(),
})

outI := output[0].Interface()
errI := output[1].Interface()
if errI != nil {
return nil, errI.(error)
output, err := h.fn(
ctx,
*input,
)
if err != nil {
return nil, err
}

bytes, err := json.Marshal(outI)
bytes, err = json.Marshal(output)
if err != nil {
return nil, TerminalError(fmt.Errorf("failed to serialize output: %w", err))
}

return bytes, nil
}

func (h *ServiceHandler) sealed() {}
func (h *serviceHandler[I, O]) sealed() {}

type ObjectHandler struct {
fn reflect.Value
input reflect.Type
output reflect.Type
type objectHandler[I any, O any] struct {
fn ObjectHandlerFn[I, O]
}

func NewObjectHandler[I any, O any](fn ObjectHandlerFn[I, O]) *ObjectHandler {
return &ObjectHandler{
fn: reflect.ValueOf(fn),
input: reflect.TypeFor[I](),
output: reflect.TypeFor[O](),
func NewObjectHandler[I any, O any](fn ObjectHandlerFn[I, O]) *objectHandler[I, O] {
return &objectHandler[I, O]{
fn: fn,
}
}

func (h *ObjectHandler) Call(ctx ObjectContext, bytes []byte) ([]byte, error) {
input := reflect.New(h.input)
func (h *objectHandler[I, O]) Call(ctx ObjectContext, bytes []byte) ([]byte, error) {
input := new(I)

if len(bytes) > 0 {
// use the zero value if there is no input data at all
if err := json.Unmarshal(bytes, input.Interface()); err != nil {
if err := json.Unmarshal(bytes, input); err != nil {
return nil, TerminalError(fmt.Errorf("request doesn't match handler signature: %w", err))
}
}

// we are sure about the fn signature so it's safe to do this
output := h.fn.Call([]reflect.Value{
reflect.ValueOf(ctx),
input.Elem(),
})

outI := output[0].Interface()
errI := output[1].Interface()
if errI != nil {
return nil, errI.(error)
output, err := h.fn(
ctx,
*input,
)
if err != nil {
return nil, err
}

bytes, err := json.Marshal(outI)
bytes, err = json.Marshal(output)
if err != nil {
return nil, TerminalError(fmt.Errorf("failed to serialize output: %w", err))
}

return bytes, nil
}

func (h *ObjectHandler) sealed() {}
func (h *objectHandler[I, O]) sealed() {}
Loading

0 comments on commit 2c02142

Please sign in to comment.