Skip to content

Commit

Permalink
Separate internal for external requests
Browse files Browse the repository at this point in the history
- integrate latest protobuf sdk changes
  • Loading branch information
Kleonikos Kyriakis committed Dec 14, 2023
1 parent 29863d1 commit 5f47341
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 38 deletions.
2 changes: 1 addition & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (a *App) Run(ctx context.Context) error {
return nil
})

rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor)
rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor, serviceRegistry)
g.Go(func() error {
a.logger.Info("Starting gRPC server...")
rpcServer.Start()
Expand Down
2 changes: 1 addition & 1 deletion internal/messaging/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (p *processor) Respond(msg Message) error {

ctx := grpc_metadata.NewOutgoingContext(context.Background(), msg.Metadata.ToGrpcMD())
var header grpc_metadata.MD
response, msgType, err := service.call(ctx, &msg.Content.RequestContent, grpc.Header(&header))
response, msgType, err := service.Call(ctx, &msg.Content.RequestContent, grpc.Header(&header))
if err != nil {
return err
}
Expand Down
33 changes: 20 additions & 13 deletions internal/messaging/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ package messaging

import (
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc"
networkv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/network/v1alpha1"
partnerv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/partner/v1alpha1"
"context"
"errors"

Expand All @@ -29,7 +29,7 @@ var (
)

type Service interface {
call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error)
Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error)
}

type activityService struct {
Expand All @@ -39,10 +39,8 @@ type accommodationService struct {
client *accommodationv1alpha1grpc.AccommodationSearchServiceClient
}
type networkService struct {
client *networkv1alpha1grpc.GetNetworkFeeServiceClient
}
type partnerService struct {
client *partnerv1alpha1grpc.GetPartnerConfigurationServiceClient
}
type pingService struct {
client *pingv1alpha1grpc.PingServiceClient
Expand All @@ -51,7 +49,7 @@ type transportService struct {
client *transportv1alpha1grpc.TransportSearchServiceClient
}

func (s activityService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
func (s activityService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
if &request.ActivitySearchRequest == nil {
return ResponseContent{}, "", ErrInvalidMessageType
}
Expand All @@ -63,7 +61,7 @@ func (s activityService) call(ctx context.Context, request *RequestContent, opts
return responseContent, ActivitySearchResponse, err
}

func (s accommodationService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
func (s accommodationService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
if &request.AccommodationSearchRequest == nil {
return ResponseContent{}, "", ErrInvalidMessageType
}
Expand All @@ -75,31 +73,40 @@ func (s accommodationService) call(ctx context.Context, request *RequestContent,
return responseContent, AccommodationSearchResponse, err
}

func (s networkService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
func (s networkService) Call(_ context.Context, request *RequestContent, _ ...grpc.CallOption) (ResponseContent, MessageType, error) {
if &request.GetNetworkFeeRequest == nil {
return ResponseContent{}, "", ErrInvalidMessageType
}
response, err := (*s.client).GetNetworkFee(ctx, &request.GetNetworkFeeRequest, opts...)

//TODO implement
response, err := &networkv1alpha1.GetNetworkFeeResponse{
NetworkFee: &networkv1alpha1.NetworkFee{Amount: 100000},
}, (error)(nil)
responseContent := ResponseContent{}
if err == nil {
responseContent.GetNetworkFeeResponse = *response // otherwise nil pointer dereference
}
return responseContent, GetNetworkFeeResponse, err
}

func (s partnerService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
func (s partnerService) Call(_ context.Context, request *RequestContent, _ ...grpc.CallOption) (ResponseContent, MessageType, error) {
if &request.GetPartnerConfigurationRequest == nil {
return ResponseContent{}, "", ErrInvalidMessageType
}
response, err := (*s.client).GetPartnerConfiguration(ctx, &request.GetPartnerConfigurationRequest, opts...)

//TODO implement
response, err := &partnerv1alpha1.GetPartnerConfigurationResponse{
PartnerConfiguration: nil,
CurrentBlockHeight: 0,
}, (error)(nil)
responseContent := ResponseContent{}
if err == nil {
responseContent.GetPartnerConfigurationResponse = *response // otherwise nil pointer dereference
}
return responseContent, GetPartnerConfigurationResponse, err
}

func (s pingService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
func (s pingService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
if &request.PingRequest == nil {
return ResponseContent{}, "", ErrInvalidMessageType
}
Expand All @@ -111,7 +118,7 @@ func (s pingService) call(ctx context.Context, request *RequestContent, opts ...
return responseContent, PingResponse, err
}

func (s transportService) call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
func (s transportService) Call(ctx context.Context, request *RequestContent, opts ...grpc.CallOption) (ResponseContent, MessageType, error) {
if &request.TransportSearchRequest == nil {
return ResponseContent{}, "", ErrInvalidMessageType
}
Expand Down
8 changes: 2 additions & 6 deletions internal/messaging/service_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package messaging
import (
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha1/activityv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha1/networkv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha1/partnerv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha1/pingv1alpha1grpc"
"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/transport/v1alpha1/transportv1alpha1grpc"
"github.com/chain4travel/camino-messenger-bot/config"
Expand Down Expand Up @@ -42,11 +40,9 @@ func (s *ServiceRegistry) RegisterServices(requestTypes config.SupportedRequestT
c := accommodationv1alpha1grpc.NewAccommodationSearchServiceClient(s.rpcClient.ClientConn)
service = accommodationService{client: &c}
case GetNetworkFeeRequest:
c := networkv1alpha1grpc.NewGetNetworkFeeServiceClient(s.rpcClient.ClientConn)
service = networkService{client: &c}
service = networkService{} // this service does not talk to partner plugin
case GetPartnerConfigurationRequest:
c := partnerv1alpha1grpc.NewGetPartnerConfigurationServiceClient(s.rpcClient.ClientConn)
service = partnerService{client: &c}
service = partnerService{} // this service does not talk to partner plugin
case PingRequest:
c := pingv1alpha1grpc.NewPingServiceClient(s.rpcClient.ClientConn)
service = pingService{client: &c}
Expand Down
2 changes: 0 additions & 2 deletions internal/messaging/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func (mt MessageType) Category() MessageCategory {
switch mt {
case ActivitySearchRequest,
AccommodationSearchRequest,
GetNetworkFeeRequest,
GetPartnerConfigurationRequest,
PingRequest,
TransportSearchRequest:
return Request
Expand Down
43 changes: 28 additions & 15 deletions internal/rpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"errors"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -37,6 +38,8 @@ var (
_ partnerv1alpha1grpc.GetPartnerConfigurationServiceServer = (*server)(nil)
_ pingv1alpha1grpc.PingServiceServer = (*server)(nil)
_ transportv1alpha1grpc.TransportSearchServiceServer = (*server)(nil)

errMissingRecipient = errors.New("missing recipient")
)

type Server interface {
Expand All @@ -45,17 +48,18 @@ type Server interface {
Stop()
}
type server struct {
grpcServer *grpc.Server
cfg *config.RPCServerConfig
logger *zap.SugaredLogger
processor messaging.Processor
grpcServer *grpc.Server
cfg *config.RPCServerConfig
logger *zap.SugaredLogger
processor messaging.Processor
serviceRegistry *messaging.ServiceRegistry
}

func (s *server) Checkpoint() string {
return "request-gateway"
}

func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor) *server {
func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor, serviceRegistry *messaging.ServiceRegistry) *server {
var opts []grpc.ServerOption
if cfg.Unencrypted {
logger.Warn("Running gRPC server without TLS!")
Expand All @@ -66,7 +70,7 @@ func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
server := &server{cfg: cfg, logger: logger, processor: processor}
server := &server{cfg: cfg, logger: logger, processor: processor, serviceRegistry: serviceRegistry}
server.grpcServer = createGrpcServerAndRegisterServices(server, opts...)
return server
}
Expand Down Expand Up @@ -96,39 +100,48 @@ func (s *server) Stop() {
}

func (s *server) AccommodationSearch(ctx context.Context, request *accommodationv1alpha1.AccommodationSearchRequest) (*accommodationv1alpha1.AccommodationSearchResponse, error) {
response, err := s.processRequest(ctx, messaging.AccommodationSearchRequest, &messaging.RequestContent{AccommodationSearchRequest: *request})
response, err := s.processExternalRequest(ctx, messaging.AccommodationSearchRequest, &messaging.RequestContent{AccommodationSearchRequest: *request})
return &response.AccommodationSearchResponse, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ?
}

func (s *server) Ping(ctx context.Context, request *pingv1alpha1.PingRequest) (*pingv1alpha1.PingResponse, error) {
response, err := s.processRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request})
response, err := s.processExternalRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request})
return &response.PingResponse, err
}

func (s *server) GetNetworkFee(ctx context.Context, request *networkv1alpha1.GetNetworkFeeRequest) (*networkv1alpha1.GetNetworkFeeResponse, error) {
response, err := s.processRequest(ctx, messaging.GetNetworkFeeRequest, &messaging.RequestContent{GetNetworkFeeRequest: *request})
response, err := s.processInternalRequest(ctx, messaging.GetNetworkFeeRequest, &messaging.RequestContent{GetNetworkFeeRequest: *request})
return &response.GetNetworkFeeResponse, err
}

func (s *server) GetPartnerConfiguration(ctx context.Context, request *partnerv1alpha1.GetPartnerConfigurationRequest) (*partnerv1alpha1.GetPartnerConfigurationResponse, error) {
response, err := s.processRequest(ctx, messaging.GetPartnerConfigurationRequest, &messaging.RequestContent{GetPartnerConfigurationRequest: *request})
response, err := s.processInternalRequest(ctx, messaging.GetPartnerConfigurationRequest, &messaging.RequestContent{GetPartnerConfigurationRequest: *request})
return &response.GetPartnerConfigurationResponse, err
}

func (s *server) ActivitySearch(ctx context.Context, request *activityv1alpha1.ActivitySearchRequest) (*activityv1alpha1.ActivitySearchResponse, error) {
response, err := s.processRequest(ctx, messaging.ActivitySearchRequest, &messaging.RequestContent{ActivitySearchRequest: *request})
response, err := s.processExternalRequest(ctx, messaging.ActivitySearchRequest, &messaging.RequestContent{ActivitySearchRequest: *request})
return &response.ActivitySearchResponse, err
}

func (s *server) TransportSearch(ctx context.Context, request *transportv1alpha1.TransportSearchRequest) (*transportv1alpha1.TransportSearchResponse, error) {
response, err := s.processRequest(ctx, messaging.TransportSearchRequest, &messaging.RequestContent{TransportSearchRequest: *request})
response, err := s.processExternalRequest(ctx, messaging.TransportSearchRequest, &messaging.RequestContent{TransportSearchRequest: *request})
return &response.TransportSearchResponse, err
}

func (s *server) processRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (*messaging.ResponseContent, error) {
func (s *server) processInternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) {
service, registered := s.serviceRegistry.GetService(requestType)
if !registered {
return messaging.ResponseContent{}, fmt.Errorf("%v: %s", messaging.ErrUnsupportedRequestType, requestType)
}
response, _, err := service.Call(ctx, request)
return response, err
}

func (s *server) processExternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) {
err, md := s.processMetadata(ctx)
if err != nil {
return nil, fmt.Errorf("error processing metadata: %v", err)
return messaging.ResponseContent{}, fmt.Errorf("error processing metadata: %v", err)
}

m := &messaging.Message{
Expand All @@ -141,7 +154,7 @@ func (s *server) processRequest(ctx context.Context, requestType messaging.Messa
response, err := s.processor.ProcessOutbound(ctx, *m)
response.Metadata.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "processed"))
grpc.SendHeader(ctx, response.Metadata.ToGrpcMD())
return &response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ?
return response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ?
}

func (s *server) processMetadata(ctx context.Context) (error, metadata.Metadata) {
Expand Down

0 comments on commit 5f47341

Please sign in to comment.