diff --git a/internal/app/app.go b/internal/app/app.go index b032c34e..a35ba6fc 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -79,7 +79,7 @@ func (a *App) Run(ctx context.Context) error { return nil }) - rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor) + rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor, serviceRegistry) g.Go(func() error { a.logger.Info("Starting gRPC server...") rpcServer.Start() diff --git a/internal/messaging/processor.go b/internal/messaging/processor.go index f7c636b3..80e4d22e 100644 --- a/internal/messaging/processor.go +++ b/internal/messaging/processor.go @@ -168,7 +168,7 @@ func (p *processor) Respond(msg Message) error { ctx := grpc_metadata.NewOutgoingContext(context.Background(), msg.Metadata.ToGrpcMD()) var header grpc_metadata.MD - response, msgType, err := service.call(ctx, &msg.Content.RequestContent, grpc.Header(&header)) + response, msgType, err := service.Call(ctx, &msg.Content.RequestContent, grpc.Header(&header)) if err != nil { return err } diff --git a/internal/messaging/service.go b/internal/messaging/service.go index 05718a3d..18f8ded0 100644 --- a/internal/messaging/service.go +++ b/internal/messaging/service.go @@ -7,10 +7,10 @@ package messaging import ( "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc" + networkv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/network/v1alpha1" + partnerv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/partner/v1alpha1" "context" "errors" @@ -29,7 +29,7 @@ var ( ) type Service interface { - call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) + Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) } type activityService struct { @@ -39,10 +39,8 @@ type accommodationService struct { client *accommodationv1alpha1grpc.AccommodationSearchServiceClient } type networkService struct { - client *networkv1alpha1grpc.GetNetworkFeeServiceClient } type partnerService struct { - client *partnerv1alpha1grpc.GetPartnerConfigurationServiceClient } type pingService struct { client *pingv1alpha1grpc.PingServiceClient @@ -51,7 +49,7 @@ type transportService struct { client *transportv1alpha1grpc.TransportSearchServiceClient } -func (s activityService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s activityService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.ActivitySearchRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } @@ -63,7 +61,7 @@ func (s activityService) call(ctx context.Context, request *RequestContent, opts return responseContent, ActivitySearchResponse, err } -func (s accommodationService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s accommodationService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.AccommodationSearchRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } @@ -75,11 +73,15 @@ func (s accommodationService) call(ctx context.Context, request *RequestContent, return responseContent, AccommodationSearchResponse, err } -func (s networkService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s networkService) Call(_ context.Context, request *RequestContent, _ ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.GetNetworkFeeRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } - response, err := (*s.client).GetNetworkFee(ctx, &request.GetNetworkFeeRequest, opts...) + + //TODO implement + response, err := &networkv1alpha1.GetNetworkFeeResponse{ + NetworkFee: &networkv1alpha1.NetworkFee{Amount: 100000}, + }, (error)(nil) responseContent := ResponseContent{} if err == nil { responseContent.GetNetworkFeeResponse = *response // otherwise nil pointer dereference @@ -87,11 +89,16 @@ func (s networkService) call(ctx context.Context, request *RequestContent, opts return responseContent, GetNetworkFeeResponse, err } -func (s partnerService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s partnerService) Call(_ context.Context, request *RequestContent, _ ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.GetPartnerConfigurationRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } - response, err := (*s.client).GetPartnerConfiguration(ctx, &request.GetPartnerConfigurationRequest, opts...) + + //TODO implement + response, err := &partnerv1alpha1.GetPartnerConfigurationResponse{ + PartnerConfiguration: nil, + CurrentBlockHeight: 0, + }, (error)(nil) responseContent := ResponseContent{} if err == nil { responseContent.GetPartnerConfigurationResponse = *response // otherwise nil pointer dereference @@ -99,7 +106,7 @@ func (s partnerService) call(ctx context.Context, request *RequestContent, opts return responseContent, GetPartnerConfigurationResponse, err } -func (s pingService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s pingService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.PingRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } @@ -111,7 +118,7 @@ func (s pingService) call(ctx context.Context, request *RequestContent, opts ... return responseContent, PingResponse, err } -func (s transportService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { +func (s transportService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) { if &request.TransportSearchRequest == nil { return ResponseContent{}, "", ErrInvalidMessageType } diff --git a/internal/messaging/service_registry.go b/internal/messaging/service_registry.go index c528f552..f0f6edd5 100644 --- a/internal/messaging/service_registry.go +++ b/internal/messaging/service_registry.go @@ -8,8 +8,6 @@ package messaging import ( "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc" "github.com/chain4travel/camino-messenger-bot/config" @@ -42,11 +40,9 @@ func (s *ServiceRegistry) RegisterServices(requestTypes config.SupportedRequestT c := accommodationv1alpha1grpc.NewAccommodationSearchServiceClient(s.rpcClient.ClientConn) service = accommodationService{client: &c} case GetNetworkFeeRequest: - c := networkv1alpha1grpc.NewGetNetworkFeeServiceClient(s.rpcClient.ClientConn) - service = networkService{client: &c} + service = networkService{} // this service does not talk to partner plugin case GetPartnerConfigurationRequest: - c := partnerv1alpha1grpc.NewGetPartnerConfigurationServiceClient(s.rpcClient.ClientConn) - service = partnerService{client: &c} + service = partnerService{} // this service does not talk to partner plugin case PingRequest: c := pingv1alpha1grpc.NewPingServiceClient(s.rpcClient.ClientConn) service = pingService{client: &c} diff --git a/internal/messaging/types.go b/internal/messaging/types.go index 52d6fa7e..19b42b83 100644 --- a/internal/messaging/types.go +++ b/internal/messaging/types.go @@ -68,8 +68,6 @@ func (mt MessageType) Category() MessageCategory { switch mt { case ActivitySearchRequest, AccommodationSearchRequest, - GetNetworkFeeRequest, - GetPartnerConfigurationRequest, PingRequest, TransportSearchRequest: return Request diff --git a/internal/rpc/server/server.go b/internal/rpc/server/server.go index 7eb44694..9dbf23a3 100644 --- a/internal/rpc/server/server.go +++ b/internal/rpc/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "fmt" "log" "net" @@ -37,6 +38,8 @@ var ( _ partnerv1alpha1grpc.GetPartnerConfigurationServiceServer = (*server)(nil) _ pingv1alpha1grpc.PingServiceServer = (*server)(nil) _ transportv1alpha1grpc.TransportSearchServiceServer = (*server)(nil) + + errMissingRecipient = errors.New("missing recipient") ) type Server interface { @@ -45,17 +48,18 @@ type Server interface { Stop() } type server struct { - grpcServer *grpc.Server - cfg *config.RPCServerConfig - logger *zap.SugaredLogger - processor messaging.Processor + grpcServer *grpc.Server + cfg *config.RPCServerConfig + logger *zap.SugaredLogger + processor messaging.Processor + serviceRegistry *messaging.ServiceRegistry } func (s *server) Checkpoint() string { return "request-gateway" } -func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor) *server { +func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor, serviceRegistry *messaging.ServiceRegistry) *server { var opts []grpc.ServerOption if cfg.Unencrypted { logger.Warn("Running gRPC server without TLS!") @@ -66,7 +70,7 @@ func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor } opts = []grpc.ServerOption{grpc.Creds(creds)} } - server := &server{cfg: cfg, logger: logger, processor: processor} + server := &server{cfg: cfg, logger: logger, processor: processor, serviceRegistry: serviceRegistry} server.grpcServer = createGrpcServerAndRegisterServices(server, opts...) return server } @@ -96,39 +100,48 @@ func (s *server) Stop() { } func (s *server) AccommodationSearch(ctx context.Context, request *accommodationv1alpha1.AccommodationSearchRequest) (*accommodationv1alpha1.AccommodationSearchResponse, error) { - response, err := s.processRequest(ctx, messaging.AccommodationSearchRequest, &messaging.RequestContent{AccommodationSearchRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.AccommodationSearchRequest, &messaging.RequestContent{AccommodationSearchRequest: *request}) return &response.AccommodationSearchResponse, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? } func (s *server) Ping(ctx context.Context, request *pingv1alpha1.PingRequest) (*pingv1alpha1.PingResponse, error) { - response, err := s.processRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request}) return &response.PingResponse, err } func (s *server) GetNetworkFee(ctx context.Context, request *networkv1alpha1.GetNetworkFeeRequest) (*networkv1alpha1.GetNetworkFeeResponse, error) { - response, err := s.processRequest(ctx, messaging.GetNetworkFeeRequest, &messaging.RequestContent{GetNetworkFeeRequest: *request}) + response, err := s.processInternalRequest(ctx, messaging.GetNetworkFeeRequest, &messaging.RequestContent{GetNetworkFeeRequest: *request}) return &response.GetNetworkFeeResponse, err } func (s *server) GetPartnerConfiguration(ctx context.Context, request *partnerv1alpha1.GetPartnerConfigurationRequest) (*partnerv1alpha1.GetPartnerConfigurationResponse, error) { - response, err := s.processRequest(ctx, messaging.GetPartnerConfigurationRequest, &messaging.RequestContent{GetPartnerConfigurationRequest: *request}) + response, err := s.processInternalRequest(ctx, messaging.GetPartnerConfigurationRequest, &messaging.RequestContent{GetPartnerConfigurationRequest: *request}) return &response.GetPartnerConfigurationResponse, err } func (s *server) ActivitySearch(ctx context.Context, request *activityv1alpha1.ActivitySearchRequest) (*activityv1alpha1.ActivitySearchResponse, error) { - response, err := s.processRequest(ctx, messaging.ActivitySearchRequest, &messaging.RequestContent{ActivitySearchRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.ActivitySearchRequest, &messaging.RequestContent{ActivitySearchRequest: *request}) return &response.ActivitySearchResponse, err } func (s *server) TransportSearch(ctx context.Context, request *transportv1alpha1.TransportSearchRequest) (*transportv1alpha1.TransportSearchResponse, error) { - response, err := s.processRequest(ctx, messaging.TransportSearchRequest, &messaging.RequestContent{TransportSearchRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.TransportSearchRequest, &messaging.RequestContent{TransportSearchRequest: *request}) return &response.TransportSearchResponse, err } -func (s *server) processRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (*messaging.ResponseContent, error) { +func (s *server) processInternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) { + service, registered := s.serviceRegistry.GetService(requestType) + if !registered { + return messaging.ResponseContent{}, fmt.Errorf("%v: %s", messaging.ErrUnsupportedRequestType, requestType) + } + response, _, err := service.Call(ctx, request) + return response, err +} + +func (s *server) processExternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) { err, md := s.processMetadata(ctx) if err != nil { - return nil, fmt.Errorf("error processing metadata: %v", err) + return messaging.ResponseContent{}, fmt.Errorf("error processing metadata: %v", err) } m := &messaging.Message{ @@ -141,7 +154,7 @@ func (s *server) processRequest(ctx context.Context, requestType messaging.Messa response, err := s.processor.ProcessOutbound(ctx, *m) response.Metadata.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "processed")) grpc.SendHeader(ctx, response.Metadata.ToGrpcMD()) - return &response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? + return response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? } func (s *server) processMetadata(ctx context.Context) (error, metadata.Metadata) {