Skip to content

Commit

Permalink
Introduce scope configuration for internal caches (#133)
Browse files Browse the repository at this point in the history
This patch enhances the runtime internal caches by introducing a
configuration mechanism to tailor their behavior. The caches can now be
configured to watch a specific set of namespaces (instead of the
previous "all or nothing" style).

This is intended to be complete the multi-namspace-watch mode feature.
Now when a user configures a controller to watch a set of namespaces the
ACK runtime caches will also respect that scope.

In addition this commit adds `kube-node-lease` to the set of namespace
ignored by default (This namespace is dedicated to node Leases objects).

Signed-off-by: Amine Hilaly <[email protected]>

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
  • Loading branch information
a-hilaly authored Jan 19, 2024
1 parent 585594d commit 5ce3c59
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 21 deletions.
12 changes: 10 additions & 2 deletions pkg/runtime/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func init() {
)
}

// Config is used to configure the caches.
type Config struct {
// WatchScope is a list of namespaces to watch for resources
WatchScope []string
// Ignored is a list of namespaces to ignore
Ignored []string
}

// Caches is used to interact with the different caches
type Caches struct {
// stopCh is a channel use to stop all the
Expand All @@ -72,10 +80,10 @@ type Caches struct {
}

// New instantiate a new Caches object.
func New(log logr.Logger) Caches {
func New(log logr.Logger, config Config) Caches {
return Caches{
Accounts: NewAccountCache(log),
Namespaces: NewNamespaceCache(log),
Namespaces: NewNamespaceCache(log, config.WatchScope, config.Ignored),
}
}

Expand Down
60 changes: 44 additions & 16 deletions pkg/runtime/cache/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,51 +80,79 @@ type NamespaceCache struct {
log logr.Logger
// namespaceInfos maps namespaces names to their known namespaceInfo
namespaceInfos map[string]*namespaceInfo
// watchScope is the list of namespaces we are watching
watchScope []string
// ignored is the list of namespaces we are ignoring
ignored []string
}

// NewNamespaceCache instanciate a new NamespaceCache.
func NewNamespaceCache(log logr.Logger) *NamespaceCache {
func NewNamespaceCache(log logr.Logger, watchScope []string, ignored []string) *NamespaceCache {
return &NamespaceCache{
log: log.WithName("cache.namespace"),
namespaceInfos: make(map[string]*namespaceInfo),
ignored: ignored,
watchScope: watchScope,
}
}

// isIgnoredNamespace returns true if an object is of type corev1.Namespace and
// its metadata name is the ACK system namespace, 'kube-system' or
// 'kube-public'
func isIgnoredNamespace(raw interface{}) bool {
object, ok := raw.(*corev1.Namespace)
return ok &&
(object.ObjectMeta.Name == ackSystemNamespace ||
object.ObjectMeta.Name == "kube-system" ||
object.ObjectMeta.Name == "kube-public")
// isIgnoredNamespace returns true if the namespace is ignored
func (c *NamespaceCache) isIgnoredNamespace(namespace string) bool {
for _, ns := range c.ignored {
if namespace == ns {
return true
}
}
return false
}

// inWatchScope returns true if the namespace is in the watch scope
func (c *NamespaceCache) inWatchScope(namespace string) bool {
if len(c.watchScope) == 0 {
return true
}
for _, ns := range c.watchScope {
if namespace == ns {
return true
}
}
return false
}

// approvedNamespace returns true if the namespace is not ignored and is in the watch scope
func (c *NamespaceCache) approvedNamespace(namespace string) bool {
return !c.isIgnoredNamespace(namespace) && c.inWatchScope(namespace)
}

// Run instantiate a new shared informer for namespaces and runs it to begin processing items.
func (c *NamespaceCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{}) {
c.log.V(1).Info("Starting namespace cache", "watchScope", c.watchScope, "ignored", c.ignored)
informer := informersv1.NewNamespaceInformer(
clientSet,
informerResyncPeriod,
k8scache.Indexers{},
k8scache.Indexers{
k8scache.NamespaceIndex: k8scache.MetaNamespaceIndexFunc,
},
)
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if !isIgnoredNamespace(obj) {
ns := obj.(*corev1.Namespace)
// It is guaranteed that the object is of type corev1.Namespace
ns := obj.(*corev1.Namespace)
if c.approvedNamespace(ns.ObjectMeta.Name) {
c.setNamespaceInfoFromK8sObject(ns)
c.log.V(1).Info("created namespace", "name", ns.ObjectMeta.Name)
}
},
UpdateFunc: func(orig, desired interface{}) {
if !isIgnoredNamespace(desired) {
ns := desired.(*corev1.Namespace)
ns := desired.(*corev1.Namespace)
if c.approvedNamespace(ns.ObjectMeta.Name) {
c.setNamespaceInfoFromK8sObject(ns)
c.log.V(1).Info("updated namespace", "name", ns.ObjectMeta.Name)
}
},
DeleteFunc: func(obj interface{}) {
if !isIgnoredNamespace(obj) {
ns := obj.(*corev1.Namespace)
if c.approvedNamespace(ns.ObjectMeta.Name) {
ns := obj.(*corev1.Namespace)
c.deleteNamespaceInfo(ns.ObjectMeta.Name)
c.log.V(1).Info("deleted namespace", "name", ns.ObjectMeta.Name)
Expand Down
109 changes: 108 additions & 1 deletion pkg/runtime/cache/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cache_test

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -49,7 +50,7 @@ func TestNamespaceCache(t *testing.T) {
fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions))

// initlizing account cache
namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger)
namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger, []string{}, []string{})
stopCh := make(chan struct{})

namespaceCache.Run(k8sClient, stopCh)
Expand Down Expand Up @@ -129,3 +130,109 @@ func TestNamespaceCache(t *testing.T) {
_, ok = namespaceCache.GetDefaultRegion(testNamespace1)
require.False(t, ok)
}

func TestScopedNamespaceCache(t *testing.T) {
defaultConfig := ackrtcache.Config{
WatchScope: []string{"watch-scope", "watch-scope-2"},
Ignored: []string{"ignored", "ignored-2"},
}

testCases := []struct {
name string
createNamespace string
expectCacheHit bool
cacheConfig ackrtcache.Config
}{
{
name: "namespace in scope",
createNamespace: "watch-scope",
expectCacheHit: true,
cacheConfig: defaultConfig,
},
{
name: "namespace not in scope",
createNamespace: "watch-scope-3",
expectCacheHit: false,
cacheConfig: defaultConfig,
},
{
name: "namespace in ignored",
createNamespace: "ignored",
expectCacheHit: false,
cacheConfig: defaultConfig,
},
{
name: "namespace is nor in scope or ignored",
createNamespace: "random-penguin",
expectCacheHit: false,
cacheConfig: defaultConfig,
},
{
name: "namespace is in scope and ignored",
createNamespace: "watch-scope-2",
expectCacheHit: true,
cacheConfig: defaultConfig,
},
{
name: "cache watching all namespaces - namespace in scope",
cacheConfig: ackrtcache.Config{},
createNamespace: "watch-scope",
expectCacheHit: true,
},
{
name: "cache watching all namespaces - namespace is ignored",
cacheConfig: ackrtcache.Config{Ignored: []string{"kube-system"}},
createNamespace: "kube-system",
expectCacheHit: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// create a fake k8s client and fake watcher
k8sClient := k8sfake.NewSimpleClientset()
watcher := watch.NewFake()
k8sClient.PrependWatchReactor("random-penguin", k8stesting.DefaultWatchReactor(watcher, nil))

// New logger writing to specific buffer
zapOptions := ctrlrtzap.Options{
Development: true,
Level: zapcore.InfoLevel,
DestWriter: io.Discard,
}
fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions))

// initlizing account cache
namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger, tc.cacheConfig.WatchScope, tc.cacheConfig.Ignored)
stopCh := make(chan struct{})

namespaceCache.Run(k8sClient, stopCh)

// Create namespace with name testNamespace1
_, err := k8sClient.CoreV1().Namespaces().Create(
context.Background(),
newNamespace(tc.createNamespace),
metav1.CreateOptions{},
)
require.Nil(t, err)

// Need a better way to wait for the cache to be updated
// Thinking informer.WaitForCacheSync() ~ but it's not exported
time.Sleep(time.Millisecond * 50)

_, found := namespaceCache.GetDefaultRegion(tc.createNamespace)
require.Equal(t, tc.expectCacheHit, found)
})
}
}

func newNamespace(name string) *corev1.Namespace {
return &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
ackv1alpha1.AnnotationDefaultRegion: "us-west-2",
},
},
}
}
42 changes: 40 additions & 2 deletions pkg/runtime/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package runtime

