Skip to content

Commit

Permalink
Send client objects
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 9, 2024
1 parent 6a07ef3 commit 30ac6d4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 29 deletions.
8 changes: 4 additions & 4 deletions example/user_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func addTicket(ctx restate.ObjectContext, ticketId string) (bool, error) {
return false, err
}

if err := ctx.Object(UserSessionServiceName, ticketId).Method("expireTicket").Send(ticketId, 15*time.Minute); err != nil {
if err := ctx.ObjectSend(UserSessionServiceName, ticketId, 15*time.Minute).Method("expireTicket").Request(ticketId); err != nil {
return false, err
}

Expand Down Expand Up @@ -63,7 +63,7 @@ func expireTicket(ctx restate.ObjectContext, ticketId string) (void restate.Void
return void, err
}

return void, ctx.Object(TicketServiceName, ticketId).Method("unreserve").Send(nil, 0)
return void, ctx.ObjectSend(TicketServiceName, ticketId, 0).Method("unreserve").Request(nil)
}

func checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
Expand All @@ -90,8 +90,8 @@ func checkout(ctx restate.ObjectContext, _ restate.Void) (bool, error) {
log.Info().Str("id", response.ID).Int("price", response.Price).Msg("payment details")

for _, ticket := range tickets {
call := ctx.Object(ticket, TicketServiceName).Method("markAsSold")
if err := call.Send(nil, 0); err != nil {
call := ctx.ObjectSend(ticket, TicketServiceName, 0).Method("markAsSold")
if err := call.Request(nil); err != nil {
return false, err
}
}
Expand Down
52 changes: 38 additions & 14 deletions internal/state/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@ import (
)

var (
_ restate.Service = (*serviceProxy)(nil)
_ restate.Object = (*serviceProxy)(nil)
_ restate.Call = (*serviceCall)(nil)
_ restate.ServiceClient = (*serviceProxy)(nil)
_ restate.ServiceSendClient = (*serviceSendProxy)(nil)
_ restate.CallClient = (*serviceCall)(nil)
_ restate.SendClient = (*serviceSend)(nil)
)

// service proxy only works as an extension to context
// to implement other services function calls
type serviceProxy struct {
*Context
service string
key string
}

func (c *serviceProxy) Method(fn string) restate.Call {
func (c *serviceProxy) Method(fn string) restate.CallClient {
return &serviceCall{
Context: c.Context,
service: c.service,
Expand All @@ -35,17 +34,27 @@ func (c *serviceProxy) Method(fn string) restate.Call {
}
}

type serviceCall struct {
type serviceSendProxy struct {
*Context
service string
key string
method string
delay time.Duration
}

type responseFuture struct {
ctx context.Context
err error
msg *wire.CallEntryMessage
func (c *serviceSendProxy) Method(fn string) restate.SendClient {
return &serviceSend{
Context: c.Context,
service: c.service,
key: c.key,
method: fn,
}
}

type serviceCall struct {
*Context
service string
key string
method string
}

// Do makes a call and wait for the response
Expand All @@ -57,9 +66,24 @@ func (c *serviceCall) Request(input any) restate.ResponseFuture {
}
}

type serviceSend struct {
*Context
service string
key string
method string

delay time.Duration
}

// Send runs a call in the background after delay duration
func (c *serviceCall) Send(body any, delay time.Duration) error {
return c.machine.sendCall(c.service, c.key, c.method, body, delay)
func (c *serviceSend) Request(input any) error {
return c.machine.sendCall(c.service, c.key, c.method, input, c.delay)
}

type responseFuture struct {
ctx context.Context
err error
msg *wire.CallEntryMessage
}

func (r *responseFuture) Err() error {
Expand Down
21 changes: 19 additions & 2 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,38 @@ func (c *Context) Sleep(until time.Time) error {
return c.machine.sleep(until)
}

func (c *Context) Service(service string) restate.Service {
func (c *Context) Service(service string) restate.ServiceClient {
return &serviceProxy{
Context: c,
service: service,
}
}

func (c *Context) Object(service, key string) restate.Object {
func (c *Context) ServiceSend(service string, delay time.Duration) restate.ServiceSendClient {
return &serviceSendProxy{
Context: c,
service: service,
delay: delay,
}
}

func (c *Context) Object(service, key string) restate.ServiceClient {
return &serviceProxy{
Context: c,
service: service,
key: key,
}
}

func (c *Context) ObjectSend(service, key string, delay time.Duration) restate.ServiceSendClient {
return &serviceSendProxy{
Context: c,
service: service,
key: key,
delay: delay,
}
}

func (c *Context) SideEffect(fn func() ([]byte, error)) ([]byte, error) {
return c.machine.sideEffect(fn)
}
Expand Down
32 changes: 23 additions & 9 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ var (
ErrKeyNotFound = fmt.Errorf("key not found")
)

type Call interface {
type CallClient interface {
// Request makes a call and returns a handle on a future response
Request(input any) ResponseFuture
// Send makes a call in the background (doesn't wait for response) after delay duration
Send(body any, delay time.Duration) error
}

type SendClient interface {
// Send makes a call in the background (doesn't wait for response)
Request(input any) error
}

type ResponseFuture interface {
Expand All @@ -28,28 +31,39 @@ type ResponseFuture interface {
Response(output any) error
}

type Service interface {
type ServiceClient interface {
// Method creates a call to method with name
Method(method string) Call
Method(method string) CallClient
}

type Object interface {
type ServiceSendClient interface {
// Method creates a call to method with name
Method(method string) Call
Method(method string) SendClient
}

type Context interface {
// Context of request.
Ctx() context.Context
// Sleep sleep during the execution until time is reached
Sleep(until time.Time) error

// Service gets a Service accessor by name where service
// must be another service known by restate runtime
Service(service string) Service
Service(service string) ServiceClient
// Service gets a Service send accessor by name where service
// must be another service known by restate runtime
// and delay is the duration with which to delay requests
ServiceSend(service string, delay time.Duration) ServiceSendClient

// Object gets a Object accessor by name where object
// must be another object known by restate runtime and
// key is any string representing the key for the object
Object(object, key string) Object
Object(object, key string) ServiceClient
// Object gets a Object accessor by name where object
// must be another object known by restate runtime,
// key is any string representing the key for the object,
// and delay is the duration with which to delay requests
ObjectSend(object, key string, delay time.Duration) ServiceSendClient

// SideEffects runs the function (fn) until it succeeds or permanently fails.
// this stores the results of the function inside restate runtime so a replay
Expand Down

0 comments on commit 30ac6d4

Please sign in to comment.