From 882b1038467eb4952670e01d47a3851913a2850e Mon Sep 17 00:00:00 2001 From: Oleksandr Grytsov Date: Wed, 24 Jan 2024 18:04:33 +0200 Subject: [PATCH] [vchanmanager] Add handling clock sync message during provisioning Signed-off-by: Oleksandr Grytsov --- main.go | 2 +- vchanmanager/vchanmanager.go | 108 ++++++++++++++++++++++++----------- 2 files changed, 75 insertions(+), 35 deletions(-) diff --git a/main.go b/main.go index fc524d8..d8a9abb 100644 --- a/main.go +++ b/main.go @@ -161,7 +161,7 @@ func main() { config.VChan.XSSecureRXPath, config.VChan.XSSecureTXPath, config.VChan.Domain, mTLSConfig) defer vchanSecure.Close() - vch, err := vchanmanager.New(downloadmanager, unpackmanager, vchanOpen, vchanSecure) + vch, err := vchanmanager.New(downloadmanager, unpackmanager, vchanOpen, vchanSecure, *provisioningMode) if err != nil { log.Errorf("Can't create vchanmanager: %v", err) diff --git a/vchanmanager/vchanmanager.go b/vchanmanager/vchanmanager.go index 18de4bc..5b5d954 100644 --- a/vchanmanager/vchanmanager.go +++ b/vchanmanager/vchanmanager.go @@ -32,6 +32,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/aoscloud/aos_messageproxy/filechunker" ) @@ -87,14 +88,15 @@ type Message struct { type VChanManager struct { sync.Mutex - vchanOpen VChanItf - vchanSecure VChanItf - recvSMChan chan []byte - sendChan chan Message - cancel context.CancelFunc - downloadManager DownloaderItf - unpackerManager UnpackerItf - syncstream *syncstream.SyncStream + vchanOpen VChanItf + vchanSecure VChanItf + recvSMChan chan []byte + sendChan chan Message + cancel context.CancelFunc + downloadManager DownloaderItf + unpackerManager UnpackerItf + syncstream *syncstream.SyncStream + provisioningMode bool } /*********************************************************************************************************************** @@ -113,19 +115,21 @@ var ( // New creates new vchan instance. func New( downloadManager DownloaderItf, unpackerManager UnpackerItf, vchanOpen VChanItf, vchanSecure VChanItf, + provisioningMode bool, ) (*VChanManager, error) { if vchanOpen == nil || vchanSecure == nil { return nil, aoserrors.Errorf("vchan is nil") } v := &VChanManager{ - recvSMChan: make(chan []byte, channelSize), - sendChan: make(chan Message, channelSize), - vchanOpen: vchanOpen, - vchanSecure: vchanSecure, - downloadManager: downloadManager, - unpackerManager: unpackerManager, - syncstream: syncstream.New(), + recvSMChan: make(chan []byte, channelSize), + sendChan: make(chan Message, channelSize), + vchanOpen: vchanOpen, + vchanSecure: vchanSecure, + downloadManager: downloadManager, + unpackerManager: unpackerManager, + syncstream: syncstream.New(), + provisioningMode: provisioningMode, } var ctx context.Context @@ -387,40 +391,76 @@ func (v *VChanManager) reader(ctx context.Context, vchan VChanItf, errCh chan<- return } - if !v.handleImageContentRequest(msg, vchan, ctx, errCh) { - v.processMessage(msg) + if msg.MsgSource == SM { + if v.handleSMOutgoingMessages(ctx, msg, vchan, errCh) { + break + } } + + v.processMessage(msg) } } } -func (v *VChanManager) handleImageContentRequest( - msg Message, vchan VChanItf, ctx context.Context, errCh chan<- error, +func (v *VChanManager) handleSMOutgoingMessages(ctx context.Context, msg Message, + vchan VChanItf, errCh chan<- error, ) bool { - if msg.MsgSource != SM { - return false - } + outgoingMessage := &pbSM.SMOutgoingMessages{} - request, ok := v.isImageContentRequest(msg.Data) - if !ok { + err := proto.Unmarshal(msg.Data, outgoingMessage) + if err != nil { + log.Errorf("Failed to unmarshal outgoing message: %v", aoserrors.Wrap(err)) return false } - go func() { - if err := v.download( - request.ImageContentRequest.GetUrl(), request.ImageContentRequest.GetRequestId(), - request.ImageContentRequest.GetContentType(), vchan, ctx); err != nil { - if errors.Is(err, errVChanWrite) { - errCh <- err + switch smMessage := outgoingMessage.GetSMOutgoingMessage().(type) { + case *pbSM.SMOutgoingMessages_ImageContentRequest: + go v.handleImageContentRequest(ctx, *smMessage, vchan, errCh) + return true - return + case *pbSM.SMOutgoingMessages_ClockSyncRequest: + if v.provisioningMode { + log.Debug("Receive clock sync request") + + clockSync := &pbSM.SMIncomingMessages{ + SMIncomingMessage: &pbSM.SMIncomingMessages_ClockSync{ + ClockSync: &pbSM.ClockSync{ + CurrentTime: timestamppb.Now(), + }, + }, + } + + data, err := proto.Marshal(clockSync) + if err != nil { + log.Errorf("Can't marshal clock sync: %v", aoserrors.Wrap(err)) + return true + } + + if err = v.SendSMMessage(data); err != nil { + log.Errorf("Can't send SM message: %v", aoserrors.Wrap(err)) } - log.Errorf("Failed to download image content: %v", err) + return true } - }() + } + + return false +} - return true +func (v *VChanManager) handleImageContentRequest(ctx context.Context, + contentRequest pbSM.SMOutgoingMessages_ImageContentRequest, vchan VChanItf, errCh chan<- error, +) { + if err := v.download( + contentRequest.ImageContentRequest.GetUrl(), contentRequest.ImageContentRequest.GetRequestId(), + contentRequest.ImageContentRequest.GetContentType(), vchan, ctx); err != nil { + if errors.Is(err, errVChanWrite) { + errCh <- err + + return + } + + log.Errorf("Failed to download image content: %v", err) + } } func (v *VChanManager) processMessage(msg Message) {