Skip to content

Commit

Permalink
[vchanmanager] Add handling clock sync message during provisioning
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksandr Grytsov <[email protected]>
  • Loading branch information
al1img committed Jan 26, 2024
1 parent 8c98dc9 commit 882b103
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 35 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func main() {
config.VChan.XSSecureRXPath, config.VChan.XSSecureTXPath, config.VChan.Domain, mTLSConfig)
defer vchanSecure.Close()

vch, err := vchanmanager.New(downloadmanager, unpackmanager, vchanOpen, vchanSecure)
vch, err := vchanmanager.New(downloadmanager, unpackmanager, vchanOpen, vchanSecure, *provisioningMode)
if err != nil {
log.Errorf("Can't create vchanmanager: %v", err)

Expand Down
108 changes: 74 additions & 34 deletions vchanmanager/vchanmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/golang/protobuf/ptypes/empty"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/aoscloud/aos_messageproxy/filechunker"
)
Expand Down Expand Up @@ -87,14 +88,15 @@ type Message struct {
type VChanManager struct {
sync.Mutex

vchanOpen VChanItf
vchanSecure VChanItf
recvSMChan chan []byte
sendChan chan Message
cancel context.CancelFunc
downloadManager DownloaderItf
unpackerManager UnpackerItf
syncstream *syncstream.SyncStream
vchanOpen VChanItf
vchanSecure VChanItf
recvSMChan chan []byte
sendChan chan Message
cancel context.CancelFunc
downloadManager DownloaderItf
unpackerManager UnpackerItf
syncstream *syncstream.SyncStream
provisioningMode bool
}

/***********************************************************************************************************************
Expand All @@ -113,19 +115,21 @@ var (
// New creates new vchan instance.
func New(
downloadManager DownloaderItf, unpackerManager UnpackerItf, vchanOpen VChanItf, vchanSecure VChanItf,
provisioningMode bool,
) (*VChanManager, error) {
if vchanOpen == nil || vchanSecure == nil {
return nil, aoserrors.Errorf("vchan is nil")
}

v := &VChanManager{
recvSMChan: make(chan []byte, channelSize),
sendChan: make(chan Message, channelSize),
vchanOpen: vchanOpen,
vchanSecure: vchanSecure,
downloadManager: downloadManager,
unpackerManager: unpackerManager,
syncstream: syncstream.New(),
recvSMChan: make(chan []byte, channelSize),
sendChan: make(chan Message, channelSize),
vchanOpen: vchanOpen,
vchanSecure: vchanSecure,
downloadManager: downloadManager,
unpackerManager: unpackerManager,
syncstream: syncstream.New(),
provisioningMode: provisioningMode,
}

var ctx context.Context
Expand Down Expand Up @@ -387,40 +391,76 @@ func (v *VChanManager) reader(ctx context.Context, vchan VChanItf, errCh chan<-
return
}

if !v.handleImageContentRequest(msg, vchan, ctx, errCh) {
v.processMessage(msg)
if msg.MsgSource == SM {
if v.handleSMOutgoingMessages(ctx, msg, vchan, errCh) {
break
}
}

v.processMessage(msg)
}
}
}

func (v *VChanManager) handleImageContentRequest(
msg Message, vchan VChanItf, ctx context.Context, errCh chan<- error,
func (v *VChanManager) handleSMOutgoingMessages(ctx context.Context, msg Message,
vchan VChanItf, errCh chan<- error,
) bool {
if msg.MsgSource != SM {
return false
}
outgoingMessage := &pbSM.SMOutgoingMessages{}

request, ok := v.isImageContentRequest(msg.Data)
if !ok {
err := proto.Unmarshal(msg.Data, outgoingMessage)
if err != nil {
log.Errorf("Failed to unmarshal outgoing message: %v", aoserrors.Wrap(err))
return false
}

go func() {
if err := v.download(
request.ImageContentRequest.GetUrl(), request.ImageContentRequest.GetRequestId(),
request.ImageContentRequest.GetContentType(), vchan, ctx); err != nil {
if errors.Is(err, errVChanWrite) {
errCh <- err
switch smMessage := outgoingMessage.GetSMOutgoingMessage().(type) {
case *pbSM.SMOutgoingMessages_ImageContentRequest:
go v.handleImageContentRequest(ctx, *smMessage, vchan, errCh)
return true

return
case *pbSM.SMOutgoingMessages_ClockSyncRequest:
if v.provisioningMode {
log.Debug("Receive clock sync request")

clockSync := &pbSM.SMIncomingMessages{
SMIncomingMessage: &pbSM.SMIncomingMessages_ClockSync{
ClockSync: &pbSM.ClockSync{
CurrentTime: timestamppb.Now(),
},
},
}

data, err := proto.Marshal(clockSync)
if err != nil {
log.Errorf("Can't marshal clock sync: %v", aoserrors.Wrap(err))
return true
}

if err = v.SendSMMessage(data); err != nil {
log.Errorf("Can't send SM message: %v", aoserrors.Wrap(err))
}

log.Errorf("Failed to download image content: %v", err)
return true
}
}()
}

return false
}

return true
func (v *VChanManager) handleImageContentRequest(ctx context.Context,
contentRequest pbSM.SMOutgoingMessages_ImageContentRequest, vchan VChanItf, errCh chan<- error,
) {
if err := v.download(
contentRequest.ImageContentRequest.GetUrl(), contentRequest.ImageContentRequest.GetRequestId(),
contentRequest.ImageContentRequest.GetContentType(), vchan, ctx); err != nil {
if errors.Is(err, errVChanWrite) {
errCh <- err

return
}

log.Errorf("Failed to download image content: %v", err)
}
}

func (v *VChanManager) processMessage(msg Message) {
Expand Down

0 comments on commit 882b103

Please sign in to comment.