diff --git a/pkg/fake/basic_reactors_test.go b/pkg/fake/basic_reactors_test.go index 83f780fc..f9dfd0f9 100644 --- a/pkg/fake/basic_reactors_test.go +++ b/pkg/fake/basic_reactors_test.go @@ -74,6 +74,32 @@ var _ = Describe("Create", func() { Expect(err).To(HaveOccurred()) }) }) + + Context("with namespace verification", func() { + BeforeEach(func() { + fake.AddVerifyNamespaceReactor(&t.client.Fake, "pods") + }) + + When("the namespace does not exist", func() { + It("should return an error", func() { + _, err := t.doCreate(t.pod) + missing, ns := resource.IsMissingNamespaceErr(err) + Expect(missing).To(BeTrue()) + Expect(ns).To(Equal(testNamespace)) + }) + }) + + When("the namespace does exist", func() { + It("should succeed", func() { + _, err := t.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: testNamespace}, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + t.assertCreateSuccess(t.pod) + }) + }) + }) }) var _ = Describe("Update", func() { diff --git a/pkg/fake/verify_namespace_reactor.go b/pkg/fake/verify_namespace_reactor.go new file mode 100644 index 00000000..e3960b7b --- /dev/null +++ b/pkg/fake/verify_namespace_reactor.go @@ -0,0 +1,50 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/testing" +) + +func AddVerifyNamespaceReactor(f *testing.Fake, resources ...string) { + f.Lock() + defer f.Unlock() + + reactors := f.ReactionChain[0:] + + react := func(a testing.Action) (bool, runtime.Object, error) { + action := a.(testing.CreateAction) + + namespace := action.GetNamespace() + + _, err := invokeReactors(testing.NewGetAction(corev1.SchemeGroupVersion.WithResource("namespaces"), "", namespace), + reactors) + if err != nil { + return true, nil, err + } + + return false, nil, nil + } + + for _, res := range resources { + f.PrependReactor("create", res, react) + } +} diff --git a/pkg/federate/fake/federator.go b/pkg/federate/fake/federator.go index e5f5990e..057724a2 100644 --- a/pkg/federate/fake/federator.go +++ b/pkg/federate/fake/federator.go @@ -21,6 +21,8 @@ package fake import ( "context" + "sync" + "sync/atomic" "time" . "github.com/onsi/gomega" @@ -30,26 +32,46 @@ import ( ) type Federator struct { + lock sync.Mutex distribute chan *unstructured.Unstructured delete chan *unstructured.Unstructured - FailOnDistribute error - FailOnDelete error - ResetOnFailure bool + failOnDistribute error + failOnDelete error + ResetOnFailure atomic.Bool } func New() *Federator { - return &Federator{ - distribute: make(chan *unstructured.Unstructured, 100), - delete: make(chan *unstructured.Unstructured, 100), - ResetOnFailure: true, + f := &Federator{ + distribute: make(chan *unstructured.Unstructured, 100), + delete: make(chan *unstructured.Unstructured, 100), } + f.ResetOnFailure.Store(true) + + return f +} + +func (f *Federator) FailOnDistribute(err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.failOnDistribute = err +} + +func (f *Federator) FailOnDelete(err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.failOnDelete = err } func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { - err := f.FailOnDistribute + f.lock.Lock() + defer f.lock.Unlock() + + err := f.failOnDistribute if err != nil { - if f.ResetOnFailure { - f.FailOnDistribute = nil + if f.ResetOnFailure.Load() { + f.failOnDistribute = nil } return err @@ -61,10 +83,13 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { } func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { - err := f.FailOnDelete + f.lock.Lock() + defer f.lock.Unlock() + + err := f.failOnDelete if err != nil { - if f.ResetOnFailure { - f.FailOnDelete = nil + if f.ResetOnFailure.Load() { + f.failOnDelete = nil } return err diff --git a/pkg/resource/resource_test.go b/pkg/resource/resource_test.go index 38c40831..fd869b2d 100644 --- a/pkg/resource/resource_test.go +++ b/pkg/resource/resource_test.go @@ -22,6 +22,10 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/submariner-io/admiral/pkg/resource" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" ) var _ = Describe("EnsureValidName", func() { @@ -44,3 +48,60 @@ var _ = Describe("EnsureValidName", func() { }) }) }) + +var _ = Describe("TrimManagedFields", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: map[string]string{"app": "test"}, + Annotations: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "image", + Name: "httpd", + }, + }, + }, + } + + When("the object has metadata", func() { + It("should succeed and trim the ManagedFields", func() { + podWithMF := pod.DeepCopy() + podWithMF.ManagedFields = []metav1.ManagedFieldsEntry{ + { + Manager: "kubectl", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "v1", + Time: ptr.To(metav1.Now()), + FieldsType: "FieldsV1", + FieldsV1: ptr.To(metav1.FieldsV1{}), + }, + } + + retObj, err := resource.TrimManagedFields(podWithMF) + Expect(err).To(Succeed()) + Expect(podWithMF).To(Equal(pod)) + Expect(retObj).To(BeIdenticalTo(podWithMF)) + + _, err = resource.TrimManagedFields(podWithMF) + Expect(err).To(Succeed()) + Expect(podWithMF).To(Equal(pod)) + }) + }) + + When("the object does not have metadata", func() { + It("should succeed and return the object", func() { + obj := &cache.DeletedFinalStateUnknown{ + Key: "key", + Obj: pod, + } + + retObj, err := resource.TrimManagedFields(obj) + Expect(err).To(Succeed()) + Expect(retObj).To(Equal(obj)) + }) + }) +}) diff --git a/pkg/resource/util.go b/pkg/resource/util.go index 54054fcc..af861b91 100644 --- a/pkg/resource/util.go +++ b/pkg/resource/util.go @@ -125,3 +125,14 @@ func ToJSON(o any) string { out, _ := json.MarshalIndent(o, "", " ") return string(out) } + +// TrimManagedFields is a cache.TransformFunc that removes the ManagedFields metadata field if present. Note that if 'obj' does not +// implement metav1.Object then it is ignored and no error is returned. +func TrimManagedFields(obj interface{}) (interface{}, error) { + objMeta, err := meta.Accessor(obj) + if err == nil && len(objMeta.GetManagedFields()) > 0 { + objMeta.SetManagedFields(nil) + } + + return obj, nil +} diff --git a/pkg/syncer/broker/syncer.go b/pkg/syncer/broker/syncer.go index 5ba930bc..408f0de9 100644 --- a/pkg/syncer/broker/syncer.go +++ b/pkg/syncer/broker/syncer.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -134,6 +135,9 @@ type SyncerConfig struct { // Scheme used to convert resource objects. By default the global k8s Scheme is used. Scheme *runtime.Scheme + + // NamespaceInformer if specified, used to retry local resources that initially failed due to missing namespace. + NamespaceInformer cache.SharedInformer } type Syncer struct { @@ -254,6 +258,7 @@ func NewSyncer(config SyncerConfig) (*Syncer, error) { //nolint:gocritic // Mini Scheme: config.Scheme, ResyncPeriod: rc.BrokerResyncPeriod, SyncCounter: syncCounter, + NamespaceInformer: config.NamespaceInformer, }) if err != nil { return nil, errors.Wrap(err, "error creating remote resource syncer") diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index 4fcef40b..283710dd 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -43,10 +43,14 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "k8s.io/utils/set" logf "sigs.k8s.io/controller-runtime/pkg/log" ) -const OrigNamespaceLabelKey = "submariner-io/originatingNamespace" +const ( + OrigNamespaceLabelKey = "submariner-io/originatingNamespace" + namespaceKey = "$namespace$" +) type SyncDirection int @@ -183,26 +187,90 @@ type ResourceSyncerConfig struct { // SyncCounter if specified, used to record counter metrics. SyncCounter *prometheus.GaugeVec + + // NamespaceInformer if specified, used to retry resources that initially failed due to missing namespace. + NamespaceInformer cache.SharedInformer } type resourceSyncer struct { - workQueue workqueue.Interface - informer cache.Controller - store cache.Store - config ResourceSyncerConfig - deleted sync.Map - created sync.Map - stopped chan struct{} - syncCounter *prometheus.GaugeVec - stopCh <-chan struct{} - log log.Logger + workQueue workqueue.Interface + hasSynced func() bool + informer cache.Controller + store cache.Store + config ResourceSyncerConfig + deleted sync.Map + created sync.Map + stopped chan struct{} + syncCounter *prometheus.GaugeVec + stopCh <-chan struct{} + log log.Logger + missingNamespaces map[string]set.Set[string] } func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { + syncer, err := newResourceSyncer(config) + if err != nil { + return nil, err + } + + rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) + if err != nil { + return nil, err //nolint:wrapcheck // OK to return the error as is. + } + + resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace) + + //nolint:wrapcheck // These are wrapper functions. + syncer.store, syncer.informer = cache.NewTransformingInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = config.SourceLabelSelector + options.FieldSelector = config.SourceFieldSelector + return resourceClient.List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = config.SourceLabelSelector + options.FieldSelector = config.SourceFieldSelector + return resourceClient.Watch(context.TODO(), options) + }, + }, rawType, config.ResyncPeriod, cache.ResourceEventHandlerFuncs{ + AddFunc: syncer.onCreate, + UpdateFunc: syncer.onUpdate, + DeleteFunc: syncer.onDelete, + }, resourceUtil.TrimManagedFields) + + syncer.hasSynced = syncer.informer.HasSynced + + return syncer, nil +} + +func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer cache.SharedInformer) (Interface, error) { + syncer, err := newResourceSyncer(config) + if err != nil { + return nil, err + } + + syncer.store = informer.GetStore() + + reg, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: syncer.onCreate, + UpdateFunc: syncer.onUpdate, + DeleteFunc: syncer.onDelete, + }, config.ResyncPeriod) + if err != nil { + return nil, errors.Wrapf(err, "error registering event handler") + } + + syncer.hasSynced = reg.HasSynced + + return syncer, nil +} + +func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { syncer := &resourceSyncer{ - config: *config, - stopped: make(chan struct{}), - log: log.Logger{Logger: logf.Log.WithName("ResourceSyncer")}, + config: *config, + stopped: make(chan struct{}), + log: log.Logger{Logger: logf.Log.WithName("ResourceSyncer")}, + missingNamespaces: map[string]set.Set[string]{}, } if syncer.config.Scheme == nil { @@ -218,11 +286,6 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { syncer.config.WaitForCacheSync = &wait } - rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) - if err != nil { - return nil, err //nolint:wrapcheck // OK to return the error as is. - } - if syncer.config.SyncCounter != nil { syncer.syncCounter = syncer.config.SyncCounter } else if syncer.config.SyncCounterOpts != nil { @@ -239,27 +302,42 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) { syncer.workQueue = workqueue.New(config.Name) + if config.NamespaceInformer != nil { + _, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, _ bool) { + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String())) + }, + }) + if err != nil { + return nil, errors.Wrapf(err, "error registering namespace handler") + } + } + + return syncer, nil +} + +func NewSharedInformer(config *ResourceSyncerConfig) (cache.SharedInformer, error) { + rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper) + if err != nil { + return nil, err //nolint:wrapcheck // OK to return the error as is. + } + resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace) //nolint:wrapcheck // These are wrapper functions. - syncer.store, syncer.informer = cache.NewInformer(&cache.ListWatch{ + informer := cache.NewSharedIndexInformerWithOptions(&cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.LabelSelector = config.SourceLabelSelector - options.FieldSelector = config.SourceFieldSelector return resourceClient.List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.LabelSelector = config.SourceLabelSelector - options.FieldSelector = config.SourceFieldSelector return resourceClient.Watch(context.TODO(), options) }, - }, rawType, config.ResyncPeriod, cache.ResourceEventHandlerFuncs{ - AddFunc: syncer.onCreate, - UpdateFunc: syncer.onUpdate, - DeleteFunc: syncer.onDelete, + }, rawType, cache.SharedIndexInformerOptions{ + ResyncPeriod: config.ResyncPeriod, }) - return syncer, nil + //nolint:wrapcheck // OK to return the error as is. + return informer, informer.SetTransform(resourceUtil.TrimManagedFields) } func (r *resourceSyncer) Start(stopCh <-chan struct{}) error { @@ -278,13 +356,17 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error { }() defer r.workQueue.ShutDownWithDrain() - r.informer.Run(stopCh) + if r.informer != nil { + r.informer.Run(stopCh) + } else { + <-r.stopCh + } }() if *r.config.WaitForCacheSync { r.log.V(log.LIBDEBUG).Infof("Syncer %q waiting for informer cache to sync", r.config.Name) - _ = cache.WaitForCacheSync(stopCh, r.informer.HasSynced) + _ = cache.WaitForCacheSync(stopCh, r.hasSynced) } r.workQueue.Run(stopCh, r.processNextWorkItem) @@ -413,7 +495,7 @@ func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) { } func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any { - if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok { + if ok := cache.WaitForCacheSync(r.stopCh, r.hasSynced); !ok { // This means the cache was stopped. r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name) @@ -423,7 +505,12 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any return run() } -func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) { +func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) { + if ns == namespaceKey { + r.handleNamespaceAdded(name) + return false, nil + } + obj, exists, err := r.store.GetByKey(key) if err != nil { return true, errors.Wrapf(err, "error retrieving resource %q", key) @@ -460,6 +547,13 @@ func (r *resourceSyncer) processNextWorkItem(key, _, _ string) (bool, error) { err = r.config.Federator.Distribute(context.Background(), resource) if err != nil || r.onSuccessfulSync(resource, transformed, op) { + missing, namespace := resourceUtil.IsMissingNamespaceErr(err) + if missing { + r.handleMissingNamespace(key, namespace) + + return false, nil + } + return true, errors.Wrapf(err, "error distributing resource %q", key) } @@ -675,6 +769,36 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr return u } +func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { + r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace) + + if r.config.NamespaceInformer == nil { + return + } + + keys, ok := r.missingNamespaces[namespace] + if !ok { + keys = set.New[string]() + r.missingNamespaces[namespace] = keys + } + + keys.Insert(key) +} + +func (r *resourceSyncer) handleNamespaceAdded(namespace string) { + keys, ok := r.missingNamespaces[namespace] + if ok { + r.log.V(log.LIBDEBUG).Infof("Syncer %q: namespace %q created - re-queueing %d resources", r.config.Name, namespace, keys.Len()) + + for _, k := range keys.UnsortedList() { + ns, name, _ := cache.SplitMetaNamespaceKey(k) + r.RequeueResource(name, ns) + } + + delete(r.missingNamespaces, namespace) + } +} + func getClusterIDLabel(resource runtime.Object) (string, bool) { clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey] return clusterID, found diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 3fce5fa8..0066d82a 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -45,6 +45,11 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + fakeK8s "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" ) var _ = Describe("Resource Syncer", func() { @@ -69,6 +74,9 @@ var _ = Describe("Resource Syncer", func() { Context(fmt.Sprintf("Direction: %s", syncer.RemoteToLocal), testReconcileRemoteToLocal) Context(fmt.Sprintf("Direction: %s", syncer.None), testReconcileNoDirection) }) + Describe("Trim Resource Fields", testTrimResourceFields) + Describe("With SharedInformer", testWithSharedInformer) + Describe("With missing namespace", testWithMissingNamespace) }) func testReconcileLocalToRemote() { @@ -170,6 +178,7 @@ func testReconcileNoDirection() { JustBeforeEach(func() { obj := toReconcile.DeepCopyObject() + d.syncer.Reconcile(func() []runtime.Object { return []runtime.Object{obj} }) @@ -338,18 +347,21 @@ func testTransformFunction() { BeforeEach(func() { test.SetClusterIDLabel(d.resource, "remote") atomic.StoreInt32(&invocationCount, 0) + expOperation = make(chan syncer.Operation, 20) transformed = test.NewPodWithImage(d.config.SourceNamespace, "transformed") requeue = false - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(from runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { defer GinkgoRecover() atomic.AddInt32(&invocationCount, 1) + pod, ok := from.(*corev1.Pod) Expect(ok).To(BeTrue(), "Expected a Pod object: %#v", from) Expect(equality.Semantic.DeepDerivative(d.resource.Spec, pod.Spec)).To(BeTrue(), "Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, d.resource.Spec) expOperation <- op + return transformed, requeue } }) @@ -440,7 +452,7 @@ func testTransformFunction() { When("deletion of the transformed resource initially fails", func() { BeforeEach(func() { - d.federator.FailOnDelete = errors.New("fake error") + d.federator.FailOnDelete(errors.New("fake error")) d.addInitialResource(d.resource) }) @@ -457,7 +469,7 @@ func testTransformFunction() { When("distribute for the transformed resource initially fails", func() { JustBeforeEach(func() { - d.federator.FailOnDistribute = errors.New("fake error") + d.federator.FailOnDistribute(errors.New("fake error")) }) It("retry until it succeeds", func() { @@ -470,9 +482,10 @@ func testTransformFunction() { When("the transform function returns nil with no re-queue", func() { BeforeEach(func() { - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(_ runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { atomic.AddInt32(&invocationCount, 1) expOperation <- op + return nil, false } }) @@ -511,8 +524,10 @@ func testTransformFunction() { BeforeEach(func() { transformFuncRet = &atomic.Value{} transformFuncRet.Store(nilResource) - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + + d.config.Transform = func(_ runtime.Object, _ int, op syncer.Operation) (runtime.Object, bool) { var ret runtime.Object + v := transformFuncRet.Load() if v != nilResource { ret, _ = v.(runtime.Object) @@ -520,6 +535,7 @@ func testTransformFunction() { transformFuncRet.Store(transformed) expOperation <- op + return ret, true } }) @@ -563,10 +579,14 @@ func testOnSuccessfulSyncFunction() { BeforeEach(func() { expOperation = make(chan syncer.Operation, 20) expResource = d.resource + onSuccessfulSyncReturn.Store(false) + d.config.OnSuccessfulSync = func(synced runtime.Object, op syncer.Operation) bool { defer GinkgoRecover() + pod, ok := synced.(*corev1.Pod) + Expect(ok).To(BeTrue(), "Expected a Pod object: %#v", synced) Expect(equality.Semantic.DeepDerivative(expResource.Spec, pod.Spec)).To(BeTrue(), "Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, expResource.Spec) @@ -623,7 +643,7 @@ func testOnSuccessfulSyncFunction() { When("a resource is successfully created in the datastore", func() { BeforeEach(func() { expResource = test.NewPodWithImage(d.config.SourceNamespace, "transformed") - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(_ runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { return expResource, false } }) @@ -638,8 +658,8 @@ func testOnSuccessfulSyncFunction() { When("distribute fails", func() { BeforeEach(func() { - d.federator.FailOnDistribute = errors.New("fake error") - d.federator.ResetOnFailure = false + d.federator.FailOnDistribute(errors.New("fake error")) + d.federator.ResetOnFailure.Store(false) }) It("should not invoke the OnSuccessfulSync function", func() { @@ -656,8 +676,8 @@ func testOnSuccessfulSyncFunction() { Context("with a general error", func() { BeforeEach(func() { - d.federator.FailOnDelete = errors.New("fake error") - d.federator.ResetOnFailure = false + d.federator.FailOnDelete(errors.New("fake error")) + d.federator.ResetOnFailure.Store(false) }) It("should not invoke the OnSuccessfulSync function", func() { @@ -668,7 +688,7 @@ func testOnSuccessfulSyncFunction() { Context("with a NotFound error", func() { BeforeEach(func() { - d.federator.FailOnDelete = apierrors.NewNotFound(schema.GroupResource{}, "") + d.federator.FailOnDelete(apierrors.NewNotFound(schema.GroupResource{}, "")) }) It("should invoke the OnSuccessfulSync function", func() { @@ -714,11 +734,14 @@ func testShouldProcessFunction() { shouldProcess = true d.config.ShouldProcess = func(obj *unstructured.Unstructured, op syncer.Operation) bool { defer GinkgoRecover() + pod := &corev1.Pod{} + Expect(d.config.Scheme.Convert(obj, pod, nil)).To(Succeed()) Expect(equality.Semantic.DeepDerivative(expResource.Spec, pod.Spec)).To(BeTrue(), "Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, expResource.Spec) expOperation <- op + return shouldProcess } }) @@ -823,7 +846,7 @@ func testSyncErrors() { When("distribute initially fails", func() { BeforeEach(func() { - d.federator.FailOnDistribute = expectedErr + d.federator.FailOnDistribute(expectedErr) }) It("should log the error and retry until it succeeds", func() { @@ -834,7 +857,7 @@ func testSyncErrors() { When("delete initially fails", func() { BeforeEach(func() { - d.federator.FailOnDelete = expectedErr + d.federator.FailOnDelete(expectedErr) d.addInitialResource(d.resource) }) @@ -850,7 +873,7 @@ func testSyncErrors() { When("delete fails with not found", func() { BeforeEach(func() { - d.federator.FailOnDelete = apierrors.NewNotFound(schema.GroupResource{}, "not found") + d.federator.FailOnDelete(apierrors.NewNotFound(schema.GroupResource{}, "not found")) d.addInitialResource(d.resource) }) @@ -1141,7 +1164,7 @@ func testRequeueResource() { BeforeEach(func() { transformed = test.NewPodWithImage(d.config.SourceNamespace, "transformed") - d.config.Transform = func(from runtime.Object, numRequeues int, op syncer.Operation) (runtime.Object, bool) { + d.config.Transform = func(_ runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { return transformed, false } }) @@ -1168,6 +1191,111 @@ func testRequeueResource() { }) } +func testTrimResourceFields() { + d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) + + BeforeEach(func() { + d.config.ResyncPeriod = time.Millisecond * 100 + + d.resource.SetManagedFields([]metav1.ManagedFieldsEntry{ + { + Manager: "kubectl", + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: "v1", + Time: ptr.To(metav1.Now()), + FieldsType: "FieldsV1", + FieldsV1: ptr.To(metav1.FieldsV1{}), + }, + }) + + d.addInitialResource(d.resource) + }) + + It("should remove ManagedFields from created resources", func() { + obj, exists, err := d.syncer.GetResource(d.resource.Name, d.resource.Namespace) + Expect(err).To(Succeed()) + Expect(exists).To(BeTrue()) + Expect(resourceutils.MustToMeta(obj).GetManagedFields()).Should(BeNil()) + + // Sleep a little so a re-sync occurs and doesn't cause a data race. + time.Sleep(200) + }) +} + +func testWithSharedInformer() { + d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) + + BeforeEach(func() { + d.useSharedInformer = true + }) + + When("a resource is created in the local datastore", func() { + d.verifyDistributeOnCreateTest("") + }) +} + +func testWithMissingNamespace() { + const transformedNamespace = "transformed-ns" + + d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) + + var ( + k8sClient kubernetes.Interface + nsInformerFactory informers.SharedInformerFactory + ) + + BeforeEach(func() { + d.config.Transform = func(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { + obj = obj.DeepCopyObject() + resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + + return obj, false + } + + d.federator.FailOnDistribute(apierrors.NewNotFound(schema.GroupResource{ + Resource: "namespaces", + }, transformedNamespace)) + + k8sClient = fakeK8s.NewSimpleClientset() + nsInformerFactory = informers.NewSharedInformerFactory(k8sClient, 0) + d.config.NamespaceInformer = nsInformerFactory.Core().V1().Namespaces().Informer() + }) + + JustBeforeEach(func() { + nsInformerFactory.Start(d.stopCh) + }) + + Specify("distribute should eventually succeed when the namespace is created", func() { + resource := test.CreateResource(d.sourceClient, d.resource) + d.federator.VerifyNoDistribute() + + d.federator.FailOnDistribute(nil) + + By("Creating namespace") + + _, err := k8sClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: transformedNamespace, + }, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + resource.SetNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + }) + + Context("and no namespace informer specified", func() { + BeforeEach(func() { + d.config.NamespaceInformer = nil + }) + + It("should not retry", func() { + test.CreateResource(d.sourceClient, d.resource) + d.federator.VerifyNoDistribute() + }) + }) +} + func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) { expSpecs := map[string]*corev1.PodSpec{} for i := range expected { @@ -1187,6 +1315,7 @@ func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) { type testDriver struct { config syncer.ResourceSyncerConfig + useSharedInformer bool syncer syncer.Interface sourceClient dynamic.ResourceInterface federator *fake.Federator @@ -1221,6 +1350,8 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer. d.config.Transform = nil d.config.OnSuccessfulSync = nil d.config.ResourcesEquivalent = nil + d.config.ResyncPeriod = 0 + d.useSharedInformer = false err := corev1.AddToScheme(d.config.Scheme) Expect(err).To(Succeed()) @@ -1237,7 +1368,22 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer. d.sourceClient = d.config.SourceClient.Resource(*gvr).Namespace(d.config.SourceNamespace) var err error - d.syncer, err = syncer.NewResourceSyncer(&d.config) + + if d.useSharedInformer { + var sharedInformer cache.SharedInformer + + sharedInformer, err = syncer.NewSharedInformer(&d.config) + Expect(err).To(Succeed()) + + d.syncer, err = syncer.NewResourceSyncerWithSharedInformer(&d.config, sharedInformer) + + go func() { + sharedInformer.Run(d.stopCh) + }() + } else { + d.syncer, err = syncer.NewResourceSyncer(&d.config) + } + Expect(err).To(Succeed()) utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) { diff --git a/pkg/syncer/syncer_suite_test.go b/pkg/syncer/syncer_suite_test.go index f40cc084..ff446a0e 100644 --- a/pkg/syncer/syncer_suite_test.go +++ b/pkg/syncer/syncer_suite_test.go @@ -18,6 +18,7 @@ limitations under the License. package syncer_test import ( + "flag" "testing" . "github.com/onsi/ginkgo/v2" @@ -26,6 +27,10 @@ import ( ) func init() { + flags := flag.NewFlagSet("kzerolog", flag.ExitOnError) + kzerolog.AddFlags(flags) + _ = flags.Parse([]string{"-v=1"}) + kzerolog.AddFlags(nil) } diff --git a/test/e2e/watcher/watcher.go b/test/e2e/watcher/watcher.go index ecde608f..4a415e87 100644 --- a/test/e2e/watcher/watcher.go +++ b/test/e2e/watcher/watcher.go @@ -36,6 +36,7 @@ var _ = Describe("[watcher] Resource watcher tests", func() { It("should notify the handler of each event", func() { clusterName := framework.TestContext.ClusterIDs[framework.ClusterA] toaster := util.CreateToaster(t.client, util.NewToaster("test-toaster", t.framework.Namespace), clusterName) + toaster.SetManagedFields(nil) Eventually(t.created).Should(Receive(Equal(toaster))) util.DeleteToaster(t.client, toaster, clusterName)