From 99a92868b9a12fbdcfadada8bc54e1393b4d7a34 Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Mon, 6 Jan 2025 13:34:15 +0700 Subject: [PATCH 1/3] internal: added helper for status update --- internal/result/result.go | 80 +++++++++++++++++++++++++++++++++++++++ internal/status/status.go | 58 ++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 internal/result/result.go create mode 100644 internal/status/status.go diff --git a/internal/result/result.go b/internal/result/result.go new file mode 100644 index 00000000..d88cf07e --- /dev/null +++ b/internal/result/result.go @@ -0,0 +1,80 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package result + +import ( + "errors" + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + +// Transient checks if the error is transient and returns a result +// that indicates whether the request should be retried. +func Transient(err error) (r ctrl.Result, _ error) { + if t := (*transientError)(nil); errors.As(err, &t) { + return Retry(t.after) + } + // Permanent errors are not returned as errors because they cause + // the controller to requeue indefinitely. Instead, they should be + // reported as a status condition. + return OK() +} + +// OK returns a successful result +func OK() (ctrl.Result, error) { + return ctrl.Result{}, nil +} + +// Failed returns a failed result +func Failed() (ctrl.Result, error) { + return Retry(0) +} + +// Retry requeues the request after the specified number of seconds +func Retry(after int) (ctrl.Result, error) { + return ctrl.Result{ + Requeue: true, + RequeueAfter: time.Second * time.Duration(after), + }, nil +} + +// transientError is an error that should be retried. +type transientError struct { + err error + after int +} + +func (t *transientError) Error() string { return t.err.Error() } +func (t *transientError) Unwrap() error { return t.err } + +// TransientError wraps an error to indicate that it should be retried. +func TransientError(err error) error { + return TransientErrorAfter(err, 5) +} + +// TransientErrorAfter wraps an error to indicate that it should be retried after +// the given duration. +func TransientErrorAfter(err error, after int) error { + if err == nil { + return nil + } + return &transientError{err: err, after: after} +} + +func isTransient(err error) bool { + var t *transientError + return errors.As(err, &t) +} diff --git a/internal/status/status.go b/internal/status/status.go new file mode 100644 index 00000000..e6d43641 --- /dev/null +++ b/internal/status/status.go @@ -0,0 +1,58 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package status + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ( + // OptionBuilder is an interface that can be implemented + // by any type that can provide a list of options + OptionBuilder[T any] interface { + GetOptions() []Option[T] + } + // Option is an interface that can be implemented by any type + // that can apply an option to a resource and return a result + Option[T any] interface { + ApplyOption(o T) + GetResult() (ctrl.Result, error) + } +) + +// Update takes the options provided by the given option builder, applies them all and then updates the resource +func Update[T client.Object](ctx context.Context, sw client.StatusWriter, obj T, b OptionBuilder[T]) (r ctrl.Result, err error) { + opts := b.GetOptions() + for _, o := range opts { + o.ApplyOption(obj) + } + if err := sw.Update(ctx, obj); err != nil { + return ctrl.Result{}, err + } + for _, o := range opts { + if r, err = o.GetResult(); err != nil { + return r, err + } + } + for _, o := range opts { + if r, _ := o.GetResult(); r.Requeue || r.RequeueAfter > 0 { + return r, nil + } + } + return ctrl.Result{}, nil +} From 328a20fe7a95e30dd9c1ec12f4451dd65f751c0d Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Mon, 6 Jan 2025 13:35:53 +0700 Subject: [PATCH 2/3] controller: added status builder for `AtlasSchema` --- .../controller/atlasmigration_controller.go | 6 +- internal/controller/atlasschema_controller.go | 40 ++--- internal/controller/atlasschema_status.go | 152 ++++++++++++++++++ internal/controller/common.go | 2 +- internal/result/errors.go | 36 +++++ internal/result/result.go | 46 ++---- 6 files changed, 221 insertions(+), 61 deletions(-) create mode 100644 internal/controller/atlasschema_status.go create mode 100644 internal/result/errors.go diff --git a/internal/controller/atlasmigration_controller.go b/internal/controller/atlasmigration_controller.go index 3f9b5acb..0f3106c3 100644 --- a/internal/controller/atlasmigration_controller.go +++ b/internal/controller/atlasmigration_controller.go @@ -134,7 +134,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque } res.SetNotReady(reason, err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -156,7 +156,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return xresult(err) } } // Reconcile given resource @@ -164,7 +164,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque if err != nil { r.recordErrEvent(res, err) } - return result(err) + return xresult(err) } func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) { diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index faa61f8e..24694170 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -134,7 +134,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("ReadSchema", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } if data.hasTargets() { res.SetNotReady("ReadSchema", "Multiple targets are not supported") @@ -144,7 +144,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("CalculatingHash", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -163,7 +163,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return xresult(err) } } opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} @@ -190,14 +190,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("CreatingWorkingDir", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } defer wd.Close() cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { res.SetNotReady("CreatingAtlasClient", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } var whoami *atlasexec.WhoAmI switch whoami, err = cli.WhoAmI(ctx); { @@ -207,12 +207,12 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = errors.New("login is required to use custom atlas.hcl config") res.SetNotReady("WhoAmI", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } case err != nil: res.SetNotReady("WhoAmI", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("LintPolicyError", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } switch desiredURL := data.Desired.String(); { // The resource is connected to Atlas Cloud. @@ -231,7 +231,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) m.setLintReview(dbv1alpha1.LintReviewError, false) }) if err != nil { - return result(err) + return xresult(err) } params := &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -268,7 +268,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ Env: data.EnvName, @@ -285,7 +285,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } desiredURL = state.URL } @@ -315,7 +315,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) default: log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL) res.Status.PlanURL = plan.File.URL @@ -342,7 +342,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) // There are multiple pending plans. This is an unexpected state. case len(plans) > 1: planURLs := make([]string, 0, len(plans)) @@ -355,7 +355,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) err = errors.New(msg) r.recordErrEvent(res, err) - return result(err) + return xresult(err) // There are no pending plans, but Atlas has been asked to review the changes ALWAYS. case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways: // Create a plan for the pending changes. @@ -390,7 +390,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err = editAtlasHCL(func(m *managedData) { m.enableDestructive(true) }); err != nil { - return result(err) + return xresult(err) } err = r.lint(ctx, wd, data, data.Vars) switch d := (*destructiveErr)(nil); { @@ -408,13 +408,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // Revert the destructive linting policy back to the original value. if err = editAtlasHCL(func(m *managedData) { m.Policy.Lint.Destructive.Error = false }); err != nil { - return result(err) + return xresult(err) } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -433,7 +433,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -459,14 +459,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } s := dbv1alpha1.AtlasSchemaStatus{ LastApplied: time.Now().Unix(), ObservedHash: hash, } if len(reports) != 1 { - return result(fmt.Errorf("unexpected number of schema reports: %d", len(reports))) + return xresult(fmt.Errorf("unexpected number of schema reports: %d", len(reports))) } log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied)) // Truncate the applied and pending changes to 1024 bytes. diff --git a/internal/controller/atlasschema_status.go b/internal/controller/atlasschema_status.go new file mode 100644 index 00000000..b2b9dd51 --- /dev/null +++ b/internal/controller/atlasschema_status.go @@ -0,0 +1,152 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "fmt" + + "ariga.io/atlas-go-sdk/atlasexec" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/ariga/atlas-operator/api/v1alpha1" + "github.com/ariga/atlas-operator/internal/result" + "github.com/ariga/atlas-operator/internal/status" +) + +type optionBuilder struct { + opts []status.Option[*v1alpha1.AtlasSchema] +} + +// statusOptions returns an initialized optionBuilder +func statusOptions() *optionBuilder { + return &optionBuilder{} +} + +// GetOptions implements the OptionBuilder interface +func (o *optionBuilder) GetOptions() []status.Option[*v1alpha1.AtlasSchema] { + return o.opts +} + +func (o *optionBuilder) withCondition(condition metav1.Condition) *optionBuilder { + o.opts = append(o.opts, conditionOption{cond: condition}) + return o +} + +func (o *optionBuilder) withObservedHash(h string) *optionBuilder { + o.opts = append(o.opts, observedHashOption{hash: h}) + return o +} + +func (o *optionBuilder) withPlanFile(f *atlasexec.SchemaPlanFile) *optionBuilder { + o.opts = append(o.opts, planFileOption{file: f}) + return o +} + +func (o *optionBuilder) withReport(p *atlasexec.SchemaApply) *optionBuilder { + // Truncate the applied and pending changes to 1024 bytes. + p.Changes.Applied = truncateSQL(p.Changes.Applied, sqlLimitSize) + p.Changes.Pending = truncateSQL(p.Changes.Pending, sqlLimitSize) + o.opts = append(o.opts, reportOption{report: p}) + if plan := p.Plan; plan != nil { + return o.withPlanFile(plan.File) + } + return o +} + +type observedHashOption struct { + hash string +} + +func (o observedHashOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + obj.Status.ObservedHash = o.hash +} + +func (o observedHashOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +type reportOption struct { + report *atlasexec.SchemaApply +} + +func (m reportOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + var msg string + if j, err := json.Marshal(m.report); err != nil { + msg = fmt.Sprintf("Error marshalling apply response: %v", err) + } else { + msg = fmt.Sprintf("The schema has been applied successfully. Apply response: %s", j) + } + obj.Status.LastApplied = m.report.End.Unix() + meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionTrue, + Reason: "Applied", + Message: msg, + }) +} + +func (m reportOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +func (o *optionBuilder) withNotReady(reason string, err error) *optionBuilder { + err = IgnoreNonTransient(err) + o.opts = append(o.opts, conditionOption{ + cond: metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + Reason: reason, + Message: err.Error(), + }, + err: err, + }) + return o +} + +type planFileOption struct { + file *atlasexec.SchemaPlanFile +} + +func (m planFileOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + obj.Status.PlanURL = m.file.URL + obj.Status.PlanLink = m.file.Link +} + +func (m planFileOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +type conditionOption struct { + cond metav1.Condition + err error +} + +func (m conditionOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + meta.SetStatusCondition(&obj.Status.Conditions, m.cond) +} + +func (m conditionOption) GetResult() (reconcile.Result, error) { + return result.Transient(m.err) +} + +func IgnoreNonTransient(err error) error { + if err == nil || result.IsTransient(err) { + return nil + } + return err +} diff --git a/internal/controller/common.go b/internal/controller/common.go index 4c1ba892..edb9f17b 100644 --- a/internal/controller/common.go +++ b/internal/controller/common.go @@ -185,7 +185,7 @@ func isTransient(err error) bool { // Permanent errors are not returned as errors because they cause // the controller to requeue indefinitely. Instead, they should be // reported as a status condition. -func result(err error) (r ctrl.Result, _ error) { +func xresult(err error) (r ctrl.Result, _ error) { if t := (*transientError)(nil); errors.As(err, &t) { r.RequeueAfter = t.after } diff --git a/internal/result/errors.go b/internal/result/errors.go new file mode 100644 index 00000000..c7c40006 --- /dev/null +++ b/internal/result/errors.go @@ -0,0 +1,36 @@ +package result + +import ( + "errors" +) + +// TransientError is an error that should be retried. +type TransientError struct { + Err error + After int +} + +// Error implements the error interface +func (t *TransientError) Error() string { + return t.Err.Error() +} + +// Unwrap implements the errors.Wrapper interface +func (t *TransientError) Unwrap() error { + return t.Err +} + +// TransientErrorAfter wraps an error to indicate that it should be retried after +// the given duration. +func TransientErrorAfter(err error, after int) error { + if err == nil { + return nil + } + return &TransientError{Err: err, After: after} +} + +// IsTransient checks if the error is transient +func IsTransient(err error) bool { + var t *TransientError + return errors.As(err, &t) +} diff --git a/internal/result/result.go b/internal/result/result.go index d88cf07e..4136d738 100644 --- a/internal/result/result.go +++ b/internal/result/result.go @@ -21,18 +21,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -// Transient checks if the error is transient and returns a result -// that indicates whether the request should be retried. -func Transient(err error) (r ctrl.Result, _ error) { - if t := (*transientError)(nil); errors.As(err, &t) { - return Retry(t.after) - } - // Permanent errors are not returned as errors because they cause - // the controller to requeue indefinitely. Instead, they should be - // reported as a status condition. - return OK() -} - // OK returns a successful result func OK() (ctrl.Result, error) { return ctrl.Result{}, nil @@ -51,30 +39,14 @@ func Retry(after int) (ctrl.Result, error) { }, nil } -// transientError is an error that should be retried. -type transientError struct { - err error - after int -} - -func (t *transientError) Error() string { return t.err.Error() } -func (t *transientError) Unwrap() error { return t.err } - -// TransientError wraps an error to indicate that it should be retried. -func TransientError(err error) error { - return TransientErrorAfter(err, 5) -} - -// TransientErrorAfter wraps an error to indicate that it should be retried after -// the given duration. -func TransientErrorAfter(err error, after int) error { - if err == nil { - return nil +// Transient checks if the error is transient and returns a result +// that indicates whether the request should be retried. +func Transient(err error) (ctrl.Result, error) { + if t := (*TransientError)(nil); errors.As(err, &t) { + return Retry(t.After) } - return &transientError{err: err, after: after} -} - -func isTransient(err error) bool { - var t *transientError - return errors.As(err, &t) + // Permanent errors are not returned as errors because they cause + // the controller to requeue indefinitely. Instead, they should be + // reported as a status condition. + return OK() } From 91a1714bdd6287d3482b85cc7acf8140bbd668a9 Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Wed, 8 Jan 2025 12:59:00 +0700 Subject: [PATCH 3/3] a --- internal/controller/atlasschema_controller.go | 60 +++++++++---------- internal/controller/atlasschema_status.go | 13 ++++ 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index 24694170..f3a50443 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -30,6 +30,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -43,6 +44,8 @@ import ( "github.com/ariga/atlas-operator/api/v1alpha1" dbv1alpha1 "github.com/ariga/atlas-operator/api/v1alpha1" "github.com/ariga/atlas-operator/internal/controller/watch" + "github.com/ariga/atlas-operator/internal/result" + "github.com/ariga/atlas-operator/internal/status" "github.com/hashicorp/hcl/v2/hclwrite" "github.com/zclconf/go-cty/cty" ) @@ -108,7 +111,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) res = &dbv1alpha1.AtlasSchema{} ) if err = r.Get(ctx, req.NamespacedName, res); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) + if apierrors.IsNotFound(err) { + return result.OK() + } + return result.Failed() } defer func() { // At the end of reconcile, update the status of the resource base on the error @@ -127,31 +133,29 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }() // When the resource is first created, create the "Ready" condition. if len(res.Status.Conditions) == 0 { - res.SetNotReady("Reconciling", "Reconciling") - return ctrl.Result{Requeue: true}, nil + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withReconciling("Reconciling")) } data, err := r.extractData(ctx, res) if err != nil { - res.SetNotReady("ReadSchema", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("ReadSchema", err)) } if data.hasTargets() { - res.SetNotReady("ReadSchema", "Multiple targets are not supported") - return ctrl.Result{}, nil + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("ReadSchema", errors.New("multiple targets are not supported"))) } hash, err := data.hash() if err != nil { - res.SetNotReady("CalculatingHash", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("CalculatingHash", err)) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. // This is to ensure that other tools know we are still applying the changes. if res.IsReady() && res.IsHashModified(hash) { - res.SetNotReady("Reconciling", "current schema does not match last applied") - return ctrl.Result{Requeue: true}, nil + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withReconciling("current schema does not match last applied")) } // ==================================================== // Starting area to handle the heavy jobs. @@ -162,8 +166,8 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // spin up a dev-db and get the connection string. data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { - res.SetNotReady("GettingDevDB", err.Error()) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("GettingDevDB", err)) } } opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} @@ -188,40 +192,34 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return err } if err != nil { - res.SetNotReady("CreatingWorkingDir", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("CreatingWorkingDir", err)) } defer wd.Close() cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { - res.SetNotReady("CreatingAtlasClient", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("CreatingAtlasClient", err)) } var whoami *atlasexec.WhoAmI switch whoami, err = cli.WhoAmI(ctx); { case errors.Is(err, atlasexec.ErrRequireLogin): log.Info("the resource is not connected to Atlas Cloud") if data.Config != nil { - err = errors.New("login is required to use custom atlas.hcl config") - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("WhoAmI", errors.New("login is required to use custom atlas.hcl config"))) } case err != nil: - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("WhoAmI", err)) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } var reports []*atlasexec.SchemaApply shouldLint, err := data.shouldLint() if err != nil { - res.SetNotReady("LintPolicyError", err.Error()) - r.recordErrEvent(res, err) - return xresult(err) + return status.Update(ctx, r.Client.Status(), res, statusOptions(). + withNotReady("LintPolicyError", err)) } switch desiredURL := data.Desired.String(); { // The resource is connected to Atlas Cloud. diff --git a/internal/controller/atlasschema_status.go b/internal/controller/atlasschema_status.go index b2b9dd51..46df9dd3 100644 --- a/internal/controller/atlasschema_status.go +++ b/internal/controller/atlasschema_status.go @@ -104,6 +104,19 @@ func (m reportOption) GetResult() (reconcile.Result, error) { return result.OK() } +func (o *optionBuilder) withReconciling(message string) *optionBuilder { + o.opts = append(o.opts, conditionOption{ + cond: metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + Reason: "Reconciling", + Message: message, + }, + err: &result.TransientError{}, + }) + return o +} + func (o *optionBuilder) withNotReady(reason string, err error) *optionBuilder { err = IgnoreNonTransient(err) o.opts = append(o.opts, conditionOption{