Skip to content

Commit

Permalink
feat: new implementation for wait for
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Apr 5, 2024
1 parent 95a52e2 commit 56fb315
Show file tree
Hide file tree
Showing 19 changed files with 322 additions and 98 deletions.
67 changes: 65 additions & 2 deletions api/v1/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/flanksource/canary-checker/api/external"
"github.com/flanksource/commons/duration"
"github.com/flanksource/duty"
"github.com/flanksource/duty/connection"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
Expand Down Expand Up @@ -785,6 +788,63 @@ type KubernetesResourceChecks struct {
CanarySpec `yaml:",inline" json:",inline"`
}

type KubernetesResourceCheckWaitFor struct {
// Expr is a cel expression that determines whether all the resources
// are in their desired state before running checks on them.
// Default: `dyn(resources).all(r, k8s.isHealthy(r))`
Expr string `json:"expr,omitempty"`

// Disable waiting for resources to get to their desired state.
Disable bool `json:"disable,omitempty"`

// Timeout to wait for all static & non-static resources to be ready.
// Default: 10m
Timeout string `json:"timeout,omitempty"`

// Interval to check if all static & non-static resources are ready.
// Default: 30s
Interval string `json:"interval,omitempty"`

parsedTimeout *time.Duration

Check failure on line 808 in api/v1/checks.go

View workflow job for this annotation

GitHub Actions / test

encountered struct field "parsedTimeout" without JSON tag in type "KubernetesResourceCheckWaitFor"

Check failure on line 808 in api/v1/checks.go

View workflow job for this annotation

GitHub Actions / lint

encountered struct field "parsedTimeout" without JSON tag in type "KubernetesResourceCheckWaitFor"
parsedInterval *time.Duration

Check failure on line 809 in api/v1/checks.go

View workflow job for this annotation

GitHub Actions / test

encountered struct field "parsedInterval" without JSON tag in type "KubernetesResourceCheckWaitFor"

Check failure on line 809 in api/v1/checks.go

View workflow job for this annotation

GitHub Actions / lint

encountered struct field "parsedInterval" without JSON tag in type "KubernetesResourceCheckWaitFor"
}

func (t *KubernetesResourceCheckWaitFor) GetTimeout() (time.Duration, error) {
if t.parsedTimeout != nil {
return *t.parsedTimeout, nil
}

if t.Timeout == "" {
return time.Duration(0), nil
}

tt, err := duration.ParseDuration(t.Timeout)
if err != nil {
return time.Duration(0), err
}
t.parsedTimeout = lo.ToPtr(time.Duration(tt))

return *t.parsedTimeout, nil
}

func (t *KubernetesResourceCheckWaitFor) GetInterval() (time.Duration, error) {
if t.parsedInterval != nil {
return *t.parsedInterval, nil
}

if t.Interval == "" {
return time.Duration(0), nil
}

tt, err := duration.ParseDuration(t.Interval)
if err != nil {
return time.Duration(0), err
}
t.parsedInterval = lo.ToPtr(time.Duration(tt))

return *t.parsedInterval, nil
}

