diff --git a/cmserver/cmserver.go b/cmserver/cmserver.go index ac0af917..e5685d72 100644 --- a/cmserver/cmserver.go +++ b/cmserver/cmserver.go @@ -19,18 +19,19 @@ package cmserver import ( "context" - "net" "sync" + "time" "github.com/aosedge/aos_common/aoserrors" "github.com/aosedge/aos_common/api/cloudprotocol" pb "github.com/aosedge/aos_common/api/communicationmanager" + "github.com/aosedge/aos_common/api/iamanager" "github.com/aosedge/aos_common/utils/cryptutils" + "github.com/aosedge/aos_common/utils/grpchelpers" "github.com/aosedge/aos_common/utils/pbconvert" "github.com/golang/protobuf/ptypes/empty" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "google.golang.org/protobuf/types/known/emptypb" "github.com/aosedge/aos_communicationmanager/config" @@ -48,6 +49,8 @@ const ( Updating ) +const cmRestartInterval = 10 * time.Second + /*********************************************************************************************************************** * Vars **********************************************************************************************************************/ @@ -94,20 +97,28 @@ type UpdateHandler interface { // CMServer CM server instance. type CMServer struct { - grpcServer *grpc.Server - listener net.Listener + config *config.Config + certProvider CertificateProvider + cryptocontext *cryptutils.CryptoContext + insecureConn bool + + grpcServer *grpchelpers.GRPCServer pb.UnimplementedUpdateSchedulerServiceServer clients []pb.UpdateSchedulerService_SubscribeNotificationsServer currentFOTAStatus UpdateFOTAStatus currentSOTAStatus UpdateSOTAStatus + certChan <-chan *iamanager.CertInfo stopChannel chan bool updatehandler UpdateHandler + restartTimer *time.Timer + sync.Mutex } // CertificateProvider certificate and key provider interface. type CertificateProvider interface { GetCertificate(certType string, issuer []byte, serial string) (certURL, keyURL string, err error) + SubscribeCertChanged(certType string) (<-chan *iamanager.CertInfo, error) } /*********************************************************************************************************************** @@ -117,52 +128,35 @@ type CertificateProvider interface { // New creates new IAM server instance. func New( cfg *config.Config, handler UpdateHandler, certProvider CertificateProvider, - cryptcoxontext *cryptutils.CryptoContext, insecure bool, + cryptocontext *cryptutils.CryptoContext, insecure bool, ) (server *CMServer, err error) { server = &CMServer{ + config: cfg, + certProvider: certProvider, + cryptocontext: cryptocontext, + insecureConn: insecure, + + grpcServer: grpchelpers.NewGRPCServer(cfg.CMServerURL), currentFOTAStatus: handler.GetFOTAStatus(), currentSOTAStatus: handler.GetSOTAStatus(), + certChan: make(<-chan *iamanager.CertInfo), stopChannel: make(chan bool, 1), updatehandler: handler, } + pb.RegisterUpdateSchedulerServiceServer(server.grpcServer, server) + if cfg.CMServerURL != "" { - var opts []grpc.ServerOption + if err := server.startGRPCServer(); err != nil { + return nil, err + } if !insecure { - certURL, keyURL, err := certProvider.GetCertificate(cfg.CertStorage, nil, "") + server.certChan, err = certProvider.SubscribeCertChanged(server.config.CertStorage) if err != nil { return nil, aoserrors.Wrap(err) } - - tlsConfig, err := cryptcoxontext.GetClientMutualTLSConfig(certURL, keyURL) - if err != nil { - return nil, aoserrors.Wrap(err) - } - - opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig))) - } else { - log.Info("CM GRPC server starts in insecure mode") } - - server.grpcServer = grpc.NewServer(opts...) - - pb.RegisterUpdateSchedulerServiceServer(server.grpcServer, server) - - log.Debug("Start update scheduler gRPC server") - - server.clients = []pb.UpdateSchedulerService_SubscribeNotificationsServer{} - - server.listener, err = net.Listen("tcp", cfg.CMServerURL) - if err != nil { - return server, aoserrors.Wrap(err) - } - - go func() { - if err := server.grpcServer.Serve(server.listener); err != nil { - log.Errorf("Can't serve gRPC server: %s", err) - } - }() } go server.handleChannels() @@ -175,11 +169,7 @@ func (server *CMServer) Close() { log.Debug("Close update scheduler gRPC server") if server.grpcServer != nil { - server.grpcServer.Stop() - } - - if server.listener != nil { - server.listener.Close() + server.grpcServer.StopServer() } server.clients = nil @@ -303,6 +293,9 @@ func (server *CMServer) handleChannels() { server.Unlock() + case <-server.certChan: + server.restartGRPCServer() + case <-server.stopChannel: return } @@ -317,6 +310,46 @@ func (server *CMServer) notifyAllClients(notification *pb.SchedulerNotifications } } +func (server *CMServer) startGRPCServer() error { + log.Debug("Starting update scheduler gRPC server") + + server.clients = []pb.UpdateSchedulerService_SubscribeNotificationsServer{} + + var opts []grpc.ServerOption + + opts, err := grpchelpers.NewProtectedServerOptions(server.cryptocontext, server.certProvider, + server.config.CertStorage, server.insecureConn) + if err != nil { + return aoserrors.Wrap(err) + } + + err = server.grpcServer.RestartServer(opts) + if err != nil { + return aoserrors.Wrap(err) + } + + return nil +} + +func (server *CMServer) restartGRPCServer() { + server.Lock() + defer server.Unlock() + + if server.restartTimer != nil { + server.restartTimer.Stop() + server.restartTimer = nil + } + + if err := server.startGRPCServer(); err != nil { + log.WithField("err", err).Error("CMServer failed to start GRPC server") + + server.restartTimer = time.AfterFunc(cmRestartInterval, func() { + server.restartTimer = nil + server.restartGRPCServer() + }) + } +} + func (updateStatus *UpdateSOTAStatus) convertToPBStatus() (pbStatus *pb.UpdateSOTAStatus) { pbStatus = &pb.UpdateSOTAStatus{ Error: pbconvert.ErrorInfoToPB(updateStatus.Error),