Skip to content

Commit

Permalink
atlas/schema: support URL as desired state
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Oct 1, 2024
1 parent a739b98 commit a96376d
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 53 deletions.
37 changes: 26 additions & 11 deletions api/v1alpha1/atlasschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"path/filepath"
"strings"

Expand Down Expand Up @@ -84,8 +85,10 @@ type (
}
// Schema defines the desired state of the target database schema in plain SQL or HCL.
Schema struct {
SQL string `json:"sql,omitempty"`
HCL string `json:"hcl,omitempty"`
SQL string `json:"sql,omitempty"`
HCL string `json:"hcl,omitempty"`
URL string `json:"url,omitempty"`

ConfigMapKeyRef *corev1.ConfigMapKeySelector `json:"configMapKeyRef,omitempty"`
}
// Policy defines the policies to apply when managing the schema change lifecycle.
Expand Down Expand Up @@ -200,29 +203,41 @@ func (sc *AtlasSchema) SetNotReady(reason, msg string) {
})
}

// Content returns the desired schema of the AtlasSchema.
func (s Schema) Content(ctx context.Context, r client.Reader, ns string) ([]byte, string, error) {
// Schema reader types (URL schemes).
const (
SchemaTypeAtlas = "atlas"
SchemaTypeFile = "file"
)

// Desired returns the desired schema of the AtlasSchema.
func (s Schema) DesiredState(ctx context.Context, r client.Reader, ns string) (*url.URL, []byte, error) {
switch ref := s.ConfigMapKeyRef; {
case ref != nil:
val := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: ns}, val)
if err != nil {
return nil, "", err
return nil, nil, err
}
// Guess the schema file format based on the key's extension.
ext := strings.ToLower(filepath.Ext(ref.Key))
switch desired, ok := val.Data[ref.Key]; {
case !ok:
return nil, "", fmt.Errorf("configmaps %s/%s does not contain key %q", ns, ref.Name, ref.Key)
return nil, nil, fmt.Errorf("configmaps %s/%s does not contain key %q", ns, ref.Name, ref.Key)
case ext == ".hcl" || ext == ".sql":
return []byte(desired), ext[1:], nil
return &url.URL{Scheme: SchemaTypeFile, Path: "schema" + ext}, []byte(desired), nil
default:
return nil, "", fmt.Errorf("configmaps key %q must be ending with .sql or .hcl, received %q", ref.Key, ext)
return nil, nil, fmt.Errorf("configmaps key %q must be ending with .sql or .hcl, received %q", ref.Key, ext)
}
case s.HCL != "":
return []byte(s.HCL), "hcl", nil
return &url.URL{Scheme: SchemaTypeFile, Path: "schema.hcl"}, []byte(s.HCL), nil
case s.SQL != "":
return []byte(s.SQL), "sql", nil
return &url.URL{Scheme: SchemaTypeFile, Path: "schema.sql"}, []byte(s.SQL), nil
case s.URL != "":
u, err := url.Parse(s.URL)
if err == nil && u.Scheme != SchemaTypeAtlas {
return nil, nil, fmt.Errorf("unsupported URL schema %q", u.Scheme)
}
return u, nil, err
}
return nil, "", fmt.Errorf("no desired schema specified")
return nil, nil, fmt.Errorf("no desired state specified")
}
22 changes: 11 additions & 11 deletions api/v1alpha1/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,19 @@ func TestSchema_Content(t *testing.T) {
Build()
)
// error
_, _, err := sch.Content(ctx, client, "default")
require.ErrorContains(t, err, "no desired schema specified")
_, _, err := sch.DesiredState(ctx, client, "default")
require.ErrorContains(t, err, "no desired state specified")

sch.SQL = "bar"
data, ext, err := sch.Content(ctx, client, "default")
u, data, err := sch.DesiredState(ctx, client, "default")
require.NoError(t, err)
require.Equal(t, "sql", ext)
require.Equal(t, "file://schema.sql", u.String())
require.Equal(t, []byte("bar"), data)

sch.HCL = "foo"
data, ext, err = sch.Content(ctx, client, "default")
u, data, err = sch.DesiredState(ctx, client, "default")
require.NoError(t, err)
require.Equal(t, "hcl", ext)
require.Equal(t, "file://schema.hcl", u.String())
require.Equal(t, []byte("foo"), data)