type KubernetesResourceCheck struct {
Description `yaml:",inline" json:",inline"`
Templatable `yaml:",inline" json:",inline"`
Expand All @@ -808,8 +868,11 @@ type KubernetesResourceCheck struct {
// Kubeconfig is the kubeconfig or the path to the kubeconfig file.
Kubeconfig *types.EnvVar `yaml:"kubeconfig,omitempty" json:"kubeconfig,omitempty"`

WaitForReady bool `json:"waitForReady,omitempty"`
Timeout string `json:"timeout,omitempty"`
WaitFor KubernetesResourceCheckWaitFor `json:"waitFor,omitempty"`
}

func (c KubernetesResourceCheck) TotalResources() int {
return len(c.Resources) + len(c.StaticResources)
}

func (c KubernetesResourceCheck) GetType() string {
Expand Down
27 changes: 27 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 74 additions & 45 deletions checks/kubernetes_resource.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package checks

import (
gocontext "context"
"fmt"
"strconv"
"strings"
"time"

"github.com/flanksource/gomplate/v3"
"github.com/flanksource/is-healthy/pkg/health"
"golang.org/x/sync/errgroup"
"github.com/sethvargo/go-retry"

"github.com/flanksource/commons/duration"
"github.com/flanksource/commons/logger"
"github.com/flanksource/commons/utils"
"github.com/flanksource/duty/types"
Expand All @@ -23,9 +23,9 @@ const (
// maximum number of static & non static resources a canary can have
defaultMaxResourcesAllowed = 10

// resourceWaitTimeout is the default timeout to wait for all resources
// to be ready. Timeout on the spec will take precedence over this.
resourceWaitTimeout = time.Minute * 10
resourceWaitTimeoutDefault = time.Minute * 10
resourceWaitIntervalDefault = time.Second * 5
waitForExprDefault = `dyn(resources).all(r, k8s.isHealthy(r))`

annotationkey = "flanksource.canary-checker/kubernetes-resource-canary"
)
Expand Down Expand Up @@ -71,27 +71,31 @@ func (c *KubernetesResourceChecker) applyKubeconfig(ctx *context.Context, kubeCo
return ctx, nil
}

func (c *KubernetesResourceChecker) validate(ctx *context.Context, check v1.KubernetesResourceCheck) error {
if _, err := check.WaitFor.GetTimeout(); err != nil {
return fmt.Errorf("failed to parse wait for timeout(%s): %w", check.WaitFor.Timeout, err)
}

if _, err := check.WaitFor.GetTimeout(); err != nil {
return fmt.Errorf("failed to parse wait for timeout(%s): %w", check.WaitFor.Timeout, err)
}

maxResourcesAllowed := ctx.Properties().Int("checks.kubernetesResource.maxResources", defaultMaxResourcesAllowed)
if check.TotalResources() > maxResourcesAllowed {
return fmt.Errorf("too many resources (%d). only %d allowed", check.TotalResources(), maxResourcesAllowed)
}

return nil
}

func (c *KubernetesResourceChecker) Check(ctx *context.Context, check v1.KubernetesResourceCheck) pkg.Results {
result := pkg.Success(check, ctx.Canary)
var err error
var results pkg.Results
results = append(results, result)

if check.Timeout != "" {
if d, err := duration.ParseDuration(check.Timeout); err != nil {
return results.Failf("failed to parse timeout: %v", err)
} else {
ctx2, cancel := ctx.WithTimeout(time.Duration(d))
defer cancel()

ctx = ctx.WithDutyContext(ctx2)
}
}

totalResources := len(check.StaticResources) + len(check.Resources)
maxResourcesAllowed := ctx.Properties().Int("checks.kubernetesResource.maxResources", defaultMaxResourcesAllowed)
if totalResources > maxResourcesAllowed {
return results.Failf("too many resources (%d). only %d allowed", totalResources, maxResourcesAllowed)
if err := c.validate(ctx, check); err != nil {
return results.Failf("validation: %v", err)
}

if check.Kubeconfig != nil {
Expand Down Expand Up @@ -127,30 +131,8 @@ func (c *KubernetesResourceChecker) Check(ctx *context.Context, check v1.Kuberne
}()
}

if check.WaitForReady {
timeout := resourceWaitTimeout
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}

logger.Debugf("waiting for %s for %d resources to be ready.", timeout, totalResources)

kClient := pkg.NewKubeClient(ctx.Kommons().GetRESTConfig)
errG, _ := errgroup.WithContext(ctx)
for _, r := range append(check.StaticResources, check.Resources...) {
r := r
errG.Go(func() error {
if status, err := kClient.WaitForResource(ctx, r.GetKind(), r.GetNamespace(), r.GetName()); err != nil {
return fmt.Errorf("error waiting for resource(%s/%s/%s) to be ready: %w", r.GetKind(), r.GetNamespace(), r.GetName(), err)
} else if status.Status != health.HealthStatusHealthy {
return fmt.Errorf("resource(%s/%s/%s) didn't become healthy. message (%s)", r.GetKind(), r.GetNamespace(), r.GetName(), status.Message)
}

return nil
})
}

if err := errG.Wait(); err != nil {
if !check.WaitFor.Disable {
if err := c.evalWaitFor(ctx, check); err != nil {
return results.Failf("%v", err)
}
}
Expand Down Expand Up @@ -192,3 +174,50 @@ func (c *KubernetesResourceChecker) Check(ctx *context.Context, check v1.Kuberne

return results
}

func (c *KubernetesResourceChecker) evalWaitFor(ctx *context.Context, check v1.KubernetesResourceCheck) error {
waitTimeout := resourceWaitTimeoutDefault
if wt, _ := check.WaitFor.GetTimeout(); wt > 0 {
waitTimeout = wt
}

waitInterval := resourceWaitIntervalDefault
if wt, _ := check.WaitFor.GetInterval(); wt > 0 {
waitInterval = wt
}

var attempts int
backoff := retry.WithMaxDuration(waitTimeout, retry.NewConstant(waitInterval))
retryErr := retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
ctx = _ctx.(*context.Context)
attempts++
ctx.Tracef("waiting for %d resources to be ready. (attempts: %d)", check.TotalResources(), attempts)

var templateVar = map[string]any{}
kClient := pkg.NewKubeClient(ctx.Kommons().GetRESTConfig)
if response, err := kClient.FetchResources(ctx, append(check.StaticResources, check.Resources...)...); err != nil {
return fmt.Errorf("wait for evaluation. fetching resources: %w", err)
} else if len(response) != check.TotalResources() {
return fmt.Errorf("unxpected error. expected %d resources, got %d", check.TotalResources(), len(response))
} else {
templateVar["resources"] = response
}

waitForExpr := check.WaitFor.Expr
if waitForExpr == "" {
waitForExpr = waitForExprDefault
}

if response, err := gomplate.RunTemplate(templateVar, gomplate.Template{Expression: waitForExpr}); err != nil {
return fmt.Errorf("wait for expression evaluation: %w", err)
} else if parsed, err := strconv.ParseBool(response); err != nil {
return fmt.Errorf("wait for expression (%q) didn't evaluate to a boolean", check.WaitFor.Expr)
} else if !parsed {
return retry.RetryableError(fmt.Errorf("not all resources are ready"))
}

return nil
})

return retryErr
}
2 changes: 1 addition & 1 deletion checks/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewNamespaceChecker() *NamespaceChecker {
// Run: Check every entry from config according to Checker interface
// Returns check result and metrics
func (c *NamespaceChecker) Run(ctx *context.Context) pkg.Results {
logger.Warnf("namespace check is deprecated. Please use the kubernetes resource check")
var err error
var results pkg.Results
for _, conf := range ctx.Canary.Spec.Namespace {
Expand Down Expand Up @@ -107,7 +108,6 @@ func (c *NamespaceChecker) getConditionTimes(ns *v1.Namespace, pod *v1.Pod) (tim
func (c *NamespaceChecker) Check(ctx *context.Context, extConfig external.Check) pkg.Results {
check := extConfig.(canaryv1.NamespaceCheck)
result := pkg.Success(check, ctx.Canary)
result.AppendMsgf("namespace check is deprecated. Please use the kubernetes resource check")
var results pkg.Results
results = append(results, result)
if !c.lock.TryAcquire(1) {
Expand Down
4 changes: 3 additions & 1 deletion checks/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
gocontext "context"

"github.com/flanksource/canary-checker/api/context"
"github.com/flanksource/commons/logger"

"github.com/flanksource/canary-checker/api/external"
networkingv1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -58,6 +59,8 @@ func NewPodChecker() *PodChecker {
// Run: Check every entry from config according to Checker interface
// Returns check result and metrics
func (c *PodChecker) Run(ctx *context.Context) pkg.Results {
logger.Warnf("pod check is deprecated. Please use the kubernetes resource check")

var results pkg.Results
if len(ctx.Canary.Spec.Pod) > 0 {
if c.k8s == nil {
Expand Down Expand Up @@ -141,7 +144,6 @@ func (c *PodChecker) Check(ctx *context.Context, extConfig external.Check) pkg.R
}

result := pkg.Success(podCheck, ctx.Canary)
result.AppendMsgf("pod check is deprecated. Please use the kubernetes resource check")
var results pkg.Results
results = append(results, result)
startTimer := NewTimer()
Expand Down
26 changes: 22 additions & 4 deletions config/deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5110,8 +5110,6 @@ spec:
template:
type: string
type: object
timeout:
type: string
transform:
properties:
expr:
Expand All @@ -5126,8 +5124,28 @@ spec:
transformDeleteStrategy:
description: Transformed checks have a delete strategy on deletion they can either be marked healthy, unhealthy or left as is
type: string
waitForReady:
type: boolean
waitFor:
properties:
disable:
description: Disable waiting for resources to get to their desired state.
type: boolean
expr:
description: |-
Expr is a cel expression that determines whether all the resources
are in their desired state before running checks on them.
Default: `dyn(resources).all(r, k8s.isHealthy(r))`
type: string
interval:
description: |-
Interval to check if all static & non-static resources are ready.
Deafult: 30s
type: string
timeout:
description: |-
Timeout to wait for all static & non-static resources to be ready.
Deafult: 10m
type: string
type: object
required:
- name
- resources
Expand Down
Loading

0 comments on commit 56fb315

Please sign in to comment.