Skip to content

Commit

Permalink
feat(scheduler): support optional cpu constraints, make sort by guest…
Browse files Browse the repository at this point in the history
… count optional
  • Loading branch information
tamcore authored and pborn-ionos committed Jan 18, 2024
1 parent 7006aec commit 907a291
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 135 deletions.
20 changes: 20 additions & 0 deletions api/v1alpha1/proxmoxcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ type SchedulerHints struct {
// By default 100% of a node's memory will be used for allocation.
// +optional
MemoryAdjustment *uint64 `json:"memoryAdjustment,omitempty"`

// Like MemoryAdjustment, but for CPU resources.
// Defaults to 0 (disabled), as CPU is a compressible resource.
// +optional
CPUAdjustment *uint64 `json:"cpuAdjustment,omitempty"`

// +optional
// +kubebuilder:default=true
PreferLowerGuestCount bool `json:"preferLowerGuestCount,omitempty"`
}

// GetMemoryAdjustment returns the memory adjustment percentage to use within the scheduler.
Expand All @@ -91,6 +100,17 @@ func (sh *SchedulerHints) GetMemoryAdjustment() uint64 {
return memoryAdjustment
}

// GetCPUAdjustment returns the cpu adjustment percentage to use within the scheduler.
func (sh *SchedulerHints) GetCPUAdjustment() uint64 {
cpuAdjustment := uint64(0)

if sh != nil {
cpuAdjustment = ptr.Deref(sh.CPUAdjustment, 0)
}

return cpuAdjustment
}

