diff --git a/apis/config/types.go b/apis/config/types.go index 5c6778508..959c80fd8 100644 --- a/apis/config/types.go +++ b/apis/config/types.go @@ -176,14 +176,6 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) -// CacheInformerMode is a "string" type -type CacheInformerMode string - -const ( - CacheInformerShared CacheInformerMode = "Shared" - CacheInformerDedicated CacheInformerMode = "Dedicated" -) - // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -200,11 +192,6 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod - // InformerMode controls the channel the cache uses to get updates about pods. - // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is - // guaranteed to best suit the cache needs, at cost of one extra connection. - // If unspecified, default is "Dedicated" - InformerMode *CacheInformerMode } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/defaults.go b/apis/config/v1/defaults.go index 175f7aeb2..6f1b4df5b 100644 --- a/apis/config/v1/defaults.go +++ b/apis/config/v1/defaults.go @@ -89,8 +89,6 @@ var ( defaultResyncMethod = CacheResyncAutodetect - defaultInformerMode = CacheInformerDedicated - // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -202,9 +200,6 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } - if obj.Cache.InformerMode == nil { - obj.Cache.InformerMode = &defaultInformerMode - } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1/defaults_test.go b/apis/config/v1/defaults_test.go index 6f42a78db..f964eab8d 100644 --- a/apis/config/v1/defaults_test.go +++ b/apis/config/v1/defaults_test.go @@ -205,7 +205,6 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, - InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1/types.go b/apis/config/v1/types.go index be95eb08e..a0e92d4bb 100644 --- a/apis/config/v1/types.go +++ b/apis/config/v1/types.go @@ -174,14 +174,6 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) -// CacheInformerMode is a "string" type -type CacheInformerMode string - -const ( - CacheInformerShared CacheInformerMode = "Shared" - CacheInformerDedicated CacheInformerMode = "Dedicated" -) - // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -198,11 +190,6 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` - // InformerMode controls the channel the cache uses to get updates about pods. - // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is - // guaranteed to best suit the cache needs, at cost of one extra connection. - // If unspecified, default is "Dedicated" - InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/zz_generated.conversion.go b/apis/config/v1/zz_generated.conversion.go index 90f58e791..4c8388b74 100644 --- a/apis/config/v1/zz_generated.conversion.go +++ b/apis/config/v1/zz_generated.conversion.go @@ -344,7 +344,6 @@ func Convert_config_NetworkOverheadArgs_To_v1_NetworkOverheadArgs(in *config.Net func autoConvert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) - out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -356,7 +355,6 @@ func Convert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in func autoConvert_config_NodeResourceTopologyCache_To_v1_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) - out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1/zz_generated.deepcopy.go b/apis/config/v1/zz_generated.deepcopy.go index 0553952b4..be4875698 100644 --- a/apis/config/v1/zz_generated.deepcopy.go +++ b/apis/config/v1/zz_generated.deepcopy.go @@ -220,11 +220,6 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } - if in.InformerMode != nil { - in, out := &in.InformerMode, &out.InformerMode - *out = new(CacheInformerMode) - **out = **in - } return } diff --git a/apis/config/v1beta3/defaults.go b/apis/config/v1beta3/defaults.go index c2207717c..eed0492ff 100644 --- a/apis/config/v1beta3/defaults.go +++ b/apis/config/v1beta3/defaults.go @@ -89,8 +89,6 @@ var ( defaultResyncMethod = CacheResyncAutodetect - defaultInformerMode = CacheInformerDedicated - // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -202,9 +200,6 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } - if obj.Cache.InformerMode == nil { - obj.Cache.InformerMode = &defaultInformerMode - } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1beta3/defaults_test.go b/apis/config/v1beta3/defaults_test.go index 90aa95df7..9813b416d 100644 --- a/apis/config/v1beta3/defaults_test.go +++ b/apis/config/v1beta3/defaults_test.go @@ -205,7 +205,6 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, - InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1beta3/types.go b/apis/config/v1beta3/types.go index 659b0241a..4c3b2fc9f 100644 --- a/apis/config/v1beta3/types.go +++ b/apis/config/v1beta3/types.go @@ -174,14 +174,6 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) -// CacheInformerMode is a "string" type -type CacheInformerMode string - -const ( - CacheInformerShared CacheInformerMode = "Shared" - CacheInformerDedicated CacheInformerMode = "Dedicated" -) - // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -198,11 +190,6 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` - // InformerMode controls the channel the cache uses to get updates about pods. - // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is - // guaranteed to best suit the cache needs, at cost of one extra connection. - // If unspecified, default is "Dedicated" - InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1beta3/zz_generated.conversion.go b/apis/config/v1beta3/zz_generated.conversion.go index 2a7c9eb8b..5ed3b6a64 100644 --- a/apis/config/v1beta3/zz_generated.conversion.go +++ b/apis/config/v1beta3/zz_generated.conversion.go @@ -344,7 +344,6 @@ func Convert_config_NetworkOverheadArgs_To_v1beta3_NetworkOverheadArgs(in *confi func autoConvert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) - out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -356,7 +355,6 @@ func Convert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCac func autoConvert_config_NodeResourceTopologyCache_To_v1beta3_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) - out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1beta3/zz_generated.deepcopy.go b/apis/config/v1beta3/zz_generated.deepcopy.go index 3773eb083..377aef4c9 100644 --- a/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/apis/config/v1beta3/zz_generated.deepcopy.go @@ -220,11 +220,6 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } - if in.InformerMode != nil { - in, out := &in.InformerMode, &out.InformerMode - *out = new(CacheInformerMode) - **out = **in - } return } diff --git a/apis/config/zz_generated.deepcopy.go b/apis/config/zz_generated.deepcopy.go index c2db32ee0..a783ada93 100644 --- a/apis/config/zz_generated.deepcopy.go +++ b/apis/config/zz_generated.deepcopy.go @@ -170,11 +170,6 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } - if in.InformerMode != nil { - in, out := &in.InformerMode, &out.InformerMode - *out = new(CacheInformerMode) - **out = **in - } return } diff --git a/cmd/noderesourcetopology-plugin/main.go b/cmd/noderesourcetopology-plugin/main.go index 372297a1b..d32e55676 100644 --- a/cmd/noderesourcetopology-plugin/main.go +++ b/cmd/noderesourcetopology-plugin/main.go @@ -36,6 +36,7 @@ import ( _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" knifeatures "sigs.k8s.io/scheduler-plugins/pkg-kni/features" + kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" "github.com/k8stopologyawareschedwg/podfingerprint" ) @@ -65,6 +66,8 @@ func main() { rand.Seed(time.Now().UnixNano()) + kniinformer.Setup() + // Register custom plugins to the scheduler framework. // Later they can consist of scheduler profile(s) and hence // used by various kinds of workloads. diff --git a/pkg-kni/podinformer/podinformer.go b/pkg-kni/podinformer/podinformer.go new file mode 100644 index 000000000..10e9bbe54 --- /dev/null +++ b/pkg-kni/podinformer/podinformer.go @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package podinformer + +import ( + "context" + "os" + "strconv" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + podlisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + k8scache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const ( + nrtInformerEnvVar string = "NRT_ENABLE_INFORMER" +) + +var ( + enabled bool +) + +func IsEnabled() bool { + return enabled +} + +func Setup() { + hasNRTInf, ok := os.LookupEnv(nrtInformerEnvVar) + if !ok || hasNRTInf == "" { + klog.InfoS("NRT specific informer disabled", "variableFound", ok, "valueGiven", hasNRTInf != "") + return + } + val, err := strconv.ParseBool(hasNRTInf) + if err != nil { + klog.Error(err, "NRT specific informer disabled") + return + } + klog.InfoS("NRT specific informer status", "value", val) + enabled = val +} + +func FromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { + if !IsEnabled() { + podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut + return podHandle.Informer(), podHandle.Lister() + } + + podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) + podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) + + klog.V(5).InfoS("Start custom pod informer") + ctx := context.Background() + go podInformer.Run(ctx.Done()) + + klog.V(5).InfoS("Syncing custom pod informer") + cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) + klog.V(5).InfoS("Synced custom pod informer") + + return podInformer, podLister +} + +func IsPodRelevantForState(pod *corev1.Pod) bool { + if pod == nil { + return false // should never happen + } + if IsEnabled() { + return true // consider all pods including ones in terminal phase + } + return pod.Status.Phase == corev1.PodRunning // we are interested only about nodes which consume resources +} diff --git a/pkg/noderesourcetopology/cache/overreserve.go b/pkg/noderesourcetopology/cache/overreserve.go index 3c373e9f2..68f754f55 100644 --- a/pkg/noderesourcetopology/cache/overreserve.go +++ b/pkg/noderesourcetopology/cache/overreserve.go @@ -27,16 +27,15 @@ import ( "github.com/k8stopologyawareschedwg/podfingerprint" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" podlisterv1 "k8s.io/client-go/listers/core/v1" + k8scache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" - "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" - "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" ) @@ -51,10 +50,9 @@ type OverReserve struct { nodesWithForeignPods counter podLister podlisterv1.PodLister resyncMethod apiconfig.CacheResyncMethod - isPodRelevant podprovider.PodFilterFunc } -func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc) (*OverReserve, error) { +func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister) (*OverReserve, error) { if client == nil || podLister == nil { return nil, fmt.Errorf("nrtcache: received nil references") } @@ -76,7 +74,6 @@ func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient. nodesWithForeignPods: newCounter(), podLister: podLister, resyncMethod: resyncMethod, - isPodRelevant: isPodRelevant, } return obj, nil } @@ -202,7 +199,7 @@ func (ov *OverReserve) Resync() { } // node -> pod identifier (namespace, name) - nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, ov.isPodRelevant, logID) + nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, logID) if err != nil { klog.ErrorS(err, "cannot find the mapping between running pods and nodes") return @@ -270,32 +267,16 @@ func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeRe } } +func InformerFromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { + podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut + return podHandle.Informer(), podHandle.Lister() +} + // to be used only in tests func (ov *OverReserve) Store() *nrtStore { return ov.nrts } -func makeNodeToPodDataMap(podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc, logID string) (map[string][]podData, error) { - nodeToObjsMap := make(map[string][]podData) - pods, err := podLister.List(labels.Everything()) - if err != nil { - return nodeToObjsMap, err - } - for _, pod := range pods { - if !isPodRelevant(pod, logID) { - continue - } - nodeObjs := nodeToObjsMap[pod.Spec.NodeName] - nodeObjs = append(nodeObjs, podData{ - Namespace: pod.Namespace, - Name: pod.Name, - HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod), - }) - nodeToObjsMap[pod.Spec.NodeName] = nodeObjs - } - return nodeToObjsMap, nil -} - func logIDFromTime() string { return fmt.Sprintf("resync%v", time.Now().UnixMilli()) } diff --git a/pkg/noderesourcetopology/cache/overreserve_test.go b/pkg/noderesourcetopology/cache/overreserve_test.go index ec48e2bae..9d0599ed8 100644 --- a/pkg/noderesourcetopology/cache/overreserve_test.go +++ b/pkg/noderesourcetopology/cache/overreserve_test.go @@ -22,7 +22,6 @@ import ( "sort" "testing" - "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "github.com/k8stopologyawareschedwg/podfingerprint" @@ -35,7 +34,6 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" - "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" tu "sigs.k8s.io/scheduler-plugins/test/util" ) @@ -104,12 +102,12 @@ func TestInitEmptyLister(t *testing.T) { fakePodLister := &fakePodLister{} - _, err = NewOverReserve(nil, nil, fakePodLister, podprovider.IsPodRelevantAlways) + _, err = NewOverReserve(nil, nil, fakePodLister) if err == nil { t.Fatalf("accepted nil lister") } - _, err = NewOverReserve(nil, fakeClient, nil, podprovider.IsPodRelevantAlways) + _, err = NewOverReserve(nil, fakeClient, nil) if err == nil { t.Fatalf("accepted nil indexer") } @@ -228,7 +226,7 @@ func TestOverreserveGetCachedNRTCopy(t *testing.T) { checkGetCachedNRTCopy( t, func(client ctrlclient.Client, podLister podlisterv1.PodLister) (Interface, error) { - return NewOverReserve(nil, client, podLister, podprovider.IsPodRelevantAlways) + return NewOverReserve(nil, client, podLister) }, testCases..., ) @@ -726,247 +724,9 @@ func TestNodeWithForeignPods(t *testing.T) { } func mustOverReserve(t *testing.T, client ctrlclient.Client, podLister podlisterv1.PodLister) *OverReserve { - obj, err := NewOverReserve(nil, client, podLister, podprovider.IsPodRelevantAlways) + obj, err := NewOverReserve(nil, client, podLister) if err != nil { t.Fatalf("unexpected error creating cache: %v", err) } return obj } - -func TestMakeNodeToPodDataMap(t *testing.T) { - tcases := []struct { - description string - pods []*corev1.Pod - isPodRelevant podprovider.PodFilterFunc - err error - expected map[string][]podData - expectedErr error - }{ - { - description: "empty pod list - shared", - isPodRelevant: podprovider.IsPodRelevantShared, - expected: make(map[string][]podData), - }, - { - description: "empty pod list - dedicated", - isPodRelevant: podprovider.IsPodRelevantDedicated, - expected: make(map[string][]podData), - }, - { - description: "single pod NOT running - succeeded (kubernetes jobs) - dedicated", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodSucceeded, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantDedicated, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - }, - }, - }, - { - description: "single pod NOT running - failed", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodFailed, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantDedicated, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - }, - }, - }, - { - description: "single pod NOT running - succeeded (kubernetes jobs) - shared", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodSucceeded, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantShared, - expected: map[string][]podData{}, - }, - { - description: "single pod NOT running - failed - shared", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodFailed, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantShared, - expected: map[string][]podData{}, - }, - { - description: "single pod running - dedicated", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantDedicated, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - }, - }, - }, - { - description: "single pod running - shared", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantDedicated, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - }, - }, - }, - { - description: "few pods, single node running - dedicated", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace2", - Name: "pod2", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace2", - Name: "pod3", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - isPodRelevant: podprovider.IsPodRelevantDedicated, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - { - Namespace: "namespace2", - Name: "pod2", - }, - { - Namespace: "namespace2", - Name: "pod3", - }, - }, - }, - }, - } - - for _, tcase := range tcases { - t.Run(tcase.description, func(t *testing.T) { - podLister := &fakePodLister{ - pods: tcase.pods, - err: tcase.err, - } - got, err := makeNodeToPodDataMap(podLister, tcase.isPodRelevant, tcase.description) - if err != tcase.expectedErr { - t.Errorf("error mismatch: got %v expected %v", err, tcase.expectedErr) - } - if diff := cmp.Diff(got, tcase.expected); diff != "" { - t.Errorf("unexpected result: %v", diff) - } - }) - } -} diff --git a/pkg/noderesourcetopology/cache/store.go b/pkg/noderesourcetopology/cache/store.go index 8ff78b321..2741017c5 100644 --- a/pkg/noderesourcetopology/cache/store.go +++ b/pkg/noderesourcetopology/cache/store.go @@ -21,6 +21,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + podlisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" @@ -28,8 +30,11 @@ import ( "github.com/k8stopologyawareschedwg/podfingerprint" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" "sigs.k8s.io/scheduler-plugins/pkg/util" + + kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" ) // nrtStore maps the NRT data by node name. It is not thread safe and needs to be protected by a lock. @@ -242,3 +247,25 @@ func checkPodFingerprintForNode(logID string, objs []podData, nodeName, pfpExpec podfingerprint.MarkCompleted(st) return err } + +func makeNodeToPodDataMap(podLister podlisterv1.PodLister, logID string) (map[string][]podData, error) { + nodeToObjsMap := make(map[string][]podData) + pods, err := podLister.List(labels.Everything()) + if err != nil { + return nodeToObjsMap, err + } + for _, pod := range pods { + if !kniinformer.IsPodRelevantForState(pod) { + // we are interested only about nodes which consume resources + continue + } + nodeObjs := nodeToObjsMap[pod.Spec.NodeName] + nodeObjs = append(nodeObjs, podData{ + Namespace: pod.Namespace, + Name: pod.Name, + HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod), + }) + nodeToObjsMap[pod.Spec.NodeName] = nodeObjs + } + return nodeToObjsMap, nil +} diff --git a/pkg/noderesourcetopology/cache/store_test.go b/pkg/noderesourcetopology/cache/store_test.go index 30871d450..7f915af45 100644 --- a/pkg/noderesourcetopology/cache/store_test.go +++ b/pkg/noderesourcetopology/cache/store_test.go @@ -23,6 +23,7 @@ import ( "sort" "testing" + "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -564,6 +565,134 @@ func TestResourceStoreUpdate(t *testing.T) { } } +func TestMakeNodeToPodDataMap(t *testing.T) { + tcases := []struct { + description string + pods []*corev1.Pod + err error + expected map[string][]podData + expectedErr error + }{ + { + description: "empty pod list", + expected: make(map[string][]podData), + }, + { + description: "single pod NOT running", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + }, + expected: make(map[string][]podData), + }, + { + description: "single pod running", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "few pods, single node running", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace2", + Name: "pod2", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace2", + Name: "pod3", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + { + Namespace: "namespace2", + Name: "pod2", + }, + { + Namespace: "namespace2", + Name: "pod3", + }, + }, + }, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.description, func(t *testing.T) { + podLister := &fakePodLister{ + pods: tcase.pods, + err: tcase.err, + } + got, err := makeNodeToPodDataMap(podLister, tcase.description) + if err != tcase.expectedErr { + t.Errorf("error mismatch: got %v expected %v", err, tcase.expectedErr) + } + if diff := cmp.Diff(got, tcase.expected); diff != "" { + t.Errorf("unexpected result: %v", diff) + } + }) + } +} + func TestCheckPodFingerprintForNode(t *testing.T) { tcases := []struct { description string diff --git a/pkg/noderesourcetopology/pluginhelpers.go b/pkg/noderesourcetopology/pluginhelpers.go index 832c95433..4e334fb2d 100644 --- a/pkg/noderesourcetopology/pluginhelpers.go +++ b/pkg/noderesourcetopology/pluginhelpers.go @@ -32,8 +32,9 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" - "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" + + kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" ) const ( @@ -55,9 +56,9 @@ func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, han return nrtcache.NewPassthrough(client), nil } - podSharedInformer, podLister, isPodRelevant := podprovider.NewFromHandle(handle, tcfg.Cache) + podSharedInformer, podLister := kniinformer.FromHandle(handle) - nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister, isPodRelevant) + nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister) if err != nil { return nil, err } diff --git a/pkg/noderesourcetopology/podprovider/podprovider.go b/pkg/noderesourcetopology/podprovider/podprovider.go deleted file mode 100644 index 7aa6ec208..000000000 --- a/pkg/noderesourcetopology/podprovider/podprovider.go +++ /dev/null @@ -1,92 +0,0 @@ -/* -Copyright 2023 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package podprovider - -import ( - "context" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coreinformers "k8s.io/client-go/informers/core/v1" - podlisterv1 "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" - k8scache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" - - apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" -) - -type PodFilterFunc func(pod *corev1.Pod, logID string) bool - -func NewFromHandle(handle framework.Handle, cacheConf *apiconfig.NodeResourceTopologyCache) (k8scache.SharedIndexInformer, podlisterv1.PodLister, PodFilterFunc) { - dedicated := wantsDedicatedInformer(cacheConf) - if !dedicated { - podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut - return podHandle.Informer(), podHandle.Lister(), IsPodRelevantShared - } - - podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) - podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) - - klog.V(5).InfoS("Start custom pod informer") - ctx := context.Background() - go podInformer.Run(ctx.Done()) - - klog.V(5).InfoS("Syncing custom pod informer") - cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) - klog.V(5).InfoS("Synced custom pod informer") - - return podInformer, podLister, IsPodRelevantDedicated -} - -// IsPodRelevantAlways is meant to be used in test only -func IsPodRelevantAlways(pod *corev1.Pod, logID string) bool { - return true -} - -func IsPodRelevantShared(pod *corev1.Pod, logID string) bool { - // we are interested only about nodes which consume resources - return pod.Status.Phase == corev1.PodRunning -} - -func IsPodRelevantDedicated(pod *corev1.Pod, logID string) bool { - // Every other phase we're interested into (see https://github.com/kubernetes-sigs/scheduler-plugins/pull/599). - // Note PodUnknown is deprecated and reportedly no longer set since 2015 (!!) - if pod.Status.Phase == corev1.PodPending { - // this is unexpected, so we're loud about it - klog.V(2).InfoS("nrtcache: Listed pod in Pending phase, ignored", "logID", logID, "podUID", pod.UID) - return false - } - if pod.Spec.NodeName == "" { - // this is very unexpected, so we're louder about it - klog.InfoS("nrtcache: Listed pod unbound, ignored", "logID", logID, "podUID", pod.UID) - return false - } - return true -} - -func wantsDedicatedInformer(cacheConf *apiconfig.NodeResourceTopologyCache) bool { - if cacheConf == nil { - return false - } - if cacheConf.InformerMode == nil { - return false - } - infMode := *cacheConf.InformerMode - return infMode == apiconfig.CacheInformerDedicated -}