Skip to content

Commit

Permalink
[cmserver] CM server adaptation to renew certificates
Browse files Browse the repository at this point in the history
Signed-off-by: Mykola Kobets <[email protected]>
  • Loading branch information
mykola-kobets-epam committed Oct 17, 2024
1 parent f133729 commit 9a84c96
Showing 1 changed file with 73 additions and 40 deletions.
113 changes: 73 additions & 40 deletions cmserver/cmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ package cmserver

import (
"context"
"net"
"sync"
"time"

"github.com/aosedge/aos_common/aoserrors"
"github.com/aosedge/aos_common/api/cloudprotocol"
pb "github.com/aosedge/aos_common/api/communicationmanager"
"github.com/aosedge/aos_common/api/iamanager"
"github.com/aosedge/aos_common/utils/cryptutils"
"github.com/aosedge/aos_common/utils/grpchelpers"
"github.com/aosedge/aos_common/utils/pbconvert"
"github.com/golang/protobuf/ptypes/empty"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/aosedge/aos_communicationmanager/config"
Expand All @@ -48,6 +49,8 @@ const (
Updating
)

const cmRestartInterval = 10 * time.Second

/***********************************************************************************************************************
* Vars
**********************************************************************************************************************/
Expand Down Expand Up @@ -94,20 +97,28 @@ type UpdateHandler interface {

// CMServer CM server instance.
type CMServer struct {
grpcServer *grpc.Server
listener net.Listener
config *config.Config
certProvider CertificateProvider
cryptocontext *cryptutils.CryptoContext
insecureConn bool

grpcServer *grpchelpers.GRPCServer
pb.UnimplementedUpdateSchedulerServiceServer
clients []pb.UpdateSchedulerService_SubscribeNotificationsServer
currentFOTAStatus UpdateFOTAStatus
currentSOTAStatus UpdateSOTAStatus
certChan <-chan *iamanager.CertInfo
stopChannel chan bool
updatehandler UpdateHandler
restartTimer *time.Timer

sync.Mutex
}

// CertificateProvider certificate and key provider interface.
type CertificateProvider interface {
GetCertificate(certType string, issuer []byte, serial string) (certURL, keyURL string, err error)
SubscribeCertChanged(certType string) (<-chan *iamanager.CertInfo, error)
}

/***********************************************************************************************************************
Expand All @@ -117,52 +128,35 @@ type CertificateProvider interface {
// New creates new IAM server instance.
func New(
cfg *config.Config, handler UpdateHandler, certProvider CertificateProvider,
cryptcoxontext *cryptutils.CryptoContext, insecure bool,
cryptocontext *cryptutils.CryptoContext, insecure bool,
) (server *CMServer, err error) {
server = &CMServer{
config: cfg,
certProvider: certProvider,
cryptocontext: cryptocontext,
insecureConn: insecure,

grpcServer: grpchelpers.NewGRPCServer(cfg.CMServerURL),
currentFOTAStatus: handler.GetFOTAStatus(),
currentSOTAStatus: handler.GetSOTAStatus(),
certChan: make(<-chan *iamanager.CertInfo),
stopChannel: make(chan bool, 1),
updatehandler: handler,
}

pb.RegisterUpdateSchedulerServiceServer(server.grpcServer, server)

if cfg.CMServerURL != "" {
var opts []grpc.ServerOption
if err := server.startGRPCServer(); err != nil {
return nil, err
}

if !insecure {
certURL, keyURL, err := certProvider.GetCertificate(cfg.CertStorage, nil, "")
server.certChan, err = certProvider.SubscribeCertChanged(server.config.CertStorage)
if err != nil {
return nil, aoserrors.Wrap(err)
}

tlsConfig, err := cryptcoxontext.GetClientMutualTLSConfig(certURL, keyURL)
if err != nil {
return nil, aoserrors.Wrap(err)
}

opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
} else {
log.Info("CM GRPC server starts in insecure mode")
}

server.grpcServer = grpc.NewServer(opts...)

pb.RegisterUpdateSchedulerServiceServer(server.grpcServer, server)

log.Debug("Start update scheduler gRPC server")

server.clients = []pb.UpdateSchedulerService_SubscribeNotificationsServer{}

server.listener, err = net.Listen("tcp", cfg.CMServerURL)
if err != nil {
return server, aoserrors.Wrap(err)
}

go func() {
if err := server.grpcServer.Serve(server.listener); err != nil {
log.Errorf("Can't serve gRPC server: %s", err)
}
}()
}

go server.handleChannels()
Expand All @@ -175,11 +169,7 @@ func (server *CMServer) Close() {
log.Debug("Close update scheduler gRPC server")

if server.grpcServer != nil {
server.grpcServer.Stop()
}

if server.listener != nil {
server.listener.Close()
server.grpcServer.StopServer()
}

server.clients = nil
Expand Down Expand Up @@ -303,6 +293,9 @@ func (server *CMServer) handleChannels() {

server.Unlock()

case <-server.certChan:
server.restartGRPCServer()

case <-server.stopChannel:
return
}
Expand All @@ -317,6 +310,46 @@ func (server *CMServer) notifyAllClients(notification *pb.SchedulerNotifications
}
}

func (server *CMServer) startGRPCServer() error {
log.Debug("Starting update scheduler gRPC server")

server.clients = []pb.UpdateSchedulerService_SubscribeNotificationsServer{}

var opts []grpc.ServerOption

opts, err := grpchelpers.NewProtectedServerOptions(server.cryptocontext, server.certProvider,
server.config.CertStorage, server.insecureConn)
if err != nil {
return aoserrors.Wrap(err)
}

err = server.grpcServer.RestartServer(opts)
if err != nil {
return aoserrors.Wrap(err)
}

return nil
}

func (server *CMServer) restartGRPCServer() {
server.Lock()
defer server.Unlock()

if server.restartTimer != nil {
server.restartTimer.Stop()
server.restartTimer = nil
}

if err := server.startGRPCServer(); err != nil {
log.WithField("err", err).Error("CMServer failed to start GRPC server")

server.restartTimer = time.AfterFunc(cmRestartInterval, func() {
server.restartTimer = nil
server.restartGRPCServer()
})
}
}

func (updateStatus *UpdateSOTAStatus) convertToPBStatus() (pbStatus *pb.UpdateSOTAStatus) {
pbStatus = &pb.UpdateSOTAStatus{
Error: pbconvert.ErrorInfoToPB(updateStatus.Error),
Expand Down

0 comments on commit 9a84c96

Please sign in to comment.