// ProxmoxClusterStatus defines the observed state of ProxmoxCluster.
type ProxmoxClusterStatus struct {
// Ready indicates that the cluster is ready.
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ spec:
to a node's resources, to allow for overprovisioning or to ensure
a node will always have a safety buffer.
properties:
cpuAdjustment:
description: Like MemoryAdjustment, but for CPU resources. Defaults
to 0 (disabled), as CPU is a compressible resource.
format: int64
type: integer
memoryAdjustment:
description: MemoryAdjustment allows to adjust a node's memory
by a given percentage. For example, setting it to 300 allows
Expand All @@ -146,6 +151,9 @@ spec:
default 100% of a node's memory will be used for allocation.
format: int64
type: integer
preferLowerGuestCount:
default: true
type: boolean
type: object
required:
- dnsServers
Expand Down
103 changes: 65 additions & 38 deletions internal/service/scheduler/vmscheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ import (
"sigs.k8s.io/cluster-api/util"
)

// InsufficientMemoryError is used when the scheduler cannot assign a VM to a node because it would
// exceed the node's memory limit.
type InsufficientMemoryError struct {
node string
available uint64
requested uint64
// InsufficientResourcesError is used when the scheduler cannot assign a VM to a node because no node
// would be able to provide the requested resources.
type InsufficientResourcesError struct {
requestedMemory uint64
requestedCores uint64
}

func (err InsufficientMemoryError) Error() string {
return fmt.Sprintf("cannot reserve %dB of memory on node %s: %dB available memory left",
err.requested, err.node, err.available)
func (err InsufficientResourcesError) Error() string {
return fmt.Sprintf("cannot reserve %dB of memory and/or %d vCores in cluster",
err.requestedMemory, err.requestedCores)
}

// ScheduleVM decides which node to a ProxmoxMachine should be scheduled on.
Expand All @@ -64,69 +63,92 @@ func selectNode(
allowedNodes []string,
schedulerHints *infrav1.SchedulerHints,
) (string, error) {
byMemory := make(sortByAvailableMemory, len(allowedNodes))
for i, nodeName := range allowedNodes {
mem, err := client.GetReservableMemoryBytes(ctx, nodeName, schedulerHints.GetMemoryAdjustment())
var nodes []nodeInfo

requestedMemory := uint64(machine.Spec.MemoryMiB) * 1024 * 1024 // convert to bytes
requestedCores := uint64(machine.Spec.NumCores)

for _, nodeName := range allowedNodes {
mem, cpu, err := client.GetReservableResources(
ctx,
nodeName,
schedulerHints.GetMemoryAdjustment(),
schedulerHints.GetCPUAdjustment(),
)
if err != nil {
return "", err
}
byMemory[i] = nodeInfo{Name: nodeName, AvailableMemory: mem}
}

sort.Sort(byMemory)
// if MemoryAdjustment is explicitly set to 0 (zero), pretend we have enough mem for the guest
if schedulerHints.GetMemoryAdjustment() == 0 {
mem = requestedMemory
}
// if CPUAdjustment is explicitly set to 0 (zero), pretend we have enough cpu for the guest
if schedulerHints.GetCPUAdjustment() == 0 {
cpu = requestedCores
}

requestedMemory := uint64(machine.Spec.MemoryMiB) * 1024 * 1024 // convert to bytes
if requestedMemory > byMemory[0].AvailableMemory {
// no more space on the node with the highest amount of available memory
return "", InsufficientMemoryError{
node: byMemory[0].Name,
available: byMemory[0].AvailableMemory,
requested: requestedMemory,
node := nodeInfo{Name: nodeName, AvailableMemory: mem, AvailableCPU: cpu}
if node.AvailableMemory >= requestedMemory && node.AvailableCPU >= requestedCores {
nodes = append(nodes, node)
}
}

if len(nodes) == 0 {
return "", InsufficientResourcesError{requestedMemory, requestedCores}
}

// Sort nodes by free memory and then free CPU in descending order
byResources := make(sortByResources, len(nodes))
copy(byResources, nodes)
sort.Sort(byResources)

decision := byResources[0].Name

// count the existing vms per node
nodeCounter := make(map[string]int)
for _, nl := range locations {
nodeCounter[nl.Node]++
}

for i, info := range byMemory {
for i, info := range byResources {
info.ScheduledVMs = nodeCounter[info.Name]
byMemory[i] = info
byResources[i] = info
}

byReplicas := make(sortByReplicas, len(byMemory))
copy(byReplicas, byMemory)
byReplicas := make(sortByReplicas, len(byResources))
copy(byReplicas, byResources)

sort.Sort(byReplicas)

decision := byMemory[0].Name
if requestedMemory < byReplicas[0].AvailableMemory {
// distribute round-robin when memory allows it
// if memory allocation allows it, pick the node with the least amount of guests
if schedulerHints.PreferLowerGuestCount {
decision = byReplicas[0].Name
}

if logger := logr.FromContextOrDiscard(ctx); logger.V(4).Enabled() {
// only construct values when message should actually be logged
logger.Info("Scheduler decision",
"byReplicas", byReplicas.String(),
"byMemory", byMemory.String(),
"byResources", byResources.String(),
"requestedMemory", requestedMemory,
"requestedCores", requestedCores,
"resultNode", decision,
"schedulerHints", schedulerHints,
)
}

return decision, nil
}

type resourceClient interface {
GetReservableMemoryBytes(context.Context, string, uint64) (uint64, error)
GetReservableResources(context.Context, string, uint64, uint64) (uint64, uint64, error)
}

type nodeInfo struct {
Name string `json:"node"`
AvailableMemory uint64 `json:"mem"`
AvailableCPU uint64 `json:"cpu"`
ScheduledVMs int `json:"vms"`
}

Expand All @@ -143,16 +165,21 @@ func (a sortByReplicas) String() string {
return string(o)
}

type sortByAvailableMemory []nodeInfo
type sortByResources []nodeInfo

func (a sortByResources) Len() int { return len(a) }
func (a sortByResources) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortByResources) Less(i, j int) bool {
// Compare by free memory and free CPU in descending order
if a[i].AvailableMemory != a[j].AvailableMemory {
return a[i].AvailableMemory > a[j].AvailableMemory
}

func (a sortByAvailableMemory) Len() int { return len(a) }
func (a sortByAvailableMemory) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortByAvailableMemory) Less(i, j int) bool {
// more available memory = lower index
return a[i].AvailableMemory > a[j].AvailableMemory
// If free memory is equal, sort by free CPU in descending order
return a[i].AvailableCPU > a[j].AvailableCPU || (a[i].AvailableCPU == a[j].AvailableCPU && a[i].ScheduledVMs < a[j].ScheduledVMs)
}

func (a sortByAvailableMemory) String() string {
func (a sortByResources) String() string {
o, _ := json.Marshal(a)
return string(o)
}
55 changes: 35 additions & 20 deletions internal/service/scheduler/vmscheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"github.com/stretchr/testify/require"
)

type fakeResourceClient map[string]uint64
type fakeResourceClient map[string]nodeInfo

func (c fakeResourceClient) GetReservableMemoryBytes(_ context.Context, nodeName string, _ uint64) (uint64, error) {
return c[nodeName], nil
func (c fakeResourceClient) GetReservableResources(_ context.Context, nodeName string, _ uint64, _ uint64) (uint64, uint64, error) {
return c[nodeName].AvailableMemory, c[nodeName].AvailableCPU, nil
}

func miBytes(in uint64) uint64 {
Expand All @@ -39,10 +39,18 @@ func TestSelectNode(t *testing.T) {
allowedNodes := []string{"pve1", "pve2", "pve3"}
var locations []infrav1.NodeLocation
const requestMiB = 8
availableMem := map[string]uint64{
"pve1": miBytes(20),
"pve2": miBytes(30),
"pve3": miBytes(15),
const requestCores = 2
cpuAdjustment := uint64(100)

schedulerHints := &infrav1.SchedulerHints{
// This defaults to true in our CRD
PreferLowerGuestCount: true,
CPUAdjustment: &cpuAdjustment,
}
availableResources := map[string]nodeInfo{
"pve1": {AvailableMemory: miBytes(20), AvailableCPU: uint64(16)},
"pve2": {AvailableMemory: miBytes(30), AvailableCPU: uint64(16)},
"pve3": {AvailableMemory: miBytes(15), AvailableCPU: uint64(16)},
}

expectedNodes := []string{
Expand All @@ -57,40 +65,47 @@ func TestSelectNode(t *testing.T) {
proxmoxMachine := &infrav1.ProxmoxMachine{
Spec: infrav1.ProxmoxMachineSpec{
MemoryMiB: requestMiB,
NumCores: requestCores,
},
}

client := fakeResourceClient(availableMem)
client := fakeResourceClient(availableResources)

node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, &infrav1.SchedulerHints{})
node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, schedulerHints)
require.NoError(t, err)
require.Equal(t, expectedNode, node)

require.Greater(t, availableMem[node], miBytes(requestMiB))
availableMem[node] -= miBytes(requestMiB)
require.Greater(t, availableResources[node].AvailableMemory, miBytes(requestMiB))
if entry, ok := availableResources[node]; ok {
entry.AvailableMemory -= miBytes(requestMiB)
entry.AvailableCPU -= requestCores
availableResources[node] = entry
}

locations = append(locations, infrav1.NodeLocation{Node: node})
})
}

t.Run("out of memory", func(t *testing.T) {
t.Run("out of resources", func(t *testing.T) {
proxmoxMachine := &infrav1.ProxmoxMachine{
Spec: infrav1.ProxmoxMachineSpec{
MemoryMiB: requestMiB,
NumCores: requestCores,
},
}

client := fakeResourceClient(availableMem)
client := fakeResourceClient(availableResources)

node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, &infrav1.SchedulerHints{})
require.ErrorAs(t, err, &InsufficientMemoryError{})
node, err := selectNode(context.Background(), client, proxmoxMachine, locations, allowedNodes, schedulerHints)
require.ErrorAs(t, err, &InsufficientResourcesError{})
require.Empty(t, node)

expectMem := map[string]uint64{
"pve1": miBytes(4), // 20 - 8 x 2
"pve2": miBytes(6), // 30 - 8 x 3
"pve3": miBytes(7), // 15 - 8 x 1
expectResources := map[string]nodeInfo{
"pve1": {AvailableMemory: miBytes(4), AvailableCPU: uint64(12)}, // 20 - 8 x 2
"pve2": {AvailableMemory: miBytes(6), AvailableCPU: uint64(10)}, // 30 - 8 x 3
"pve3": {AvailableMemory: miBytes(7), AvailableCPU: uint64(14)}, // 15 - 8 x 1
}
require.Equal(t, expectMem, availableMem)

require.Equal(t, expectResources, availableResources)
})
}
2 changes: 1 addition & 1 deletion internal/service/vmservice/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func createVM(ctx context.Context, scope *scope.MachineScope) (proxmox.VMCloneRe
var err error
options.Target, err = selectNextNode(ctx, scope)
if err != nil {
if errors.As(err, &scheduler.InsufficientMemoryError{}) {
if errors.As(err, &scheduler.InsufficientResourcesError{}) {
scope.SetFailureMessage(err)
scope.SetFailureReason(capierrors.InsufficientResourcesMachineError)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/service/vmservice/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestEnsureVirtualMachine_CreateVM_SelectNode_InsufficientMemory(t *testing.
machineScope.InfraCluster.ProxmoxCluster.Spec.AllowedNodes = []string{"node1"}

selectNextNode = func(context.Context, *scope.MachineScope) (string, error) {
return "", fmt.Errorf("error: %w", scheduler.InsufficientMemoryError{})
return "", fmt.Errorf("error: %w", scheduler.InsufficientResourcesError{})
}
t.Cleanup(func() { selectNextNode = scheduler.ScheduleVM })

Expand Down
2 changes: 1 addition & 1 deletion pkg/proxmox/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Client interface {

GetTask(ctx context.Context, upID string) (*proxmox.Task, error)

GetReservableMemoryBytes(ctx context.Context, nodeName string, nodeMemoryAdjustment uint64) (uint64, error)
GetReservableResources(ctx context.Context, nodeName string, nodeMemoryAdjustment uint64, nodeCPUAdjustment uint64) (uint64, uint64, error)

ResizeDisk(ctx context.Context, vm *proxmox.VirtualMachine, disk, size string) error

Expand Down
Loading

0 comments on commit 907a291

Please sign in to comment.