// Should return the content from the configmap
Expand All @@ -150,21 +150,21 @@ func TestSchema_Content(t *testing.T) {
},
Key: "schema.sql",
}
data, ext, err = sch.Content(ctx, client, "default")
u, data, err = sch.DesiredState(ctx, client, "default")
require.NoError(t, err)
require.Equal(t, "sql", ext)
require.Equal(t, "file://schema.sql", u.String())
require.Equal(t, []byte("bar"), data)

sch.ConfigMapKeyRef.Key = "schema.bug"
_, _, err = sch.Content(ctx, client, "default")
_, _, err = sch.DesiredState(ctx, client, "default")
require.ErrorContains(t, err, `configmaps key "schema.bug" must be ending with .sql or .hcl, received ".bug"`)

sch.ConfigMapKeyRef.Key = "schema.foo"
_, _, err = sch.Content(ctx, client, "default")
_, _, err = sch.DesiredState(ctx, client, "default")
require.ErrorContains(t, err, `configmaps default/test does not contain key "schema.foo"`)

sch.ConfigMapKeyRef.Name = "foo"
_, _, err = sch.Content(ctx, client, "default")
_, _, err = sch.DesiredState(ctx, client, "default")
require.ErrorContains(t, err, `configmaps "foo" not found`)
}

Expand Down
2 changes: 2 additions & 0 deletions charts/atlas-operator/templates/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ spec:
type: string
sql:
type: string
url:
type: string
type: object
schemas:
description: The names of the schemas (named databases) on the target
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/db.atlasgo.io_atlasschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ spec:
type: string
sql:
type: string
url:
type: string
type: object
schemas:
description: The names of the schemas (named databases) on the target
Expand Down
47 changes: 26 additions & 21 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/url"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -68,9 +69,9 @@ type (
Exclude []string
Policy *dbv1alpha1.Policy
TxMode dbv1alpha1.TransactionMode
Desired *url.URL

desired []byte
ext string
schema []byte
}
)

