From 73e50129c68efe194d660e767a04baf409a9791e Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 3 Apr 2024 08:38:12 -0400 Subject: [PATCH 1/6] Trim ManagedFields in the resource syncer to ...reduce the informer cache memory usage. Fixes https://github.com/submariner-io/admiral/issues/859 Signed-off-by: Tom Pantelis --- pkg/syncer/resource_syncer.go | 6 ++++-- test/e2e/watcher/watcher.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index 4fcef40b..2d0eb282 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -241,8 +241,7 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace) - //nolint:wrapcheck // These are wrapper functions. - syncer.store, syncer.informer = cache.NewInformer(&cache.ListWatch{ + syncer.store, syncer.informer = cache.NewTransformingInformer(&cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.LabelSelector = config.SourceLabelSelector options.FieldSelector = config.SourceFieldSelector @@ -257,6 +256,9 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { AddFunc: syncer.onCreate, UpdateFunc: syncer.onUpdate, DeleteFunc: syncer.onDelete, + }, func(obj interface{}) (interface{}, error) { + resourceUtil.MustToMeta(obj).SetManagedFields(nil) + return obj, nil }) return syncer, nil diff --git a/test/e2e/watcher/watcher.go b/test/e2e/watcher/watcher.go index ecde608f..4a415e87 100644 --- a/test/e2e/watcher/watcher.go +++ b/test/e2e/watcher/watcher.go @@ -36,6 +36,7 @@ var _ = Describe("[watcher] Resource watcher tests", func() { It("should notify the handler of each event", func() { clusterName := framework.TestContext.ClusterIDs[framework.ClusterA] toaster := util.CreateToaster(t.client, util.NewToaster("test-toaster", t.framework.Namespace), clusterName) + toaster.SetManagedFields(nil) Eventually(t.created).Should(Receive(Equal(toaster))) util.DeleteToaster(t.client, toaster, clusterName) From 279c7be28b4dd5653f4290f31457c0168e0129f7 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 16 Apr 2024 08:09:21 -0400 Subject: [PATCH 2/6] Fix data race due to trimming ManagedFields in resource syncer This was observed in a unit test elsewhere that configures a resync period. On resync, the K8s DeltaFIFO retrieves every object from the cache store and re-queues them. It also invokes the transform function which results in a data race when the resource syncer tries to nil the ManagedFields. This is because the object instance is the same as that stored in the cache which was previously accessed by another thread. So mutating it without the protection of a lock is unsafe. This is really an issue with the DeltaFIFO - it is documented that the "TransformFunc sees the object before any other actor, and it is now safe to mutate the object in place instead of making a copy" however this is not the case on a re-sync. The DeltaFIFO should either elide the TransformFunc on re-sync or make a copy of the object retrieved from the cache store. As a workaround, the resource syncer should only set ManagedFields to nil if it is non-nil, which will be the case a a re-sync. Added a unit test to cover this case. Also the object passed to the TransformFunc could be a DeletedFinalStateUnknown so we need to handle that as well. Signed-off-by: Tom Pantelis --- pkg/syncer/resource_syncer.go | 6 +++++- pkg/syncer/resource_syncer_test.go | 34 ++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index 2d0eb282..ae05325a 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -257,7 +257,11 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { UpdateFunc: syncer.onUpdate, DeleteFunc: syncer.onDelete, }, func(obj interface{}) (interface{}, error) { - resourceUtil.MustToMeta(obj).SetManagedFields(nil) + objMeta, err := meta.Accessor(obj) + if err == nil && len(objMeta.GetManagedFields()) > 0 { + objMeta.SetManagedFields(nil) + } + return obj, nil }) diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 3fce5fa8..af492c7b 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -45,6 +45,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" + "k8s.io/utils/ptr" ) var _ = Describe("Resource Syncer", func() { @@ -69,6 +70,7 @@ var _ = Describe("Resource Syncer", func() { Context(fmt.Sprintf("Direction: %s", syncer.RemoteToLocal), testReconcileRemoteToLocal) Context(fmt.Sprintf("Direction: %s", syncer.None), testReconcileNoDirection) }) + Describe("Trim Resource Fields", testTrimResourceFields) }) func testReconcileLocalToRemote() { @@ -1168,6 +1170,37 @@ func testRequeueResource() { }) } +func testTrimResourceFields() { + d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) + + BeforeEach(func() { + d.config.ResyncPeriod = time.Millisecond * 100 + + d.resource.SetManagedFields([]metav1.ManagedFieldsEntry{ + { + Manager: "kubectl", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "v1", + Time: ptr.To(metav1.Now()), + FieldsType: "FieldsV1", + FieldsV1: ptr.To(metav1.FieldsV1{}), + }, + }) + + d.addInitialResource(d.resource) + }) + + It("should remove ManagedFields from created resources", func() { + obj, exists, err := d.syncer.GetResource(d.resource.Name, d.resource.Namespace) + Expect(err).To(Succeed()) + Expect(exists).To(BeTrue()) + Expect(resourceutils.MustToMeta(obj).GetManagedFields()).Should(BeNil()) + + // Sleep a little so a re-sync occurs and doesn't cause a data race. + time.Sleep(200) + }) +} + func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) { expSpecs := map[string]*corev1.PodSpec{} for i := range expected { @@ -1221,6 +1254,7 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer. d.config.Transform = nil d.config.OnSuccessfulSync = nil d.config.ResourcesEquivalent = nil + d.config.ResyncPeriod = 0 err := corev1.AddToScheme(d.config.Scheme) Expect(err).To(Succeed()) From 3ccc1b45ecd5e2486c9bfc2114694c2b4bfc7e1a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 17 Apr 2024 08:25:22 -0400 Subject: [PATCH 3/6] Add reusable TrimManagedFields transform function Signed-off-by: Tom Pantelis --- pkg/resource/resource_test.go | 61 +++++++++++++++++++++++++++++++++++ pkg/resource/util.go | 11 +++++++ pkg/syncer/resource_syncer.go | 9 +----- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/pkg/resource/resource_test.go b/pkg/resource/resource_test.go index 38c40831..fd869b2d 100644 --- a/pkg/resource/resource_test.go +++ b/pkg/resource/resource_test.go @@ -22,6 +22,10 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/submariner-io/admiral/pkg/resource" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" ) var _ = Describe("EnsureValidName", func() { @@ -44,3 +48,60 @@ var _ = Describe("EnsureValidName", func() { }) }) }) + +var _ = Describe("TrimManagedFields", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: map[string]string{"app": "test"}, + Annotations: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "image", + Name: "httpd", + }, + }, + }, + } + + When("the object has metadata", func() { + It("should succeed and trim the ManagedFields", func() { + podWithMF := pod.DeepCopy() + podWithMF.ManagedFields = []metav1.ManagedFieldsEntry{ + { + Manager: "kubectl", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "v1", + Time: ptr.To(metav1.Now()), + FieldsType: "FieldsV1", + FieldsV1: ptr.To(metav1.FieldsV1{}), + }, + } + + retObj, err := resource.TrimManagedFields(podWithMF) + Expect(err).To(Succeed()) + Expect(podWithMF).To(Equal(pod)) + Expect(retObj).To(BeIdenticalTo(podWithMF)) + + _, err = resource.TrimManagedFields(podWithMF) + Expect(err).To(Succeed()) + Expect(podWithMF).To(Equal(pod)) + }) + }) + + When("the object does not have metadata", func() { + It("should succeed and return the object", func() { + obj := &cache.DeletedFinalStateUnknown{ + Key: "key", + Obj: pod, + } + + retObj, err := resource.TrimManagedFields(obj) + Expect(err).To(Succeed()) + Expect(retObj).To(Equal(obj)) + }) + }) +}) diff --git a/pkg/resource/util.go b/pkg/resource/util.go index 54054fcc..af861b91 100644 --- a/pkg/resource/util.go +++ b/pkg/resource/util.go @@ -125,3 +125,14 @@ func ToJSON(o any) string { out, _ := json.MarshalIndent(o, "", " ") return string(out) } + +// TrimManagedFields is a cache.TransformFunc that removes the ManagedFields metadata field if present. Note that if 'obj' does not +// implement metav1.Object then it is ignored and no error is returned. +func TrimManagedFields(obj interface{}) (interface{}, error) { + objMeta, err := meta.Accessor(obj) + if err == nil && len(objMeta.GetManagedFields()) > 0 { + objMeta.SetManagedFields(nil) + } + + return obj, nil +} diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index ae05325a..ad0f6990 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -256,14 +256,7 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { AddFunc: syncer.onCreate, UpdateFunc: syncer.onUpdate, DeleteFunc: syncer.onDelete, - }, func(obj interface{}) (interface{}, error) { - objMeta, err := meta.Accessor(obj) - if err == nil && len(objMeta.GetManagedFields()) > 0 { - objMeta.SetManagedFields(nil) - } - - return obj, nil - }) + }, resourceUtil.TrimManagedFields) return syncer, nil } From 91478a142cd5b653e2ec420f93c7410eccc22248 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 24 Apr 2024 10:19:34 -0400 Subject: [PATCH 4/6] Add support for resource syncer to use a shared informer Some users may want to configure a shared informer rather than use the internal dedicated informer for efficiency if there's multiple consumers of a resource type. Signed-off-by: Tom Pantelis --- pkg/syncer/resource_syncer.go | 94 ++++++++++++++++++++++++------ pkg/syncer/resource_syncer_test.go | 58 ++++++++++++++++-- pkg/syncer/syncer_suite_test.go | 5 ++ 3 files changed, 132 insertions(+), 25 deletions(-) diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index ad0f6990..d40b90d2 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -187,6 +187,7 @@ type ResourceSyncerConfig struct { type resourceSyncer struct { workQueue workqueue.Interface + hasSynced func() bool informer cache.Controller store cache.Store config ResourceSyncerConfig @@ -199,6 +200,57 @@ type resourceSyncer struct { } func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { + syncer := newResourceSyncer(config) + + rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) + if err != nil { + return nil, err //nolint:wrapcheck // OK to return the error as is. + } + + resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace) + + //nolint:wrapcheck // These are wrapper functions. + syncer.store, syncer.informer = cache.NewTransformingInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = config.SourceLabelSelector + options.FieldSelector = config.SourceFieldSelector + return resourceClient.List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = config.SourceLabelSelector + options.FieldSelector = config.SourceFieldSelector + return resourceClient.Watch(context.TODO(), options) + }, + }, rawType, config.ResyncPeriod, cache.ResourceEventHandlerFuncs{ + AddFunc: syncer.onCreate, + UpdateFunc: syncer.onUpdate, + DeleteFunc: syncer.onDelete, + }, resourceUtil.TrimManagedFields) + + syncer.hasSynced = syncer.informer.HasSynced + + return syncer, nil +} + +func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer cache.SharedInformer) (Interface, error) { + syncer := newResourceSyncer(config) + syncer.store = informer.GetStore() + + reg, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: syncer.onCreate, + UpdateFunc: syncer.onUpdate, + DeleteFunc: syncer.onDelete, + }, config.ResyncPeriod) + if err != nil { + return nil, errors.Wrapf(err, "error registering even handler") + } + + syncer.hasSynced = reg.HasSynced + + return syncer, nil +} + +func newResourceSyncer(config *ResourceSyncerConfig) *resourceSyncer { syncer := &resourceSyncer{ config: *config, stopped: make(chan struct{}), @@ -218,11 +270,6 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { syncer.config.WaitForCacheSync = &wait } - rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) - if err != nil { - return nil, err //nolint:wrapcheck // OK to return the error as is. - } - if syncer.config.SyncCounter != nil { syncer.syncCounter = syncer.config.SyncCounter } else if syncer.config.SyncCounterOpts != nil { @@ -239,26 +286,31 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { syncer.workQueue = workqueue.New(config.Name) + return syncer +} + +func NewSharedInformer(config *ResourceSyncerConfig) (cache.SharedInformer, error) { + rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) + if err != nil { + return nil, err //nolint:wrapcheck // OK to return the error as is. + } + resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace) - syncer.store, syncer.informer = cache.NewTransformingInformer(&cache.ListWatch{ + //nolint:wrapcheck // These are wrapper functions. + informer := cache.NewSharedIndexInformerWithOptions(&cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = config.SourceLabelSelector - options.FieldSelector = config.SourceFieldSelector return resourceClient.List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = config.SourceLabelSelector - options.FieldSelector = config.SourceFieldSelector return resourceClient.Watch(context.TODO(), options) }, - }, rawType, config.ResyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: syncer.onCreate, - UpdateFunc: syncer.onUpdate, - DeleteFunc: syncer.onDelete, - }, resourceUtil.TrimManagedFields) + }, rawType, cache.SharedIndexInformerOptions{ + ResyncPeriod: config.ResyncPeriod, + }) - return syncer, nil + //nolint:wrapcheck // OK to return the error as is. + return informer, informer.SetTransform(resourceUtil.TrimManagedFields) } func (r *resourceSyncer) Start(stopCh <-chan struct{}) error { @@ -277,13 +329,17 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error { }() defer r.workQueue.ShutDownWithDrain() - r.informer.Run(stopCh) + if r.informer != nil { + r.informer.Run(stopCh) + } else { + <-r.stopCh + } }() if *r.config.WaitForCacheSync { r.log.V(log.LIBDEBUG).Infof("Syncer %q waiting for informer cache to sync", r.config.Name) - _ = cache.WaitForCacheSync(stopCh, r.informer.HasSynced) + _ = cache.WaitForCacheSync(stopCh, r.hasSynced) } r.workQueue.Run(stopCh, r.processNextWorkItem) @@ -412,7 +468,7 @@ func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) { } func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any { - if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok { + if ok := cache.WaitForCacheSync(r.stopCh, r.hasSynced); !ok { // This means the cache was stopped. r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name) diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index af492c7b..839e01d1 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -45,6 +45,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -71,6 +72,7 @@ var _ = Describe("Resource Syncer", func() { Context(fmt.Sprintf("Direction: %s", syncer.None), testReconcileNoDirection) }) Describe("Trim Resource Fields", testTrimResourceFields) + Describe("With SharedInformer", testWithSharedInformer) }) func testReconcileLocalToRemote() { @@ -172,6 +174,7 @@ func testReconcileNoDirection() { JustBeforeEach(func() { obj := toReconcile.DeepCopyObject() + d.syncer.Reconcile(func() []runtime.Object { return []runtime.Object{obj} }) @@ -340,18 +343,21 @@ func testTransformFunction() { BeforeEach(func() { test.SetClusterIDLabel(d.resource, "remote") atomic.StoreInt32(&invocationCount, 0) + expOperation = make(chan syncer.Operation, 20) transformed = test.NewPodWithImage(d.config.SourceNamespace, "transformed") requeue = false - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { defer GinkgoRecover() atomic.AddInt32(&invocationCount, 1) + pod, ok := from.(*corev1.Pod) Expect(ok).To(BeTrue(), "Expected a Pod object: %#v", from) Expect(equality.Semantic.DeepDerivative(d.resource.Spec, pod.Spec)).To(BeTrue(), "Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, d.resource.Spec) expOperation <- op + return transformed, requeue } }) @@ -472,9 +478,10 @@ func testTransformFunction() { When("the transform function returns nil with no re-queue", func() { BeforeEach(func() { - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(_ runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { atomic.AddInt32(&invocationCount, 1) expOperation <- op + return nil, false } }) @@ -513,8 +520,10 @@ func testTransformFunction() { BeforeEach(func() { transformFuncRet = &atomic.Value{} transformFuncRet.Store(nilResource) - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + + d.config.Transform = func(_ runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { var ret runtime.Object + v := transformFuncRet.Load() if v != nilResource { ret, _ = v.(runtime.Object) @@ -522,6 +531,7 @@ func testTransformFunction() { transformFuncRet.Store(transformed) expOperation <- op + return ret, true } }) @@ -565,10 +575,14 @@ func testOnSuccessfulSyncFunction() { BeforeEach(func() { expOperation = make(chan syncer.Operation, 20) expResource = d.resource + onSuccessfulSyncReturn.Store(false) + d.config.OnSuccessfulSync = func(synced runtime.Object, op syncer.Operation) bool { defer GinkgoRecover() + pod, ok := synced.(*corev1.Pod) + Expect(ok).To(BeTrue(), "Expected a Pod object: %#v", synced) Expect(equality.Semantic.DeepDerivative(expResource.Spec, pod.Spec)).To(BeTrue(), "Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, expResource.Spec) @@ -625,7 +639,7 @@ func testOnSuccessfulSyncFunction() { When("a resource is successfully created in the datastore", func() { BeforeEach(func() { expResource = test.NewPodWithImage(d.config.SourceNamespace, "transformed") - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(_ runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { return expResource, false } }) @@ -716,11 +730,14 @@ func testShouldProcessFunction() { shouldProcess = true d.config.ShouldProcess = func(obj *unstructured.Unstructured, op syncer.Operation) bool { defer GinkgoRecover() + pod := &corev1.Pod{} + Expect(d.config.Scheme.Convert(obj, pod, nil)).To(Succeed()) Expect(equality.Semantic.DeepDerivative(expResource.Spec, pod.Spec)).To(BeTrue(), "Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, expResource.Spec) expOperation <- op + return shouldProcess } }) @@ -1143,7 +1160,7 @@ func testRequeueResource() { BeforeEach(func() { transformed = test.NewPodWithImage(d.config.SourceNamespace, "transformed") - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(_ runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { return transformed, false } }) @@ -1201,6 +1218,18 @@ func testTrimResourceFields() { }) } +func testWithSharedInformer() { + d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) + + BeforeEach(func() { + d.useSharedInformer = true + }) + + When("a resource is created in the local datastore", func() { + d.verifyDistributeOnCreateTest("") + }) +} + func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) { expSpecs := map[string]*corev1.PodSpec{} for i := range expected { @@ -1220,6 +1249,7 @@ func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) { type testDriver struct { config syncer.ResourceSyncerConfig + useSharedInformer bool syncer syncer.Interface sourceClient dynamic.ResourceInterface federator *fake.Federator @@ -1255,6 +1285,7 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer. d.config.OnSuccessfulSync = nil d.config.ResourcesEquivalent = nil d.config.ResyncPeriod = 0 + d.useSharedInformer = false err := corev1.AddToScheme(d.config.Scheme) Expect(err).To(Succeed()) @@ -1271,7 +1302,22 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer. d.sourceClient = d.config.SourceClient.Resource(*gvr).Namespace(d.config.SourceNamespace) var err error - d.syncer, err = syncer.NewResourceSyncer(&d.config) + + if d.useSharedInformer { + var sharedInformer cache.SharedInformer + + sharedInformer, err = syncer.NewSharedInformer(&d.config) + Expect(err).To(Succeed()) + + d.syncer, err = syncer.NewResourceSyncerWithSharedInformer(&d.config, sharedInformer) + + go func() { + sharedInformer.Run(d.stopCh) + }() + } else { + d.syncer, err = syncer.NewResourceSyncer(&d.config) + } + Expect(err).To(Succeed()) utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { diff --git a/pkg/syncer/syncer_suite_test.go b/pkg/syncer/syncer_suite_test.go index f40cc084..ff446a0e 100644 --- a/pkg/syncer/syncer_suite_test.go +++ b/pkg/syncer/syncer_suite_test.go @@ -18,6 +18,7 @@ limitations under the License. package syncer_test import ( + "flag" "testing" . "github.com/onsi/ginkgo/v2" @@ -26,6 +27,10 @@ import ( ) func init() { + flags := flag.NewFlagSet("kzerolog", flag.ExitOnError) + kzerolog.AddFlags(flags) + _ = flags.Parse([]string{"-v=1"}) + kzerolog.AddFlags(nil) } From 5c3e672952fb419e43a34517695883f6f8046e50 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 13 May 2024 19:19:25 -0400 Subject: [PATCH 5/6] Use informer to retry failed resources due to missing namespace ...rather than exponential requeuing which is problematic at scale. The user can specify a SharedInformer that is used to requeue resources that had previously failed when a missing namespace is later created. Signed-off-by: Tom Pantelis --- pkg/federate/fake/federator.go | 51 +++++++++---- pkg/syncer/broker/syncer.go | 5 ++ pkg/syncer/resource_syncer.go | 111 +++++++++++++++++++++++------ pkg/syncer/resource_syncer_test.go | 86 +++++++++++++++++++--- 4 files changed, 209 insertions(+), 44 deletions(-) diff --git a/pkg/federate/fake/federator.go b/pkg/federate/fake/federator.go index e5f5990e..057724a2 100644 --- a/pkg/federate/fake/federator.go +++ b/pkg/federate/fake/federator.go @@ -21,6 +21,8 @@ package fake import ( "context" + "sync" + "sync/atomic" "time" . "github.com/onsi/gomega" @@ -30,26 +32,46 @@ import ( ) type Federator struct { + lock sync.Mutex distribute chan *unstructured.Unstructured delete chan *unstructured.Unstructured - FailOnDistribute error - FailOnDelete error - ResetOnFailure bool + failOnDistribute error + failOnDelete error + ResetOnFailure atomic.Bool } func New() *Federator { - return &Federator{ - distribute: make(chan *unstructured.Unstructured, 100), - delete: make(chan *unstructured.Unstructured, 100), - ResetOnFailure: true, + f := &Federator{ + distribute: make(chan *unstructured.Unstructured, 100), + delete: make(chan *unstructured.Unstructured, 100), } + f.ResetOnFailure.Store(true) + + return f +} + +func (f *Federator) FailOnDistribute(err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.failOnDistribute = err +} + +func (f *Federator) FailOnDelete(err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.failOnDelete = err } func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { - err := f.FailOnDistribute + f.lock.Lock() + defer f.lock.Unlock() + + err := f.failOnDistribute if err != nil { - if f.ResetOnFailure { - f.FailOnDistribute = nil + if f.ResetOnFailure.Load() { + f.failOnDistribute = nil } return err @@ -61,10 +83,13 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { } func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { - err := f.FailOnDelete + f.lock.Lock() + defer f.lock.Unlock() + + err := f.failOnDelete if err != nil { - if f.ResetOnFailure { - f.FailOnDelete = nil + if f.ResetOnFailure.Load() { + f.failOnDelete = nil } return err diff --git a/pkg/syncer/broker/syncer.go b/pkg/syncer/broker/syncer.go index 5ba930bc..408f0de9 100644 --- a/pkg/syncer/broker/syncer.go +++ b/pkg/syncer/broker/syncer.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -134,6 +135,9 @@ type SyncerConfig struct { // Scheme used to convert resource objects. By default the global k8s Scheme is used. Scheme *runtime.Scheme + + // NamespaceInformer if specified, used to retry local resources that initially failed due to missing namespace. + NamespaceInformer cache.SharedInformer } type Syncer struct { @@ -254,6 +258,7 @@ func NewSyncer(config SyncerConfig) (*Syncer, error) { //nolint:gocritic // Mini Scheme: config.Scheme, ResyncPeriod: rc.BrokerResyncPeriod, SyncCounter: syncCounter, + NamespaceInformer: config.NamespaceInformer, }) if err != nil { return nil, errors.Wrap(err, "error creating remote resource syncer") diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index d40b90d2..283710dd 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -43,10 +43,14 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "k8s.io/utils/set" logf "sigs.k8s.io/controller-runtime/pkg/log" ) -const OrigNamespaceLabelKey = "submariner-io/originatingNamespace" +const ( + OrigNamespaceLabelKey = "submariner-io/originatingNamespace" + namespaceKey = "$namespace$" +) type SyncDirection int @@ -183,24 +187,31 @@ type ResourceSyncerConfig struct { // SyncCounter if specified, used to record counter metrics. SyncCounter *prometheus.GaugeVec + + // NamespaceInformer if specified, used to retry resources that initially failed due to missing namespace. + NamespaceInformer cache.SharedInformer } type resourceSyncer struct { - workQueue workqueue.Interface - hasSynced func() bool - informer cache.Controller - store cache.Store - config ResourceSyncerConfig - deleted sync.Map - created sync.Map - stopped chan struct{} - syncCounter *prometheus.GaugeVec - stopCh <-chan struct{} - log log.Logger + workQueue workqueue.Interface + hasSynced func() bool + informer cache.Controller + store cache.Store + config ResourceSyncerConfig + deleted sync.Map + created sync.Map + stopped chan struct{} + syncCounter *prometheus.GaugeVec + stopCh <-chan struct{} + log log.Logger + missingNamespaces map[string]set.Set[string] } func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { - syncer := newResourceSyncer(config) + syncer, err := newResourceSyncer(config) + if err != nil { + return nil, err + } rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) if err != nil { @@ -233,7 +244,11 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { } func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer cache.SharedInformer) (Interface, error) { - syncer := newResourceSyncer(config) + syncer, err := newResourceSyncer(config) + if err != nil { + return nil, err + } + syncer.store = informer.GetStore() reg, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ @@ -242,7 +257,7 @@ func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer DeleteFunc: syncer.onDelete, }, config.ResyncPeriod) if err != nil { - return nil, errors.Wrapf(err, "error registering even handler") + return nil, errors.Wrapf(err, "error registering event handler") } syncer.hasSynced = reg.HasSynced @@ -250,11 +265,12 @@ func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer return syncer, nil } -func newResourceSyncer(config *ResourceSyncerConfig) *resourceSyncer { +func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { syncer := &resourceSyncer{ - config: *config, - stopped: make(chan struct{}), - log: log.Logger{Logger: logf.Log.WithName("ResourceSyncer")}, + config: *config, + stopped: make(chan struct{}), + log: log.Logger{Logger: logf.Log.WithName("ResourceSyncer")}, + missingNamespaces: map[string]set.Set[string]{}, } if syncer.config.Scheme == nil { @@ -286,7 +302,18 @@ func newResourceSyncer(config *ResourceSyncerConfig) *resourceSyncer { syncer.workQueue = workqueue.New(config.Name) - return syncer + if config.NamespaceInformer != nil { + _, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, _ bool) { + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String())) + }, + }) + if err != nil { + return nil, errors.Wrapf(err, "error registering namespace handler") + } + } + + return syncer, nil } func NewSharedInformer(config *ResourceSyncerConfig) (cache.SharedInformer, error) { @@ -478,7 +505,12 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any return run() } -func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) { +func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) { + if ns == namespaceKey { + r.handleNamespaceAdded(name) + return false, nil + } + obj, exists, err := r.store.GetByKey(key) if err != nil { return true, errors.Wrapf(err, "error retrieving resource %q", key) @@ -515,6 +547,13 @@ func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) { err = r.config.Federator.Distribute(context.Background(), resource) if err != nil || r.onSuccessfulSync(resource, transformed, op) { + missing, namespace := resourceUtil.IsMissingNamespaceErr(err) + if missing { + r.handleMissingNamespace(key, namespace) + + return false, nil + } + return true, errors.Wrapf(err, "error distributing resource %q", key) } @@ -730,6 +769,36 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr return u } +func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { + r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace) + + if r.config.NamespaceInformer == nil { + return + } + + keys, ok := r.missingNamespaces[namespace] + if !ok { + keys = set.New[string]() + r.missingNamespaces[namespace] = keys + } + + keys.Insert(key) +} + +func (r *resourceSyncer) handleNamespaceAdded(namespace string) { + keys, ok := r.missingNamespaces[namespace] + if ok { + r.log.V(log.LIBDEBUG).Infof("Syncer %q: namespace %q created - re-queueing %d resources", r.config.Name, namespace, keys.Len()) + + for _, k := range keys.UnsortedList() { + ns, name, _ := cache.SplitMetaNamespaceKey(k) + r.RequeueResource(name, ns) + } + + delete(r.missingNamespaces, namespace) + } +} + func getClusterIDLabel(resource runtime.Object) (string, bool) { clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey] return clusterID, found diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 839e01d1..0066d82a 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -45,6 +45,9 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + fakeK8s "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -73,6 +76,7 @@ var _ = Describe("Resource Syncer", func() { }) Describe("Trim Resource Fields", testTrimResourceFields) Describe("With SharedInformer", testWithSharedInformer) + Describe("With missing namespace", testWithMissingNamespace) }) func testReconcileLocalToRemote() { @@ -448,7 +452,7 @@ func testTransformFunction() { When("deletion of the transformed resource initially fails", func() { BeforeEach(func() { - d.federator.FailOnDelete = errors.New("fake error") + d.federator.FailOnDelete(errors.New("fake error")) d.addInitialResource(d.resource) }) @@ -465,7 +469,7 @@ func testTransformFunction() { When("distribute for the transformed resource initially fails", func() { JustBeforeEach(func() { - d.federator.FailOnDistribute = errors.New("fake error") + d.federator.FailOnDistribute(errors.New("fake error")) }) It("retry until it succeeds", func() { @@ -654,8 +658,8 @@ func testOnSuccessfulSyncFunction() { When("distribute fails", func() { BeforeEach(func() { - d.federator.FailOnDistribute = errors.New("fake error") - d.federator.ResetOnFailure = false + d.federator.FailOnDistribute(errors.New("fake error")) + d.federator.ResetOnFailure.Store(false) }) It("should not invoke the OnSuccessfulSync function", func() { @@ -672,8 +676,8 @@ func testOnSuccessfulSyncFunction() { Context("with a general error", func() { BeforeEach(func() { - d.federator.FailOnDelete = errors.New("fake error") - d.federator.ResetOnFailure = false + d.federator.FailOnDelete(errors.New("fake error")) + d.federator.ResetOnFailure.Store(false) }) It("should not invoke the OnSuccessfulSync function", func() { @@ -684,7 +688,7 @@ func testOnSuccessfulSyncFunction() { Context("with a NotFound error", func() { BeforeEach(func() { - d.federator.FailOnDelete = apierrors.NewNotFound(schema.GroupResource{}, "") + d.federator.FailOnDelete(apierrors.NewNotFound(schema.GroupResource{}, "")) }) It("should invoke the OnSuccessfulSync function", func() { @@ -842,7 +846,7 @@ func testSyncErrors() { When("distribute initially fails", func() { BeforeEach(func() { - d.federator.FailOnDistribute = expectedErr + d.federator.FailOnDistribute(expectedErr) }) It("should log the error and retry until it succeeds", func() { @@ -853,7 +857,7 @@ func testSyncErrors() { When("delete initially fails", func() { BeforeEach(func() { - d.federator.FailOnDelete = expectedErr + d.federator.FailOnDelete(expectedErr) d.addInitialResource(d.resource) }) @@ -869,7 +873,7 @@ func testSyncErrors() { When("delete fails with not found", func() { BeforeEach(func() { - d.federator.FailOnDelete = apierrors.NewNotFound(schema.GroupResource{}, "not found") + d.federator.FailOnDelete(apierrors.NewNotFound(schema.GroupResource{}, "not found")) d.addInitialResource(d.resource) }) @@ -1230,6 +1234,68 @@ func testWithSharedInformer() { }) } +func testWithMissingNamespace() { + const transformedNamespace = "transformed-ns" + + d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) + + var ( + k8sClient kubernetes.Interface + nsInformerFactory informers.SharedInformerFactory + ) + + BeforeEach(func() { + d.config.Transform = func(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { + obj = obj.DeepCopyObject() + resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + + return obj, false + } + + d.federator.FailOnDistribute(apierrors.NewNotFound(schema.GroupResource{ + Resource: "namespaces", + }, transformedNamespace)) + + k8sClient = fakeK8s.NewSimpleClientset() + nsInformerFactory = informers.NewSharedInformerFactory(k8sClient, 0) + d.config.NamespaceInformer = nsInformerFactory.Core().V1().Namespaces().Informer() + }) + + JustBeforeEach(func() { + nsInformerFactory.Start(d.stopCh) + }) + + Specify("distribute should eventually succeed when the namespace is created", func() { + resource := test.CreateResource(d.sourceClient, d.resource) + d.federator.VerifyNoDistribute() + + d.federator.FailOnDistribute(nil) + + By("Creating namespace") + + _, err := k8sClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: transformedNamespace, + }, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + resource.SetNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + }) + + Context("and no namespace informer specified", func() { + BeforeEach(func() { + d.config.NamespaceInformer = nil + }) + + It("should not retry", func() { + test.CreateResource(d.sourceClient, d.resource) + d.federator.VerifyNoDistribute() + }) + }) +} + func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) { expSpecs := map[string]*corev1.PodSpec{} for i := range expected { From 022fa4b5a8ba428f26cc9debdab1552b9cfeb6c1 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 13 May 2024 19:19:56 -0400 Subject: [PATCH 6/6] Add fake reactor to verify namespace on create Signed-off-by: Tom Pantelis --- pkg/fake/basic_reactors_test.go | 26 +++++++++++++++ pkg/fake/verify_namespace_reactor.go | 50 ++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 pkg/fake/verify_namespace_reactor.go diff --git a/pkg/fake/basic_reactors_test.go b/pkg/fake/basic_reactors_test.go index 83f780fc..f9dfd0f9 100644 --- a/pkg/fake/basic_reactors_test.go +++ b/pkg/fake/basic_reactors_test.go @@ -74,6 +74,32 @@ var _ = Describe("Create", func() { Expect(err).To(HaveOccurred()) }) }) + + Context("with namespace verification", func() { + BeforeEach(func() { + fake.AddVerifyNamespaceReactor(&t.client.Fake, "pods") + }) + + When("the namespace does not exist", func() { + It("should return an error", func() { + _, err := t.doCreate(t.pod) + missing, ns := resource.IsMissingNamespaceErr(err) + Expect(missing).To(BeTrue()) + Expect(ns).To(Equal(testNamespace)) + }) + }) + + When("the namespace does exist", func() { + It("should succeed", func() { + _, err := t.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: testNamespace}, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + t.assertCreateSuccess(t.pod) + }) + }) + }) }) var _ = Describe("Update", func() { diff --git a/pkg/fake/verify_namespace_reactor.go b/pkg/fake/verify_namespace_reactor.go new file mode 100644 index 00000000..e3960b7b --- /dev/null +++ b/pkg/fake/verify_namespace_reactor.go @@ -0,0 +1,50 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/testing" +) + +func AddVerifyNamespaceReactor(f *testing.Fake, resources ...string) { + f.Lock() + defer f.Unlock() + + reactors := f.ReactionChain[0:] + + react := func(a testing.Action) (bool, runtime.Object, error) { + action := a.(testing.CreateAction) + + namespace := action.GetNamespace() + + _, err := invokeReactors(testing.NewGetAction(corev1.SchemeGroupVersion.WithResource("namespaces"), "", namespace), + reactors) + if err != nil { + return true, nil, err + } + + return false, nil, nil + } + + for _, res := range resources { + f.PrependReactor("create", res, react) + } +}