Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: keep track of promotion state in promotion status #2604

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
631 changes: 355 additions & 276 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions api/v1alpha1/promotion_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,26 @@
HealthChecks []HealthCheckStep `json:"healthChecks,omitempty" protobuf:"bytes,8,rep,name=healthChecks"`
// FinishedAt is the time when the promotion was completed.
FinishedAt *metav1.Time `json:"finishedAt,omitempty" protobuf:"bytes,6,opt,name=finishedAt"`
// CurrentStep is the index of the current promotion step being executed. This
// permits steps that have already run successfully to be skipped on
// subsequent reconciliations attempts.
CurrentStep int64 `json:"currentStep,omitempty" protobuf:"varint,9,opt,name=currentStep"`
// State stores the state of the promotion process between reconciliation
// attempts.
State *apiextensionsv1.JSON `json:"state,omitempty" protobuf:"bytes,10,opt,name=state"`
}

// GetConfig returns the State field as unmarshalled YAML.
krancour marked this conversation as resolved.
Show resolved Hide resolved
func (s *PromotionStatus) GetState() map[string]any {
if s.State == nil {
return nil

Check warning on line 158 in api/v1alpha1/promotion_types.go

View check run for this annotation

Codecov / codecov/patch

api/v1alpha1/promotion_types.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}

var state map[string]any
if err := yaml.Unmarshal(s.State.Raw, &state); err != nil {
return nil

Check warning on line 163 in api/v1alpha1/promotion_types.go

View check run for this annotation

Codecov / codecov/patch

api/v1alpha1/promotion_types.go#L161-L163

Added lines #L161 - L163 were not covered by tests
}
return state

Check warning on line 165 in api/v1alpha1/promotion_types.go

View check run for this annotation

Codecov / codecov/patch

api/v1alpha1/promotion_types.go#L165

Added line #L165 was not covered by tests
}

// HealthCheckStep describes a health check directive which can be executed by
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions charts/kargo/resources/crds/kargo.akuity.io_promotions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ spec:
Status describes the current state of the transition represented by this
Promotion.
properties:
currentStep:
description: |-
CurrentStep is the index of the current promotion step being executed. This
permits steps that have already run successfully to be skipped on
subsequent reconciliations attempts.
format: int64
type: integer
finishedAt:
description: FinishedAt is the time when the promotion was completed.
format: date-time
Expand Down Expand Up @@ -501,6 +508,11 @@ spec:
description: Phase describes where the Promotion currently is in its
lifecycle.
type: string
state:
description: |-
State stores the state of the promotion process between reconciliation
attempts.
x-kubernetes-preserve-unknown-fields: true
type: object
required:
- spec
Expand Down
24 changes: 24 additions & 0 deletions charts/kargo/resources/crds/kargo.akuity.io_stages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,13 @@ spec:
status:
description: Status is the (optional) status of the Promotion.
properties:
currentStep:
description: |-
CurrentStep is the index of the current promotion step being executed. This
permits steps that have already run successfully to be skipped on
subsequent reconciliations attempts.
format: int64
type: integer
finishedAt:
description: FinishedAt is the time when the promotion was
completed.
Expand Down Expand Up @@ -1611,6 +1618,11 @@ spec:
description: Phase describes where the Promotion currently
is in its lifecycle.
type: string
state:
description: |-
State stores the state of the promotion process between reconciliation
attempts.
x-kubernetes-preserve-unknown-fields: true
type: object
required:
- name
Expand Down Expand Up @@ -2067,6 +2079,13 @@ spec:
status:
description: Status is the (optional) status of the Promotion.
properties:
currentStep:
description: |-
CurrentStep is the index of the current promotion step being executed. This
permits steps that have already run successfully to be skipped on
subsequent reconciliations attempts.
format: int64
type: integer
finishedAt:
description: FinishedAt is the time when the promotion was
completed.
Expand Down Expand Up @@ -2467,6 +2486,11 @@ spec:
description: Phase describes where the Promotion currently
is in its lifecycle.
type: string
state:
description: |-
State stores the state of the promotion process between reconciliation
attempts.
x-kubernetes-preserve-unknown-fields: true
type: object
required:
- name
Expand Down
30 changes: 19 additions & 11 deletions internal/controller/promotions/promotions.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,25 +511,34 @@
})
}

