Skip to content

Commit

Permalink
[YUNIKORN-2651] Update the unchecked error for make lint warnings (#850)
Browse files Browse the repository at this point in the history
Closes: #850

Signed-off-by: Chia-Ping Tsai <[email protected]>
  • Loading branch information
SophieTech88 authored and chia7712 committed Jun 4, 2024
1 parent 58adfe9 commit 3ce745e
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ linters-settings:
local-prefixes: github.com/apache/yunikorn
govet:
check-shadowing: true
goconst:
min-occurrences: 5
funlen:
lines: 120
statements: 80
Expand Down
9 changes: 7 additions & 2 deletions pkg/admission/conf/am_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionController
return acc
}

func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) {
configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc})
func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) error {
_, err := configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc})
if err != nil {
return fmt.Errorf("failed to create register handlers: %w", err)
}

return nil
}

func (acc *AdmissionControllerConf) GetNamespace() string {
Expand Down
11 changes: 8 additions & 3 deletions pkg/admission/namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package admission

import (
"fmt"

v1 "k8s.io/api/core/v1"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -53,14 +55,17 @@ type nsFlags struct {
}

// NewNamespaceCache creates a new cache and registers the handler for the cache with the Informer.
func NewNamespaceCache(namespaces informersv1.NamespaceInformer) *NamespaceCache {
func NewNamespaceCache(namespaces informersv1.NamespaceInformer) (*NamespaceCache, error) {
nsc := &NamespaceCache{
nameSpaces: make(map[string]nsFlags),
}
if namespaces != nil {
namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc})
_, err := namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc})
if err != nil {
return nil, fmt.Errorf("failed to create namespace cache: %w", err)
}
}
return nsc
return nsc, nil
}

// enableYuniKorn returns the value for the enableYuniKorn flag (tri-state UNSET, TRUE or FALSE) for the namespace.
Expand Down
6 changes: 4 additions & 2 deletions pkg/admission/namespace_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
const testNS = "test-ns"

func TestFlags(t *testing.T) {
cache := NewNamespaceCache(nil)
cache, nsErr := NewNamespaceCache(nil)
assert.NilError(t, nsErr)
cache.nameSpaces["notset"] = nsFlags{
enableYuniKorn: UNSET,
generateAppID: UNSET,
Expand Down Expand Up @@ -69,7 +70,8 @@ func TestNamespaceHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
cache := NewNamespaceCache(informers.Namespace)
cache, nsErr := NewNamespaceCache(informers.Namespace)
assert.NilError(t, nsErr)
informers.Start()
defer informers.Stop()

Expand Down
11 changes: 8 additions & 3 deletions pkg/admission/priority_class_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package admission

import (
"fmt"

schedulingv1 "k8s.io/api/scheduling/v1"
informersv1 "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -36,14 +38,17 @@ type PriorityClassCache struct {
}

// NewPriorityClassCache creates a new cache and registers the handler for the cache with the Informer.
func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) *PriorityClassCache {
func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) (*PriorityClassCache, error) {
pcc := &PriorityClassCache{
priorityClasses: make(map[string]bool),
}
if priorityClasses != nil {
priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc})
_, err := priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc})
if err != nil {
return nil, fmt.Errorf("failed to create a new cache and register the handler: %w", err)
}
}
return pcc
return pcc, nil
}

// isPreemptSelfAllowed returns the preemption value. Only returns false if configured.
Expand Down
6 changes: 4 additions & 2 deletions pkg/admission/priority_class_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
const testPC = "test-pc"

