Skip to content

Commit

Permalink
koord-scheduler: support Reservation reserved CPU Cores (#1140)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <[email protected]>
  • Loading branch information
eahydra committed Apr 7, 2023
1 parent 832ecd0 commit 2d6ad79
Show file tree
Hide file tree
Showing 10 changed files with 530 additions and 87 deletions.
13 changes: 8 additions & 5 deletions apis/extension/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
Expand Down Expand Up @@ -155,18 +156,20 @@ func GetResourceStatus(annotations map[string]string) (*ResourceStatus, error) {
return resourceStatus, nil
}

func SetResourceStatus(pod *corev1.Pod, status *ResourceStatus) error {
if pod == nil {
func SetResourceStatus(obj metav1.Object, status *ResourceStatus) error {
if obj == nil {
return nil
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
data, err := json.Marshal(status)
if err != nil {
return err
}
pod.Annotations[AnnotationResourceStatus] = string(data)
annotations[AnnotationResourceStatus] = string(data)
obj.SetAnnotations(annotations)
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@ import (
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
)

func takePreferredCPUs(
topology *CPUTopology,
maxRefCount int,
availableCPUs cpuset.CPUSet,
preferredCPUs cpuset.CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
numaAllocatedStrategy schedulingconfig.NUMAAllocateStrategy,
) (cpuset.CPUSet, error) {
preferredCPUs = availableCPUs.Intersection(preferredCPUs)
size := preferredCPUs.Size()
if size == 0 {
return cpuset.CPUSet{}, nil
}
if numCPUsNeeded > size {
numCPUsNeeded = size
}

return takeCPUs(topology, maxRefCount, preferredCPUs, allocatedCPUs, numCPUsNeeded, cpuBindPolicy, cpuExclusivePolicy, numaAllocatedStrategy)
}

func takeCPUs(
topology *CPUTopology,
maxRefCount int,
Expand Down
33 changes: 25 additions & 8 deletions pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) {

// first pod request 4 CPUs
podUID := uuid.NewUUID()
availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err := takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
4, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
Expand All @@ -579,7 +579,7 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) {

// second pod request 5 CPUs
podUID = uuid.NewUUID()
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err = takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
5, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
Expand All @@ -589,7 +589,7 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) {

// third pod request 4 cpus
podUID = uuid.NewUUID()
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err = takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
4, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
Expand All @@ -610,7 +610,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) {

// first pod request 16 CPUs
podUID := uuid.NewUUID()
availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err := takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
16, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
Expand All @@ -620,7 +620,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) {

// second pod request 16 CPUs
podUID = uuid.NewUUID()
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err = takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
16, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
Expand All @@ -630,7 +630,7 @@ func TestTakeCPUsSortByRefCount(t *testing.T) {

// third pod request 16 cpus
podUID = uuid.NewUUID()
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err = takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
16, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
Expand All @@ -640,15 +640,15 @@ func TestTakeCPUsSortByRefCount(t *testing.T) {

// forth pod request 16 cpus
podUID = uuid.NewUUID()
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
result, err = takeCPUs(
cpuTopology, 2, availableCPUs, allocatedCPUsDetails,
16, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
assert.True(t, result.Equals(cpuset.MustParse("16-31")))
assert.NoError(t, err)
allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel)

availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
assert.Equal(t, cpuset.MustParse(""), availableCPUs)
}

Expand Down Expand Up @@ -754,3 +754,20 @@ func BenchmarkTakeCPUsWithSpread(b *testing.B) {
})
}
}

func TestTakePreferredCPUs(t *testing.T) {
topology := buildCPUTopologyForTest(2, 1, 16, 2)
cpus := topology.CPUDetails.CPUs()
result, err := takeCPUs(topology, 1, cpus, nil, 2, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
assert.NoError(t, err)
assert.Equal(t, []int{0, 2}, result.ToSlice())

result, err = takePreferredCPUs(topology, 1, cpus, cpuset.NewCPUSet(), nil, 2, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
assert.NoError(t, err)
assert.Empty(t, result.ToSlice())

preferredCPUs := cpuset.NewCPUSet(11, 13, 15, 17)
result, err = takePreferredCPUs(topology, 1, cpus, preferredCPUs, nil, 2, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated)
assert.NoError(t, err)
assert.Equal(t, []int{11, 13}, result.ToSlice())
}
20 changes: 19 additions & 1 deletion pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (n *cpuAllocation) updateAllocatedCPUSet(cpuTopology *CPUTopology, podUID t
n.addCPUs(cpuTopology, podUID, cpuset, cpuExclusivePolicy)
}

func (n *cpuAllocation) getCPUs(podUID types.UID) (cpuset.CPUSet, bool) {
cpuset, ok := n.allocatedPods[podUID]
return cpuset, ok
}

func (n *cpuAllocation) addCPUs(cpuTopology *CPUTopology, podUID types.UID, cpuset cpuset.CPUSet, exclusivePolicy schedulingconfig.CPUExclusivePolicy) {
if _, ok := n.allocatedPods[podUID]; ok {
return
Expand Down Expand Up @@ -83,8 +88,21 @@ func (n *cpuAllocation) releaseCPUs(podUID types.UID) {
}
}

func (n *cpuAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) {
func (n *cpuAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs, preferredCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) {
allocateInfo = n.allocatedCPUs.Clone()
if !preferredCPUs.IsEmpty() {
for _, cpuID := range preferredCPUs.ToSliceNoSort() {
cpuInfo, ok := allocateInfo[cpuID]
if ok {
cpuInfo.RefCount--
if cpuInfo.RefCount == 0 {
delete(allocateInfo, cpuID)
} else {
allocateInfo[cpuID] = cpuInfo
}
}
}
}
allocated := allocateInfo.CPUs().Filter(func(cpuID int) bool {
return allocateInfo[cpuID].RefCount >= maxRefCount
})
Expand Down
30 changes: 25 additions & 5 deletions pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestNodeAllocationStateAddCPUs(t *testing.T) {
assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods)
assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs)

availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
expectAvailableCPUs := cpuset.MustParse("0-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)

Expand All @@ -63,7 +63,7 @@ func TestNodeAllocationStateAddCPUs(t *testing.T) {
assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods)
assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs)

availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
cpuset.MustParse("0-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)

Expand Down Expand Up @@ -120,19 +120,39 @@ func Test_cpuAllocation_getAvailableCPUs(t *testing.T) {
podUID := uuid.NewUUID()
allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel)

availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
expectAvailableCPUs := cpuset.MustParse("0-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)

// test with add already allocated cpu(refCount > 1 but less than maxRefCount) and another pod
anotherPodUID := uuid.NewUUID()
allocationState.addCPUs(cpuTopology, anotherPodUID, cpuset.MustParse("2-5"), schedulingconfig.CPUExclusivePolicyPCPULevel)
availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet())
availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet(), cpuset.NewCPUSet())
expectAvailableCPUs = cpuset.MustParse("0-1,5-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)

allocationState.releaseCPUs(podUID)
availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet())
availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet())
expectAvailableCPUs = cpuset.MustParse("0-1,6-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)
}

func Test_cpuAllocation_getAvailableCPUs_with_preferred_cpus(t *testing.T) {
cpuTopology := buildCPUTopologyForTest(2, 1, 4, 2)
for _, v := range cpuTopology.CPUDetails {
v.CoreID = v.SocketID<<16 | v.CoreID
cpuTopology.CPUDetails[v.CPUID] = v
}

allocationState := newCPUAllocation("test-node-1")
assert.NotNil(t, allocationState)
podUID := uuid.NewUUID()
allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("0-4"), schedulingconfig.CPUExclusivePolicyPCPULevel)
availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet())
expectAvailableCPUs := cpuset.MustParse("5-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)

availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet(), cpuset.NewCPUSet(1, 2))
expectAvailableCPUs = cpuset.MustParse("1-2,5-15")
assert.Equal(t, expectAvailableCPUs, availableCPUs)
}
85 changes: 63 additions & 22 deletions pkg/scheduler/plugins/nodenumaresource/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,23 @@ type CPUManager interface {
node *corev1.Node,
numCPUsNeeded int,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) (cpuset.CPUSet, error)
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
preferredCPUs cpuset.CPUSet,
) (cpuset.CPUSet, error)

UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy)

GetAllocatedCPUSet(nodeName string, podUID types.UID) (cpuset.CPUSet, bool)

Free(nodeName string, podUID types.UID)

Score(
node *corev1.Node,
numCPUsNeeded int,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) int64
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
preferredCPUs cpuset.CPUSet,
) int64

GetAvailableCPUs(nodeName string) (availableCPUs cpuset.CPUSet, allocated CPUDetails, err error)
}
Expand Down Expand Up @@ -112,6 +118,7 @@ func (c *cpuManagerImpl) Allocate(
numCPUsNeeded int,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
preferredCPUs cpuset.CPUSet,
) (cpuset.CPUSet, error) {
result := cpuset.CPUSet{}
// The Pod requires the CPU to be allocated according to CPUBindPolicy,
Expand All @@ -131,19 +138,47 @@ func (c *cpuManagerImpl) Allocate(
allocation.lock.Lock()
defer allocation.lock.Unlock()

availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs)
availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs, preferredCPUs)
numaAllocateStrategy := c.getNUMAAllocateStrategy(node)
result, err := takeCPUs(
cpuTopologyOptions.CPUTopology,
cpuTopologyOptions.MaxRefCount,
availableCPUs,
allocated,
numCPUsNeeded,
cpuBindPolicy,
cpuExclusivePolicy,
numaAllocateStrategy,
)
return result, err
if !preferredCPUs.IsEmpty() {
var err error
result, err = takePreferredCPUs(
cpuTopologyOptions.CPUTopology,
cpuTopologyOptions.MaxRefCount,
availableCPUs,
preferredCPUs,
allocated,
numCPUsNeeded,
cpuBindPolicy,
cpuExclusivePolicy,
numaAllocateStrategy)
if err != nil {
return result, err
}
numCPUsNeeded -= result.Size()
availableCPUs = availableCPUs.Difference(preferredCPUs)
}
if numCPUsNeeded > 0 {
cpus, err := takeCPUs(
cpuTopologyOptions.CPUTopology,
cpuTopologyOptions.MaxRefCount,
availableCPUs,
allocated,
numCPUsNeeded,
cpuBindPolicy,
cpuExclusivePolicy,
numaAllocateStrategy,
)
if err != nil {
return cpuset.CPUSet{}, err
}
if result.IsEmpty() {
result = cpus
} else {
result = result.Union(cpus)
}
}
return result, nil
}

