diff --git a/apis/config/types.go b/apis/config/types.go index 959c80fd8..5c6778508 100644 --- a/apis/config/types.go +++ b/apis/config/types.go @@ -176,6 +176,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -192,6 +200,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Dedicated" + InformerMode *CacheInformerMode } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/defaults.go b/apis/config/v1/defaults.go index 6f1b4df5b..175f7aeb2 100644 --- a/apis/config/v1/defaults.go +++ b/apis/config/v1/defaults.go @@ -89,6 +89,8 @@ var ( defaultResyncMethod = CacheResyncAutodetect + defaultInformerMode = CacheInformerDedicated + // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } + if obj.Cache.InformerMode == nil { + obj.Cache.InformerMode = &defaultInformerMode + } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1/defaults_test.go b/apis/config/v1/defaults_test.go index f964eab8d..6f42a78db 100644 --- a/apis/config/v1/defaults_test.go +++ b/apis/config/v1/defaults_test.go @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, + InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1/types.go b/apis/config/v1/types.go index a0e92d4bb..be95eb08e 100644 --- a/apis/config/v1/types.go +++ b/apis/config/v1/types.go @@ -174,6 +174,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Dedicated" + InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/zz_generated.conversion.go b/apis/config/v1/zz_generated.conversion.go index 4c8388b74..90f58e791 100644 --- a/apis/config/v1/zz_generated.conversion.go +++ b/apis/config/v1/zz_generated.conversion.go @@ -344,6 +344,7 @@ func Convert_config_NetworkOverheadArgs_To_v1_NetworkOverheadArgs(in *config.Net func autoConvert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -355,6 +356,7 @@ func Convert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in func autoConvert_config_NodeResourceTopologyCache_To_v1_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1/zz_generated.deepcopy.go b/apis/config/v1/zz_generated.deepcopy.go index be4875698..0553952b4 100644 --- a/apis/config/v1/zz_generated.deepcopy.go +++ b/apis/config/v1/zz_generated.deepcopy.go @@ -220,6 +220,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/apis/config/v1beta3/defaults.go b/apis/config/v1beta3/defaults.go index eed0492ff..c2207717c 100644 --- a/apis/config/v1beta3/defaults.go +++ b/apis/config/v1beta3/defaults.go @@ -89,6 +89,8 @@ var ( defaultResyncMethod = CacheResyncAutodetect + defaultInformerMode = CacheInformerDedicated + // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } + if obj.Cache.InformerMode == nil { + obj.Cache.InformerMode = &defaultInformerMode + } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1beta3/defaults_test.go b/apis/config/v1beta3/defaults_test.go index 9813b416d..90aa95df7 100644 --- a/apis/config/v1beta3/defaults_test.go +++ b/apis/config/v1beta3/defaults_test.go @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, + InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1beta3/types.go b/apis/config/v1beta3/types.go index 4c3b2fc9f..659b0241a 100644 --- a/apis/config/v1beta3/types.go +++ b/apis/config/v1beta3/types.go @@ -174,6 +174,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Dedicated" + InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1beta3/zz_generated.conversion.go b/apis/config/v1beta3/zz_generated.conversion.go index 5ed3b6a64..2a7c9eb8b 100644 --- a/apis/config/v1beta3/zz_generated.conversion.go +++ b/apis/config/v1beta3/zz_generated.conversion.go @@ -344,6 +344,7 @@ func Convert_config_NetworkOverheadArgs_To_v1beta3_NetworkOverheadArgs(in *confi func autoConvert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -355,6 +356,7 @@ func Convert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCac func autoConvert_config_NodeResourceTopologyCache_To_v1beta3_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1beta3/zz_generated.deepcopy.go b/apis/config/v1beta3/zz_generated.deepcopy.go index 377aef4c9..3773eb083 100644 --- a/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/apis/config/v1beta3/zz_generated.deepcopy.go @@ -220,6 +220,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/apis/config/zz_generated.deepcopy.go b/apis/config/zz_generated.deepcopy.go index a783ada93..c2db32ee0 100644 --- a/apis/config/zz_generated.deepcopy.go +++ b/apis/config/zz_generated.deepcopy.go @@ -170,6 +170,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/cmd/noderesourcetopology-plugin/main.go b/cmd/noderesourcetopology-plugin/main.go index d32e55676..372297a1b 100644 --- a/cmd/noderesourcetopology-plugin/main.go +++ b/cmd/noderesourcetopology-plugin/main.go @@ -36,7 +36,6 @@ import ( _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" knifeatures "sigs.k8s.io/scheduler-plugins/pkg-kni/features" - kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" "github.com/k8stopologyawareschedwg/podfingerprint" ) @@ -66,8 +65,6 @@ func main() { rand.Seed(time.Now().UnixNano()) - kniinformer.Setup() - // Register custom plugins to the scheduler framework. // Later they can consist of scheduler profile(s) and hence // used by various kinds of workloads. diff --git a/pkg-kni/podinformer/podinformer.go b/pkg-kni/podinformer/podinformer.go deleted file mode 100644 index 10e9bbe54..000000000 --- a/pkg-kni/podinformer/podinformer.go +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package podinformer - -import ( - "context" - "os" - "strconv" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coreinformers "k8s.io/client-go/informers/core/v1" - podlisterv1 "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" - k8scache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" -) - -const ( - nrtInformerEnvVar string = "NRT_ENABLE_INFORMER" -) - -var ( - enabled bool -) - -func IsEnabled() bool { - return enabled -} - -func Setup() { - hasNRTInf, ok := os.LookupEnv(nrtInformerEnvVar) - if !ok || hasNRTInf == "" { - klog.InfoS("NRT specific informer disabled", "variableFound", ok, "valueGiven", hasNRTInf != "") - return - } - val, err := strconv.ParseBool(hasNRTInf) - if err != nil { - klog.Error(err, "NRT specific informer disabled") - return - } - klog.InfoS("NRT specific informer status", "value", val) - enabled = val -} - -func FromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { - if !IsEnabled() { - podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut - return podHandle.Informer(), podHandle.Lister() - } - - podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) - podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) - - klog.V(5).InfoS("Start custom pod informer") - ctx := context.Background() - go podInformer.Run(ctx.Done()) - - klog.V(5).InfoS("Syncing custom pod informer") - cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) - klog.V(5).InfoS("Synced custom pod informer") - - return podInformer, podLister -} - -func IsPodRelevantForState(pod *corev1.Pod) bool { - if pod == nil { - return false // should never happen - } - if IsEnabled() { - return true // consider all pods including ones in terminal phase - } - return pod.Status.Phase == corev1.PodRunning // we are interested only about nodes which consume resources -} diff --git a/pkg/noderesourcetopology/cache/overreserve.go b/pkg/noderesourcetopology/cache/overreserve.go index 68f754f55..3c373e9f2 100644 --- a/pkg/noderesourcetopology/cache/overreserve.go +++ b/pkg/noderesourcetopology/cache/overreserve.go @@ -27,15 +27,16 @@ import ( "github.com/k8stopologyawareschedwg/podfingerprint" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" podlisterv1 "k8s.io/client-go/listers/core/v1" - k8scache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" ) @@ -50,9 +51,10 @@ type OverReserve struct { nodesWithForeignPods counter podLister podlisterv1.PodLister resyncMethod apiconfig.CacheResyncMethod + isPodRelevant podprovider.PodFilterFunc } -func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister) (*OverReserve, error) { +func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc) (*OverReserve, error) { if client == nil || podLister == nil { return nil, fmt.Errorf("nrtcache: received nil references") } @@ -74,6 +76,7 @@ func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient. nodesWithForeignPods: newCounter(), podLister: podLister, resyncMethod: resyncMethod, + isPodRelevant: isPodRelevant, } return obj, nil } @@ -199,7 +202,7 @@ func (ov *OverReserve) Resync() { } // node -> pod identifier (namespace, name) - nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, logID) + nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, ov.isPodRelevant, logID) if err != nil { klog.ErrorS(err, "cannot find the mapping between running pods and nodes") return @@ -267,16 +270,32 @@ func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeRe } } -func InformerFromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { - podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut - return podHandle.Informer(), podHandle.Lister() -} - // to be used only in tests func (ov *OverReserve) Store() *nrtStore { return ov.nrts } +func makeNodeToPodDataMap(podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc, logID string) (map[string][]podData, error) { + nodeToObjsMap := make(map[string][]podData) + pods, err := podLister.List(labels.Everything()) + if err != nil { + return nodeToObjsMap, err + } + for _, pod := range pods { + if !isPodRelevant(pod, logID) { + continue + } + nodeObjs := nodeToObjsMap[pod.Spec.NodeName] + nodeObjs = append(nodeObjs, podData{ + Namespace: pod.Namespace, + Name: pod.Name, + HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod), + }) + nodeToObjsMap[pod.Spec.NodeName] = nodeObjs + } + return nodeToObjsMap, nil +} + func logIDFromTime() string { return fmt.Sprintf("resync%v", time.Now().UnixMilli()) } diff --git a/pkg/noderesourcetopology/cache/overreserve_test.go b/pkg/noderesourcetopology/cache/overreserve_test.go index 9d0599ed8..ec48e2bae 100644 --- a/pkg/noderesourcetopology/cache/overreserve_test.go +++ b/pkg/noderesourcetopology/cache/overreserve_test.go @@ -22,6 +22,7 @@ import ( "sort" "testing" + "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "github.com/k8stopologyawareschedwg/podfingerprint" @@ -34,6 +35,7 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" tu "sigs.k8s.io/scheduler-plugins/test/util" ) @@ -102,12 +104,12 @@ func TestInitEmptyLister(t *testing.T) { fakePodLister := &fakePodLister{} - _, err = NewOverReserve(nil, nil, fakePodLister) + _, err = NewOverReserve(nil, nil, fakePodLister, podprovider.IsPodRelevantAlways) if err == nil { t.Fatalf("accepted nil lister") } - _, err = NewOverReserve(nil, fakeClient, nil) + _, err = NewOverReserve(nil, fakeClient, nil, podprovider.IsPodRelevantAlways) if err == nil { t.Fatalf("accepted nil indexer") } @@ -226,7 +228,7 @@ func TestOverreserveGetCachedNRTCopy(t *testing.T) { checkGetCachedNRTCopy( t, func(client ctrlclient.Client, podLister podlisterv1.PodLister) (Interface, error) { - return NewOverReserve(nil, client, podLister) + return NewOverReserve(nil, client, podLister, podprovider.IsPodRelevantAlways) }, testCases..., ) @@ -724,9 +726,247 @@ func TestNodeWithForeignPods(t *testing.T) { } func mustOverReserve(t *testing.T, client ctrlclient.Client, podLister podlisterv1.PodLister) *OverReserve { - obj, err := NewOverReserve(nil, client, podLister) + obj, err := NewOverReserve(nil, client, podLister, podprovider.IsPodRelevantAlways) if err != nil { t.Fatalf("unexpected error creating cache: %v", err) } return obj } + +func TestMakeNodeToPodDataMap(t *testing.T) { + tcases := []struct { + description string + pods []*corev1.Pod + isPodRelevant podprovider.PodFilterFunc + err error + expected map[string][]podData + expectedErr error + }{ + { + description: "empty pod list - shared", + isPodRelevant: podprovider.IsPodRelevantShared, + expected: make(map[string][]podData), + }, + { + description: "empty pod list - dedicated", + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: make(map[string][]podData), + }, + { + description: "single pod NOT running - succeeded (kubernetes jobs) - dedicated", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "single pod NOT running - failed", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "single pod NOT running - succeeded (kubernetes jobs) - shared", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantShared, + expected: map[string][]podData{}, + }, + { + description: "single pod NOT running - failed - shared", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantShared, + expected: map[string][]podData{}, + }, + { + description: "single pod running - dedicated", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "single pod running - shared", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "few pods, single node running - dedicated", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace2", + Name: "pod2", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace2", + Name: "pod3", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + { + Namespace: "namespace2", + Name: "pod2", + }, + { + Namespace: "namespace2", + Name: "pod3", + }, + }, + }, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.description, func(t *testing.T) { + podLister := &fakePodLister{ + pods: tcase.pods, + err: tcase.err, + } + got, err := makeNodeToPodDataMap(podLister, tcase.isPodRelevant, tcase.description) + if err != tcase.expectedErr { + t.Errorf("error mismatch: got %v expected %v", err, tcase.expectedErr) + } + if diff := cmp.Diff(got, tcase.expected); diff != "" { + t.Errorf("unexpected result: %v", diff) + } + }) + } +} diff --git a/pkg/noderesourcetopology/cache/store.go b/pkg/noderesourcetopology/cache/store.go index 2741017c5..8ff78b321 100644 --- a/pkg/noderesourcetopology/cache/store.go +++ b/pkg/noderesourcetopology/cache/store.go @@ -21,8 +21,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/labels" - podlisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" @@ -30,11 +28,8 @@ import ( "github.com/k8stopologyawareschedwg/podfingerprint" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" - "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" "sigs.k8s.io/scheduler-plugins/pkg/util" - - kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" ) // nrtStore maps the NRT data by node name. It is not thread safe and needs to be protected by a lock. @@ -247,25 +242,3 @@ func checkPodFingerprintForNode(logID string, objs []podData, nodeName, pfpExpec podfingerprint.MarkCompleted(st) return err } - -func makeNodeToPodDataMap(podLister podlisterv1.PodLister, logID string) (map[string][]podData, error) { - nodeToObjsMap := make(map[string][]podData) - pods, err := podLister.List(labels.Everything()) - if err != nil { - return nodeToObjsMap, err - } - for _, pod := range pods { - if !kniinformer.IsPodRelevantForState(pod) { - // we are interested only about nodes which consume resources - continue - } - nodeObjs := nodeToObjsMap[pod.Spec.NodeName] - nodeObjs = append(nodeObjs, podData{ - Namespace: pod.Namespace, - Name: pod.Name, - HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod), - }) - nodeToObjsMap[pod.Spec.NodeName] = nodeObjs - } - return nodeToObjsMap, nil -} diff --git a/pkg/noderesourcetopology/cache/store_test.go b/pkg/noderesourcetopology/cache/store_test.go index 7f915af45..30871d450 100644 --- a/pkg/noderesourcetopology/cache/store_test.go +++ b/pkg/noderesourcetopology/cache/store_test.go @@ -23,7 +23,6 @@ import ( "sort" "testing" - "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -565,134 +564,6 @@ func TestResourceStoreUpdate(t *testing.T) { } } -func TestMakeNodeToPodDataMap(t *testing.T) { - tcases := []struct { - description string - pods []*corev1.Pod - err error - expected map[string][]podData - expectedErr error - }{ - { - description: "empty pod list", - expected: make(map[string][]podData), - }, - { - description: "single pod NOT running", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - }, - expected: make(map[string][]podData), - }, - { - description: "single pod running", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - }, - }, - }, - { - description: "few pods, single node running", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace2", - Name: "pod2", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace2", - Name: "pod3", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - { - Namespace: "namespace2", - Name: "pod2", - }, - { - Namespace: "namespace2", - Name: "pod3", - }, - }, - }, - }, - } - - for _, tcase := range tcases { - t.Run(tcase.description, func(t *testing.T) { - podLister := &fakePodLister{ - pods: tcase.pods, - err: tcase.err, - } - got, err := makeNodeToPodDataMap(podLister, tcase.description) - if err != tcase.expectedErr { - t.Errorf("error mismatch: got %v expected %v", err, tcase.expectedErr) - } - if diff := cmp.Diff(got, tcase.expected); diff != "" { - t.Errorf("unexpected result: %v", diff) - } - }) - } -} - func TestCheckPodFingerprintForNode(t *testing.T) { tcases := []struct { description string diff --git a/pkg/noderesourcetopology/pluginhelpers.go b/pkg/noderesourcetopology/pluginhelpers.go index 4e334fb2d..832c95433 100644 --- a/pkg/noderesourcetopology/pluginhelpers.go +++ b/pkg/noderesourcetopology/pluginhelpers.go @@ -32,9 +32,8 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" - - kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" ) const ( @@ -56,9 +55,9 @@ func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, han return nrtcache.NewPassthrough(client), nil } - podSharedInformer, podLister := kniinformer.FromHandle(handle) + podSharedInformer, podLister, isPodRelevant := podprovider.NewFromHandle(handle, tcfg.Cache) - nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister) + nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister, isPodRelevant) if err != nil { return nil, err } diff --git a/pkg/noderesourcetopology/podprovider/podprovider.go b/pkg/noderesourcetopology/podprovider/podprovider.go new file mode 100644 index 000000000..7aa6ec208 --- /dev/null +++ b/pkg/noderesourcetopology/podprovider/podprovider.go @@ -0,0 +1,92 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprovider + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + podlisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + k8scache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + + apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" +) + +type PodFilterFunc func(pod *corev1.Pod, logID string) bool + +func NewFromHandle(handle framework.Handle, cacheConf *apiconfig.NodeResourceTopologyCache) (k8scache.SharedIndexInformer, podlisterv1.PodLister, PodFilterFunc) { + dedicated := wantsDedicatedInformer(cacheConf) + if !dedicated { + podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut + return podHandle.Informer(), podHandle.Lister(), IsPodRelevantShared + } + + podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) + podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) + + klog.V(5).InfoS("Start custom pod informer") + ctx := context.Background() + go podInformer.Run(ctx.Done()) + + klog.V(5).InfoS("Syncing custom pod informer") + cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) + klog.V(5).InfoS("Synced custom pod informer") + + return podInformer, podLister, IsPodRelevantDedicated +} + +// IsPodRelevantAlways is meant to be used in test only +func IsPodRelevantAlways(pod *corev1.Pod, logID string) bool { + return true +} + +func IsPodRelevantShared(pod *corev1.Pod, logID string) bool { + // we are interested only about nodes which consume resources + return pod.Status.Phase == corev1.PodRunning +} + +func IsPodRelevantDedicated(pod *corev1.Pod, logID string) bool { + // Every other phase we're interested into (see https://github.com/kubernetes-sigs/scheduler-plugins/pull/599). + // Note PodUnknown is deprecated and reportedly no longer set since 2015 (!!) + if pod.Status.Phase == corev1.PodPending { + // this is unexpected, so we're loud about it + klog.V(2).InfoS("nrtcache: Listed pod in Pending phase, ignored", "logID", logID, "podUID", pod.UID) + return false + } + if pod.Spec.NodeName == "" { + // this is very unexpected, so we're louder about it + klog.InfoS("nrtcache: Listed pod unbound, ignored", "logID", logID, "podUID", pod.UID) + return false + } + return true +} + +func wantsDedicatedInformer(cacheConf *apiconfig.NodeResourceTopologyCache) bool { + if cacheConf == nil { + return false + } + if cacheConf.InformerMode == nil { + return false + } + infMode := *cacheConf.InformerMode + return infMode == apiconfig.CacheInformerDedicated +}