From 2d6ad79c4930c129600c4c89559dc21c561ced3e Mon Sep 17 00:00:00 2001 From: Joseph Date: Fri, 7 Apr 2023 10:04:39 +0800 Subject: [PATCH] koord-scheduler: support Reservation reserved CPU Cores (#1140) Signed-off-by: Joseph --- apis/extension/resource.go | 13 +- .../nodenumaresource/cpu_accumulator.go | 23 ++ .../nodenumaresource/cpu_accumulator_test.go | 33 ++- .../nodenumaresource/cpu_allocation.go | 20 +- .../nodenumaresource/cpu_allocation_test.go | 30 ++- .../plugins/nodenumaresource/cpu_manager.go | 85 +++++-- .../cpu_topology_manager_test.go | 4 +- .../plugins/nodenumaresource/plugin.go | 188 +++++++++++++--- .../plugins/nodenumaresource/plugin_test.go | 213 ++++++++++++++++-- .../nodenumaresource/pod_eventhandler.go | 8 + 10 files changed, 530 insertions(+), 87 deletions(-) diff --git a/apis/extension/resource.go b/apis/extension/resource.go index e7880f11c..1b215a312 100644 --- a/apis/extension/resource.go +++ b/apis/extension/resource.go @@ -20,6 +20,7 @@ import ( "encoding/json" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -155,18 +156,20 @@ func GetResourceStatus(annotations map[string]string) (*ResourceStatus, error) { return resourceStatus, nil } -func SetResourceStatus(pod *corev1.Pod, status *ResourceStatus) error { - if pod == nil { +func SetResourceStatus(obj metav1.Object, status *ResourceStatus) error { + if obj == nil { return nil } - if pod.Annotations == nil { - pod.Annotations = map[string]string{} + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} } data, err := json.Marshal(status) if err != nil { return err } - pod.Annotations[AnnotationResourceStatus] = string(data) + annotations[AnnotationResourceStatus] = string(data) + obj.SetAnnotations(annotations) return nil } diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go index b1316e79c..cc16bfc1b 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go @@ -26,6 +26,29 @@ import ( "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) +func takePreferredCPUs( + topology *CPUTopology, + maxRefCount int, + availableCPUs cpuset.CPUSet, + preferredCPUs cpuset.CPUSet, + allocatedCPUs CPUDetails, + numCPUsNeeded int, + cpuBindPolicy schedulingconfig.CPUBindPolicy, + cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, + numaAllocatedStrategy schedulingconfig.NUMAAllocateStrategy, +) (cpuset.CPUSet, error) { + preferredCPUs = availableCPUs.Intersection(preferredCPUs) + size := preferredCPUs.Size() + if size == 0 { + return cpuset.CPUSet{}, nil + } + if numCPUsNeeded > size { + numCPUsNeeded = size + } + + return takeCPUs(topology, maxRefCount, preferredCPUs, allocatedCPUs, numCPUsNeeded, cpuBindPolicy, cpuExclusivePolicy, numaAllocatedStrategy) +} + func takeCPUs( topology *CPUTopology, maxRefCount int, diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go index fc4851cba..a98e6e278 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go @@ -569,7 +569,7 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) { // first pod request 4 CPUs podUID := uuid.NewUUID() - availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err := takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 4, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -579,7 +579,7 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) { // second pod request 5 CPUs podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 5, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -589,7 +589,7 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) { // third pod request 4 cpus podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 4, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -610,7 +610,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) { // first pod request 16 CPUs podUID := uuid.NewUUID() - availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err := takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -620,7 +620,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) { // second pod request 16 CPUs podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -630,7 +630,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) { // third pod request 16 cpus podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -640,7 +640,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) { // forth pod request 16 cpus podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) @@ -648,7 +648,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) { assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) assert.Equal(t, cpuset.MustParse(""), availableCPUs) } @@ -754,3 +754,20 @@ func BenchmarkTakeCPUsWithSpread(b *testing.B) { }) } } + +func TestTakePreferredCPUs(t *testing.T) { + topology := buildCPUTopologyForTest(2, 1, 16, 2) + cpus := topology.CPUDetails.CPUs() + result, err := takeCPUs(topology, 1, cpus, nil, 2, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) + assert.NoError(t, err) + assert.Equal(t, []int{0, 2}, result.ToSlice()) + + result, err = takePreferredCPUs(topology, 1, cpus, cpuset.NewCPUSet(), nil, 2, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) + assert.NoError(t, err) + assert.Empty(t, result.ToSlice()) + + preferredCPUs := cpuset.NewCPUSet(11, 13, 15, 17) + result, err = takePreferredCPUs(topology, 1, cpus, preferredCPUs, nil, 2, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) + assert.NoError(t, err) + assert.Equal(t, []int{11, 13}, result.ToSlice()) +} diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go index 54dafee8e..1eaafb0b9 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go @@ -45,6 +45,11 @@ func (n *cpuAllocation) updateAllocatedCPUSet(cpuTopology *CPUTopology, podUID t n.addCPUs(cpuTopology, podUID, cpuset, cpuExclusivePolicy) } +func (n *cpuAllocation) getCPUs(podUID types.UID) (cpuset.CPUSet, bool) { + cpuset, ok := n.allocatedPods[podUID] + return cpuset, ok +} + func (n *cpuAllocation) addCPUs(cpuTopology *CPUTopology, podUID types.UID, cpuset cpuset.CPUSet, exclusivePolicy schedulingconfig.CPUExclusivePolicy) { if _, ok := n.allocatedPods[podUID]; ok { return @@ -83,8 +88,21 @@ func (n *cpuAllocation) releaseCPUs(podUID types.UID) { } } -func (n *cpuAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) { +func (n *cpuAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs, preferredCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) { allocateInfo = n.allocatedCPUs.Clone() + if !preferredCPUs.IsEmpty() { + for _, cpuID := range preferredCPUs.ToSliceNoSort() { + cpuInfo, ok := allocateInfo[cpuID] + if ok { + cpuInfo.RefCount-- + if cpuInfo.RefCount == 0 { + delete(allocateInfo, cpuID) + } else { + allocateInfo[cpuID] = cpuInfo + } + } + } + } allocated := allocateInfo.CPUs().Filter(func(cpuID int) bool { return allocateInfo[cpuID].RefCount >= maxRefCount }) diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go index 90cdbeaf1..cd7c7dca8 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go @@ -54,7 +54,7 @@ func TestNodeAllocationStateAddCPUs(t *testing.T) { assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods) assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs) - availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) expectAvailableCPUs := cpuset.MustParse("0-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) @@ -63,7 +63,7 @@ func TestNodeAllocationStateAddCPUs(t *testing.T) { assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods) assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) cpuset.MustParse("0-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) @@ -120,19 +120,39 @@ func Test_cpuAllocation_getAvailableCPUs(t *testing.T) { podUID := uuid.NewUUID() allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) - availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) expectAvailableCPUs := cpuset.MustParse("0-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) // test with add already allocated cpu(refCount > 1 but less than maxRefCount) and another pod anotherPodUID := uuid.NewUUID() allocationState.addCPUs(cpuTopology, anotherPodUID, cpuset.MustParse("2-5"), schedulingconfig.CPUExclusivePolicyPCPULevel) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet()) expectAvailableCPUs = cpuset.MustParse("0-1,5-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) allocationState.releaseCPUs(podUID) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet()) + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet()) expectAvailableCPUs = cpuset.MustParse("0-1,6-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) } + +func Test_cpuAllocation_getAvailableCPUs_with_preferred_cpus(t *testing.T) { + cpuTopology := buildCPUTopologyForTest(2, 1, 4, 2) + for _, v := range cpuTopology.CPUDetails { + v.CoreID = v.SocketID<<16 | v.CoreID + cpuTopology.CPUDetails[v.CPUID] = v + } + + allocationState := newCPUAllocation("test-node-1") + assert.NotNil(t, allocationState) + podUID := uuid.NewUUID() + allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("0-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) + availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet()) + expectAvailableCPUs := cpuset.MustParse("5-15") + assert.Equal(t, expectAvailableCPUs, availableCPUs) + + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet(1, 2)) + expectAvailableCPUs = cpuset.MustParse("1-2,5-15") + assert.Equal(t, expectAvailableCPUs, availableCPUs) +} diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go b/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go index a1756851b..95f81f384 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go @@ -37,17 +37,23 @@ type CPUManager interface { node *corev1.Node, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, - cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) (cpuset.CPUSet, error) + cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, + preferredCPUs cpuset.CPUSet, + ) (cpuset.CPUSet, error) UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) + GetAllocatedCPUSet(nodeName string, podUID types.UID) (cpuset.CPUSet, bool) + Free(nodeName string, podUID types.UID) Score( node *corev1.Node, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, - cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) int64 + cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, + preferredCPUs cpuset.CPUSet, + ) int64 GetAvailableCPUs(nodeName string) (availableCPUs cpuset.CPUSet, allocated CPUDetails, err error) } @@ -112,6 +118,7 @@ func (c *cpuManagerImpl) Allocate( numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, + preferredCPUs cpuset.CPUSet, ) (cpuset.CPUSet, error) { result := cpuset.CPUSet{} // The Pod requires the CPU to be allocated according to CPUBindPolicy, @@ -131,19 +138,47 @@ func (c *cpuManagerImpl) Allocate( allocation.lock.Lock() defer allocation.lock.Unlock() - availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs) + availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs, preferredCPUs) numaAllocateStrategy := c.getNUMAAllocateStrategy(node) - result, err := takeCPUs( - cpuTopologyOptions.CPUTopology, - cpuTopologyOptions.MaxRefCount, - availableCPUs, - allocated, - numCPUsNeeded, - cpuBindPolicy, - cpuExclusivePolicy, - numaAllocateStrategy, - ) - return result, err + if !preferredCPUs.IsEmpty() { + var err error + result, err = takePreferredCPUs( + cpuTopologyOptions.CPUTopology, + cpuTopologyOptions.MaxRefCount, + availableCPUs, + preferredCPUs, + allocated, + numCPUsNeeded, + cpuBindPolicy, + cpuExclusivePolicy, + numaAllocateStrategy) + if err != nil { + return result, err + } + numCPUsNeeded -= result.Size() + availableCPUs = availableCPUs.Difference(preferredCPUs) + } + if numCPUsNeeded > 0 { + cpus, err := takeCPUs( + cpuTopologyOptions.CPUTopology, + cpuTopologyOptions.MaxRefCount, + availableCPUs, + allocated, + numCPUsNeeded, + cpuBindPolicy, + cpuExclusivePolicy, + numaAllocateStrategy, + ) + if err != nil { + return cpuset.CPUSet{}, err + } + if result.IsEmpty() { + result = cpus + } else { + result = result.Union(cpus) + } + } + return result, nil } func (c *cpuManagerImpl) UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) { @@ -159,6 +194,14 @@ func (c *cpuManagerImpl) UpdateAllocatedCPUSet(nodeName string, podUID types.UID allocation.updateAllocatedCPUSet(cpuTopologyOptions.CPUTopology, podUID, cpuset, cpuExclusivePolicy) } +func (c *cpuManagerImpl) GetAllocatedCPUSet(nodeName string, podUID types.UID) (cpuset.CPUSet, bool) { + allocation := c.getOrCreateAllocation(nodeName) + allocation.lock.Lock() + defer allocation.lock.Unlock() + + return allocation.getCPUs(podUID) +} + func (c *cpuManagerImpl) Free(nodeName string, podUID types.UID) { allocation := c.getOrCreateAllocation(nodeName) allocation.lock.Lock() @@ -166,12 +209,7 @@ func (c *cpuManagerImpl) Free(nodeName string, podUID types.UID) { allocation.releaseCPUs(podUID) } -func (c *cpuManagerImpl) Score( - node *corev1.Node, - numCPUsNeeded int, - cpuBindPolicy schedulingconfig.CPUBindPolicy, - cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, -) int64 { +func (c *cpuManagerImpl) Score(node *corev1.Node, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, preferredCPUs cpuset.CPUSet) int64 { cpuTopologyOptions := c.topologyManager.GetCPUTopologyOptions(node.Name) if cpuTopologyOptions.CPUTopology == nil || !cpuTopologyOptions.CPUTopology.IsValid() { return 0 @@ -184,8 +222,10 @@ func (c *cpuManagerImpl) Score( allocation.lock.Lock() defer allocation.lock.Unlock() + // TODO(joseph): should support score with preferredCPUs. + cpuTopology := cpuTopologyOptions.CPUTopology - availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs) + availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs, preferredCPUs) acc := newCPUAccumulator( cpuTopology, cpuTopologyOptions.MaxRefCount, @@ -289,6 +329,7 @@ func (c *cpuManagerImpl) GetAvailableCPUs(nodeName string) (availableCPUs cpuset allocation := c.getOrCreateAllocation(nodeName) allocation.lock.Lock() defer allocation.lock.Unlock() - availableCPUs, allocated = allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, cpuTopologyOptions.ReservedCPUs) + emptyCPUs := cpuset.NewCPUSet() + availableCPUs, allocated = allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, cpuTopologyOptions.ReservedCPUs, emptyCPUs) return availableCPUs, allocated, nil } diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go b/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go index 693056c83..ce1dc3e2c 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go @@ -88,8 +88,8 @@ func TestCPUTopologyManager(t *testing.T) { assert.NotNil(t, topologyManager) extendHandle := &frameworkHandleExtender{ - Handle: suit.Handle, - Clientset: suit.NRTClientset, + ExtendedHandle: suit.Extender, + Clientset: suit.NRTClientset, } err = registerNodeResourceTopologyEventHandler(extendHandle, topologyManager) assert.NoError(t, err) diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 061cf666d..e134bbc64 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -23,15 +23,20 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" resourceapi "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" "github.com/koordinator-sh/koordinator/pkg/util" "github.com/koordinator-sh/koordinator/pkg/util/cpuset" + reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation" ) const ( @@ -75,6 +80,9 @@ var ( _ framework.ScorePlugin = &Plugin{} _ framework.ReservePlugin = &Plugin{} _ framework.PreBindPlugin = &Plugin{} + + _ frameworkext.ReservationPreFilterExtension = &Plugin{} + _ frameworkext.ReservationPreBindPlugin = &Plugin{} ) type Plugin struct { @@ -174,14 +182,39 @@ type preFilterState struct { preferredCPUExclusivePolicy schedulingconfig.CPUExclusivePolicy numCPUsNeeded int allocatedCPUs cpuset.CPUSet + reservedCPUs map[string]map[types.UID]cpuset.CPUSet } func (s *preFilterState) Clone() framework.StateData { - return &preFilterState{ - skip: s.skip, - resourceSpec: s.resourceSpec, - allocatedCPUs: s.allocatedCPUs.Clone(), + ns := &preFilterState{ + skip: s.skip, + resourceSpec: s.resourceSpec, + preferredCPUBindPolicy: s.preferredCPUBindPolicy, + preferredCPUExclusivePolicy: s.preferredCPUExclusivePolicy, + numCPUsNeeded: s.numCPUsNeeded, + allocatedCPUs: s.allocatedCPUs.Clone(), + } + ns.reservedCPUs = map[string]map[types.UID]cpuset.CPUSet{} + for nodeName, reservedCPUs := range s.reservedCPUs { + reservedCPUsClone := ns.reservedCPUs[nodeName] + if reservedCPUsClone == nil { + reservedCPUsClone = map[types.UID]cpuset.CPUSet{} + ns.reservedCPUs[nodeName] = reservedCPUsClone + } + for reservationUID, cpuSet := range reservedCPUs { + reservedCPUsClone[reservationUID] = cpuSet.Clone() + } + } + return ns +} + +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, *framework.Status) { + value, err := cycleState.Read(stateKey) + if err != nil { + return nil, framework.AsStatus(err) } + state := value.(*preFilterState) + return state, nil } func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod) *framework.Status { @@ -191,7 +224,8 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState } state := &preFilterState{ - skip: true, + skip: true, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, } if AllowUseCPUSet(pod) { preferredCPUBindPolicy := resourceSpec.PreferredCPUBindPolicy @@ -224,13 +258,63 @@ func (p *Plugin) PreFilterExtensions() framework.PreFilterExtensions { return nil } -func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, *framework.Status) { - value, err := cycleState.Read(stateKey) - if err != nil { - return nil, framework.AsStatus(err) +func (p *Plugin) RemoveReservation(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, reservation *schedulingv1alpha1.Reservation, nodeInfo *framework.NodeInfo) *framework.Status { + state, status := getPreFilterState(cycleState) + if !status.IsSuccess() { + return status } - state := value.(*preFilterState) - return state, nil + if state.skip { + return nil + } + + allocatedCPUs, ok := p.cpuManager.GetAllocatedCPUSet(reservation.Status.NodeName, reservation.UID) + if !ok || allocatedCPUs.IsEmpty() { + return nil + } + + klog.V(5).Infof("NodeNUMAResource.RemoveReservation: podToSchedule %v, reservation: %v on node %s, allocatedCPUs: %v", + klog.KObj(podToSchedule), klog.KObj(reservation), nodeInfo.Node().Name, allocatedCPUs) + + reservedCPUsOnNode := state.reservedCPUs[reservation.Status.NodeName] + if reservedCPUsOnNode == nil { + reservedCPUsOnNode = map[types.UID]cpuset.CPUSet{} + state.reservedCPUs[reservation.Status.NodeName] = reservedCPUsOnNode + } + reservedCPUsOnNode[reservation.UID] = allocatedCPUs + return nil +} + +func (p *Plugin) AddPodInReservation(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, reservation *schedulingv1alpha1.Reservation, nodeInfo *framework.NodeInfo) *framework.Status { + state, status := getPreFilterState(cycleState) + if !status.IsSuccess() { + return status + } + if state.skip { + return nil + } + + allocatedCPUs, ok := p.cpuManager.GetAllocatedCPUSet(podInfoToAdd.Pod.Spec.NodeName, podInfoToAdd.Pod.UID) + if !ok || allocatedCPUs.IsEmpty() { + return nil + } + klog.V(5).Infof("NodeNUMAResource.AddPodInReservation: podToSchedule %v, add podInfoToAdd %v reservation %v on node %s, allocatedCPUs: %v", + klog.KObj(podToSchedule), klog.KObj(podInfoToAdd.Pod), klog.KObj(reservation), nodeInfo.Node().Name, allocatedCPUs) + + reservedCPUsOnNode := state.reservedCPUs[reservation.Status.NodeName] + if reservedCPUsOnNode != nil { + cpus := reservedCPUsOnNode[reservation.UID] + cpus = cpus.Difference(allocatedCPUs) + if !cpus.IsEmpty() { + reservedCPUsOnNode[reservation.UID] = cpus + } else { + delete(reservedCPUsOnNode, reservation.UID) + if len(reservedCPUsOnNode) == 0 { + delete(state.reservedCPUs, reservation.Status.NodeName) + } + } + } + + return nil } func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { @@ -295,7 +379,20 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po return 0, nil } - score := p.cpuManager.Score(node, state.numCPUsNeeded, preferredCPUBindPolicy, state.preferredCPUExclusivePolicy) + var score int64 + reservedCPUs := state.reservedCPUs[nodeName] + if len(reservedCPUs) > 0 { + var maxScore int64 + for _, cpus := range reservedCPUs { + s := p.cpuManager.Score(node, state.numCPUsNeeded, preferredCPUBindPolicy, state.preferredCPUExclusivePolicy, cpus) + if s > maxScore { + maxScore = s + } + } + score = maxScore + } else { + score = p.cpuManager.Score(node, state.numCPUsNeeded, preferredCPUBindPolicy, state.preferredCPUExclusivePolicy, cpuset.NewCPUSet()) + } return score, nil } @@ -325,7 +422,12 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, if err != nil { return framework.AsStatus(err) } - result, err := p.cpuManager.Allocate(node, state.numCPUsNeeded, preferredCPUBindPolicy, state.preferredCPUExclusivePolicy) + + reservationReservedCPUs, err := p.getReservationReservedCPUs(cycleState, pod, node, state) + if err != nil { + return framework.AsStatus(err) + } + result, err := p.cpuManager.Allocate(node, state.numCPUsNeeded, preferredCPUBindPolicy, state.preferredCPUExclusivePolicy, reservationReservedCPUs) if err != nil { return framework.AsStatus(err) } @@ -335,6 +437,28 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, return nil } +func (p *Plugin) getReservationReservedCPUs(cycleState *framework.CycleState, pod *corev1.Pod, node *corev1.Node, state *preFilterState) (cpuset.CPUSet, error) { + var result cpuset.CPUSet + if reservationutil.IsReservePod(pod) { + return result, nil + } + nominatedReservation := frameworkext.GetNominatedReservation(cycleState) + if nominatedReservation == nil { + return result, nil + } + + allocatedCPUs, _ := p.cpuManager.GetAllocatedCPUSet(node.Name, nominatedReservation.UID) + if allocatedCPUs.IsEmpty() { + return result, nil + } + + reservedCPUs := state.reservedCPUs[node.Name][nominatedReservation.UID] + if !reservedCPUs.IsEmpty() && !reservedCPUs.IsSubsetOf(allocatedCPUs) { + return result, fmt.Errorf("reservation reserved CPUs are invalid") + } + return reservedCPUs, nil +} + func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) { state, status := getPreFilterState(cycleState) if !status.IsSuccess() { @@ -347,6 +471,14 @@ func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState } func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + return p.preBindObject(ctx, cycleState, pod, nodeName) +} + +func (p *Plugin) PreBindReservation(ctx context.Context, cycleState *framework.CycleState, reservation *schedulingv1alpha1.Reservation, nodeName string) *framework.Status { + return p.preBindObject(ctx, cycleState, reservation, nodeName) +} + +func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleState, object runtime.Object, nodeName string) *framework.Status { state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return status @@ -359,9 +491,10 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState, return nil } - podOriginal := pod - pod = pod.DeepCopy() - + originalObj := object + object = object.DeepCopyObject() + metaObject := object.(metav1.Object) + annotations := metaObject.GetAnnotations() // Write back ResourceSpec annotation if LSR Pod hasn't specified CPUBindPolicy if state.resourceSpec.PreferredCPUBindPolicy == "" || state.resourceSpec.PreferredCPUBindPolicy == schedulingconfig.CPUBindPolicyDefault || @@ -371,32 +504,31 @@ func (p *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState, } resourceSpecData, err := json.Marshal(resourceSpec) if err != nil { - return framework.NewStatus(framework.Error, err.Error()) + return framework.AsStatus(err) } - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) + if annotations == nil { + annotations = make(map[string]string) } - pod.Annotations[extension.AnnotationResourceSpec] = string(resourceSpecData) + annotations[extension.AnnotationResourceSpec] = string(resourceSpecData) + metaObject.SetAnnotations(annotations) } resourceStatus := &extension.ResourceStatus{CPUSet: state.allocatedCPUs.String()} - err := SetResourceStatus(pod, resourceStatus) - if err != nil { - return framework.NewStatus(framework.Error, err.Error()) + if err := SetResourceStatus(metaObject, resourceStatus); err != nil { + return framework.AsStatus(err) } // patch pod or reservation (if the pod is a reserve pod) with new annotations - err = util.RetryOnConflictOrTooManyRequests(func() error { - _, err1 := util.NewPatch().WithHandle(p.handle).AddAnnotations(pod.Annotations).Patch(ctx, podOriginal) + err := util.RetryOnConflictOrTooManyRequests(func() error { + _, err1 := util.NewPatch().WithHandle(p.handle).AddAnnotations(metaObject.GetAnnotations()).Patch(ctx, originalObj.(metav1.Object)) return err1 }) if err != nil { - klog.V(3).ErrorS(err, "Failed to preBind Pod with CPUSet", - "pod", klog.KObj(pod), "CPUSet", state.allocatedCPUs, "node", nodeName) + klog.V(3).ErrorS(err, "Failed to preBind %T with CPUSet", object, klog.KObj(metaObject), "CPUSet", state.allocatedCPUs, "node", nodeName) return framework.NewStatus(framework.Error, err.Error()) } - klog.V(4).Infof("Successfully preBind Pod %s/%s with CPUSet %s", pod.Namespace, pod.Name, state.allocatedCPUs) + klog.V(4).Infof("Successfully preBind %T %v with CPUSet %s", object, klog.KObj(metaObject), state.allocatedCPUs) return nil } diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index ec4be1832..405422f6a 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" @@ -38,8 +39,12 @@ import ( "k8s.io/utils/pointer" "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" "github.com/koordinator-sh/koordinator/pkg/util/cpuset" _ "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/scheme" @@ -102,22 +107,16 @@ func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { } type frameworkHandleExtender struct { - framework.Handle + frameworkext.ExtendedHandle *nrtfake.Clientset } -func proxyPluginFactory(fakeClientSet *nrtfake.Clientset, factory runtime.PluginFactory) runtime.PluginFactory { - return func(configuration apiruntime.Object, f framework.Handle) (framework.Plugin, error) { - return factory(configuration, &frameworkHandleExtender{ - Handle: f, - Clientset: fakeClientSet, - }) - } -} - type pluginTestSuit struct { framework.Handle + ExtenderFactory *frameworkext.FrameworkExtenderFactory + Extender frameworkext.FrameworkExtender NRTClientset *nrtfake.Clientset + KoordClientSet *koordfake.Clientset proxyNew runtime.PluginFactory nodeNUMAResourceArgs *schedulingconfig.NodeNUMAResourceArgs } @@ -130,12 +129,24 @@ func newPluginTestSuit(t *testing.T, nodes []*corev1.Node) *pluginTestSuit { assert.NoError(t, err) nrtClientSet := nrtfake.NewSimpleClientset() - proxyNew := proxyPluginFactory(nrtClientSet, New) + koordClientSet := koordfake.NewSimpleClientset() + koordSharedInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClientSet, 0) + extenderFactory, err := frameworkext.NewFrameworkExtenderFactory( + frameworkext.WithKoordinatorClientSet(koordClientSet), + frameworkext.WithKoordinatorSharedInformerFactory(koordSharedInformerFactory), + ) + assert.NoError(t, err) + + proxyNew := frameworkext.PluginFactoryProxy(extenderFactory, func(configuration apiruntime.Object, f framework.Handle) (framework.Plugin, error) { + return New(configuration, &frameworkHandleExtender{ + ExtendedHandle: f.(frameworkext.ExtendedHandle), + Clientset: nrtClientSet, + }) + }) registeredPlugins := []schedulertesting.RegisterPluginFunc{ schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - schedulertesting.RegisterPluginAsExtensions(Name, proxyNew, "PreFilter", "Filter", "Score", "Reserve", "PreBind"), } cs := kubefake.NewSimpleClientset() @@ -149,9 +160,15 @@ func newPluginTestSuit(t *testing.T, nodes []*corev1.Node) *pluginTestSuit { runtime.WithSnapshotSharedLister(snapshot), ) assert.Nil(t, err) + + extender := extenderFactory.NewFrameworkExtender(fh) + return &pluginTestSuit{ Handle: fh, + ExtenderFactory: extenderFactory, + Extender: extender, NRTClientset: nrtClientSet, + KoordClientSet: koordClientSet, proxyNew: proxyNew, nodeNUMAResourceArgs: &nodeNUMAResourceArgs, } @@ -208,6 +225,7 @@ func TestPlugin_PreFilter(t *testing.T) { resourceSpec: &extension.ResourceSpec{PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs}, preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 4, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, { @@ -240,6 +258,7 @@ func TestPlugin_PreFilter(t *testing.T) { resourceSpec: &extension.ResourceSpec{PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs}, preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 4, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, { @@ -269,13 +288,15 @@ func TestPlugin_PreFilter(t *testing.T) { resourceSpec: &extension.ResourceSpec{PreferredCPUBindPolicy: extension.CPUBindPolicyDefault}, preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 4, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, { name: "skip cpu share pod", pod: &corev1.Pod{}, wantState: &preFilterState{ - skip: true, + skip: true, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, { @@ -304,7 +325,8 @@ func TestPlugin_PreFilter(t *testing.T) { }, }, wantState: &preFilterState{ - skip: true, + skip: true, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, { @@ -323,7 +345,8 @@ func TestPlugin_PreFilter(t *testing.T) { }, }, wantState: &preFilterState{ - skip: true, + skip: true, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, { @@ -369,7 +392,8 @@ func TestPlugin_PreFilter(t *testing.T) { }, }, wantState: &preFilterState{ - skip: true, + skip: true, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, }, }, } @@ -940,6 +964,26 @@ func TestPlugin_Reserve(t *testing.T) { want: nil, wantCPUSet: cpuset.NewCPUSet(4, 5, 6, 7), }, + { + name: "succeed allocate from reservation reserved cpus", + state: &preFilterState{ + skip: false, + numCPUsNeeded: 4, + resourceSpec: &extension.ResourceSpec{ + PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, + }, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{ + "test-node-1": { + uuid.NewUUID(): cpuset.NewCPUSet(4, 5, 6, 7, 8, 9, 10), + }, + }, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + pod: &corev1.Pod{}, + want: nil, + wantCPUSet: cpuset.NewCPUSet(4, 5, 6, 7), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -975,6 +1019,11 @@ func TestPlugin_Reserve(t *testing.T) { if len(tt.allocatedCPUs) > 0 { allocationState.addCPUs(tt.cpuTopology, uuid.NewUUID(), cpuset.NewCPUSet(tt.allocatedCPUs...), schedulingconfig.CPUExclusivePolicyNone) } + if len(tt.state.reservedCPUs) > 0 { + for reservationUID, cpus := range tt.state.reservedCPUs["test-node-1"] { + allocationState.addCPUs(tt.cpuTopology, reservationUID, cpus, schedulingconfig.CPUExclusivePolicyNone) + } + } } cpuManager := plg.cpuManager.(*cpuManagerImpl) @@ -985,6 +1034,16 @@ func TestPlugin_Reserve(t *testing.T) { cycleState := framework.NewCycleState() if tt.state != nil { cycleState.Write(stateKey, tt.state) + if len(tt.state.reservedCPUs) > 0 { + for reservationUID := range tt.state.reservedCPUs["test-node-1"] { + frameworkext.SetNominatedReservation(cycleState, &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + UID: reservationUID, + Name: "test-reservation", + }, + }) + } + } } nodeInfo, err := suit.Handle.SnapshotSharedLister().NodeInfos().Get("test-node-1") @@ -1139,3 +1198,125 @@ func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { } assert.Equal(t, expectedResourceSpec, resourceSpec) } + +func TestPlugin_PreBindReservation(t *testing.T) { + suit := newPluginTestSuit(t, nil) + p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) + assert.NotNil(t, p) + assert.Nil(t, err) + + reservation := &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "test-reservation-1", + }, + } + + _, status := suit.KoordClientSet.SchedulingV1alpha1().Reservations().Create(context.TODO(), reservation, metav1.CreateOptions{}) + assert.Nil(t, status) + + suit.start() + + plg := p.(*Plugin) + + state := &preFilterState{ + skip: false, + numCPUsNeeded: 4, + resourceSpec: &extension.ResourceSpec{ + PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, + }, + allocatedCPUs: cpuset.NewCPUSet(0, 1, 2, 3), + } + cycleState := framework.NewCycleState() + cycleState.Write(stateKey, state) + + s := plg.PreBindReservation(context.TODO(), cycleState, reservation, "test-node-1") + assert.True(t, s.IsSuccess()) + + gotReservation, status := suit.KoordClientSet.SchedulingV1alpha1().Reservations().Get(context.TODO(), reservation.Name, metav1.GetOptions{}) + assert.Nil(t, status) + assert.NotNil(t, gotReservation) + resourceStatus, err := extension.GetResourceStatus(gotReservation.Annotations) + assert.NoError(t, err) + assert.NotNil(t, resourceStatus) + expectResourceStatus := &extension.ResourceStatus{ + CPUSet: "0-3", + } + assert.Equal(t, expectResourceStatus, resourceStatus) +} + +func TestReservationPreFilterExtension(t *testing.T) { + suit := newPluginTestSuit(t, nil) + p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) + assert.NoError(t, err) + pl := p.(*Plugin) + cycleState := framework.NewCycleState() + state := &preFilterState{ + skip: false, + numCPUsNeeded: 4, + resourceSpec: &extension.ResourceSpec{ + PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, + }, + reservedCPUs: map[string]map[types.UID]cpuset.CPUSet{}, + } + cycleState.Write(stateKey, state) + + reservation := &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-reservation", + UID: uuid.NewUUID(), + }, + Status: schedulingv1alpha1.ReservationStatus{ + NodeName: "test-node", + }, + } + pl.topologyManager.UpdateCPUTopologyOptions("test-node", func(options *CPUTopologyOptions) { + options.CPUTopology = buildCPUTopologyForTest(1, 2, 8, 2) + options.MaxRefCount = 1 + }) + pl.cpuManager.UpdateAllocatedCPUSet("test-node", reservation.UID, cpuset.NewCPUSet(6, 7, 8, 9), schedulingconfig.CPUExclusivePolicyNone) + + podA := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "pod-a", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + podB := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "pod-b", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + pl.cpuManager.UpdateAllocatedCPUSet("test-node", podA.UID, cpuset.NewCPUSet(6, 7), schedulingconfig.CPUExclusivePolicyNone) + pl.cpuManager.UpdateAllocatedCPUSet("test-node", podB.UID, cpuset.NewCPUSet(8, 9), schedulingconfig.CPUExclusivePolicyNone) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + } + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + + status := pl.RemoveReservation(context.TODO(), cycleState, &corev1.Pod{}, reservation, nodeInfo) + assert.True(t, status.IsSuccess()) + assert.Equal(t, cpuset.NewCPUSet(6, 7, 8, 9), state.reservedCPUs["test-node"][reservation.UID]) + + status = pl.AddPodInReservation(context.TODO(), cycleState, &corev1.Pod{}, framework.NewPodInfo(podA), reservation, nodeInfo) + assert.True(t, status.IsSuccess()) + assert.Equal(t, cpuset.NewCPUSet(8, 9), state.reservedCPUs["test-node"][reservation.UID]) + + status = pl.AddPodInReservation(context.TODO(), cycleState, &corev1.Pod{}, framework.NewPodInfo(podB), reservation, nodeInfo) + assert.True(t, status.IsSuccess()) + assert.True(t, state.reservedCPUs["test-node"][reservation.UID].IsEmpty()) + assert.Empty(t, state.reservedCPUs) +} diff --git a/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go b/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go index 01564d819..40184df6e 100644 --- a/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go +++ b/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go @@ -23,9 +23,11 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/scheduler/framework" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper" "github.com/koordinator-sh/koordinator/pkg/util" "github.com/koordinator-sh/koordinator/pkg/util/cpuset" + reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation" ) type podEventHandler struct { @@ -38,6 +40,12 @@ func registerPodEventHandler(handle framework.Handle, cpuManager CPUManager) { cpuManager: cpuManager, } frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), handle.SharedInformerFactory(), podInformer, eventHandler) + extendedHandle, ok := handle.(frameworkext.ExtendedHandle) + if ok { + reservationInformer := extendedHandle.KoordinatorSharedInformerFactory().Scheduling().V1alpha1().Reservations() + reservationEventHandler := reservationutil.NewReservationToPodEventHandler(eventHandler, reservationutil.IsObjValidActiveReservation) + frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), extendedHandle.KoordinatorSharedInformerFactory(), reservationInformer.Informer(), reservationEventHandler) + } } func (c *podEventHandler) OnAdd(obj interface{}) {