diff --git a/.golangci.yml b/.golangci.yml index 2d9bbc302..179df3820 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -35,6 +35,8 @@ linters-settings: local-prefixes: github.com/apache/yunikorn govet: check-shadowing: true + goconst: + min-occurrences: 5 funlen: lines: 120 statements: 80 diff --git a/pkg/admission/conf/am_conf.go b/pkg/admission/conf/am_conf.go index 154df9ced..f375af5e3 100644 --- a/pkg/admission/conf/am_conf.go +++ b/pkg/admission/conf/am_conf.go @@ -117,8 +117,13 @@ func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionController return acc } -func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) { - configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc}) +func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) error { + _, err := configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc}) + if err != nil { + return fmt.Errorf("failed to create register handlers: %w", err) + } + + return nil } func (acc *AdmissionControllerConf) GetNamespace() string { diff --git a/pkg/admission/namespace_cache.go b/pkg/admission/namespace_cache.go index c5ff80654..963ffc27b 100644 --- a/pkg/admission/namespace_cache.go +++ b/pkg/admission/namespace_cache.go @@ -19,6 +19,8 @@ package admission import ( + "fmt" + v1 "k8s.io/api/core/v1" informersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" @@ -53,14 +55,17 @@ type nsFlags struct { } // NewNamespaceCache creates a new cache and registers the handler for the cache with the Informer. -func NewNamespaceCache(namespaces informersv1.NamespaceInformer) *NamespaceCache { +func NewNamespaceCache(namespaces informersv1.NamespaceInformer) (*NamespaceCache, error) { nsc := &NamespaceCache{ nameSpaces: make(map[string]nsFlags), } if namespaces != nil { - namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc}) + _, err := namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc}) + if err != nil { + return nil, fmt.Errorf("failed to create namespace cache: %w", err) + } } - return nsc + return nsc, nil } // enableYuniKorn returns the value for the enableYuniKorn flag (tri-state UNSET, TRUE or FALSE) for the namespace. diff --git a/pkg/admission/namespace_cache_test.go b/pkg/admission/namespace_cache_test.go index 792a2fcdc..5d9e2bcd1 100644 --- a/pkg/admission/namespace_cache_test.go +++ b/pkg/admission/namespace_cache_test.go @@ -35,7 +35,8 @@ import ( const testNS = "test-ns" func TestFlags(t *testing.T) { - cache := NewNamespaceCache(nil) + cache, nsErr := NewNamespaceCache(nil) + assert.NilError(t, nsErr) cache.nameSpaces["notset"] = nsFlags{ enableYuniKorn: UNSET, generateAppID: UNSET, @@ -69,7 +70,8 @@ func TestNamespaceHandlers(t *testing.T) { kubeClient := client.NewKubeClientMock(false) informers := NewInformers(kubeClient, "default") - cache := NewNamespaceCache(informers.Namespace) + cache, nsErr := NewNamespaceCache(informers.Namespace) + assert.NilError(t, nsErr) informers.Start() defer informers.Stop() diff --git a/pkg/admission/priority_class_cache.go b/pkg/admission/priority_class_cache.go index b640ce834..87a4dd873 100644 --- a/pkg/admission/priority_class_cache.go +++ b/pkg/admission/priority_class_cache.go @@ -19,6 +19,8 @@ package admission import ( + "fmt" + schedulingv1 "k8s.io/api/scheduling/v1" informersv1 "k8s.io/client-go/informers/scheduling/v1" "k8s.io/client-go/tools/cache" @@ -36,14 +38,17 @@ type PriorityClassCache struct { } // NewPriorityClassCache creates a new cache and registers the handler for the cache with the Informer. -func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) *PriorityClassCache { +func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) (*PriorityClassCache, error) { pcc := &PriorityClassCache{ priorityClasses: make(map[string]bool), } if priorityClasses != nil { - priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc}) + _, err := priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc}) + if err != nil { + return nil, fmt.Errorf("failed to create a new cache and register the handler: %w", err) + } } - return pcc + return pcc, nil } // isPreemptSelfAllowed returns the preemption value. Only returns false if configured. diff --git a/pkg/admission/priority_class_cache_test.go b/pkg/admission/priority_class_cache_test.go index 0fe287afd..de755eb5c 100644 --- a/pkg/admission/priority_class_cache_test.go +++ b/pkg/admission/priority_class_cache_test.go @@ -35,7 +35,8 @@ import ( const testPC = "test-pc" func TestIsPreemptSelfAllowed(t *testing.T) { - cache := NewPriorityClassCache(nil) + cache, pcErr := NewPriorityClassCache(nil) + assert.NilError(t, pcErr) cache.priorityClasses["yes"] = true cache.priorityClasses["no"] = false @@ -49,7 +50,8 @@ func TestPriorityClassHandlers(t *testing.T) { kubeClient := client.NewKubeClientMock(false) informers := NewInformers(kubeClient, "default") - cache := NewPriorityClassCache(informers.PriorityClass) + cache, pcErr := NewPriorityClassCache(informers.PriorityClass) + assert.NilError(t, pcErr) informers.Start() defer informers.Stop() diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 20036eec2..a59b224eb 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -111,33 +111,50 @@ func NewContextWithBootstrapConfigMaps(apis client.APIProvider, bootstrapConfigM return ctx } -func (ctx *Context) AddSchedulingEventHandlers() { - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ +func (ctx *Context) AddSchedulingEventHandlers() error { + err := ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.ConfigMapInformerHandlers, FilterFn: ctx.filterConfigMaps, AddFn: ctx.addConfigMaps, UpdateFn: ctx.updateConfigMaps, DeleteFn: ctx.deleteConfigMaps, }) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ + if err != nil { + return err + } + + err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.PriorityClassInformerHandlers, FilterFn: ctx.filterPriorityClasses, AddFn: ctx.addPriorityClass, UpdateFn: ctx.updatePriorityClass, DeleteFn: ctx.deletePriorityClass, }) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ + if err != nil { + return err + } + + err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.NodeInformerHandlers, AddFn: ctx.addNode, UpdateFn: ctx.updateNode, DeleteFn: ctx.deleteNode, }) - ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ + if err != nil { + return err + } + + err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{ Type: client.PodInformerHandlers, AddFn: ctx.AddPod, UpdateFn: ctx.UpdatePod, DeleteFn: ctx.DeletePod, }) + if err != nil { + return err + } + + return nil } func (ctx *Context) IsPluginMode() bool { @@ -1449,7 +1466,11 @@ func (ctx *Context) InitializeState() error { // Step 5: Start scheduling event handlers. At this point, initialization is mostly complete, and any existing // objects will show up as newly added objects. Since the add/update event handlers are idempotent, this is fine. - ctx.AddSchedulingEventHandlers() + err = ctx.AddSchedulingEventHandlers() + if err != nil { + log.Log(log.Admission).Error("failed to add scheduling event handlers", zap.Error(err)) + return err + } // Step 6: Finalize priority classes. Between the start of initialization and when the informer event handlers are // registered, it is possible that a priority class object was deleted. Process them again and remove diff --git a/pkg/client/apifactory.go b/pkg/client/apifactory.go index 62bdcb203..beedfbd91 100644 --- a/pkg/client/apifactory.go +++ b/pkg/client/apifactory.go @@ -19,6 +19,7 @@ package client import ( + "fmt" "time" "go.uber.org/zap" @@ -53,7 +54,7 @@ func (t Type) String() string { type APIProvider interface { GetAPIs() *Clients - AddEventHandler(handlers *ResourceEventHandlers) + AddEventHandler(handlers *ResourceEventHandlers) error Start() Stop() WaitForSync() @@ -143,7 +144,7 @@ func (s *APIFactory) IsTestingMode() bool { return s.testMode } -func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) { +func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) error { s.lock.Lock() defer s.lock.Unlock() // register all handlers @@ -166,34 +167,43 @@ func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) { } log.Log(log.ShimClient).Info("registering event handler", zap.Stringer("type", handlers.Type)) - s.addEventHandlers(handlers.Type, h, 0) + if err := s.addEventHandlers(handlers.Type, h, 0); err != nil { + return fmt.Errorf("failed to initialize event handlers: %w", err) + } + return nil } func (s *APIFactory) addEventHandlers( - handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) { + handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) error { + var err error switch handlerType { case PodInformerHandlers: - s.GetAPIs().PodInformer.Informer(). + _, err = s.GetAPIs().PodInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case NodeInformerHandlers: - s.GetAPIs().NodeInformer.Informer(). + _, err = s.GetAPIs().NodeInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case ConfigMapInformerHandlers: - s.GetAPIs().ConfigMapInformer.Informer(). + _, err = s.GetAPIs().ConfigMapInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case StorageInformerHandlers: - s.GetAPIs().StorageInformer.Informer(). + _, err = s.GetAPIs().StorageInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case PVInformerHandlers: - s.GetAPIs().PVInformer.Informer(). + _, err = s.GetAPIs().PVInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case PVCInformerHandlers: - s.GetAPIs().PVCInformer.Informer(). + _, err = s.GetAPIs().PVCInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) case PriorityClassInformerHandlers: - s.GetAPIs().PriorityClassInformer.Informer(). + _, err = s.GetAPIs().PriorityClassInformer.Informer(). AddEventHandlerWithResyncPeriod(handler, resyncPeriod) } + + if err != nil { + return fmt.Errorf("failed to add event handlers: %w", err) + } + return nil } func (s *APIFactory) WaitForSync() { diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go index 733923901..fee2d2579 100644 --- a/pkg/client/apifactory_mock.go +++ b/pkg/client/apifactory_mock.go @@ -19,6 +19,7 @@ package client import ( + "fmt" "time" "go.uber.org/zap" @@ -219,16 +220,18 @@ func (m *MockedAPIProvider) IsTestingMode() bool { return true } -func (m *MockedAPIProvider) AddEventHandler(handlers *ResourceEventHandlers) { +func (m *MockedAPIProvider) AddEventHandler(handlers *ResourceEventHandlers) error { m.Lock() defer m.Unlock() if !m.running { - return + return fmt.Errorf("mocked API provider is not running") } m.eventHandler <- handlers log.Log(log.Test).Info("registering event handler", zap.Stringer("type", handlers.Type)) + + return nil } func (m *MockedAPIProvider) RunEventHandler() { diff --git a/pkg/cmd/admissioncontroller/main.go b/pkg/cmd/admissioncontroller/main.go index 815f92947..08f2771e2 100644 --- a/pkg/cmd/admissioncontroller/main.go +++ b/pkg/cmd/admissioncontroller/main.go @@ -62,9 +62,21 @@ func main() { kubeClient := client.NewKubeClient(amConf.GetKubeConfig()) informers := admission.NewInformers(kubeClient, amConf.GetNamespace()) - amConf.RegisterHandlers(informers.ConfigMap) - pcCache := admission.NewPriorityClassCache(informers.PriorityClass) - nsCache := admission.NewNamespaceCache(informers.Namespace) + + if hadlerErr := amConf.RegisterHandlers(informers.ConfigMap); hadlerErr != nil { + log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(hadlerErr)) + return + } + pcCache, pcErr := admission.NewPriorityClassCache(informers.PriorityClass) + if pcErr != nil { + log.Log(log.Admission).Fatal("Failed to create new priority class cache", zap.Error(pcErr)) + return + } + nsCache, nsErr := admission.NewNamespaceCache(informers.Namespace) + if nsErr != nil { + log.Log(log.Admission).Fatal("Failed to create namespace cache", zap.Error(nsErr)) + return + } informers.Start() wm, err := admission.NewWebhookManager(amConf) diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 6b6b9af64..1bbe5f02c 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -162,10 +162,6 @@ func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expe } } -func (fc *MockScheduler) removeApplication(appId string) error { - return fc.context.RemoveApplication(appId) -} - func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) { app := fc.context.GetApplication(appID) assert.Equal(t, app != nil, true) diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 92fb07a83..77a472f57 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -32,6 +32,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -58,6 +59,7 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/locking" + "github.com/apache/yunikorn-k8shim/pkg/log" "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" ) @@ -716,7 +718,11 @@ func (k *KubeCtl) StartConfigMapInformer(namespace string, stopChan <-chan struc informerFactory := informers.NewSharedInformerFactoryWithOptions(k.clientSet, 0, informers.WithNamespace(namespace)) informerFactory.Start(stopChan) configMapInformer := informerFactory.Core().V1().ConfigMaps() - configMapInformer.Informer().AddEventHandler(eventHandler) + _, err := configMapInformer.Informer().AddEventHandler(eventHandler) + if err != nil { + log.Log(log.AdmissionConf).Error("Error adding event handler", zap.Error(err)) + return err + } go configMapInformer.Informer().Run(stopChan) if err := utils.WaitForCondition(func() bool { return configMapInformer.Informer().HasSynced()