Skip to content

Commit

Permalink
Fix panic in apply_presets.go
Browse files Browse the repository at this point in the history
  • Loading branch information
lennartkats-db committed Sep 27, 2024
1 parent 0cc35ca commit e952f42
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
79 changes: 58 additions & 21 deletions bundle/config/mutator/apply_presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func (m *applyPresets) Name() string {
}

func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics

if d := validatePauseStatus(b); d != nil {
return d
diags = diags.Extend(d)
}

r := b.Config.Resources
Expand All @@ -45,7 +47,11 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
tags := toTagArray(t.Tags)

// Jobs presets: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus
for _, j := range r.Jobs {
for key, j := range r.Jobs {
if j.JobSettings == nil {
diags = diags.Extend(diag.Errorf("job %s is not defined", key))
continue
}
j.Name = prefix + j.Name
if j.Tags == nil {
j.Tags = make(map[string]string)
Expand Down Expand Up @@ -77,20 +83,27 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}

// Pipelines presets: Prefix, PipelinesDevelopment
for i := range r.Pipelines {
r.Pipelines[i].Name = prefix + r.Pipelines[i].Name
for key, p := range r.Pipelines {
if p.PipelineSpec == nil {
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
continue
}
p.Name = prefix + p.Name
if config.IsExplicitlyEnabled(t.PipelinesDevelopment) {
r.Pipelines[i].Development = true
p.Development = true
}
if t.TriggerPauseStatus == config.Paused {
r.Pipelines[i].Continuous = false
p.Continuous = false
}

// As of 2024-06, pipelines don't yet support tags
}

// Models presets: Prefix, Tags
for _, m := range r.Models {
for key, m := range r.Models {
if m.Model == nil {
diags = diags.Extend(diag.Errorf("model %s is not defined", key))
continue
}
m.Name = prefix + m.Name
for _, t := range tags {
exists := slices.ContainsFunc(m.Tags, func(modelTag ml.ModelTag) bool {
Expand All @@ -104,7 +117,11 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}

// Experiments presets: Prefix, Tags
for _, e := range r.Experiments {
for key, e := range r.Experiments {
if e.Experiment == nil {
diags = diags.Extend(diag.Errorf("experiment %s is not defined", key))
continue
}
filepath := e.Name
dir := path.Dir(filepath)
base := path.Base(filepath)
Expand All @@ -128,40 +145,60 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}

// Model serving endpoint presets: Prefix
for i := range r.ModelServingEndpoints {
r.ModelServingEndpoints[i].Name = normalizePrefix(prefix) + r.ModelServingEndpoints[i].Name
for key, e := range r.ModelServingEndpoints {
if e.CreateServingEndpoint == nil {
diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key))
continue
}
e.Name = normalizePrefix(prefix) + e.Name

// As of 2024-06, model serving endpoints don't yet support tags
}

// Registered models presets: Prefix
for i := range r.RegisteredModels {
r.RegisteredModels[i].Name = normalizePrefix(prefix) + r.RegisteredModels[i].Name
for key, m := range r.RegisteredModels {
if m.CreateRegisteredModelRequest == nil {
diags = diags.Extend(diag.Errorf("registered model %s is not defined", key))
continue
}
m.Name = normalizePrefix(prefix) + m.Name

// As of 2024-06, registered models don't yet support tags
}

// Quality monitors presets: Prefix
// Quality monitors presets: Schedule
if t.TriggerPauseStatus == config.Paused {
for i := range r.QualityMonitors {
for key, q := range r.QualityMonitors {
if q.CreateMonitor == nil {
diags = diags.Extend(diag.Errorf("quality monitor for %s is not defined", key))
continue
}
// Remove all schedules from monitors, since they don't support pausing/unpausing.
// Quality monitors might support the "pause" property in the future, so at the
// CLI level we do respect that property if it is set to "unpaused."
if r.QualityMonitors[i].Schedule != nil && r.QualityMonitors[i].Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused {
r.QualityMonitors[i].Schedule = nil
if q.Schedule != nil && q.Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused {
q.Schedule = nil
}
}
}

// Schemas: Prefix
for i := range r.Schemas {
r.Schemas[i].Name = normalizePrefix(prefix) + r.Schemas[i].Name
for key, s := range r.Schemas {
if s.CreateSchema == nil {
diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
continue
}
s.Name = normalizePrefix(prefix) + s.Name
// HTTP API for schemas doesn't yet support tags. It's only supported in
// the Databricks UI and via the SQL API.
}

// Clusters: Prefix, Tags
for _, c := range r.Clusters {
for key, c := range r.Clusters {
if c.ClusterSpec == nil {
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
continue
}
c.ClusterName = prefix + c.ClusterName
if c.CustomTags == nil {
c.CustomTags = make(map[string]string)
Expand All @@ -175,7 +212,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}
}

return nil
return diags
}

func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
Expand Down
20 changes: 20 additions & 0 deletions bundle/config/mutator/apply_presets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,23 @@ func TestApplyPresetsJobsMaxConcurrentRuns(t *testing.T) {
})
}
}

func TestApplyPresetsPrefixWithoutJobSettings(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {}, // no jobsettings inside
},
},
Presets: config.Presets{
NamePrefix: "prefix-",
},
},
}

ctx := context.Background()
diags := bundle.Apply(ctx, b, mutator.ApplyPresets())

require.ErrorContains(t, diags.Error(), "job job1 is not defined")
}

0 comments on commit e952f42

Please sign in to comment.