workDir := filepath.Join(os.TempDir(), "promotion-"+string(workingPromo.UID))
if err := os.MkdirAll(workDir, 0o700); err != nil && !os.IsExist(err) {
promoCtx := directives.PromotionContext{
WorkDir: filepath.Join(os.TempDir(), "promotion-"+string(workingPromo.UID)),
Project: stageNamespace,
Stage: stageName,
FreightRequests: stage.Spec.RequestedFreight,
Freight: *workingPromo.Status.FreightCollection.DeepCopy(),
StartFromStep: promo.Status.CurrentStep,
State: directives.State(workingPromo.Status.GetState()),

Check warning on line 521 in internal/controller/promotions/promotions.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/promotions/promotions.go#L514-L521

Added lines #L514 - L521 were not covered by tests
}
if err := os.Mkdir(promoCtx.WorkDir, 0o700); err == nil {

Check warning on line 523 in internal/controller/promotions/promotions.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/promotions/promotions.go#L523

Added line #L523 was not covered by tests
// If we're working with a fresh directory, we should start the promotion
// process again from the beginning.
promoCtx.StartFromStep = 0
promoCtx.State = nil
} else if !os.IsExist(err) {

Check warning on line 528 in internal/controller/promotions/promotions.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/promotions/promotions.go#L526-L528

Added lines #L526 - L528 were not covered by tests
return nil, fmt.Errorf("error creating working directory: %w", err)
}
defer func() {
if workingPromo.Status.Phase.IsTerminal() {
if err := os.RemoveAll(workDir); err != nil {
if err := os.RemoveAll(promoCtx.WorkDir); err != nil {

Check warning on line 533 in internal/controller/promotions/promotions.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/promotions/promotions.go#L533

Added line #L533 was not covered by tests
logger.Error(err, "could not remove working directory")
}
}
}()

res, err := r.directivesEngine.Promote(ctx, directives.PromotionContext{
WorkDir: workDir,
Project: stageNamespace,
Stage: stageName,
FreightRequests: stage.Spec.RequestedFreight,
Freight: *workingPromo.Status.FreightCollection.DeepCopy(),
}, steps)
res, err := r.directivesEngine.Promote(ctx, promoCtx, steps)
workingPromo.Status.CurrentStep = res.CurrentStep
workingPromo.Status.State = &apiextensionsv1.JSON{Raw: res.State.ToJSON()}

Check warning on line 541 in internal/controller/promotions/promotions.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/promotions/promotions.go#L539-L541

Added lines #L539 - L541 were not covered by tests
switch res.Status {
case directives.PromotionStatusPending:
workingPromo.Status.Phase = kargoapi.PromotionPhaseRunning
Expand All @@ -541,7 +550,6 @@
Config: &apiextensionsv1.JSON{Raw: step.Config.ToJSON()},
})
}

workingPromo.Status.Phase = kargoapi.PromotionPhaseSucceeded
workingPromo.Status.HealthChecks = healthChecks
case directives.PromotionStatusFailure:
Expand Down
67 changes: 37 additions & 30 deletions internal/directives/git_pr_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,35 +131,42 @@
}

func getPRNumber(sharedState State, cfg GitWaitForPRConfig) (int64, error) {
prNumber := cfg.PRNumber
if cfg.PRNumberFromOpen != "" {
stepOutput, exists := sharedState.Get(cfg.PRNumberFromOpen)
if !exists {
return 0, fmt.Errorf(
"no output found from step with alias %q",
cfg.PRNumberFromOpen,
)
}
stepOutputMap, ok := stepOutput.(map[string]any)
if !ok {
return 0, fmt.Errorf(
"output from step with alias %q is not a map[string]any",
cfg.PRNumberFromOpen,
)
}
prNumberAny, exists := stepOutputMap[prNumberKey]
if !exists {
return 0, fmt.Errorf(
"no PR number found in output from step with alias %q",
cfg.PRNumberFromOpen,
)
}
if prNumber, ok = prNumberAny.(int64); !ok {
return 0, fmt.Errorf(
"PR number in output from step with alias %q is not an int64",
cfg.PRNumberFromOpen,
)
}
if cfg.PRNumberFromOpen == "" {
return cfg.PRNumber, nil
}
stepOutput, exists := sharedState.Get(cfg.PRNumberFromOpen)
if !exists {
return 0, fmt.Errorf(
"no output found from step with alias %q",
cfg.PRNumberFromOpen,
)

Check warning on line 142 in internal/directives/git_pr_waiter.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/git_pr_waiter.go#L137-L142

Added lines #L137 - L142 were not covered by tests
}
stepOutputMap, ok := stepOutput.(map[string]any)
if !ok {
return 0, fmt.Errorf(
"output from step with alias %q is not a map[string]any",
cfg.PRNumberFromOpen,
)

Check warning on line 149 in internal/directives/git_pr_waiter.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/git_pr_waiter.go#L144-L149

Added lines #L144 - L149 were not covered by tests
}
prNumberAny, exists := stepOutputMap[prNumberKey]
if !exists {
return 0, fmt.Errorf(
"no PR number found in output from step with alias %q",
cfg.PRNumberFromOpen,
)

Check warning on line 156 in internal/directives/git_pr_waiter.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/git_pr_waiter.go#L151-L156

Added lines #L151 - L156 were not covered by tests
}
// If the state was rehydrated from PromotionStatus, which makes use of
// apiextensions.JSON, the PR number will be a float64. Otherwise, it will be
// an int64. We need to handle both cases.
switch prNumber := prNumberAny.(type) {
case int64:
return prNumber, nil
case float64:
return int64(prNumber), nil
default:
return 0, fmt.Errorf(
"PR number in output from step with alias %q is not an int64",
cfg.PRNumberFromOpen,
)

Check warning on line 170 in internal/directives/git_pr_waiter.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/git_pr_waiter.go#L161-L170

Added lines #L161 - L170 were not covered by tests
}
return prNumber, nil
}
11 changes: 11 additions & 0 deletions internal/directives/promotions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type PromotionContext struct {
// well as any Freight that has been inherited from the target Stage's current
// state.
Freight kargoapi.FreightCollection
// SharedState is the index of the step from which the promotion should begin
// execution.
StartFromStep int64
// State is the current state of the promotion process.
State State
}

// PromotionStep describes a single step in a user-defined promotion process.
Expand Down Expand Up @@ -79,6 +84,12 @@ type PromotionResult struct {
// PromotionStepRunners. This configuration can later be used as input to
// health check processes.
HealthCheckSteps []HealthCheckStep
// If the promotion process remains in-progress, perhaps waiting for a change
// in some external state, the value of this field will indicated where to
// resume the process in the next reconciliation.
CurrentStep int64
// State is the current state of the promotion process.
State State
}

// PromotionStatus is a type that represents the high-level outcome of the
Expand Down
47 changes: 36 additions & 11 deletions internal/directives/simple_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,40 @@
var err error
workDir, err = os.MkdirTemp("", "run-")
if err != nil {
return PromotionResult{Status: PromotionStatusFailure},
return PromotionResult{
Status: PromotionStatusFailure,
CurrentStep: 0,
},

Check warning on line 54 in internal/directives/simple_engine.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/simple_engine.go#L51-L54

Added lines #L51 - L54 were not covered by tests
fmt.Errorf("temporary working directory creation failed: %w", err)
}
defer os.RemoveAll(workDir)
}

