Skip to content

Commit

Permalink
Simplify informer code to use common SharedIndexFactories (#1595)
Browse files Browse the repository at this point in the history
Summary: This simplfies informer code to use common
sharedIndexFactories. One set for namespaced resources and
another factory for non-namespaced resources.

We also use the same factories to do the initial list
and population of caches instead of using the bare clientset.

Type of change: /kind cleanup

Test Plan: Added logs to ensure that the order dependent stuff
still was init in order.

Signed-off-by: Vihang Mehta <[email protected]>
  • Loading branch information
vihangm authored Aug 1, 2023
1 parent 26d59e7 commit 7e4fe5a
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ go_library(
"@com_github_sirupsen_logrus//:logrus",
"@io_k8s_api//apps/v1:apps",
"@io_k8s_api//core/v1:core",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/labels",
"@io_k8s_apimachinery//pkg/watch",
"@io_k8s_client_go//informers",
"@io_k8s_client_go//kubernetes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sync"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -38,13 +38,6 @@ type Controller struct {
quitCh chan struct{}
updateCh chan *K8sResourceMessage
once sync.Once
watchers []watcher
}

// watcher watches a k8s resource type and forwards the updates to the given update channel.
type watcher interface {
StartWatcher(chan struct{})
InitWatcher() error
}

// NewController creates a new Controller.
Expand All @@ -65,36 +58,32 @@ func NewController(namespaces []string, updateCh chan *K8sResourceMessage) (*Con

// NewControllerWithClientSet creates a new Controller using the given Clientset.
func NewControllerWithClientSet(namespaces []string, updateCh chan *K8sResourceMessage, clientset kubernetes.Interface) (*Controller, error) {
quitCh := make(chan struct{})
mc := &Controller{quitCh: make(chan struct{}), updateCh: updateCh}
go mc.startWithClientSet(namespaces, clientset)

// Create a watcher for each resource.
// The resource types we watch the K8s API for. These types are in a specific order:
// for example, nodes and namespaces must be synced before pods, since nodes/namespaces
// contain pods.
watchers := []watcher{
nodeWatcher(namespaces, updateCh, clientset),
namespaceWatcher(namespaces, updateCh, clientset),
podWatcher(namespaces, updateCh, clientset),
endpointsWatcher(namespaces, updateCh, clientset),
serviceWatcher(namespaces, updateCh, clientset),
replicaSetWatcher(namespaces, updateCh, clientset),
deploymentWatcher(namespaces, updateCh, clientset),
}
return mc, nil
}

mc := &Controller{quitCh: quitCh, updateCh: updateCh, watchers: watchers}
// startWithClientSet starts the controller
func (mc *Controller) startWithClientSet(namespaces []string, clientset kubernetes.Interface) {
factory := informers.NewSharedInformerFactory(clientset, 12*time.Hour)

go func() {
for _, w := range mc.watchers {
err := w.InitWatcher()
if err != nil {
// Still return the informer because the rest of the system can recover from this.
log.WithError(err).Error("Failed to run watcher init")
}
go w.StartWatcher(quitCh)
}
}()
// Create a watcher for each resource.
// The resource types we watch the K8s API for.
startNodeWatcher(mc.updateCh, mc.quitCh, factory)
startNamespaceWatcher(mc.updateCh, mc.quitCh, factory)

return mc, nil
var namespacedFactories []informers.SharedInformerFactory
for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
namespacedFactories = append(namespacedFactories, factory)
}

startPodWatcher(mc.updateCh, mc.quitCh, namespacedFactories)
startEndpointsWatcher(mc.updateCh, mc.quitCh, namespacedFactories)
startServiceWatcher(mc.updateCh, mc.quitCh, namespacedFactories)
startReplicaSetWatcher(mc.updateCh, mc.quitCh, namespacedFactories)
startDeploymentWatcher(mc.updateCh, mc.quitCh, namespacedFactories)
}

// Stop stops all K8s watchers.
Expand Down
241 changes: 80 additions & 161 deletions src/vizier/services/metadata/controllers/k8smeta/k8s_metadata_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,215 +19,134 @@
package k8smeta

import (
"context"
"time"

log "github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"px.dev/pixie/src/shared/k8s"
"px.dev/pixie/src/vizier/services/metadata/storepb"
)

type informerWatcher struct {
convert func(obj interface{}) *K8sResourceMessage
ch chan *K8sResourceMessage
init func() error
informers []cache.SharedIndexInformer
}

// StartWatcher starts a watcher.
func (i *informerWatcher) StartWatcher(quitCh chan struct{}) {
for _, inf := range i.informers {
_, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
msg := i.convert(obj)
if msg != nil {
msg.EventType = watch.Added
i.ch <- msg
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
msg := i.convert(newObj)
if msg != nil {
msg.EventType = watch.Modified
i.ch <- msg
}
},
DeleteFunc: func(obj interface{}) {
msg := i.convert(obj)
if msg != nil {
msg.EventType = watch.Deleted
i.ch <- msg
}
},
})
inf.Run(quitCh)
}
}

// InitWatcher initializes a watcher, for example to perform a list.
func (i *informerWatcher) InitWatcher() error {
if i.init != nil {
return i.init()
}
return nil
}

func podWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: podConverter,
ch: ch,
}

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
inf := factory.Core().V1().Pods().Informer()
iw.informers = append(iw.informers, inf)
}

