Skip to content

Commit

Permalink
improve on call handling
Browse files Browse the repository at this point in the history
Also update docs with registration step
  • Loading branch information
muhamadazmy committed Mar 19, 2024
1 parent 0795f55 commit b123eae
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 33 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ cd restate-sdk-go/example
go run .
```

Registration

```bash
restate deployments register --force -y http://localhost:9080
```

In yet a third terminal do the following steps

- Add tickets to basket
Expand Down
67 changes: 34 additions & 33 deletions internal/state/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type serviceCall struct {

// Do makes a call and wait for the response
func (c *serviceCall) Do(key string, input any, output any) error {
return c.machine.doCall(c.service, c.method, key, input, output)
return c.machine.doDynCall(c.service, c.method, key, input, output)
}

// Send runs a call in the background after delay duration
Expand All @@ -69,15 +69,43 @@ func (m *Machine) makeRequest(key string, body any) ([]byte, error) {
return proto.Marshal(params)
}

func (m *Machine) doCall(service, method, key string, input, output any) error {
func (m *Machine) doDynCall(service, method, key string, input, output any) error {
m.log.Debug().Str("service", service).Str("method", method).Msg("in do call")

params, err := m.makeRequest(key, input)
if err != nil {
return err
}

bytes, err := replayOrNew(
bytes, err := m.doCall(service, method, params)
if err != nil {
return err
}

var rpcResponse dynrpc.RpcResponse
if err := proto.Unmarshal(bytes, &rpcResponse); err != nil {
return fmt.Errorf("failed to decode rpc response: %w", err)
}

js, err := rpcResponse.Response.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to process response payload")
}

if output == nil {
return nil
}

if err := json.Unmarshal(js, output); err != nil {
// TODO: is this should be a terminal error or not?
return restate.TerminalError(fmt.Errorf("failed to decode response (%s): %w", string(bytes), err))
}

return nil
}

func (m *Machine) doCall(service, method string, params []byte) ([]byte, error) {
return replayOrNew(
m,
wire.InvokeEntryMessageType,
func(entry *wire.InvokeEntryMessage) ([]byte, error) {
Expand All @@ -91,32 +119,13 @@ func (m *Machine) doCall(service, method, key string, input, output any) error {
case *protocol.InvokeEntryMessage_Failure:
return nil, fmt.Errorf("[%d] %s", result.Failure.Code, result.Failure.Message)
case *protocol.InvokeEntryMessage_Value:
var rpcResponse dynrpc.RpcResponse
if err := proto.Unmarshal(result.Value, &rpcResponse); err != nil {
return nil, fmt.Errorf("failed to decode rpc response: %w", err)
}

return rpcResponse.Response.MarshalJSON()
return result.Value, nil
}

return nil, errUnreachable
}, func() ([]byte, error) {
return m._doCall(service, method, params)
})

if err != nil {
return err
}

if output == nil {
return nil
}

if err := json.Unmarshal(bytes, output); err != nil {
return restate.TerminalError(fmt.Errorf("failed to decode response (%s): %w", string(bytes), err))
}

return nil
}

func (m *Machine) _doCall(service, method string, params []byte) ([]byte, error) {
Expand All @@ -139,11 +148,8 @@ func (m *Machine) _doCall(service, method string, params []byte) ([]byte, error)
return nil, ErrUnexpectedMessage
}

//response := msg.(*wire.CompletionMessage)

completion := response.(*wire.CompletionMessage)

var output []byte
switch value := completion.Payload.Result.(type) {
case *protocol.CompletionMessage_Empty:
return nil, nil
Expand All @@ -152,15 +158,10 @@ func (m *Machine) _doCall(service, method string, params []byte) ([]byte, error)
// never happen
return nil, fmt.Errorf("[%d] %s", value.Failure.Code, value.Failure.Message)
case *protocol.CompletionMessage_Value:
output = value.Value
}

var rpcResponse dynrpc.RpcResponse
if err := proto.Unmarshal(output, &rpcResponse); err != nil {
return nil, fmt.Errorf("failed to decode rpc response(%s,%s): %w", service, method, err)
return value.Value, nil
}

return rpcResponse.Response.MarshalJSON()
return nil, errUnreachable
}

func (c *Machine) sendCall(service, method, key string, body any, delay time.Duration) error {
Expand Down
1 change: 1 addition & 0 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (r *UnKeyedRouter) Handlers() map[string]Handler {
return r.handlers
}

// KeyedRouter
type KeyedRouter struct {
handlers map[string]Handler
}
Expand Down

0 comments on commit b123eae

Please sign in to comment.