Expand Down Expand Up @@ -161,16 +162,20 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
defer wd.Close()
// Write the schema file to the working directory.
_, err = wd.WriteFile(data.Source(), data.desired)
if err != nil {
res.SetNotReady("CreatingSchemaFile", err.Error())
r.recordErrEvent(res, err)
return result(err)
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
_, err = wd.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
if err != nil {
res.SetNotReady("CreatingSchemaFile", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
}
switch {
case res.Status.LastApplied == 0:
// Verify the first run doesn't contain destructive changes.
err = r.lint(ctx, wd, data.EnvName, atlasexec.Vars{"lint_destructive": "true"})
err = r.lint(ctx, wd, data, atlasexec.Vars2{
"lint_destructive": "true",
})
switch d := (&destructiveErr{}); {
case err == nil:
case errors.As(err, &d):
Expand All @@ -191,7 +196,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
case data.shouldLint():
// Run the linting policy.
if err = r.lint(ctx, wd, data.EnvName, nil); err != nil {
if err = r.lint(ctx, wd, data, nil); err != nil {
reason, msg := "LintPolicyError", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
Expand All @@ -202,7 +207,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return result(err)
}
}
report, err := r.apply(ctx, wd.Path(), data.EnvName, string(data.TxMode))
report, err := r.apply(ctx, wd.Path(), data, string(data.TxMode))
if err != nil {
res.SetNotReady("ApplyingSchema", err.Error())
r.recorder.Event(res, corev1.EventTypeWarning, "ApplyingSchema", err.Error())
Expand Down Expand Up @@ -260,13 +265,14 @@ func (r *AtlasSchemaReconciler) watchRefs(res *dbv1alpha1.AtlasSchema) {
}
}

func (r *AtlasSchemaReconciler) apply(ctx context.Context, dir, envName, txMode string) (*atlasexec.SchemaApply, error) {
func (r *AtlasSchemaReconciler) apply(ctx context.Context, dir string, data *managedData, txMode string) (*atlasexec.SchemaApply, error) {
cli, err := r.atlasClient(dir)
if err != nil {
return nil, err
}
return cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
Env: envName,
Env: data.EnvName,
To: data.Desired.String(),
TxMode: txMode,
AutoApprove: true,
})
Expand All @@ -289,7 +295,7 @@ func (r *AtlasSchemaReconciler) extractData(ctx context.Context, res *dbv1alpha1
if err != nil {
return nil, transient(err)
}
data.desired, data.ext, err = s.Schema.Content(ctx, r, res.Namespace)
data.Desired, data.schema, err = s.Schema.DesiredState(ctx, r, res.Namespace)
if err != nil {
return nil, transient(err)
}
Expand All @@ -312,11 +318,6 @@ func (r *AtlasSchemaReconciler) recordErrEvent(res *dbv1alpha1.AtlasSchema, err
r.recorder.Event(res, corev1.EventTypeWarning, reason, strings.TrimSpace(err.Error()))
}

// Source returns the file name of the desired schema.
func (d *managedData) Source() string {
return fmt.Sprintf("schema.%s", d.ext)
}

// ShouldLint returns true if the linting policy is set to error.
func (d *managedData) shouldLint() bool {
p := d.Policy
Expand All @@ -329,7 +330,11 @@ func (d *managedData) shouldLint() bool {
// hash returns the sha256 hash of the desired.
func (d *managedData) hash() (string, error) {
h := sha256.New()
h.Write([]byte(d.desired))
if len(d.schema) > 0 {
h.Write([]byte(d.schema))
} else {
h.Write([]byte(d.Desired.String()))
}
return hex.EncodeToString(h.Sum(nil)), nil
}

Expand All @@ -347,8 +352,8 @@ func (d *managedData) render(w io.Writer) error {
if d.DevURL == "" {
return errors.New("dev url is not set")
}
if d.ext == "" {
return errors.New("schema extension is not set")
if d.Desired == nil {
return errors.New("the desired state is not set")
}
return tmpl.ExecuteTemplate(w, "atlas_schema.tmpl", d)
}
Expand Down
7 changes: 3 additions & 4 deletions internal/controller/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestReconcile_Reconcile(t *testing.T) {
},
})
// Third reconcile, return error for missing schema
assert(ctrl.Result{RequeueAfter: 5000000000}, false, "ReadSchema", "no desired schema specified")
assert(ctrl.Result{RequeueAfter: 5000000000}, false, "ReadSchema", "no desired state specified")
// Add schema,
h.patch(t, &dbv1alpha1.AtlasSchema{
ObjectMeta: meta,
Expand All @@ -162,7 +162,7 @@ func TestReconcile_Reconcile(t *testing.T) {
// Check the events generated by the controller
require.Equal(t, []string{
"Warning TransientErr no target database defined",
"Warning TransientErr no desired schema specified",
"Warning TransientErr no desired state specified",
"Normal Applied Applied schema",
"Normal Applied Applied schema",
}, h.events())
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestConfigTemplate(t *testing.T) {
},
},
Schemas: []string{"foo", "bar"},
ext: "sql",
Desired: must(url.Parse("file://schema.sql")),
}
err := data.render(&buf)
require.NoError(t, err)
Expand All @@ -450,7 +450,6 @@ lint {
}
env {
name = atlas.env
src = "schema.sql"
url = "mysql://root:password@localhost:3306/test"
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo","bar"]
Expand Down
11 changes: 6 additions & 5 deletions internal/controller/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,22 @@ const lintDirName = "lint-migrations"
// - 1.sql: the current schema.
// - 2.sql: the pending changes.
// Then it runs `atlas migrate lint` in the temporary directory.
func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingDir, envName string, vars atlasexec.Vars) error {
func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingDir, data *managedData, vars atlasexec.VarArgs) error {
cli, err := r.atlasClient(wd.Path())
if err != nil {
return err
}
current, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: envName,
Format: "sql",
Env: data.EnvName,
Format: "{{ sql . }}",
})
if err != nil {
return err
}
plan, err := cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: data.Desired.String(),
DryRun: true, // Dry run to get pending changes.
Env: envName,
})
if err != nil {
return err
Expand All @@ -72,8 +73,8 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD
return err
}
lint, err := cli.MigrateLint(ctx, &atlasexec.MigrateLintParams{
Env: data.EnvName,
DirURL: fmt.Sprintf("file://./%s", lintDirName),
Env: envName,
Latest: 1, // Only lint 2.sql, pending changes.
Vars: vars,
})
Expand Down
1 change: 0 additions & 1 deletion internal/controller/templates/atlas_schema.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ lint {
{{- end }}
env {
name = atlas.env
src = "{{ removeSpecialChars .Source }}"
url = "{{ removeSpecialChars .URL }}"
dev = "{{ removeSpecialChars .DevURL }}"
schemas = {{ slides .Schemas }}
Expand Down
4 changes: 4 additions & 0 deletions schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create table users2 (
id int not null,
primary key (id)
);
Loading

0 comments on commit a96376d

Please sign in to comment.