From 0fc09578be7f7bd2b39c81c092841964722eae7c Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Wed, 2 Oct 2024 09:37:04 +0700 Subject: [PATCH] atlas/schema: use `URL` as desired state --- api/v1alpha1/atlasschema_types.go | 25 ++++++---- api/v1alpha1/types_test.go | 22 ++++----- internal/controller/atlasschema_controller.go | 47 ++++++++++--------- .../controller/atlasschema_controller_test.go | 7 ++- internal/controller/lint.go | 11 +++-- .../controller/templates/atlas_schema.tmpl | 1 - 6 files changed, 62 insertions(+), 51 deletions(-) diff --git a/api/v1alpha1/atlasschema_types.go b/api/v1alpha1/atlasschema_types.go index 9442479b..9991a56c 100644 --- a/api/v1alpha1/atlasschema_types.go +++ b/api/v1alpha1/atlasschema_types.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "net/url" "path/filepath" "strings" @@ -200,29 +201,35 @@ func (sc *AtlasSchema) SetNotReady(reason, msg string) { }) } -// Content returns the desired schema of the AtlasSchema. -func (s Schema) Content(ctx context.Context, r client.Reader, ns string) ([]byte, string, error) { +// Schema reader types (URL schemes). +const ( + SchemaTypeAtlas = "atlas" + SchemaTypeFile = "file" +) + +// Desired returns the desired schema of the AtlasSchema. +func (s Schema) DesiredState(ctx context.Context, r client.Reader, ns string) (*url.URL, []byte, error) { switch ref := s.ConfigMapKeyRef; { case ref != nil: val := &corev1.ConfigMap{} err := r.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: ns}, val) if err != nil { - return nil, "", err + return nil, nil, err } // Guess the schema file format based on the key's extension. ext := strings.ToLower(filepath.Ext(ref.Key)) switch desired, ok := val.Data[ref.Key]; { case !ok: - return nil, "", fmt.Errorf("configmaps %s/%s does not contain key %q", ns, ref.Name, ref.Key) + return nil, nil, fmt.Errorf("configmaps %s/%s does not contain key %q", ns, ref.Name, ref.Key) case ext == ".hcl" || ext == ".sql": - return []byte(desired), ext[1:], nil + return &url.URL{Scheme: SchemaTypeFile, Path: "schema" + ext}, []byte(desired), nil default: - return nil, "", fmt.Errorf("configmaps key %q must be ending with .sql or .hcl, received %q", ref.Key, ext) + return nil, nil, fmt.Errorf("configmaps key %q must be ending with .sql or .hcl, received %q", ref.Key, ext) } case s.HCL != "": - return []byte(s.HCL), "hcl", nil + return &url.URL{Scheme: SchemaTypeFile, Path: "schema.hcl"}, []byte(s.HCL), nil case s.SQL != "": - return []byte(s.SQL), "sql", nil + return &url.URL{Scheme: SchemaTypeFile, Path: "schema.sql"}, []byte(s.SQL), nil } - return nil, "", fmt.Errorf("no desired schema specified") + return nil, nil, fmt.Errorf("no desired state specified") } diff --git a/api/v1alpha1/types_test.go b/api/v1alpha1/types_test.go index bc5e9647..71aa4e71 100644 --- a/api/v1alpha1/types_test.go +++ b/api/v1alpha1/types_test.go @@ -128,19 +128,19 @@ func TestSchema_Content(t *testing.T) { Build() ) // error - _, _, err := sch.Content(ctx, client, "default") - require.ErrorContains(t, err, "no desired schema specified") + _, _, err := sch.DesiredState(ctx, client, "default") + require.ErrorContains(t, err, "no desired state specified") sch.SQL = "bar" - data, ext, err := sch.Content(ctx, client, "default") + u, data, err := sch.DesiredState(ctx, client, "default") require.NoError(t, err) - require.Equal(t, "sql", ext) + require.Equal(t, "file://schema.sql", u.String()) require.Equal(t, []byte("bar"), data) sch.HCL = "foo" - data, ext, err = sch.Content(ctx, client, "default") + u, data, err = sch.DesiredState(ctx, client, "default") require.NoError(t, err) - require.Equal(t, "hcl", ext) + require.Equal(t, "file://schema.hcl", u.String()) require.Equal(t, []byte("foo"), data) // Should return the content from the configmap @@ -150,21 +150,21 @@ func TestSchema_Content(t *testing.T) { }, Key: "schema.sql", } - data, ext, err = sch.Content(ctx, client, "default") + u, data, err = sch.DesiredState(ctx, client, "default") require.NoError(t, err) - require.Equal(t, "sql", ext) + require.Equal(t, "file://schema.sql", u.String()) require.Equal(t, []byte("bar"), data) sch.ConfigMapKeyRef.Key = "schema.bug" - _, _, err = sch.Content(ctx, client, "default") + _, _, err = sch.DesiredState(ctx, client, "default") require.ErrorContains(t, err, `configmaps key "schema.bug" must be ending with .sql or .hcl, received ".bug"`) sch.ConfigMapKeyRef.Key = "schema.foo" - _, _, err = sch.Content(ctx, client, "default") + _, _, err = sch.DesiredState(ctx, client, "default") require.ErrorContains(t, err, `configmaps default/test does not contain key "schema.foo"`) sch.ConfigMapKeyRef.Name = "foo" - _, _, err = sch.Content(ctx, client, "default") + _, _, err = sch.DesiredState(ctx, client, "default") require.ErrorContains(t, err, `configmaps "foo" not found`) } diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index 45797636..53f06bbf 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/url" + "path/filepath" "strings" "time" @@ -68,9 +69,9 @@ type ( Exclude []string Policy *dbv1alpha1.Policy TxMode dbv1alpha1.TransactionMode + Desired *url.URL - desired []byte - ext string + schema []byte } ) @@ -161,16 +162,20 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) } defer wd.Close() // Write the schema file to the working directory. - _, err = wd.WriteFile(data.Source(), data.desired) - if err != nil { - res.SetNotReady("CreatingSchemaFile", err.Error()) - r.recordErrEvent(res, err) - return result(err) + if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile { + _, err = wd.WriteFile(filepath.Join(u.Host, u.Path), data.schema) + if err != nil { + res.SetNotReady("CreatingSchemaFile", err.Error()) + r.recordErrEvent(res, err) + return result(err) + } } switch { case res.Status.LastApplied == 0: // Verify the first run doesn't contain destructive changes. - err = r.lint(ctx, wd, data.EnvName, atlasexec.Vars{"lint_destructive": "true"}) + err = r.lint(ctx, wd, data, atlasexec.Vars2{ + "lint_destructive": "true", + }) switch d := (&destructiveErr{}); { case err == nil: case errors.As(err, &d): @@ -191,7 +196,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) } case data.shouldLint(): // Run the linting policy. - if err = r.lint(ctx, wd, data.EnvName, nil); err != nil { + if err = r.lint(ctx, wd, data, nil); err != nil { reason, msg := "LintPolicyError", err.Error() res.SetNotReady(reason, msg) r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) @@ -202,7 +207,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return result(err) } } - report, err := r.apply(ctx, wd.Path(), data.EnvName, string(data.TxMode)) + report, err := r.apply(ctx, wd.Path(), data, string(data.TxMode)) if err != nil { res.SetNotReady("ApplyingSchema", err.Error()) r.recorder.Event(res, corev1.EventTypeWarning, "ApplyingSchema", err.Error()) @@ -260,13 +265,14 @@ func (r *AtlasSchemaReconciler) watchRefs(res *dbv1alpha1.AtlasSchema) { } } -func (r *AtlasSchemaReconciler) apply(ctx context.Context, dir, envName, txMode string) (*atlasexec.SchemaApply, error) { +func (r *AtlasSchemaReconciler) apply(ctx context.Context, dir string, data *managedData, txMode string) (*atlasexec.SchemaApply, error) { cli, err := r.atlasClient(dir) if err != nil { return nil, err } return cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ - Env: envName, + Env: data.EnvName, + To: data.Desired.String(), TxMode: txMode, AutoApprove: true, }) @@ -289,7 +295,7 @@ func (r *AtlasSchemaReconciler) extractData(ctx context.Context, res *dbv1alpha1 if err != nil { return nil, transient(err) } - data.desired, data.ext, err = s.Schema.Content(ctx, r, res.Namespace) + data.Desired, data.schema, err = s.Schema.DesiredState(ctx, r, res.Namespace) if err != nil { return nil, transient(err) } @@ -312,11 +318,6 @@ func (r *AtlasSchemaReconciler) recordErrEvent(res *dbv1alpha1.AtlasSchema, err r.recorder.Event(res, corev1.EventTypeWarning, reason, strings.TrimSpace(err.Error())) } -// Source returns the file name of the desired schema. -func (d *managedData) Source() string { - return fmt.Sprintf("schema.%s", d.ext) -} - // ShouldLint returns true if the linting policy is set to error. func (d *managedData) shouldLint() bool { p := d.Policy @@ -329,7 +330,11 @@ func (d *managedData) shouldLint() bool { // hash returns the sha256 hash of the desired. func (d *managedData) hash() (string, error) { h := sha256.New() - h.Write([]byte(d.desired)) + if len(d.schema) > 0 { + h.Write([]byte(d.schema)) + } else { + h.Write([]byte(d.Desired.String())) + } return hex.EncodeToString(h.Sum(nil)), nil } @@ -347,8 +352,8 @@ func (d *managedData) render(w io.Writer) error { if d.DevURL == "" { return errors.New("dev url is not set") } - if d.ext == "" { - return errors.New("schema extension is not set") + if d.Desired == nil { + return errors.New("the desired state is not set") } return tmpl.ExecuteTemplate(w, "atlas_schema.tmpl", d) } diff --git a/internal/controller/atlasschema_controller_test.go b/internal/controller/atlasschema_controller_test.go index 2bdca98e..1d29fba8 100644 --- a/internal/controller/atlasschema_controller_test.go +++ b/internal/controller/atlasschema_controller_test.go @@ -136,7 +136,7 @@ func TestReconcile_Reconcile(t *testing.T) { }, }) // Third reconcile, return error for missing schema - assert(ctrl.Result{RequeueAfter: 5000000000}, false, "ReadSchema", "no desired schema specified") + assert(ctrl.Result{RequeueAfter: 5000000000}, false, "ReadSchema", "no desired state specified") // Add schema, h.patch(t, &dbv1alpha1.AtlasSchema{ ObjectMeta: meta, @@ -162,7 +162,7 @@ func TestReconcile_Reconcile(t *testing.T) { // Check the events generated by the controller require.Equal(t, []string{ "Warning TransientErr no target database defined", - "Warning TransientErr no desired schema specified", + "Warning TransientErr no desired state specified", "Normal Applied Applied schema", "Normal Applied Applied schema", }, h.events()) @@ -425,7 +425,7 @@ func TestConfigTemplate(t *testing.T) { }, }, Schemas: []string{"foo", "bar"}, - ext: "sql", + Desired: must(url.Parse("file://schema.sql")), } err := data.render(&buf) require.NoError(t, err) @@ -450,7 +450,6 @@ lint { } env { name = atlas.env - src = "schema.sql" url = "mysql://root:password@localhost:3306/test" dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo","bar"] diff --git a/internal/controller/lint.go b/internal/controller/lint.go index aed44755..c73543f4 100644 --- a/internal/controller/lint.go +++ b/internal/controller/lint.go @@ -34,21 +34,22 @@ const lintDirName = "lint-migrations" // - 1.sql: the current schema. // - 2.sql: the pending changes. // Then it runs `atlas migrate lint` in the temporary directory. -func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingDir, envName string, vars atlasexec.Vars) error { +func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingDir, data *managedData, vars atlasexec.VarArgs) error { cli, err := r.atlasClient(wd.Path()) if err != nil { return err } current, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{ - Env: envName, - Format: "sql", + Env: data.EnvName, + Format: "{{ sql . }}", }) if err != nil { return err } plan, err := cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + Env: data.EnvName, + To: data.Desired.String(), DryRun: true, // Dry run to get pending changes. - Env: envName, }) if err != nil { return err @@ -72,8 +73,8 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD return err } lint, err := cli.MigrateLint(ctx, &atlasexec.MigrateLintParams{ + Env: data.EnvName, DirURL: fmt.Sprintf("file://./%s", lintDirName), - Env: envName, Latest: 1, // Only lint 2.sql, pending changes. Vars: vars, }) diff --git a/internal/controller/templates/atlas_schema.tmpl b/internal/controller/templates/atlas_schema.tmpl index 290092b0..0d87cfb9 100644 --- a/internal/controller/templates/atlas_schema.tmpl +++ b/internal/controller/templates/atlas_schema.tmpl @@ -82,7 +82,6 @@ lint { {{- end }} env { name = atlas.env - src = "{{ removeSpecialChars .Source }}" url = "{{ removeSpecialChars .URL }}" dev = "{{ removeSpecialChars .DevURL }}" schemas = {{ slides .Schemas }}