From d9e23a709ed42477bbf8d7e47cbc6a1fb6cfc56c Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Thu, 14 Dec 2023 16:14:15 +0200 Subject: [PATCH] Separate internal for external requests - integrate latest protobuf sdk changes Separate internal for external requests - integrate latest protobuf sdk changes --- examples/rpc/partner-plugin/server.go | 3 -- go.mod | 4 +-- go.sum | 6 ++++ internal/app/app.go | 2 +- internal/messaging/processor.go | 2 +- internal/messaging/service.go | 33 ++++++++++++-------- internal/messaging/service_registry.go | 8 ++--- internal/messaging/types.go | 2 -- internal/rpc/server/server.go | 43 +++++++++++++++++--------- 9 files changed, 60 insertions(+), 43 deletions(-) diff --git a/examples/rpc/partner-plugin/server.go b/examples/rpc/partner-plugin/server.go index 0e91a5ef..c9335307 100644 --- a/examples/rpc/partner-plugin/server.go +++ b/examples/rpc/partner-plugin/server.go @@ -47,7 +47,6 @@ func (p *partnerPlugin) ActivitySearch(ctx context.Context, request *activityv1a response := activityv1alpha1.ActivitySearchResponse{ Header: nil, Metadata: &typesv1alpha1.SearchResponseMetadata{SearchId: &typesv1alpha1.UUID{Value: md.RequestID}}, - Options: nil, } grpc.SendHeader(ctx, md.ToGrpcMD()) return &response, nil @@ -65,7 +64,6 @@ func (p *partnerPlugin) AccommodationSearch(ctx context.Context, request *accomm response := accommodationv1alpha1.AccommodationSearchResponse{ Header: nil, Metadata: &typesv1alpha1.SearchResponseMetadata{SearchId: &typesv1alpha1.UUID{Value: md.RequestID}}, - Options: nil, } grpc.SendHeader(ctx, md.ToGrpcMD()) return &response, nil @@ -133,7 +131,6 @@ func (p *partnerPlugin) TransportSearch(ctx context.Context, request *transportv response := transportv1alpha1.TransportSearchResponse{ Header: nil, Metadata: &typesv1alpha1.SearchResponseMetadata{SearchId: &typesv1alpha1.UUID{Value: md.RequestID}}, - Options: nil, } grpc.SendHeader(ctx, md.ToGrpcMD()) return &response, nil diff --git a/go.mod b/go.mod index f5fe3feb..52668c7c 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/chain4travel/camino-messenger-bot go 1.20 require ( - buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2 - buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2 + buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2 + buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2 github.com/ava-labs/avalanchego v1.9.16 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/google/uuid v1.4.0 diff --git a/go.sum b/go.sum index 25899ff1..67208072 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,13 @@ buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2 h1:Yy0x91aZhzQOikR33x5eEIFEWS1TZzuzRc+LP8NuCgQ= buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2/go.mod h1:xDIPwKMomacOmFbzRICgdUP/gpjEoetNVYVTVr29H0k= +buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2 h1:ykl0rTU4nNvPtJRm2lqOCRpgOd93RSt3ev8hNkbozDE= +buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2/go.mod h1:tKtDR8xG+DIFkSv8PiW1YM64GxJ/44n3UfZAN+5jfJ8= buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231211091155-5467620e05ed.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c= +buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231214132539-21b35d953f3d.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c= buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2 h1:8HbCQyMVfu/+Spx4yOPwWThwJpr0JELRxJgt8Kdoso4= buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw= +buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2 h1:HTcdQrjEKtCEizgMVc1kmNtsGSQQ04WTh7fUNEuqCFE= +buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -658,6 +663,7 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/internal/app/app.go b/internal/app/app.go index b032c34e..a35ba6fc 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -79,7 +79,7 @@ func (a *App) Run(ctx context.Context) error { return nil }) - rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor) + rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor, serviceRegistry) g.Go(func() error { a.logger.Info("Starting gRPC server...") rpcServer.Start() diff --git a/internal/messaging/processor.go b/internal/messaging/processor.go index f7c636b3..80e4d22e 100644 --- a/internal/messaging/processor.go +++ b/internal/messaging/processor.go @@ -168,7 +168,7 @@ func (p *processor) Respond(msg Message) error { ctx := grpc_metadata.NewOutgoingContext(context.Background(), msg.Metadata.ToGrpcMD()) var header grpc_metadata.MD - response, msgType, err := service.call(ctx, &msg.Content.RequestContent, grpc.Header(&header)) + response, msgType, err := service.Call(ctx, &msg.Content.RequestContent, grpc.Header(&header)) if err != nil { return err } diff --git a/internal/messaging/service.go b/internal/messaging/service.go index 05718a3d..18f8ded0 100644 --- a/internal/messaging/service.go +++ b/internal/messaging/service.go @@ -7,10 +7,10 @@ package messaging import ( "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc" + networkv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/network/v1alpha1" + partnerv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/partner/v1alpha1" "context" "errors" @@ -29,7 +29,7 @@ var ( ) type Service interface { - call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) + Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) } type activityService struct { @@ -39,10 +39,8 @@ type accommodationService struct { client *accommodationv1alpha1grpc.AccommodationSearchServiceClient } type networkService struct { - client *networkv1alpha1grpc.GetNetworkFeeServiceClient } type partnerService struct { - client *partnerv1alpha1grpc.GetPartnerConfigurationServiceClient } type pingService struct { client *pingv1alpha1grpc.PingServiceClient @@ -51,7 +49,7 @@ type transportService struct { client *transportv1alpha1grpc.TransportSearchServiceClient } -func (s activityService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s activityService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.ActivitySearchRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } @@ -63,7 +61,7 @@ func (s activityService) call(ctx context.Context, request *RequestContent, opts return responseContent, ActivitySearchResponse, err } -func (s accommodationService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s accommodationService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.AccommodationSearchRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } @@ -75,11 +73,15 @@ func (s accommodationService) call(ctx context.Context, request *RequestContent, return responseContent, AccommodationSearchResponse, err } -func (s networkService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s networkService) Call(_ context.Context, request *RequestContent, _ ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.GetNetworkFeeRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } - response, err := (*s.client).GetNetworkFee(ctx, &request.GetNetworkFeeRequest, opts...) + + //TODO implement + response, err := &networkv1alpha1.GetNetworkFeeResponse{ + NetworkFee: &networkv1alpha1.NetworkFee{Amount: 100000}, + }, (error)(nil) responseContent := ResponseContent{} if err == nil { responseContent.GetNetworkFeeResponse = *response // otherwise nil pointer dereference @@ -87,11 +89,16 @@ func (s networkService) call(ctx context.Context, request *RequestContent, opts return responseContent, GetNetworkFeeResponse, err } -func (s partnerService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s partnerService) Call(_ context.Context, request *RequestContent, _ ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.GetPartnerConfigurationRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } - response, err := (*s.client).GetPartnerConfiguration(ctx, &request.GetPartnerConfigurationRequest, opts...) + + //TODO implement + response, err := &partnerv1alpha1.GetPartnerConfigurationResponse{ + PartnerConfiguration: nil, + CurrentBlockHeight: 0, + }, (error)(nil) responseContent := ResponseContent{} if err == nil { responseContent.GetPartnerConfigurationResponse = *response // otherwise nil pointer dereference @@ -99,7 +106,7 @@ func (s partnerService) call(ctx context.Context, request *RequestContent, opts return responseContent, GetPartnerConfigurationResponse, err } -func (s pingService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s pingService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.PingRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } @@ -111,7 +118,7 @@ func (s pingService) call(ctx context.Context, request *RequestContent, opts ... return responseContent, PingResponse, err } -func (s transportService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s transportService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.TransportSearchRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } diff --git a/internal/messaging/service_registry.go b/internal/messaging/service_registry.go index c528f552..f0f6edd5 100644 --- a/internal/messaging/service_registry.go +++ b/internal/messaging/service_registry.go @@ -8,8 +8,6 @@ package messaging import ( "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc" "github.com/chain4travel/camino-messenger-bot/config" @@ -42,11 +40,9 @@ func (s *ServiceRegistry) RegisterServices(requestTypes config.SupportedRequestT c := accommodationv1alpha1grpc.NewAccommodationSearchServiceClient(s.rpcClient.ClientConn) service = accommodationService{client: &c} case GetNetworkFeeRequest: - c := networkv1alpha1grpc.NewGetNetworkFeeServiceClient(s.rpcClient.ClientConn) - service = networkService{client: &c} + service = networkService{} // this service does not talk to partner plugin case GetPartnerConfigurationRequest: - c := partnerv1alpha1grpc.NewGetPartnerConfigurationServiceClient(s.rpcClient.ClientConn) - service = partnerService{client: &c} + service = partnerService{} // this service does not talk to partner plugin case PingRequest: c := pingv1alpha1grpc.NewPingServiceClient(s.rpcClient.ClientConn) service = pingService{client: &c} diff --git a/internal/messaging/types.go b/internal/messaging/types.go index 52d6fa7e..19b42b83 100644 --- a/internal/messaging/types.go +++ b/internal/messaging/types.go @@ -68,8 +68,6 @@ func (mt MessageType) Category() MessageCategory { switch mt { case ActivitySearchRequest, AccommodationSearchRequest, - GetNetworkFeeRequest, - GetPartnerConfigurationRequest, PingRequest, TransportSearchRequest: return Request diff --git a/internal/rpc/server/server.go b/internal/rpc/server/server.go index 7eb44694..9dbf23a3 100644 --- a/internal/rpc/server/server.go +++ b/internal/rpc/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "fmt" "log" "net" @@ -37,6 +38,8 @@ var ( _ partnerv1alpha1grpc.GetPartnerConfigurationServiceServer = (*server)(nil) _ pingv1alpha1grpc.PingServiceServer = (*server)(nil) _ transportv1alpha1grpc.TransportSearchServiceServer = (*server)(nil) + + errMissingRecipient = errors.New("missing recipient") ) type Server interface { @@ -45,17 +48,18 @@ type Server interface { Stop() } type server struct { - grpcServer *grpc.Server - cfg *config.RPCServerConfig - logger *zap.SugaredLogger - processor messaging.Processor + grpcServer *grpc.Server + cfg *config.RPCServerConfig + logger *zap.SugaredLogger + processor messaging.Processor + serviceRegistry *messaging.ServiceRegistry } func (s *server) Checkpoint() string { return "request-gateway" } -func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor) *server { +func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor, serviceRegistry *messaging.ServiceRegistry) *server { var opts []grpc.ServerOption if cfg.Unencrypted { logger.Warn("Running gRPC server without TLS!") @@ -66,7 +70,7 @@ func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor } opts = []grpc.ServerOption{grpc.Creds(creds)} } - server := &server{cfg: cfg, logger: logger, processor: processor} + server := &server{cfg: cfg, logger: logger, processor: processor, serviceRegistry: serviceRegistry} server.grpcServer = createGrpcServerAndRegisterServices(server, opts...) return server } @@ -96,39 +100,48 @@ func (s *server) Stop() { } func (s *server) AccommodationSearch(ctx context.Context, request *accommodationv1alpha1.AccommodationSearchRequest) (*accommodationv1alpha1.AccommodationSearchResponse, error) { - response, err := s.processRequest(ctx, messaging.AccommodationSearchRequest, &messaging.RequestContent{AccommodationSearchRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.AccommodationSearchRequest, &messaging.RequestContent{AccommodationSearchRequest: *request}) return &response.AccommodationSearchResponse, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? } func (s *server) Ping(ctx context.Context, request *pingv1alpha1.PingRequest) (*pingv1alpha1.PingResponse, error) { - response, err := s.processRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request}) return &response.PingResponse, err } func (s *server) GetNetworkFee(ctx context.Context, request *networkv1alpha1.GetNetworkFeeRequest) (*networkv1alpha1.GetNetworkFeeResponse, error) { - response, err := s.processRequest(ctx, messaging.GetNetworkFeeRequest, &messaging.RequestContent{GetNetworkFeeRequest: *request}) + response, err := s.processInternalRequest(ctx, messaging.GetNetworkFeeRequest, &messaging.RequestContent{GetNetworkFeeRequest: *request}) return &response.GetNetworkFeeResponse, err } func (s *server) GetPartnerConfiguration(ctx context.Context, request *partnerv1alpha1.GetPartnerConfigurationRequest) (*partnerv1alpha1.GetPartnerConfigurationResponse, error) { - response, err := s.processRequest(ctx, messaging.GetPartnerConfigurationRequest, &messaging.RequestContent{GetPartnerConfigurationRequest: *request}) + response, err := s.processInternalRequest(ctx, messaging.GetPartnerConfigurationRequest, &messaging.RequestContent{GetPartnerConfigurationRequest: *request}) return &response.GetPartnerConfigurationResponse, err } func (s *server) ActivitySearch(ctx context.Context, request *activityv1alpha1.ActivitySearchRequest) (*activityv1alpha1.ActivitySearchResponse, error) { - response, err := s.processRequest(ctx, messaging.ActivitySearchRequest, &messaging.RequestContent{ActivitySearchRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.ActivitySearchRequest, &messaging.RequestContent{ActivitySearchRequest: *request}) return &response.ActivitySearchResponse, err } func (s *server) TransportSearch(ctx context.Context, request *transportv1alpha1.TransportSearchRequest) (*transportv1alpha1.TransportSearchResponse, error) { - response, err := s.processRequest(ctx, messaging.TransportSearchRequest, &messaging.RequestContent{TransportSearchRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.TransportSearchRequest, &messaging.RequestContent{TransportSearchRequest: *request}) return &response.TransportSearchResponse, err } -func (s *server) processRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (*messaging.ResponseContent, error) { +func (s *server) processInternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) { + service, registered := s.serviceRegistry.GetService(requestType) + if !registered { + return messaging.ResponseContent{}, fmt.Errorf("%v: %s", messaging.ErrUnsupportedRequestType, requestType) + } + response, _, err := service.Call(ctx, request) + return response, err +} + +func (s *server) processExternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) { err, md := s.processMetadata(ctx) if err != nil { - return nil, fmt.Errorf("error processing metadata: %v", err) + return messaging.ResponseContent{}, fmt.Errorf("error processing metadata: %v", err) } m := &messaging.Message{ @@ -141,7 +154,7 @@ func (s *server) processRequest(ctx context.Context, requestType messaging.Messa response, err := s.processor.ProcessOutbound(ctx, *m) response.Metadata.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "processed")) grpc.SendHeader(ctx, response.Metadata.ToGrpcMD()) - return &response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? + return response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? } func (s *server) processMetadata(ctx context.Context) (error, metadata.Metadata) {