Skip to content

Commit

Permalink
[unitstatushandler] Handle unit subjects events
Browse files Browse the repository at this point in the history
Signed-off-by: Mykhailo Lohvynenko <[email protected]>
  • Loading branch information
mlohvynenko committed Oct 18, 2024
1 parent 8eb97de commit e8a9298
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 52 deletions.
10 changes: 5 additions & 5 deletions unitstatushandler/softwaremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type softwareManager struct {

statusChannel chan cmserver.UpdateSOTAStatus

nodeManager NodeManager
unitManager UnitManager
unitConfigUpdater UnitConfigUpdater
downloader softwareDownloader
statusHandler softwareStatusHandler
Expand Down Expand Up @@ -114,15 +114,15 @@ type softwareManager struct {
* Interface
**********************************************************************************************************************/

func newSoftwareManager(statusHandler softwareStatusHandler, downloader softwareDownloader, nodeManager NodeManager,
func newSoftwareManager(statusHandler softwareStatusHandler, downloader softwareDownloader, unitManager UnitManager,
unitConfigUpdater UnitConfigUpdater, softwareUpdater SoftwareUpdater, instanceRunner InstanceRunner,
storage Storage, defaultTTL time.Duration,
) (manager *softwareManager, err error) {
manager = &softwareManager{
statusChannel: make(chan cmserver.UpdateSOTAStatus, 1),
downloader: downloader,
statusHandler: statusHandler,
nodeManager: nodeManager,
unitManager: unitManager,
unitConfigUpdater: unitConfigUpdater,
softwareUpdater: softwareUpdater,
instanceRunner: instanceRunner,
Expand Down Expand Up @@ -1499,7 +1499,7 @@ func (manager *softwareManager) updateNodes() (nodesErr error) {
if nodeStatus.Status == cloudprotocol.NodeStatusPaused {
log.WithField("nodeID", nodeStatus.NodeID).Debug("Pause node")

if err := manager.nodeManager.PauseNode(nodeStatus.NodeID); err != nil && nodesErr == nil {
if err := manager.unitManager.PauseNode(nodeStatus.NodeID); err != nil && nodesErr == nil {
log.WithField("nodeID", nodeStatus.NodeID).Errorf("Can't pause node: %v", err)

nodesErr = aoserrors.Wrap(err)
Expand All @@ -1509,7 +1509,7 @@ func (manager *softwareManager) updateNodes() (nodesErr error) {
if nodeStatus.Status == cloudprotocol.NodeStatusProvisioned {
log.WithField("nodeID", nodeStatus.NodeID).Debug("Resume node")

if err := manager.nodeManager.ResumeNode(nodeStatus.NodeID); err != nil && nodesErr == nil {
if err := manager.unitManager.ResumeNode(nodeStatus.NodeID); err != nil && nodesErr == nil {
log.WithField("nodeID", nodeStatus.NodeID).Errorf("Can't resume node: %v", err)

nodesErr = aoserrors.Wrap(err)
Expand Down
74 changes: 58 additions & 16 deletions unitstatushandler/unitstatushandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import (
* Types
**********************************************************************************************************************/

// NodeManager manages nodes.
type NodeManager interface {
// UnitManager manages unit.
type UnitManager interface {
GetAllNodeIDs() ([]string, error)
GetNodeInfo(nodeID string) (cloudprotocol.NodeInfo, error)
SubscribeNodeInfoChange() <-chan cloudprotocol.NodeInfo
PauseNode(nodeID string) error
ResumeNode(nodeID string) error
GetUnitSubjects() ([]string, error)
SubscribeUnitSubjectsChanged() <-chan []string
}

// Downloader downloads packages.
Expand Down Expand Up @@ -128,7 +130,7 @@ type LayerStatus struct {
type Instance struct {
sync.Mutex

nodeManager NodeManager
unitManager UnitManager
statusSender StatusSender

statusMutex sync.Mutex
Expand All @@ -140,9 +142,10 @@ type Instance struct {
firmwareManager *firmwareManager
softwareManager *softwareManager

newComponentsChannel <-chan []cloudprotocol.ComponentStatus
nodeChangedChannel <-chan cloudprotocol.NodeInfo
systemQuotaAlertChannel <-chan cloudprotocol.SystemQuotaAlert
newComponentsChannel <-chan []cloudprotocol.ComponentStatus
nodeChangedChannel <-chan cloudprotocol.NodeInfo
unitSubjectsChangedChannel <-chan []string
systemQuotaAlertChannel <-chan cloudprotocol.SystemQuotaAlert

initDone bool
isConnected bool
Expand All @@ -155,7 +158,7 @@ type Instance struct {
// New creates new unit status handler instance.
func New(
cfg *config.Config,
nodeManager NodeManager,
unitManager UnitManager,
unitConfigUpdater UnitConfigUpdater,
firmwareUpdater FirmwareUpdater,
softwareUpdater SoftwareUpdater,
Expand All @@ -168,12 +171,13 @@ func New(
log.Debug("Create unit status handler")

instance = &Instance{
nodeManager: nodeManager,
statusSender: statusSender,
sendStatusPeriod: cfg.UnitStatusSendTimeout.Duration,
newComponentsChannel: firmwareUpdater.NewComponentsChannel(),
nodeChangedChannel: nodeManager.SubscribeNodeInfoChange(),
systemQuotaAlertChannel: systemQuotaAlertProvider.GetSystemQuoteAlertChannel(),
unitManager: unitManager,
statusSender: statusSender,
sendStatusPeriod: cfg.UnitStatusSendTimeout.Duration,
newComponentsChannel: firmwareUpdater.NewComponentsChannel(),
nodeChangedChannel: unitManager.SubscribeNodeInfoChange(),
unitSubjectsChangedChannel: unitManager.SubscribeUnitSubjectsChanged(),
systemQuotaAlertChannel: systemQuotaAlertProvider.GetSystemQuoteAlertChannel(),
}

instance.resetUnitStatus()
Expand All @@ -185,7 +189,7 @@ func New(
return nil, aoserrors.Wrap(err)
}

if instance.softwareManager, err = newSoftwareManager(instance, groupDownloader, nodeManager, unitConfigUpdater,
if instance.softwareManager, err = newSoftwareManager(instance, groupDownloader, unitManager, unitConfigUpdater,
softwareUpdater, instanceRunner, storage, cfg.SMController.UpdateTTL.Duration); err != nil {
return nil, aoserrors.Wrap(err)
}
Expand Down Expand Up @@ -448,6 +452,19 @@ func (instance *Instance) initNodesStatus() error {
return nil
}

func (instance *Instance) initUnitSubjects() error {
subjects, err := instance.unitManager.GetUnitSubjects()
if err != nil {
log.Errorf("Can't get unit subjects: %v", err)
}

if len(subjects) > 0 {
instance.setSubjects(subjects)
}

return nil
}

func (instance *Instance) initCurrentStatus() error {
if err := instance.initUnitConfigStatus(); err != nil {
return err
Expand All @@ -473,6 +490,10 @@ func (instance *Instance) initCurrentStatus() error {
return err
}

if err := instance.initUnitSubjects(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -636,6 +657,20 @@ func (instance *Instance) updateNodeInfo(nodeInfo cloudprotocol.NodeInfo) {
instance.statusChanged()
}

func (instance *Instance) setSubjects(subjects []string) {
instance.statusMutex.Lock()
defer instance.statusMutex.Unlock()

log.WithField("subjects", subjects).Debug("Set subjects")

instance.unitStatus.UnitSubjects = subjects
}

func (instance *Instance) updateSubjects(subjects []string) {
instance.setSubjects(subjects)
instance.statusChanged()
}

func (instance *Instance) statusChanged() {
if instance.statusTimer != nil {
return
Expand Down Expand Up @@ -686,15 +721,15 @@ func (instance *Instance) sendCurrentStatus(deltaStatus bool) {
}

func (instance *Instance) getAllNodesInfo() ([]cloudprotocol.NodeInfo, error) {
nodeIDs, err := instance.nodeManager.GetAllNodeIDs()
nodeIDs, err := instance.unitManager.GetAllNodeIDs()
if err != nil {
return nil, aoserrors.Wrap(err)
}

nodesInfo := make([]cloudprotocol.NodeInfo, 0, len(nodeIDs))

for _, nodeID := range nodeIDs {
nodeInfo, err := instance.nodeManager.GetNodeInfo(nodeID)
nodeInfo, err := instance.unitManager.GetNodeInfo(nodeID)
if err != nil {
log.WithField("nodeID", nodeID).Errorf("Can't get node info: %s", err)
continue
Expand Down Expand Up @@ -744,6 +779,13 @@ func (instance *Instance) handleChannels() {
log.Errorf("Can't perform rebalancing: %v", err)
}

case subjects, ok := <-instance.unitSubjectsChangedChannel:
if !ok {
return
}

instance.updateSubjects(subjects)

case systemQuotaAlert, ok := <-instance.systemQuotaAlertChannel:
if !ok {
return
Expand Down
54 changes: 38 additions & 16 deletions unitstatushandler/unitstatushandler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ const waitStatusTimeout = 5 * time.Second
* Types
**********************************************************************************************************************/

type TestNodeManager struct {
type TestUnitManager struct {
nodesInfo map[string]*cloudprotocol.NodeInfo
nodeInfoChannel chan cloudprotocol.NodeInfo
subjectsChannel chan []string
currentSubjects []string
}

type TestSender struct {
Expand Down Expand Up @@ -1276,10 +1278,11 @@ func TestSoftwareManager(t *testing.T) {
},
}

nodeManager := NewTestNodeManager([]cloudprotocol.NodeInfo{
unitManager := NewTestUnitManager([]cloudprotocol.NodeInfo{
{NodeID: "node1", NodeType: "type1", Status: cloudprotocol.NodeStatusProvisioned},
{NodeID: "node2", NodeType: "type2", Status: cloudprotocol.NodeStatusProvisioned},
})
},
nil)
unitConfigUpdater := NewTestUnitConfigUpdater(cloudprotocol.UnitConfigStatus{})
softwareUpdater := NewTestSoftwareUpdater(nil, nil)
instanceRunner := NewTestInstanceRunner()
Expand All @@ -1304,7 +1307,7 @@ func TestSoftwareManager(t *testing.T) {

// Create software manager

softwareManager, err := newSoftwareManager(newTestStatusHandler(), softwareDownloader, nodeManager,
softwareManager, err := newSoftwareManager(newTestStatusHandler(), softwareDownloader, unitManager,
unitConfigUpdater, softwareUpdater, instanceRunner, testStorage, 30*time.Second)
if err != nil {
t.Errorf("Can't create software manager: %s", err)
Expand Down Expand Up @@ -1362,7 +1365,7 @@ func TestSoftwareManager(t *testing.T) {

if item.desiredStatus != nil && item.desiredStatus.Nodes != nil {
for _, nodeStatus := range item.desiredStatus.Nodes {
nodeInfo, err := nodeManager.GetNodeInfo(nodeStatus.NodeID)
nodeInfo, err := unitManager.GetNodeInfo(nodeStatus.NodeID)
if err != nil {
t.Errorf("Get node info error: %v", err)
}
Expand Down Expand Up @@ -1641,23 +1644,24 @@ func TestSyncExecutor(t *testing.T) {
**********************************************************************************************************************/

/***********************************************************************************************************************
* TestNodeManager
* TestUnitManager
**********************************************************************************************************************/

func NewTestNodeManager(nodesInfo []cloudprotocol.NodeInfo) *TestNodeManager {
func NewTestUnitManager(nodesInfo []cloudprotocol.NodeInfo, subjects []string) *TestUnitManager {
nodesInfoMap := make(map[string]*cloudprotocol.NodeInfo)

for _, nodeInfo := range nodesInfo {
nodesInfoMap[nodeInfo.NodeID] = &cloudprotocol.NodeInfo{}
*nodesInfoMap[nodeInfo.NodeID] = nodeInfo
}

return &TestNodeManager{
nodesInfo: nodesInfoMap,
return &TestUnitManager{
nodesInfo: nodesInfoMap,
currentSubjects: subjects,
}
}

func (manager *TestNodeManager) GetAllNodeIDs() ([]string, error) {
func (manager *TestUnitManager) GetAllNodeIDs() ([]string, error) {
nodeIDs := make([]string, 0, len(manager.nodesInfo))

for nodeID := range manager.nodesInfo {
Expand All @@ -1667,7 +1671,7 @@ func (manager *TestNodeManager) GetAllNodeIDs() ([]string, error) {
return nodeIDs, nil
}

func (manager *TestNodeManager) GetNodeInfo(nodeID string) (cloudprotocol.NodeInfo, error) {
func (manager *TestUnitManager) GetNodeInfo(nodeID string) (cloudprotocol.NodeInfo, error) {
nodeInfo, ok := manager.nodesInfo[nodeID]
if !ok {
return cloudprotocol.NodeInfo{}, aoserrors.New("node not found")
Expand All @@ -1676,13 +1680,13 @@ func (manager *TestNodeManager) GetNodeInfo(nodeID string) (cloudprotocol.NodeIn
return *nodeInfo, nil
}

func (manager *TestNodeManager) SubscribeNodeInfoChange() <-chan cloudprotocol.NodeInfo {
func (manager *TestUnitManager) SubscribeNodeInfoChange() <-chan cloudprotocol.NodeInfo {
manager.nodeInfoChannel = make(chan cloudprotocol.NodeInfo, 1)

return manager.nodeInfoChannel
}

func (manager *TestNodeManager) NodeInfoChanged(nodeInfo cloudprotocol.NodeInfo) {
func (manager *TestUnitManager) NodeInfoChanged(nodeInfo cloudprotocol.NodeInfo) {
if _, ok := manager.nodesInfo[nodeInfo.NodeID]; !ok {
manager.nodesInfo[nodeInfo.NodeID] = &cloudprotocol.NodeInfo{}
}
Expand All @@ -1694,7 +1698,7 @@ func (manager *TestNodeManager) NodeInfoChanged(nodeInfo cloudprotocol.NodeInfo)
}
}

func (manager *TestNodeManager) GetAllNodesInfo() []cloudprotocol.NodeInfo {
func (manager *TestUnitManager) GetAllNodesInfo() []cloudprotocol.NodeInfo {
nodesInfo := make([]cloudprotocol.NodeInfo, 0, len(manager.nodesInfo))

for _, nodeInfo := range manager.nodesInfo {
Expand All @@ -1704,7 +1708,7 @@ func (manager *TestNodeManager) GetAllNodesInfo() []cloudprotocol.NodeInfo {
return nodesInfo
}

func (manager *TestNodeManager) PauseNode(nodeID string) error {
func (manager *TestUnitManager) PauseNode(nodeID string) error {
if _, ok := manager.nodesInfo[nodeID]; !ok {
return aoserrors.New("node not found")
}
Expand All @@ -1714,7 +1718,7 @@ func (manager *TestNodeManager) PauseNode(nodeID string) error {
return nil
}

func (manager *TestNodeManager) ResumeNode(nodeID string) error {
func (manager *TestUnitManager) ResumeNode(nodeID string) error {
if _, ok := manager.nodesInfo[nodeID]; !ok {
return aoserrors.New("node not found")
}
Expand All @@ -1724,6 +1728,24 @@ func (manager *TestNodeManager) ResumeNode(nodeID string) error {
return nil
}

func (manager *TestUnitManager) GetUnitSubjects() (subjects []string, err error) {
return manager.currentSubjects, nil
}

func (manager *TestUnitManager) SubscribeUnitSubjectsChanged() <-chan []string {
manager.subjectsChannel = make(chan []string, 1)

return manager.subjectsChannel
}

func (manager *TestUnitManager) SubjectsChanged(subjects []string) {
manager.currentSubjects = subjects

if manager.subjectsChannel != nil {
manager.subjectsChannel <- subjects
}
}

/***********************************************************************************************************************
* TestSender
**********************************************************************************************************************/
Expand Down
Loading

0 comments on commit e8a9298

Please sign in to comment.