func TestIsPreemptSelfAllowed(t *testing.T) {
cache := NewPriorityClassCache(nil)
cache, pcErr := NewPriorityClassCache(nil)
assert.NilError(t, pcErr)
cache.priorityClasses["yes"] = true
cache.priorityClasses["no"] = false

Expand All @@ -49,7 +50,8 @@ func TestPriorityClassHandlers(t *testing.T) {
kubeClient := client.NewKubeClientMock(false)

informers := NewInformers(kubeClient, "default")
cache := NewPriorityClassCache(informers.PriorityClass)
cache, pcErr := NewPriorityClassCache(informers.PriorityClass)
assert.NilError(t, pcErr)
informers.Start()
defer informers.Stop()

Expand Down
33 changes: 27 additions & 6 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,50 @@ func NewContextWithBootstrapConfigMaps(apis client.APIProvider, bootstrapConfigM
return ctx
}

func (ctx *Context) AddSchedulingEventHandlers() {
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
func (ctx *Context) AddSchedulingEventHandlers() error {
err := ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.ConfigMapInformerHandlers,
FilterFn: ctx.filterConfigMaps,
AddFn: ctx.addConfigMaps,
UpdateFn: ctx.updateConfigMaps,
DeleteFn: ctx.deleteConfigMaps,
})
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
if err != nil {
return err
}

err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PriorityClassInformerHandlers,
FilterFn: ctx.filterPriorityClasses,
AddFn: ctx.addPriorityClass,
UpdateFn: ctx.updatePriorityClass,
DeleteFn: ctx.deletePriorityClass,
})
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
if err != nil {
return err
}

err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.NodeInformerHandlers,
AddFn: ctx.addNode,
UpdateFn: ctx.updateNode,
DeleteFn: ctx.deleteNode,
})
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
if err != nil {
return err
}

err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
AddFn: ctx.AddPod,
UpdateFn: ctx.UpdatePod,
DeleteFn: ctx.DeletePod,
})
if err != nil {
return err
}

return nil
}

