diff --git a/src/vizier/services/metadata/controllers/k8smeta/BUILD.bazel b/src/vizier/services/metadata/controllers/k8smeta/BUILD.bazel index 6b386e1d5b7..ba6f5c8498b 100644 --- a/src/vizier/services/metadata/controllers/k8smeta/BUILD.bazel +++ b/src/vizier/services/metadata/controllers/k8smeta/BUILD.bazel @@ -44,7 +44,7 @@ go_library( "@com_github_sirupsen_logrus//:logrus", "@io_k8s_api//apps/v1:apps", "@io_k8s_api//core/v1:core", - "@io_k8s_apimachinery//pkg/apis/meta/v1:meta", + "@io_k8s_apimachinery//pkg/labels", "@io_k8s_apimachinery//pkg/watch", "@io_k8s_client_go//informers", "@io_k8s_client_go//kubernetes", diff --git a/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_controller.go b/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_controller.go index 04484340bfe..08724209c7a 100644 --- a/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_controller.go +++ b/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_controller.go @@ -22,7 +22,7 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -38,13 +38,6 @@ type Controller struct { quitCh chan struct{} updateCh chan *K8sResourceMessage once sync.Once - watchers []watcher -} - -// watcher watches a k8s resource type and forwards the updates to the given update channel. -type watcher interface { - StartWatcher(chan struct{}) - InitWatcher() error } // NewController creates a new Controller. @@ -65,36 +58,32 @@ func NewController(namespaces []string, updateCh chan *K8sResourceMessage) (*Con // NewControllerWithClientSet creates a new Controller using the given Clientset. func NewControllerWithClientSet(namespaces []string, updateCh chan *K8sResourceMessage, clientset kubernetes.Interface) (*Controller, error) { - quitCh := make(chan struct{}) + mc := &Controller{quitCh: make(chan struct{}), updateCh: updateCh} + go mc.startWithClientSet(namespaces, clientset) - // Create a watcher for each resource. - // The resource types we watch the K8s API for. These types are in a specific order: - // for example, nodes and namespaces must be synced before pods, since nodes/namespaces - // contain pods. - watchers := []watcher{ - nodeWatcher(namespaces, updateCh, clientset), - namespaceWatcher(namespaces, updateCh, clientset), - podWatcher(namespaces, updateCh, clientset), - endpointsWatcher(namespaces, updateCh, clientset), - serviceWatcher(namespaces, updateCh, clientset), - replicaSetWatcher(namespaces, updateCh, clientset), - deploymentWatcher(namespaces, updateCh, clientset), - } + return mc, nil +} - mc := &Controller{quitCh: quitCh, updateCh: updateCh, watchers: watchers} +// startWithClientSet starts the controller +func (mc *Controller) startWithClientSet(namespaces []string, clientset kubernetes.Interface) { + factory := informers.NewSharedInformerFactory(clientset, 12*time.Hour) - go func() { - for _, w := range mc.watchers { - err := w.InitWatcher() - if err != nil { - // Still return the informer because the rest of the system can recover from this. - log.WithError(err).Error("Failed to run watcher init") - } - go w.StartWatcher(quitCh) - } - }() + // Create a watcher for each resource. + // The resource types we watch the K8s API for. + startNodeWatcher(mc.updateCh, mc.quitCh, factory) + startNamespaceWatcher(mc.updateCh, mc.quitCh, factory) - return mc, nil + var namespacedFactories []informers.SharedInformerFactory + for _, ns := range namespaces { + factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) + namespacedFactories = append(namespacedFactories, factory) + } + + startPodWatcher(mc.updateCh, mc.quitCh, namespacedFactories) + startEndpointsWatcher(mc.updateCh, mc.quitCh, namespacedFactories) + startServiceWatcher(mc.updateCh, mc.quitCh, namespacedFactories) + startReplicaSetWatcher(mc.updateCh, mc.quitCh, namespacedFactories) + startDeploymentWatcher(mc.updateCh, mc.quitCh, namespacedFactories) } // Stop stops all K8s watchers. diff --git a/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_utils.go b/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_utils.go index cfadd2d2588..1c21e315055 100644 --- a/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_utils.go +++ b/src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_utils.go @@ -19,215 +19,134 @@ package k8smeta import ( - "context" - "time" - log "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "px.dev/pixie/src/shared/k8s" "px.dev/pixie/src/vizier/services/metadata/storepb" ) -type informerWatcher struct { - convert func(obj interface{}) *K8sResourceMessage - ch chan *K8sResourceMessage - init func() error - informers []cache.SharedIndexInformer -} - -// StartWatcher starts a watcher. -func (i *informerWatcher) StartWatcher(quitCh chan struct{}) { - for _, inf := range i.informers { - _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - msg := i.convert(obj) - if msg != nil { - msg.EventType = watch.Added - i.ch <- msg - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - msg := i.convert(newObj) - if msg != nil { - msg.EventType = watch.Modified - i.ch <- msg - } - }, - DeleteFunc: func(obj interface{}) { - msg := i.convert(obj) - if msg != nil { - msg.EventType = watch.Deleted - i.ch <- msg - } - }, - }) - inf.Run(quitCh) - } -} - -// InitWatcher initializes a watcher, for example to perform a list. -func (i *informerWatcher) InitWatcher() error { - if i.init != nil { - return i.init() - } - return nil -} - -func podWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: podConverter, - ch: ch, - } - - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) - inf := factory.Core().V1().Pods().Informer() - iw.informers = append(iw.informers, inf) - } - - init := func() error { - var podList []v1.Pod - // We initialize ch with the current Pods to handle cold start race conditions. - for _, ns := range namespaces { - list, err := clientset.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) - if err != nil { - log.WithError(err).Errorf("Failed to init pods in %s namespace.", ns) - return err - } - podList = append(podList, list.Items...) - } - - for _, obj := range podList { - item := obj - msg := iw.convert(&item) +func createHandlers(convert func(obj interface{}) *K8sResourceMessage, ch chan *K8sResourceMessage) cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + msg := convert(obj) if msg != nil { msg.EventType = watch.Added - iw.ch <- msg + ch <- msg } - } - return nil + }, + UpdateFunc: func(oldObj, newObj interface{}) { + msg := convert(newObj) + if msg != nil { + msg.EventType = watch.Modified + ch <- msg + } + }, + DeleteFunc: func(obj interface{}) { + msg := convert(obj) + if msg != nil { + msg.EventType = watch.Deleted + ch <- msg + } + }, } - - iw.init = init - - return iw } -func serviceWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: serviceConverter, - ch: ch, - } +func startNodeWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factory informers.SharedInformerFactory) { + nodes := factory.Core().V1().Nodes() - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) - inf := factory.Core().V1().Services().Informer() - iw.informers = append(iw.informers, inf) - } + inf := nodes.Informer() + _, _ = inf.AddEventHandler(createHandlers(nodeConverter, ch)) + go inf.Run(quitCh) - return iw -} + cache.WaitForCacheSync(quitCh, inf.HasSynced) -func namespaceWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: namespaceConverter, - ch: ch, + // A cache sync doesn't guarantee that the handlers have been called, + // so instead we manually list and call the Add handlers since subsequent + // resources depend on these. + list, err := nodes.Lister().List(labels.Everything()) + if err != nil { + log.WithError(err).Errorf("Failed to init nodes") } - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) - inf := factory.Core().V1().Namespaces().Informer() - iw.informers = append(iw.informers, inf) + for i := range list { + msg := nodeConverter(list[i]) + if msg != nil { + msg.EventType = watch.Added + ch <- msg + } } - - return iw } -func endpointsWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: endpointsConverter, - ch: ch, - } - - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) - inf := factory.Core().V1().Endpoints().Informer() - iw.informers = append(iw.informers, inf) - } - - return iw +func startNamespaceWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factory informers.SharedInformerFactory) { + inf := factory.Core().V1().Namespaces().Informer() + _, _ = inf.AddEventHandler(createHandlers(namespaceConverter, ch)) + go inf.Run(quitCh) } -func nodeWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: nodeConverter, - ch: ch, - } +func startPodWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) { + for _, factory := range factories { + pods := factory.Core().V1().Pods() - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) - inf := factory.Core().V1().Nodes().Informer() - iw.informers = append(iw.informers, inf) - } + inf := pods.Informer() + _, _ = inf.AddEventHandler(createHandlers(podConverter, ch)) + go inf.Run(quitCh) - init := func() error { - // We initialize ch with the current nodes to handle cold start race conditions. - list, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + cache.WaitForCacheSync(quitCh, inf.HasSynced) + + // A cache sync doesn't guarantee that the handlers have been called, + // so instead we manually list and call the Add handlers since subsequent + // resources depend on these. + list, err := pods.Lister().List(labels.Everything()) if err != nil { - return err + log.WithError(err).Errorf("Failed to init pods") } - for _, obj := range list.Items { - item := obj - msg := iw.convert(&item) + for i := range list { + msg := podConverter(list[i]) if msg != nil { msg.EventType = watch.Added - iw.ch <- msg + ch <- msg } } - return nil } - - iw.init = init - - return iw } -func replicaSetWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: replicaSetConverter, - ch: ch, +func startServiceWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) { + for _, factory := range factories { + inf := factory.Core().V1().Services().Informer() + _, _ = inf.AddEventHandler(createHandlers(serviceConverter, ch)) + go inf.Run(quitCh) } +} - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) - inf := factory.Apps().V1().ReplicaSets().Informer() - iw.informers = append(iw.informers, inf) +func startEndpointsWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) { + for _, factory := range factories { + inf := factory.Core().V1().Endpoints().Informer() + _, _ = inf.AddEventHandler(createHandlers(endpointsConverter, ch)) + go inf.Run(quitCh) } - - return iw } -func deploymentWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher { - iw := &informerWatcher{ - convert: deploymentConverter, - ch: ch, +func startReplicaSetWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) { + for _, factory := range factories { + inf := factory.Apps().V1().ReplicaSets().Informer() + _, _ = inf.AddEventHandler(createHandlers(replicaSetConverter, ch)) + go inf.Run(quitCh) } +} - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns)) +func startDeploymentWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) { + for _, factory := range factories { inf := factory.Apps().V1().Deployments().Informer() - iw.informers = append(iw.informers, inf) + _, _ = inf.AddEventHandler(createHandlers(deploymentConverter, ch)) + go inf.Run(quitCh) } - - return iw } func podConverter(obj interface{}) *K8sResourceMessage {