func (c *cpuManagerImpl) UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) {
Expand All @@ -159,19 +194,22 @@ func (c *cpuManagerImpl) UpdateAllocatedCPUSet(nodeName string, podUID types.UID
allocation.updateAllocatedCPUSet(cpuTopologyOptions.CPUTopology, podUID, cpuset, cpuExclusivePolicy)
}

func (c *cpuManagerImpl) GetAllocatedCPUSet(nodeName string, podUID types.UID) (cpuset.CPUSet, bool) {
allocation := c.getOrCreateAllocation(nodeName)
allocation.lock.Lock()
defer allocation.lock.Unlock()

return allocation.getCPUs(podUID)
}

func (c *cpuManagerImpl) Free(nodeName string, podUID types.UID) {
allocation := c.getOrCreateAllocation(nodeName)
allocation.lock.Lock()
defer allocation.lock.Unlock()
allocation.releaseCPUs(podUID)
}

func (c *cpuManagerImpl) Score(
node *corev1.Node,
numCPUsNeeded int,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
) int64 {
func (c *cpuManagerImpl) Score(node *corev1.Node, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, preferredCPUs cpuset.CPUSet) int64 {
cpuTopologyOptions := c.topologyManager.GetCPUTopologyOptions(node.Name)
if cpuTopologyOptions.CPUTopology == nil || !cpuTopologyOptions.CPUTopology.IsValid() {
return 0
Expand All @@ -184,8 +222,10 @@ func (c *cpuManagerImpl) Score(
allocation.lock.Lock()
defer allocation.lock.Unlock()

// TODO(joseph): should support score with preferredCPUs.

cpuTopology := cpuTopologyOptions.CPUTopology
availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs)
availableCPUs, allocated := allocation.getAvailableCPUs(cpuTopology, cpuTopologyOptions.MaxRefCount, reservedCPUs, preferredCPUs)
acc := newCPUAccumulator(
cpuTopology,
cpuTopologyOptions.MaxRefCount,
Expand Down Expand Up @@ -289,6 +329,7 @@ func (c *cpuManagerImpl) GetAvailableCPUs(nodeName string) (availableCPUs cpuset
allocation := c.getOrCreateAllocation(nodeName)
allocation.lock.Lock()
defer allocation.lock.Unlock()
availableCPUs, allocated = allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, cpuTopologyOptions.ReservedCPUs)
emptyCPUs := cpuset.NewCPUSet()
availableCPUs, allocated = allocation.getAvailableCPUs(cpuTopologyOptions.CPUTopology, cpuTopologyOptions.MaxRefCount, cpuTopologyOptions.ReservedCPUs, emptyCPUs)
return availableCPUs, allocated, nil
}
Loading

0 comments on commit 2d6ad79

Please sign in to comment.