init := func() error {
var podList []v1.Pod
// We initialize ch with the current Pods to handle cold start race conditions.
for _, ns := range namespaces {
list, err := clientset.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.WithError(err).Errorf("Failed to init pods in %s namespace.", ns)
return err
}
podList = append(podList, list.Items...)
}

for _, obj := range podList {
item := obj
msg := iw.convert(&item)
func createHandlers(convert func(obj interface{}) *K8sResourceMessage, ch chan *K8sResourceMessage) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
msg := convert(obj)
if msg != nil {
msg.EventType = watch.Added
iw.ch <- msg
ch <- msg
}
}
return nil
},
UpdateFunc: func(oldObj, newObj interface{}) {
msg := convert(newObj)
if msg != nil {
msg.EventType = watch.Modified
ch <- msg
}
},
DeleteFunc: func(obj interface{}) {
msg := convert(obj)
if msg != nil {
msg.EventType = watch.Deleted
ch <- msg
}
},
}

iw.init = init

return iw
}

func serviceWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: serviceConverter,
ch: ch,
}
func startNodeWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factory informers.SharedInformerFactory) {
nodes := factory.Core().V1().Nodes()

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
inf := factory.Core().V1().Services().Informer()
iw.informers = append(iw.informers, inf)
}
inf := nodes.Informer()
_, _ = inf.AddEventHandler(createHandlers(nodeConverter, ch))
go inf.Run(quitCh)

return iw
}
cache.WaitForCacheSync(quitCh, inf.HasSynced)

func namespaceWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: namespaceConverter,
ch: ch,
// A cache sync doesn't guarantee that the handlers have been called,
// so instead we manually list and call the Add handlers since subsequent
// resources depend on these.
list, err := nodes.Lister().List(labels.Everything())
if err != nil {
log.WithError(err).Errorf("Failed to init nodes")
}

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
inf := factory.Core().V1().Namespaces().Informer()
iw.informers = append(iw.informers, inf)
for i := range list {
msg := nodeConverter(list[i])
if msg != nil {
msg.EventType = watch.Added
ch <- msg
}
}

return iw
}

func endpointsWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: endpointsConverter,
ch: ch,
}

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
inf := factory.Core().V1().Endpoints().Informer()
iw.informers = append(iw.informers, inf)
}

return iw
func startNamespaceWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factory informers.SharedInformerFactory) {
inf := factory.Core().V1().Namespaces().Informer()
_, _ = inf.AddEventHandler(createHandlers(namespaceConverter, ch))
go inf.Run(quitCh)
}

func nodeWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: nodeConverter,
ch: ch,
}
func startPodWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) {
for _, factory := range factories {
pods := factory.Core().V1().Pods()

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
inf := factory.Core().V1().Nodes().Informer()
iw.informers = append(iw.informers, inf)
}
inf := pods.Informer()
_, _ = inf.AddEventHandler(createHandlers(podConverter, ch))
go inf.Run(quitCh)

init := func() error {
// We initialize ch with the current nodes to handle cold start race conditions.
list, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
cache.WaitForCacheSync(quitCh, inf.HasSynced)

// A cache sync doesn't guarantee that the handlers have been called,
// so instead we manually list and call the Add handlers since subsequent
// resources depend on these.
list, err := pods.Lister().List(labels.Everything())
if err != nil {
return err
log.WithError(err).Errorf("Failed to init pods")
}

for _, obj := range list.Items {
item := obj
msg := iw.convert(&item)
for i := range list {
msg := podConverter(list[i])
if msg != nil {
msg.EventType = watch.Added
iw.ch <- msg
ch <- msg
}
}
return nil
}

iw.init = init

return iw
}

func replicaSetWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: replicaSetConverter,
ch: ch,
func startServiceWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) {
for _, factory := range factories {
inf := factory.Core().V1().Services().Informer()
_, _ = inf.AddEventHandler(createHandlers(serviceConverter, ch))
go inf.Run(quitCh)
}
}

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
inf := factory.Apps().V1().ReplicaSets().Informer()
iw.informers = append(iw.informers, inf)
func startEndpointsWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) {
for _, factory := range factories {
inf := factory.Core().V1().Endpoints().Informer()
_, _ = inf.AddEventHandler(createHandlers(endpointsConverter, ch))
go inf.Run(quitCh)
}

return iw
}

func deploymentWatcher(namespaces []string, ch chan *K8sResourceMessage, clientset kubernetes.Interface) *informerWatcher {
iw := &informerWatcher{
convert: deploymentConverter,
ch: ch,
func startReplicaSetWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) {
for _, factory := range factories {
inf := factory.Apps().V1().ReplicaSets().Informer()
_, _ = inf.AddEventHandler(createHandlers(replicaSetConverter, ch))
go inf.Run(quitCh)
}
}

for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 12*time.Hour, informers.WithNamespace(ns))
func startDeploymentWatcher(ch chan *K8sResourceMessage, quitCh <-chan struct{}, factories []informers.SharedInformerFactory) {
for _, factory := range factories {
inf := factory.Apps().V1().Deployments().Informer()
iw.informers = append(iw.informers, inf)
_, _ = inf.AddEventHandler(createHandlers(deploymentConverter, ch))
go inf.Run(quitCh)
}

return iw
}

func podConverter(obj interface{}) *K8sResourceMessage {
Expand Down

0 comments on commit 7e4fe5a

Please sign in to comment.