Skip to content

Commit

Permalink
add topology hints
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <[email protected]>
  • Loading branch information
miklezzzz committed Dec 13, 2024
1 parent 95c8f23 commit 54715d5
Show file tree
Hide file tree
Showing 10 changed files with 603 additions and 314 deletions.
18 changes: 3 additions & 15 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"path"
"path/filepath"
"runtime/trace"
"sort"
"strings"
"sync"
"time"
Expand All @@ -33,7 +32,6 @@ import (
"github.com/flant/addon-operator/pkg/module_manager/models/modules"
"github.com/flant/addon-operator/pkg/module_manager/models/modules/events"
dynamic_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/dynamically_enabled"
"github.com/flant/addon-operator/pkg/module_manager/scheduler/node"
"github.com/flant/addon-operator/pkg/task"
"github.com/flant/addon-operator/pkg/utils"
"github.com/flant/kube-client/client"
Expand Down Expand Up @@ -2471,25 +2469,15 @@ func (op *AddonOperator) CreateConvergeModulesTasks(state *module_manager.Module
// Add ModuleRun tasks to install or reload enabled modules.
newlyEnabled := utils.ListToMapStringStruct(state.ModulesToEnable)
log.Debugf("The following modules are going to be enabled/rerun: %v", state.AllEnabledModulesByOrder)
// sort modules' orders
sortedOrders := make(node.NodeWeightRange, 0, len(state.AllEnabledModulesByOrder))
for order := range state.AllEnabledModulesByOrder {
sortedOrders = append(sortedOrders, order)
}
sort.Sort(sortedOrders)

for _, order := range sortedOrders {
for _, modules := range state.AllEnabledModulesByOrder {
newLogLabels := utils.MergeLabels(logLabels)
delete(newLogLabels, "task.id")
modules := state.AllEnabledModulesByOrder[order]
// create parallel moduleRun task
switch {
// create parallel moduleRun task
case len(modules) > 1:
parallelRunMetadata := task.ParallelRunMetadata{}
newLogLabels["modules"] = strings.Join(modules, ",")
newLogLabels["order"] = order.String()
parallelRunMetadata := task.ParallelRunMetadata{
Order: order,
}
for _, moduleName := range modules {
ev := events.ModuleEvent{
ModuleName: moduleName,
Expand Down
5 changes: 3 additions & 2 deletions pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
kube_config_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/kube_config"
script_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/script_enabled"
static_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/static"
"github.com/flant/addon-operator/pkg/module_manager/scheduler/node"
"github.com/flant/addon-operator/pkg/task"
"github.com/flant/addon-operator/pkg/utils"
. "github.com/flant/shell-operator/pkg/hook/binding_context"
Expand All @@ -55,7 +54,7 @@ type ModulesState struct {
// All enabled modules.
AllEnabledModules []string
// All enabled modules grouped by order.
AllEnabledModulesByOrder map[node.NodeWeight][]string
AllEnabledModulesByOrder [][]string
// Modules that should be deleted.
ModulesToDisable []string
// Modules that was disabled and now are enabled.
Expand Down Expand Up @@ -881,6 +880,7 @@ func (mm *ModuleManager) RecalculateGraph(logLabels map[string]string) bool {
EventType: events.ModuleStateChanged,
})
}

return stateChanged
}

Expand All @@ -893,6 +893,7 @@ func (mm *ModuleManager) GlobalSynchronizationNeeded() bool {
return true
}
}

return false
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/module_manager/scheduler/extenders/extenders.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ type NotificationExtender interface {
SetNotifyChannel(context.Context, chan ExtenderEvent)
}

type TopologicalExtender interface {
// GetTopologicalHints returns the list of vertices that should be connected to the specified vertex
GetTopologicalHints(string) []string
}

// Hail to enabled scripts
type ResettableExtender interface {
// Reset resets the extender's cache
Reset()
type StatefulExtender interface {
// SetModulesStateHelper sets a helper function to get the list of enabled modules according to the latest vertex state buffer
SetModulesStateHelper(func() []string)
}
52 changes: 52 additions & 0 deletions pkg/module_manager/scheduler/extenders/mock/extenders_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,55 @@ func (f *TerminatorThree) Filter(_ string, _ map[string]string) (*bool, error) {
func (f *TerminatorThree) IsTerminator() bool {
return true
}

type TopologicalOne struct{}

func (f *TopologicalOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("TopologicalOne")
}

func (f *TopologicalOne) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TopologicalOne) IsTerminator() bool {
return true
}

func (f *TopologicalOne) GetTopologicalHints(moduleName string) []string {
switch moduleName {
case "echo":
return []string{"prometheus", "cert-manager"}
case "prometheus":
return []string{"prometheus-crd"}
case "foobar":
return []string{"foo", "bar"}
case "operator-trivy":
return []string{"istio", "admission-policy-engine"}
}

return nil
}

type TopologicalTwo struct{}

func (f *TopologicalTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("TopologicalTwo")
}

func (f *TopologicalTwo) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TopologicalTwo) IsTerminator() bool {
return true
}

func (f *TopologicalTwo) GetTopologicalHints(moduleName string) []string {
switch moduleName {

Check failure on line 148 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case "my-module":
return []string{"unknown-module"}
}

return nil
}
33 changes: 16 additions & 17 deletions pkg/module_manager/scheduler/extenders/script_enabled/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

"github.com/deckhouse/deckhouse/pkg/log"
pointer "k8s.io/utils/ptr"
Expand All @@ -30,9 +29,7 @@ type scriptState string
type Extender struct {
tmpDir string
basicModuleDescriptors map[string]moduleDescriptor

l sync.RWMutex
enabledModules []string
modulesStateHelper func() []string
}

type moduleDescriptor struct {
Expand All @@ -53,7 +50,6 @@ func NewExtender(tmpDir string) (*Extender, error) {

e := &Extender{
basicModuleDescriptors: make(map[string]moduleDescriptor),
enabledModules: make([]string, 0),
tmpDir: tmpDir,
}

Expand Down Expand Up @@ -89,12 +85,6 @@ func (e *Extender) Name() extenders.ExtenderName {
return Name
}

func (e *Extender) Reset() {
e.l.Lock()
e.enabledModules = make([]string, 0)
e.l.Unlock()
}

func (e *Extender) Filter(moduleName string, logLabels map[string]string) (*bool, error) {
if moduleDescriptor, found := e.basicModuleDescriptors[moduleName]; found {
var err error
Expand All @@ -106,7 +96,7 @@ func (e *Extender) Filter(moduleName string, logLabels map[string]string) (*bool
refreshLogLabels := utils.MergeLabels(logLabels, map[string]string{
"extender": "ScriptEnabled",
})
isEnabled, err = moduleDescriptor.module.RunEnabledScript(e.tmpDir, e.enabledModules, refreshLogLabels)
isEnabled, err = moduleDescriptor.module.RunEnabledScript(e.tmpDir, e.GetEnabledModules(), refreshLogLabels)
if err != nil {
err = fmt.Errorf("failed to execute '%s' module's enabled script: %v", moduleDescriptor.module.GetName(), err)
}
Expand All @@ -124,19 +114,28 @@ func (e *Extender) Filter(moduleName string, logLabels map[string]string) (*bool
log.Debugf("MODULE '%s' is ENABLED. Enabled script doesn't exist!", moduleDescriptor.module.GetName())
}

if enabled == nil || (enabled != nil && *enabled) {
e.l.Lock()
e.enabledModules = append(e.enabledModules, moduleDescriptor.module.GetName())
e.l.Unlock()
}
if err != nil {
return enabled, exerror.Permanent(err)
}

return enabled, nil
}

return nil, nil
}

func (e *Extender) IsTerminator() bool {
return true
}

func (e *Extender) SetModulesStateHelper(f func() []string) {
e.modulesStateHelper = f
}

func (e *Extender) GetEnabledModules() []string {
if e.modulesStateHelper == nil {
return nil
}

return e.modulesStateHelper()
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@ func TestExtender(t *testing.T) {
},
}

sharedEnabledModules := make([]string, 0)
helper := func() []string {
return sharedEnabledModules
}
e.SetModulesStateHelper(helper)

logLabels := map[string]string{"source": "TestExtender"}
for _, m := range basicModules {
e.AddBasicModule(m)
enabled, err := e.Filter(m.Name, logLabels)
if enabled == nil || (enabled != nil && *enabled) {
sharedEnabledModules = append(sharedEnabledModules, m.Name)
}
switch m.GetName() {
case "foo-bar":
assert.Equal(t, true, *enabled)
Expand All @@ -87,7 +96,7 @@ func TestExtender(t *testing.T) {
}

expected := []string{"ingress-nginx", "cert-manager", "foo-bar"}
assert.Equal(t, expected, e.enabledModules)
assert.Equal(t, expected, e.GetEnabledModules())

err = os.RemoveAll(tmp)
assert.NoError(t, err)
Expand Down
19 changes: 3 additions & 16 deletions pkg/module_manager/scheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,10 @@ type Node struct {
module ModuleInterface
}

type NodeWeightRange []NodeWeight

func (r NodeWeightRange) Len() int {
return len(r)
}

func (r NodeWeightRange) Less(i, j int) bool {
return r[i] < r[j]
}

func (r NodeWeightRange) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}

const (
ModuleType NodeType = "module"
WeightType NodeType = "weight"
ModuleType NodeType = "module"
WeightType NodeType = "weight"
TypeAttribute string = "type"
)

func NewNode() *Node {
Expand Down
Loading

0 comments on commit 54715d5

Please sign in to comment.