// Initialize the shared state that will be passed to each step.
state := make(State)
state := promoCtx.State.DeepCopy()
if state == nil {
state = make(State)
}
var healthCheckSteps []HealthCheckStep
for _, step := range steps {

for i := promoCtx.StartFromStep; i < int64(len(steps)); i++ {
step := steps[i]
select {
case <-ctx.Done():
return PromotionResult{Status: PromotionStatusFailure}, ctx.Err()
return PromotionResult{
Status: PromotionStatusFailure,
CurrentStep: i,
State: state,
}, ctx.Err()
default:
}
reg, err := e.registry.GetPromotionStepRunnerRegistration(step.Kind)
if err != nil {
return PromotionResult{Status: PromotionStatusFailure},
return PromotionResult{
Status: PromotionStatusFailure,
CurrentStep: i,
State: state,
},
fmt.Errorf("no runner registered for step kind %q: %w", step.Kind, err)
}

Expand Down Expand Up @@ -93,17 +109,24 @@
}

result, err := reg.Runner.RunPromotionStep(ctx, stepCtx)
if step.Alias != "" {
state[step.Alias] = result.Output

Check warning on line 113 in internal/directives/simple_engine.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/simple_engine.go#L113

Added line #L113 was not covered by tests
}
if err != nil {
return PromotionResult{Status: PromotionStatusFailure},
return PromotionResult{
Status: PromotionStatusFailure,
CurrentStep: i,
State: state,
},
fmt.Errorf("failed to run step %q: %w", step.Kind, err)
}

if result.Status != PromotionStatusSuccess {
return PromotionResult{Status: result.Status}, nil
}

if step.Alias != "" {
state[step.Alias] = result.Output
return PromotionResult{
Status: result.Status,
CurrentStep: i,
State: state,
}, nil

Check warning on line 129 in internal/directives/simple_engine.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/simple_engine.go#L125-L129

Added lines #L125 - L129 were not covered by tests
}

if result.HealthCheckStep != nil {
Expand All @@ -113,6 +136,8 @@
return PromotionResult{
Status: PromotionStatusSuccess,
HealthCheckSteps: healthCheckSteps,
CurrentStep: 0,
State: state,
}, nil
}

Expand Down
15 changes: 14 additions & 1 deletion internal/directives/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package directives

import "k8s.io/apimachinery/pkg/runtime"
import (
"encoding/json"

"k8s.io/apimachinery/pkg/runtime"
)

// State is a type that represents shared state between executions of
// PromotionSteps. It is not safe for concurrent use at present, as we expect
Expand Down Expand Up @@ -28,3 +32,12 @@
// consider writing our own implementation in the future.
return runtime.DeepCopyJSON(*s)
}

// ToJSON marshals the State to JSON.
func (s State) ToJSON() []byte {
if len(s) == 0 {
return nil

Check warning on line 39 in internal/directives/state.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/state.go#L37-L39

Added lines #L37 - L39 were not covered by tests
}
b, _ := json.Marshal(s)
return b

Check warning on line 42 in internal/directives/state.go

View check run for this annotation

Codecov / codecov/patch

internal/directives/state.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}
Loading