func (ctx *Context) IsPluginMode() bool {
Expand Down Expand Up @@ -1449,7 +1466,11 @@ func (ctx *Context) InitializeState() error {

// Step 5: Start scheduling event handlers. At this point, initialization is mostly complete, and any existing
// objects will show up as newly added objects. Since the add/update event handlers are idempotent, this is fine.
ctx.AddSchedulingEventHandlers()
err = ctx.AddSchedulingEventHandlers()
if err != nil {
log.Log(log.Admission).Error("failed to add scheduling event handlers", zap.Error(err))
return err
}

// Step 6: Finalize priority classes. Between the start of initialization and when the informer event handlers are
// registered, it is possible that a priority class object was deleted. Process them again and remove
Expand Down
32 changes: 21 additions & 11 deletions pkg/client/apifactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package client

import (
"fmt"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (t Type) String() string {

type APIProvider interface {
GetAPIs() *Clients
AddEventHandler(handlers *ResourceEventHandlers)
AddEventHandler(handlers *ResourceEventHandlers) error
Start()
Stop()
WaitForSync()
Expand Down Expand Up @@ -143,7 +144,7 @@ func (s *APIFactory) IsTestingMode() bool {
return s.testMode
}

func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) {
func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) error {
s.lock.Lock()
defer s.lock.Unlock()
// register all handlers
Expand All @@ -166,34 +167,43 @@ func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) {
}

log.Log(log.ShimClient).Info("registering event handler", zap.Stringer("type", handlers.Type))
s.addEventHandlers(handlers.Type, h, 0)
if err := s.addEventHandlers(handlers.Type, h, 0); err != nil {
return fmt.Errorf("failed to initialize event handlers: %w", err)
}
return nil
}

func (s *APIFactory) addEventHandlers(
handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) error {
var err error
switch handlerType {
case PodInformerHandlers:
s.GetAPIs().PodInformer.Informer().
_, err = s.GetAPIs().PodInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
case NodeInformerHandlers:
s.GetAPIs().NodeInformer.Informer().
_, err = s.GetAPIs().NodeInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
case ConfigMapInformerHandlers:
s.GetAPIs().ConfigMapInformer.Informer().
_, err = s.GetAPIs().ConfigMapInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
case StorageInformerHandlers:
s.GetAPIs().StorageInformer.Informer().
_, err = s.GetAPIs().StorageInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
case PVInformerHandlers:
s.GetAPIs().PVInformer.Informer().
_, err = s.GetAPIs().PVInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
case PVCInformerHandlers:
s.GetAPIs().PVCInformer.Informer().
_, err = s.GetAPIs().PVCInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
case PriorityClassInformerHandlers:
s.GetAPIs().PriorityClassInformer.Informer().
_, err = s.GetAPIs().PriorityClassInformer.Informer().
AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}

if err != nil {
return fmt.Errorf("failed to add event handlers: %w", err)
}
return nil
}

func (s *APIFactory) WaitForSync() {
Expand Down
7 changes: 5 additions & 2 deletions pkg/client/apifactory_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package client

import (
"fmt"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -219,16 +220,18 @@ func (m *MockedAPIProvider) IsTestingMode() bool {
return true
}

func (m *MockedAPIProvider) AddEventHandler(handlers *ResourceEventHandlers) {
func (m *MockedAPIProvider) AddEventHandler(handlers *ResourceEventHandlers) error {
m.Lock()
defer m.Unlock()

if !m.running {
return
return fmt.Errorf("mocked API provider is not running")
}

m.eventHandler <- handlers
log.Log(log.Test).Info("registering event handler", zap.Stringer("type", handlers.Type))

return nil
}

func (m *MockedAPIProvider) RunEventHandler() {
Expand Down
18 changes: 15 additions & 3 deletions pkg/cmd/admissioncontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,21 @@ func main() {
kubeClient := client.NewKubeClient(amConf.GetKubeConfig())

informers := admission.NewInformers(kubeClient, amConf.GetNamespace())
amConf.RegisterHandlers(informers.ConfigMap)
pcCache := admission.NewPriorityClassCache(informers.PriorityClass)
nsCache := admission.NewNamespaceCache(informers.Namespace)

if hadlerErr := amConf.RegisterHandlers(informers.ConfigMap); hadlerErr != nil {
log.Log(log.Admission).Fatal("Failed to register handlers", zap.Error(hadlerErr))
return
}
pcCache, pcErr := admission.NewPriorityClassCache(informers.PriorityClass)
if pcErr != nil {
log.Log(log.Admission).Fatal("Failed to create new priority class cache", zap.Error(pcErr))
return
}
nsCache, nsErr := admission.NewNamespaceCache(informers.Namespace)
if nsErr != nil {
log.Log(log.Admission).Fatal("Failed to create namespace cache", zap.Error(nsErr))
return
}
informers.Start()

wm, err := admission.NewWebhookManager(amConf)
Expand Down
4 changes: 0 additions & 4 deletions pkg/shim/scheduler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expe
}
}

func (fc *MockScheduler) removeApplication(appId string) error {
return fc.context.RemoveApplication(appId)
}

func (fc *MockScheduler) waitAndAssertTaskState(t *testing.T, appID, taskID, expectedState string) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
Expand Down
8 changes: 7 additions & 1 deletion test/e2e/framework/helpers/k8s/k8s_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -58,6 +59,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
)
Expand Down Expand Up @@ -716,7 +718,11 @@ func (k *KubeCtl) StartConfigMapInformer(namespace string, stopChan <-chan struc
informerFactory := informers.NewSharedInformerFactoryWithOptions(k.clientSet, 0, informers.WithNamespace(namespace))
informerFactory.Start(stopChan)
configMapInformer := informerFactory.Core().V1().ConfigMaps()
configMapInformer.Informer().AddEventHandler(eventHandler)
_, err := configMapInformer.Informer().AddEventHandler(eventHandler)
if err != nil {
log.Log(log.AdmissionConf).Error("Error adding event handler", zap.Error(err))
return err
}
go configMapInformer.Informer().Run(stopChan)
if err := utils.WaitForCondition(func() bool {
return configMapInformer.Informer().HasSynced()
Expand Down

0 comments on commit 3ce745e

Please sign in to comment.