From b123eaeb32bce7ea38b762e8e80f6576c238eba4 Mon Sep 17 00:00:00 2001 From: Muhamad Azamy Date: Tue, 19 Mar 2024 08:53:39 +0100 Subject: [PATCH] improve on call handling Also update docs with registration step --- README.md | 6 ++++ internal/state/call.go | 67 +++++++++++++++++++++--------------------- router.go | 1 + 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 1c81d02..23066f5 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,12 @@ cd restate-sdk-go/example go run . ``` +Registration + +```bash +restate deployments register --force -y http://localhost:9080 +``` + In yet a third terminal do the following steps - Add tickets to basket diff --git a/internal/state/call.go b/internal/state/call.go index 47a79db..5c65717 100644 --- a/internal/state/call.go +++ b/internal/state/call.go @@ -42,7 +42,7 @@ type serviceCall struct { // Do makes a call and wait for the response func (c *serviceCall) Do(key string, input any, output any) error { - return c.machine.doCall(c.service, c.method, key, input, output) + return c.machine.doDynCall(c.service, c.method, key, input, output) } // Send runs a call in the background after delay duration @@ -69,7 +69,7 @@ func (m *Machine) makeRequest(key string, body any) ([]byte, error) { return proto.Marshal(params) } -func (m *Machine) doCall(service, method, key string, input, output any) error { +func (m *Machine) doDynCall(service, method, key string, input, output any) error { m.log.Debug().Str("service", service).Str("method", method).Msg("in do call") params, err := m.makeRequest(key, input) @@ -77,7 +77,35 @@ func (m *Machine) doCall(service, method, key string, input, output any) error { return err } - bytes, err := replayOrNew( + bytes, err := m.doCall(service, method, params) + if err != nil { + return err + } + + var rpcResponse dynrpc.RpcResponse + if err := proto.Unmarshal(bytes, &rpcResponse); err != nil { + return fmt.Errorf("failed to decode rpc response: %w", err) + } + + js, err := rpcResponse.Response.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to process response payload") + } + + if output == nil { + return nil + } + + if err := json.Unmarshal(js, output); err != nil { + // TODO: is this should be a terminal error or not? + return restate.TerminalError(fmt.Errorf("failed to decode response (%s): %w", string(bytes), err)) + } + + return nil +} + +func (m *Machine) doCall(service, method string, params []byte) ([]byte, error) { + return replayOrNew( m, wire.InvokeEntryMessageType, func(entry *wire.InvokeEntryMessage) ([]byte, error) { @@ -91,32 +119,13 @@ func (m *Machine) doCall(service, method, key string, input, output any) error { case *protocol.InvokeEntryMessage_Failure: return nil, fmt.Errorf("[%d] %s", result.Failure.Code, result.Failure.Message) case *protocol.InvokeEntryMessage_Value: - var rpcResponse dynrpc.RpcResponse - if err := proto.Unmarshal(result.Value, &rpcResponse); err != nil { - return nil, fmt.Errorf("failed to decode rpc response: %w", err) - } - - return rpcResponse.Response.MarshalJSON() + return result.Value, nil } return nil, errUnreachable }, func() ([]byte, error) { return m._doCall(service, method, params) }) - - if err != nil { - return err - } - - if output == nil { - return nil - } - - if err := json.Unmarshal(bytes, output); err != nil { - return restate.TerminalError(fmt.Errorf("failed to decode response (%s): %w", string(bytes), err)) - } - - return nil } func (m *Machine) _doCall(service, method string, params []byte) ([]byte, error) { @@ -139,11 +148,8 @@ func (m *Machine) _doCall(service, method string, params []byte) ([]byte, error) return nil, ErrUnexpectedMessage } - //response := msg.(*wire.CompletionMessage) - completion := response.(*wire.CompletionMessage) - var output []byte switch value := completion.Payload.Result.(type) { case *protocol.CompletionMessage_Empty: return nil, nil @@ -152,15 +158,10 @@ func (m *Machine) _doCall(service, method string, params []byte) ([]byte, error) // never happen return nil, fmt.Errorf("[%d] %s", value.Failure.Code, value.Failure.Message) case *protocol.CompletionMessage_Value: - output = value.Value - } - - var rpcResponse dynrpc.RpcResponse - if err := proto.Unmarshal(output, &rpcResponse); err != nil { - return nil, fmt.Errorf("failed to decode rpc response(%s,%s): %w", service, method, err) + return value.Value, nil } - return rpcResponse.Response.MarshalJSON() + return nil, errUnreachable } func (c *Machine) sendCall(service, method, key string, body any, delay time.Duration) error { diff --git a/router.go b/router.go index 1a4cda1..ee9f050 100644 --- a/router.go +++ b/router.go @@ -114,6 +114,7 @@ func (r *UnKeyedRouter) Handlers() map[string]Handler { return r.handlers } +// KeyedRouter type KeyedRouter struct { handlers map[string]Handler }