diff --git a/apis/placement/v1/binding_types.go b/apis/placement/v1/binding_types.go index 65df2b6c4..14f2f31a7 100644 --- a/apis/placement/v1/binding_types.go +++ b/apis/placement/v1/binding_types.go @@ -164,6 +164,11 @@ func (m *ClusterResourceBinding) SetConditions(conditions ...metav1.Condition) { } } +// RemoveCondition removes the condition of the given ClusterResourceBinding. +func (m *ClusterResourceBinding) RemoveCondition(conditionType string) { + meta.RemoveStatusCondition(&m.Status.Conditions, conditionType) +} + // GetCondition returns the condition of the given ClusterResourceBinding. func (m *ClusterResourceBinding) GetCondition(conditionType string) *metav1.Condition { return meta.FindStatusCondition(m.Status.Conditions, conditionType) diff --git a/apis/placement/v1beta1/binding_types.go b/apis/placement/v1beta1/binding_types.go index b889c1c42..1beda3f9f 100644 --- a/apis/placement/v1beta1/binding_types.go +++ b/apis/placement/v1beta1/binding_types.go @@ -164,6 +164,11 @@ func (b *ClusterResourceBinding) SetConditions(conditions ...metav1.Condition) { } } +// RemoveCondition removes the condition of the given ClusterResourceBinding. +func (b *ClusterResourceBinding) RemoveCondition(conditionType string) { + meta.RemoveStatusCondition(&b.Status.Conditions, conditionType) +} + // GetCondition returns the condition of the given ClusterResourceBinding. func (b *ClusterResourceBinding) GetCondition(conditionType string) *metav1.Condition { return meta.FindStatusCondition(b.Status.Conditions, conditionType) diff --git a/pkg/controllers/clusterresourcebindingwatcher/watcher.go b/pkg/controllers/clusterresourcebindingwatcher/watcher.go index e226fb70c..0b4f2c90e 100644 --- a/pkg/controllers/clusterresourcebindingwatcher/watcher.go +++ b/pkg/controllers/clusterresourcebindingwatcher/watcher.go @@ -115,7 +115,7 @@ func isBindingStatusUpdated(oldBinding, newBinding *fleetv1beta1.ClusterResource oldCond := oldBinding.GetCondition(string(i.ResourceBindingConditionType())) newCond := newBinding.GetCondition(string(i.ResourceBindingConditionType())) if !condition.EqualCondition(oldCond, newCond) { - klog.V(2).InfoS("The binding condition has changed, need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding)) + klog.V(2).InfoS("The binding condition has changed, need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding), "type", i.ResourceBindingConditionType()) return true } } diff --git a/pkg/controllers/workgenerator/controller.go b/pkg/controllers/workgenerator/controller.go index 3ce4d8f13..4a7402080 100644 --- a/pkg/controllers/workgenerator/controller.go +++ b/pkg/controllers/workgenerator/controller.go @@ -120,6 +120,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques workUpdated := false overrideSucceeded := false + // Reset the conditions and failed placements. + for i := condition.OverriddenCondition; i < condition.TotalCondition; i++ { + resourceBinding.RemoveCondition(string(i.ResourceBindingConditionType())) + } + resourceBinding.Status.FailedPlacements = nil // list all the corresponding works works, syncErr := r.listAllWorksAssociated(ctx, &resourceBinding) if syncErr == nil { @@ -152,8 +157,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques if err := errors.Unwrap(syncErr); err != nil && len(err.Error()) > 2 { errorMessage = errorMessage[len(err.Error())+2:] } - // remove all the failedPlacement as it does not reflect the latest status - resourceBinding.Status.FailedPlacements = nil if !overrideSucceeded { resourceBinding.SetConditions(metav1.Condition{ Status: metav1.ConditionFalse, @@ -180,8 +183,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques Message: "All of the works are synchronized to the latest", }) if workUpdated { - // revert the applied condition and failedPlacement if we made any changes to the work - resourceBinding.Status.FailedPlacements = nil + // revert the applied condition if we made any changes to the work resourceBinding.SetConditions(metav1.Condition{ Status: metav1.ConditionFalse, Type: string(fleetv1beta1.ResourceBindingApplied), @@ -195,9 +197,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques } // update the resource binding status - if updateErr := r.Client.Status().Update(ctx, &resourceBinding); updateErr != nil { - klog.ErrorS(updateErr, "Failed to update the resourceBinding status", "resourceBinding", bindingRef) - return controllerruntime.Result{}, controller.NewUpdateIgnoreConflictError(updateErr) + if updateErr := r.updateBindingStatusWithRetry(ctx, &resourceBinding); updateErr != nil { + return controllerruntime.Result{}, updateErr } if errors.Is(syncErr, controller.ErrUserError) { // Stop retry when the error is caused by user error @@ -220,6 +221,50 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques return controllerruntime.Result{}, syncErr } +// updateBindingStatusWIthRetry sends the update request to API server with retry. +func (r *Reconciler) updateBindingStatusWithRetry(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding) error { + // Retry only for specific errors or conditions + err := r.Client.Status().Update(ctx, resourceBinding) + if err != nil { + klog.ErrorS(err, "Failed to update the resourceBinding status, will retry", "resourceBinding", klog.KObj(resourceBinding), "resourceBindingStatus", resourceBinding.Status) + errAfterRetries := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var latestBinding fleetv1beta1.ClusterResourceBinding + if err := r.Client.Get(ctx, client.ObjectKeyFromObject(resourceBinding), &latestBinding); err != nil { + return err + } + // Work generator is the only controller that updates conditions excluding rollout started which is updated by rollout controller. + if rolloutCond := latestBinding.GetCondition(string(fleetv1beta1.ResourceBindingRolloutStarted)); rolloutCond != nil { + found := false + for i := range resourceBinding.Status.Conditions { + if resourceBinding.Status.Conditions[i].Type == rolloutCond.Type { + // Replace the existing condition + resourceBinding.Status.Conditions[i] = *rolloutCond + found = true + break + } + } + if !found { + // Prepend the new condition if it wasn't found + resourceBinding.Status.Conditions = append([]metav1.Condition{*rolloutCond}, resourceBinding.Status.Conditions...) + } + } + + if err := r.Client.Status().Update(ctx, resourceBinding); err != nil { + klog.ErrorS(err, "Failed to update the resourceBinding status on retry", "resourceBinding", klog.KObj(resourceBinding), "resourceBindingStatus", resourceBinding.Status) + return err + } + klog.V(2).InfoS("Successfully updated the resourceBinding status", "resourceBinding", klog.KObj(resourceBinding), "resourceBindingStatus", resourceBinding.Status) + return nil + }) + if errAfterRetries != nil { + klog.ErrorS(errAfterRetries, "Failed to update binding status after retries", "resourceBinding", klog.KObj(resourceBinding)) + return errAfterRetries + } + return nil + } + return err +} + // handleDelete handle a deleting binding func (r *Reconciler) handleDelete(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding) (controllerruntime.Result, error) { klog.V(4).InfoS("Start to handle deleting resource binding", "resourceBinding", klog.KObj(resourceBinding)) diff --git a/pkg/controllers/workgenerator/controller_integration_test.go b/pkg/controllers/workgenerator/controller_integration_test.go index 90b813036..fdf8dfa49 100644 --- a/pkg/controllers/workgenerator/controller_integration_test.go +++ b/pkg/controllers/workgenerator/controller_integration_test.go @@ -47,7 +47,8 @@ var ( validResourceOverrideSnapshot placementv1alpha1.ResourceOverrideSnapshot invalidClusterResourceOverrideSnapshot placementv1alpha1.ClusterResourceOverrideSnapshot - cmpConditionOption = cmp.Options{cmpopts.SortSlices(utils.LessFuncFailedResourcePlacements), utils.IgnoreConditionLTTAndMessageFields, cmpopts.EquateEmpty()} + cmpConditionOption = cmp.Options{cmpopts.SortSlices(utils.LessFuncFailedResourcePlacements), utils.IgnoreConditionLTTAndMessageFields, cmpopts.EquateEmpty()} + cmpConditionOptionWithLTT = cmp.Options{cmpopts.SortSlices(utils.LessFuncFailedResourcePlacements), cmpopts.EquateEmpty()} fakeFailedAppliedReason = "fakeApplyFailureReason" fakeFailedAppliedMessage = "fake apply failure message" @@ -1308,6 +1309,124 @@ var _ = Describe("Test Work Generator Controller", func() { }, timeout, interval).Should(BeEmpty(), fmt.Sprintf("binding(%s) mismatch (-want +got)", binding.Name)) }) }) + + Context("Should not touch/reset RolloutStarted condition when the binding is updated", func() { + var masterSnapshot *placementv1beta1.ClusterResourceSnapshot + + BeforeEach(func() { + masterSnapshot = generateResourceSnapshot(1, 1, 0, [][]byte{ + testResourceCRD, testNameSpace, testResource, + }) + Expect(k8sClient.Create(ctx, masterSnapshot)).Should(Succeed()) + By(fmt.Sprintf("master resource snapshot %s created", masterSnapshot.Name)) + spec := placementv1beta1.ResourceBindingSpec{ + State: placementv1beta1.BindingStateBound, + ResourceSnapshotName: masterSnapshot.Name, + TargetCluster: memberClusterName, + } + binding = generateClusterResourceBinding(spec) + Expect(k8sClient.Create(ctx, binding)).Should(Succeed()) + By(fmt.Sprintf("resource binding %s created", binding.Name)) + }) + + AfterEach(func() { + By("Deleting master clusterResourceSnapshot") + Expect(k8sClient.Delete(ctx, masterSnapshot)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + }) + + It("Should create all the work in the target namespace after the resource snapshot is created", func() { + // check the binding status till the bound condition is true + Eventually(func() bool { + if err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name}, binding); err != nil { + return false + } + // only check the work created status as the applied status reason changes depends on where the reconcile logic is + return condition.IsConditionStatusTrue( + meta.FindStatusCondition(binding.Status.Conditions, string(placementv1beta1.ResourceBindingWorkSynchronized)), binding.GetGeneration()) + }, timeout, interval).Should(BeTrue(), fmt.Sprintf("binding(%s) condition should be true", binding.Name)) + // check the work is created by now + work := placementv1beta1.Work{} + Eventually(func() error { + return k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(placementv1beta1.FirstWorkNameFmt, testCRPName), Namespace: memberClusterNamespaceName}, &work) + }, timeout, interval).Should(Succeed(), "Failed to get the expected work in hub cluster") + By(fmt.Sprintf("work %s is created in %s", work.Name, work.Namespace)) + //inspect the work + wantWork := placementv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(placementv1beta1.FirstWorkNameFmt, testCRPName), + Namespace: memberClusterNamespaceName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: placementv1beta1.GroupVersion.String(), + Kind: "ClusterResourceBinding", + Name: binding.Name, + UID: binding.UID, + BlockOwnerDeletion: ptr.To(true), + }, + }, + Labels: map[string]string{ + placementv1beta1.CRPTrackingLabel: testCRPName, + placementv1beta1.ParentBindingLabel: binding.Name, + placementv1beta1.ParentResourceSnapshotIndexLabel: "1", + }, + }, + Spec: placementv1beta1.WorkSpec{ + Workload: placementv1beta1.WorkloadTemplate{ + Manifests: []placementv1beta1.Manifest{ + {RawExtension: runtime.RawExtension{Raw: testResourceCRD}}, + {RawExtension: runtime.RawExtension{Raw: testNameSpace}}, + {RawExtension: runtime.RawExtension{Raw: testResource}}, + }, + }, + }, + } + diff := cmp.Diff(wantWork, work, ignoreWorkOption, ignoreTypeMeta) + Expect(diff).Should(BeEmpty(), fmt.Sprintf("work(%s) mismatch (-want +got):\n%s", work.Name, diff)) + // check the binding status that it should be marked as work not applied eventually + verifyBindingStatusSyncedNotApplied(binding, false, true) + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name}, binding)).Should(Succeed()) + rolloutCond := binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted)) + // mark the work applied + markWorkApplied(&work) + // check the binding status that it should be marked as applied true eventually + verifyBindStatusAppliedNotAvailable(binding, false) + checkRolloutStartedNotUpdated(rolloutCond, binding) + // mark the work available + markWorkAvailable(&work) + // check the binding status that it should be marked as available true eventually + verifyBindStatusAvail(binding, false) + checkRolloutStartedNotUpdated(rolloutCond, binding) + }) + + It("Should treat the unscheduled binding as bound and not remove work", func() { + // check the work is created + work := placementv1beta1.Work{} + Eventually(func() error { + return k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(placementv1beta1.FirstWorkNameFmt, testCRPName), Namespace: memberClusterNamespaceName}, &work) + }, timeout, interval).Should(Succeed(), "Failed to get the expected work in hub cluster") + By(fmt.Sprintf("work %s is created in %s", work.Name, work.Namespace)) + // update binding to be unscheduled + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name}, binding)).Should(Succeed()) + rolloutCond := binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted)) + binding.Spec.State = placementv1beta1.BindingStateUnscheduled + Expect(k8sClient.Update(ctx, binding)).Should(Succeed()) + By(fmt.Sprintf("resource binding %s updated to be unscheduled", binding.Name)) + Consistently(func() error { + return k8sClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(placementv1beta1.FirstWorkNameFmt, testCRPName), Namespace: memberClusterNamespaceName}, &work) + }, duration, interval).Should(Succeed(), "controller should not remove work in hub cluster for unscheduled binding") + //inspect the work manifest to make sure it still has the same content + expectedManifest := []placementv1beta1.Manifest{ + {RawExtension: runtime.RawExtension{Raw: testResourceCRD}}, + {RawExtension: runtime.RawExtension{Raw: testNameSpace}}, + {RawExtension: runtime.RawExtension{Raw: testResource}}, + } + diff := cmp.Diff(expectedManifest, work.Spec.Workload.Manifests) + Expect(diff).Should(BeEmpty(), fmt.Sprintf("work manifest(%s) mismatch (-want +got):\n%s", work.Name, diff)) + // check the binding status + verifyBindingStatusSyncedNotApplied(binding, false, false) + checkRolloutStartedNotUpdated(rolloutCond, binding) + }) + }) }) Context("Test Bound ClusterResourceBinding with not found cluster", func() { @@ -1946,3 +2065,9 @@ func markOneManifestAvailable(work *placementv1beta1.Work) { Expect(k8sClient.Status().Update(ctx, work)).Should(Succeed()) By(fmt.Sprintf("resource work `%s` is marked as available", work.Name)) } + +func checkRolloutStartedNotUpdated(rolloutCond *metav1.Condition, binding *placementv1beta1.ClusterResourceBinding) { + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name}, binding)).Should(Succeed()) + diff := cmp.Diff(rolloutCond, binding.GetCondition(string(placementv1beta1.ResourceBindingRolloutStarted)), cmpConditionOptionWithLTT) + Expect(diff).Should(BeEmpty(), fmt.Sprintf("binding(%s) mismatch (-want +got)", binding.Name), diff) +} diff --git a/pkg/controllers/workgenerator/controller_test.go b/pkg/controllers/workgenerator/controller_test.go index 0eb1fdf44..1bd75dc7a 100644 --- a/pkg/controllers/workgenerator/controller_test.go +++ b/pkg/controllers/workgenerator/controller_test.go @@ -6,19 +6,30 @@ Licensed under the MIT license. package workgenerator import ( + "context" "errors" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/controllers/work" "go.goms.io/fleet/pkg/utils/condition" "go.goms.io/fleet/pkg/utils/controller" + "go.goms.io/fleet/test/utils/informer" +) + +var ( + ignoreOption = cmpopts.IgnoreFields(metav1.Condition{}, "Message", "LastTransitionTime") ) func TestGetWorkNamePrefixFromSnapshotName(t *testing.T) { @@ -1484,3 +1495,280 @@ func TestExtractFailedResourcePlacementsFromWork(t *testing.T) { }) } } + +func TestUpdateBindingStatusWithRetry(t *testing.T) { + bindingName := "test-binding" + lastTransitionTime := metav1.NewTime(time.Now()) + tests := []struct { + name string + latestBinding *fleetv1beta1.ClusterResourceBinding + resourceBinding *fleetv1beta1.ClusterResourceBinding + conflictCount int + expectError bool + }{ + { + name: "updates status successfully", + latestBinding: &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Generation: 1, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateBound, + TargetCluster: "cluster-1", + ResourceSnapshotName: "snapshot-1", + }, + Status: fleetv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(fleetv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.RolloutStartedReason, + LastTransitionTime: lastTransitionTime, + }, + }, + }, + }, + resourceBinding: &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Generation: 1, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateBound, + TargetCluster: "cluster-1", + ResourceSnapshotName: "snapshot-1", + }, + Status: fleetv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(fleetv1beta1.ResourceBindingOverridden), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.OverriddenSucceededReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingWorkSynchronized), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkSyncedReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingApplied), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkAppliedReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingAvailable), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkAvailableReason, + }, + }, + }, + }, + conflictCount: 0, + expectError: false, + }, + { + name: "updates status after conflict", + latestBinding: &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Generation: 1, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateBound, + TargetCluster: "cluster-1", + ResourceSnapshotName: "snapshot-1", + }, + Status: fleetv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(fleetv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.RolloutStartedReason, + LastTransitionTime: lastTransitionTime, + }, + }, + }, + }, + resourceBinding: &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Generation: 1, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateBound, + TargetCluster: "cluster-1", + ResourceSnapshotName: "snapshot-1", + }, + Status: fleetv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(fleetv1beta1.ResourceBindingOverridden), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.OverriddenSucceededReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingWorkSynchronized), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkSyncedReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingApplied), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkAppliedReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingAvailable), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkAvailableReason, + }, + }, + }, + }, + conflictCount: 1, + expectError: false, + }, + { + name: "does not update status", + latestBinding: &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Generation: 1, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateBound, + TargetCluster: "cluster-1", + ResourceSnapshotName: "snapshot-1", + }, + Status: fleetv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(fleetv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.RolloutStartedReason, + LastTransitionTime: lastTransitionTime, + }, + }, + }, + }, + resourceBinding: &fleetv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName, + Generation: 1, + }, + Spec: fleetv1beta1.ResourceBindingSpec{ + State: fleetv1beta1.BindingStateBound, + TargetCluster: "cluster-1", + ResourceSnapshotName: "snapshot-1", + }, + Status: fleetv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(fleetv1beta1.ResourceBindingOverridden), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.OverriddenSucceededReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingWorkSynchronized), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkSyncedReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingApplied), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkAppliedReason, + }, + { + Type: string(fleetv1beta1.ResourceBindingAvailable), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AllWorkAvailableReason, + }, + }, + }, + }, + conflictCount: 10, + expectError: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + scheme := serviceScheme(t) + objects := []client.Object{tt.latestBinding} + fakeClient := fake.NewClientBuilder(). + WithStatusSubresource(objects...). + WithScheme(scheme). + WithObjects(objects...). + Build() + + conflictClient := &conflictClient{ + Client: fakeClient, + conflictCount: tt.conflictCount, + } + + // Create reconciler with custom client + r := &Reconciler{ + Client: conflictClient, + recorder: record.NewFakeRecorder(10), + InformerManager: &informer.FakeManager{}, + } + err := r.updateBindingStatusWithRetry(ctx, tt.resourceBinding) + if (err != nil) != tt.expectError { + t.Errorf("updateBindingStatusWithRetry() error = %v, wantErr %v", err, tt.expectError) + } + + binding := &fleetv1beta1.ClusterResourceBinding{} + if err := r.Client.Get(ctx, client.ObjectKeyFromObject(tt.resourceBinding), binding); err != nil { + t.Errorf("updateBindingStatusWithRetry() error = %v, wantErr %v", err, nil) + } + + if tt.conflictCount > 0 { + latestRollout := tt.latestBinding.GetCondition(string(fleetv1beta1.ResourceBindingRolloutStarted)) + rollout := tt.resourceBinding.GetCondition(string(fleetv1beta1.ResourceBindingRolloutStarted)) + if diff := cmp.Diff(latestRollout, rollout, ignoreOption); diff != "" { + t.Errorf("updateBindingStatusWithRetry() ResourceBindingRolloutStarted Condition got = %v, want %v", rollout, latestRollout) + } + } + }) + } +} + +type conflictClient struct { + client.Client + conflictCount int +} + +func (c *conflictClient) Status() client.StatusWriter { + return &conflictStatusWriter{ + StatusWriter: c.Client.Status(), + conflictClient: c, + } +} + +type conflictStatusWriter struct { + client.StatusWriter + conflictClient *conflictClient +} + +func (s *conflictStatusWriter) Update(_ context.Context, _ client.Object, _ ...client.SubResourceUpdateOption) error { + if s.conflictClient.conflictCount > 0 { + s.conflictClient.conflictCount-- + return k8serrors.NewConflict(schema.GroupResource{Resource: "ClusterResourceBinding"}, "test-binding", errors.New("conflict")) + } + return nil +} diff --git a/pkg/controllers/workgenerator/override_test.go b/pkg/controllers/workgenerator/override_test.go index ec44da9cd..7e63497b8 100644 --- a/pkg/controllers/workgenerator/override_test.go +++ b/pkg/controllers/workgenerator/override_test.go @@ -36,6 +36,9 @@ func serviceScheme(t *testing.T) *runtime.Scheme { if err := placementv1alpha1.AddToScheme(scheme); err != nil { t.Fatalf("Failed to add v1alpha1 scheme: %v", err) } + if err := placementv1beta1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add v1beta1 scheme: %v", err) + } return scheme } diff --git a/test/e2e/rollout_test.go b/test/e2e/rollout_test.go index d853a3be9..1db441dfa 100644 --- a/test/e2e/rollout_test.go +++ b/test/e2e/rollout_test.go @@ -9,26 +9,31 @@ import ( "encoding/json" "errors" "fmt" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/utils" + testv1alpha1 "go.goms.io/fleet/test/apis/v1alpha1" "go.goms.io/fleet/test/e2e/framework" "go.goms.io/fleet/test/utils/controller" ) const ( - randomImageName = "random-image-name" + randomImageName = "random-image-name" + testResourceCRDName = "testresources.test.kubernetes-fleet.io" ) // Note that this container will run in parallel with other containers. @@ -542,6 +547,241 @@ var _ = Describe("placing wrapped resources using a CRP", Ordered, func() { ensureCRPAndRelatedResourcesDeletion(crpName, allMemberClusters) }) }) + + Context("Test a CRP place custom resource successfully, should wait to update resource", Ordered, func() { + crpName := fmt.Sprintf(crpNameTemplate, GinkgoParallelProcess()) + workNamespace := appNamespace() + var wantSelectedResources []placementv1beta1.ResourceIdentifier + var testCustomResource testv1alpha1.TestResource + var crp *placementv1beta1.ClusterResourcePlacement + + BeforeAll(func() { + // Create the test resources. + readTestCustomResource(&testCustomResource) + testCustomResource.Namespace = workNamespace.Name + wantSelectedResources = []placementv1beta1.ResourceIdentifier{ + { + Kind: utils.NamespaceKind, + Name: workNamespace.Name, + Version: corev1.SchemeGroupVersion.Version, + }, + { + Group: testv1alpha1.GroupVersion.Group, + Kind: testCustomResource.Kind, + Name: testCustomResource.Name, + Version: testv1alpha1.GroupVersion.Version, + Namespace: workNamespace.Name, + }, + { + Group: utils.CRDMetaGVK.Group, + Kind: utils.CRDMetaGVK.Kind, + Name: testResourceCRDName, + Version: utils.CRDMetaGVK.Version, + }, + } + }) + + It("create the resources", func() { + Expect(hubClient.Create(ctx, &workNamespace)).To(Succeed(), "Failed to create namespace %s", workNamespace.Name) + Expect(hubClient.Create(ctx, &testCustomResource)).To(Succeed(), "Failed to create test custom resource %s", testCustomResource.GetName()) + }) + + It("create the CRP that select the namespace and CRD", func() { + crp = buildCRPForSafeRollout() + crdClusterResourceSelector := placementv1beta1.ClusterResourceSelector{ + Group: utils.CRDMetaGVK.Group, + Kind: utils.CRDMetaGVK.Kind, + Version: utils.CRDMetaGVK.Version, + Name: testResourceCRDName, + } + crp.Spec.ResourceSelectors = append(crp.Spec.ResourceSelectors, crdClusterResourceSelector) + crp.Spec.Policy = &placementv1beta1.PlacementPolicy{ + PlacementType: placementv1beta1.PickFixedPlacementType, + ClusterNames: []string{ + memberCluster1EastProdName, + }, + } + crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds = ptr.To(60) + Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP") + }) + + It("should update CRP status as expected", func() { + crpStatusUpdatedActual := customizedCRPStatusUpdatedActual(crpName, wantSelectedResources, []string{memberCluster1EastProdName}, nil, "0", false) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected") + }) + + It("should place the resources on member cluster", func() { + workResourcesPlacedActual := waitForTestResourceToBePlaced(memberCluster1EastProd, &testCustomResource) + Eventually(workResourcesPlacedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to place work resources on member cluster %s", memberCluster1EastProd.ClusterName) + }) + + It("update the custom resource", func() { + Eventually(func() error { + var cr testv1alpha1.TestResource + err := hubClient.Get(ctx, types.NamespacedName{Name: testCustomResource.Name, Namespace: workNamespace.Name}, &cr) + if err != nil { + return err + } + cr.Spec.Foo = "bar1" // Previously was "foo1" + return hubClient.Update(ctx, &cr) + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update custom resource") + }) + + It("should not update the resource on member cluster before the unavailable second", func() { + // subtracting 5 seconds because transition between IT takes ~1 second + unavailablePeriod := time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds)*time.Second - (5 * time.Second) + Consistently(func() bool { + var cr testv1alpha1.TestResource + err := memberCluster1EastProd.KubeClient.Get(ctx, types.NamespacedName{Name: testCustomResource.Name, Namespace: workNamespace.Name}, &cr) + if err != nil { + klog.Errorf("Failed to get custom resource %s/%s: %v", workNamespace.Name, testCustomResource.Name, err) + return false + } + if cr.Spec.Foo == "foo1" { // Previously was "foo1" + return true + } + return false + }, unavailablePeriod, consistentlyInterval).Should(BeTrue(), "Test resource was updated when it shouldn't be") + }) + + It("should update CRP status as expected", func() { + crpStatusUpdatedActual := customizedCRPStatusUpdatedActual(crpName, wantSelectedResources, []string{memberCluster1EastProdName}, nil, "1", false) + Eventually(crpStatusUpdatedActual, longEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the CRP. + ensureCRPAndRelatedResourcesDeletion(crpName, []*framework.Cluster{memberCluster1EastProd}) + }) + }) + + Context("Test a CRP place custom resource successfully, should wait to update resource on multiple member clusters", Ordered, func() { + crpName := fmt.Sprintf(crpNameTemplate, GinkgoParallelProcess()) + workNamespace := appNamespace() + var wantSelectedResources []placementv1beta1.ResourceIdentifier + var testCustomResource testv1alpha1.TestResource + var crp *placementv1beta1.ClusterResourcePlacement + + BeforeAll(func() { + // Create the test resources. + readTestCustomResource(&testCustomResource) + testCustomResource.Namespace = workNamespace.Name + wantSelectedResources = []placementv1beta1.ResourceIdentifier{ + { + Kind: utils.NamespaceKind, + Name: workNamespace.Name, + Version: corev1.SchemeGroupVersion.Version, + }, + { + Group: testv1alpha1.GroupVersion.Group, + Kind: testCustomResource.Kind, + Name: testCustomResource.Name, + Version: testv1alpha1.GroupVersion.Version, + Namespace: workNamespace.Name, + }, + { + Group: utils.CRDMetaGVK.Group, + Kind: utils.CRDMetaGVK.Kind, + Name: testResourceCRDName, + Version: utils.CRDMetaGVK.Version, + }, + } + }) + + It("create the resources", func() { + Expect(hubClient.Create(ctx, &workNamespace)).To(Succeed(), "Failed to create namespace %s", workNamespace.Name) + Expect(hubClient.Create(ctx, &testCustomResource)).To(Succeed(), "Failed to create test custom resource %s", testCustomResource.GetName()) + }) + + It("create the CRP that select the namespace and CRD", func() { + crp = buildCRPForSafeRollout() + crdClusterResourceSelector := placementv1beta1.ClusterResourceSelector{ + Group: utils.CRDMetaGVK.Group, + Kind: utils.CRDMetaGVK.Kind, + Version: utils.CRDMetaGVK.Version, + Name: testResourceCRDName, + } + crp.Spec.ResourceSelectors = append(crp.Spec.ResourceSelectors, crdClusterResourceSelector) + crp.Spec.Policy = &placementv1beta1.PlacementPolicy{ + PlacementType: placementv1beta1.PickAllPlacementType, + } + crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds = ptr.To(60) + Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP") + }) + + It("should update CRP status as expected", func() { + crpStatusUpdatedActual := customizedCRPStatusUpdatedActual(crpName, wantSelectedResources, allMemberClusterNames, nil, "0", false) + Eventually(crpStatusUpdatedActual, longEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected") + }) + + It("should place the resources on member clusters", func() { + for idx := range allMemberClusters { + memberCluster := allMemberClusters[idx] + workResourcesPlacedActual := waitForTestResourceToBePlaced(memberCluster, &testCustomResource) + Eventually(workResourcesPlacedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to place work resources on member cluster %s", memberCluster.ClusterName) + } + }) + + It("update the custom resource", func() { + Eventually(func() error { + var cr testv1alpha1.TestResource + err := hubClient.Get(ctx, types.NamespacedName{Name: testCustomResource.Name, Namespace: workNamespace.Name}, &cr) + if err != nil { + return err + } + cr.Spec.Foo = "bar1" // Previously was "foo1" + return hubClient.Update(ctx, &cr) + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update custom resource") + }) + + It("should update one member cluster", func() { + // adding a buffer of 5 seconds + unavailablePeriod := time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds)*time.Second + (5 * time.Second) + Eventually(func() bool { + // Check the number of clusters meeting the condition + countClustersMeetingCondition := func() int { + count := 0 + for _, cluster := range allMemberClusters { + if !checkCluster(cluster, testCustomResource.Name, workNamespace.Name) { + // resource field updated to "bar1" + count++ + } + } + return count + } + return countClustersMeetingCondition() == 1 + }, unavailablePeriod, eventuallyInterval).Should(BeTrue(), "Test resource was updated when it shouldn't be") + }) + + It("should not rollout update to the next member cluster before unavailable second", func() { + // subtracting a buffer of 5 seconds + unavailablePeriod := time.Duration(*crp.Spec.Strategy.RollingUpdate.UnavailablePeriodSeconds)*time.Second - (5 * time.Second) + Consistently(func() bool { + // Check the number of clusters meeting the condition + countClustersMeetingCondition := func() int { + count := 0 + for _, cluster := range allMemberClusters { + if !checkCluster(cluster, testCustomResource.Name, workNamespace.Name) { + // resource field updated to "bar1" + count++ + } + } + return count + } + return countClustersMeetingCondition() == 1 + }, unavailablePeriod, consistentlyInterval).Should(BeTrue(), "Test resource was updated when it shouldn't be") + }) + + It("should update CRP status as expected", func() { + crpStatusUpdatedActual := customizedCRPStatusUpdatedActual(crpName, wantSelectedResources, allMemberClusterNames, nil, "1", false) + Eventually(crpStatusUpdatedActual, longEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the CRP. + ensureCRPAndRelatedResourcesDeletion(crpName, allMemberClusters) + }) + }) }) // createWrappedResourcesForRollout creates an enveloped resource on the hub cluster with a workload object for testing purposes. @@ -552,6 +792,16 @@ func createWrappedResourcesForRollout(testEnvelopeObj *corev1.ConfigMap, obj met Expect(hubClient.Create(ctx, testEnvelopeObj)).To(Succeed(), "Failed to create testEnvelop object %s containing %s", testEnvelopeObj.Name, kind) } +func checkCluster(cluster *framework.Cluster, name, namespace string) bool { + var cr testv1alpha1.TestResource + err := cluster.KubeClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &cr) + if err != nil { + klog.Errorf("Failed to get custom resource %s/%s: %v", namespace, name, err) + return false + } + return cr.Spec.Foo == "foo1" // Check that the resource's field is as expected +} + func waitForDeploymentPlacementToReady(memberCluster *framework.Cluster, testDeployment *appv1.Deployment) func() error { return func() error { if err := validateWorkNamespaceOnCluster(memberCluster, types.NamespacedName{Name: testDeployment.Namespace}); err != nil { @@ -645,6 +895,39 @@ func waitForJobToBePlaced(memberCluster *framework.Cluster, testJob *batchv1.Job } } +func waitForTestResourceToBePlaced(memberCluster *framework.Cluster, testResource *testv1alpha1.TestResource) func() error { + return func() error { + if err := validateWorkNamespaceOnCluster(memberCluster, types.NamespacedName{Name: testResource.Namespace}); err != nil { + return err + } + By("check the placedTestResource") + return memberCluster.KubeClient.Get(ctx, types.NamespacedName{Namespace: testResource.Namespace, Name: testResource.Name}, &testv1alpha1.TestResource{}) + } +} + +func waitForCRDToBeReady(crdName string) { + Eventually(func() error { // wait for CRD to be created + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := hubClient.Get(ctx, types.NamespacedName{Name: crdName}, crd); err != nil { + return err + } + if crd.Status.Conditions == nil { + return fmt.Errorf("CRD status conditions are nil for %s", crdName) + } + + for _, cond := range crd.Status.Conditions { + if cond.Type == apiextensionsv1.Established && cond.Status != apiextensionsv1.ConditionTrue { + return fmt.Errorf("CRD is not established: %s", crdName) + } + if cond.Type == apiextensionsv1.NamesAccepted && cond.Status != apiextensionsv1.ConditionTrue { + return fmt.Errorf("CRD names are not accepted: %s", crdName) + } + } + + return nil + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "CRD failed to be ready %s", crdName) +} + func buildCRPForSafeRollout() *placementv1beta1.ClusterResourcePlacement { return &placementv1beta1.ClusterResourcePlacement{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/setup_test.go b/test/e2e/setup_test.go index 7cb2b82b0..136409529 100644 --- a/test/e2e/setup_test.go +++ b/test/e2e/setup_test.go @@ -35,6 +35,7 @@ import ( placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/propertyprovider/azure/trackers" "go.goms.io/fleet/pkg/utils" + testv1alpha1 "go.goms.io/fleet/test/apis/v1alpha1" "go.goms.io/fleet/test/e2e/framework" ) @@ -226,8 +227,8 @@ func TestMain(m *testing.M) { if err := fleetnetworkingv1alpha1.AddToScheme(scheme); err != nil { log.Fatalf("failed to add custom APIs (networking) to the runtime scheme: %v", err) } - if err := placementv1alpha1.AddToScheme(scheme); err != nil { - log.Fatalf("failed to add custom APIs (placement) to the runtime scheme: %v", err) + if err := testv1alpha1.AddToScheme(scheme); err != nil { + log.Fatalf("failed to add custom APIs (test) to the runtime scheme: %v", err) } // Add built-in APIs and extensions to the scheme. @@ -321,12 +322,14 @@ func beforeSuiteForProcess1() { // Note that these clusters are not real kind clusters. setupInvalidClusters() createResourcesForFleetGuardRail() + createTestResourceCRD() } var _ = SynchronizedBeforeSuite(beforeSuiteForProcess1, beforeSuiteForAllProcesses) var _ = SynchronizedAfterSuite(func() {}, func() { deleteResourcesForFleetGuardRail() + deleteTestResourceCRD() setAllMemberClustersToLeave() checkIfAllMemberClustersHaveLeft() cleanupInvalidClusters() diff --git a/test/e2e/utils_test.go b/test/e2e/utils_test.go index 4fa95ce1e..99f33e997 100644 --- a/test/e2e/utils_test.go +++ b/test/e2e/utils_test.go @@ -20,6 +20,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -39,6 +40,7 @@ import ( "go.goms.io/fleet/pkg/propertyprovider/azure/trackers" "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/condition" + testv1alpha1 "go.goms.io/fleet/test/apis/v1alpha1" "go.goms.io/fleet/test/e2e/framework" ) @@ -543,6 +545,22 @@ func deleteResourcesForFleetGuardRail() { Expect(hubClient.Delete(ctx, &cr)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) } +func deleteTestResourceCRD() { + crd := apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testresources.test.kubernetes-fleet.io", + }, + } + Expect(hubClient.Delete(ctx, &crd)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) +} + +func createTestResourceCRD() { + var crd apiextensionsv1.CustomResourceDefinition + readTestCustomResourceDefinition(&crd) + Expect(hubClient.Create(ctx, &crd)).To(Succeed(), "Failed to create test custom resource definition %s", crd.Name) + waitForCRDToBeReady(crd.Name) +} + // cleanupMemberCluster removes finalizers (if any) from the member cluster, and // wait until its final removal. func cleanupMemberCluster(memberClusterName string) { @@ -1113,6 +1131,19 @@ func checkIfOverrideAnnotationsOnAllMemberClusters(includeNamespace bool, wantAn } } +func readTestCustomResource(customResource *testv1alpha1.TestResource) { + By("Read the custom resource") + err := utils.GetObjectFromManifest("../manifests/test-resource.yaml", customResource) + customResource.Name = fmt.Sprintf("%s-%d", customResource.Name, GinkgoParallelProcess()) + Expect(err).Should(Succeed()) +} + +func readTestCustomResourceDefinition(crd *apiextensionsv1.CustomResourceDefinition) { + By("Read the custom resource definition") + err := utils.GetObjectFromManifest("../manifests/test_testresources_crd.yaml", crd) + Expect(err).Should(Succeed()) +} + func readDeploymentTestManifest(testDeployment *appsv1.Deployment) { By("Read the deployment resource") err := utils.GetObjectFromManifest("resources/test-deployment.yaml", testDeployment)