import (
"fmt"
"strings"
"sync"

Expand All @@ -34,6 +35,18 @@ import (
acktypes "github.com/aws-controllers-k8s/runtime/pkg/types"
)

const (
// NamespaceKubeNodeLease is the name of the Kubernetes namespace that
// contains the kube-node-lease resources (used for node hearthbeats)
NamespaceKubeNodeLease = "kube-node-lease"
// NamespacePublic is the name of the Kubernetes namespace that contains
// the public info (ConfigMaps)
NamespaceKubePublic = "kube-public"
// NamespaceSystem is the name of the Kubernetes namespace where we place
// system components.
NamespaceKubeSystem = "kube-system"
)

// serviceController wraps a number of `controller-runtime.Reconciler` that are
// related to a specific AWS service API.
type serviceController struct {
Expand Down Expand Up @@ -187,13 +200,38 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg
c.metaLock.Lock()
defer c.metaLock.Unlock()

cache := ackrtcache.New(c.log)
if cfg.WatchNamespace == "" {
namespaces, err := cfg.GetWatchNamespaces()
if err != nil {
return fmt.Errorf("unable to get watch namespaces: %v", err)
}

cache := ackrtcache.New(c.log, ackrtcache.Config{
WatchScope: namespaces,
// Default to ignoring the kube-system, kube-public, and
// kube-node-lease namespaces.
// NOTE: Maybe we should make this configurable? It's not clear that
// we'd ever want to watch these namespaces.
Ignored: []string{
NamespaceKubeSystem,
NamespaceKubePublic,
NamespaceKubeNodeLease,
}},
)
// We want to run the caches if the length of the namespaces slice is
// either 0 (watching all namespaces) or greater than 1 (watching multiple
// namespaces).
//
// The caches are only used for cross account resource management. If the
// controller is not configured to watch multiple namespaces, then we don't
// need to run the caches.
if len(namespaces) == 0 || len(namespaces) >= 2 {
clusterConfig := mgr.GetConfig()
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return err
}
// Run the caches. This will not block as the caches are run in
// separate goroutines.
cache.Run(clientSet)
}

Expand Down

0 comments on commit 5ce3c59

Please sign in to comment.