Skip to content

Commit

Permalink
feat: template resources in kubernetes resource check (#1803)
Browse files Browse the repository at this point in the history
* feat: template resources in kubernetes resource check

* use isready in the fixture and bump gomplate

* feat: delete by labels instead of names because name can be templated

* handle deleting of static resources

move all polling and deleting tasks to use namespace and gvk. name
shouldn't be used.

* chore: upgrade kubernetes version in CI

* chore: delete services by name

add log line when the wait for expression fails

* chore: address review comments
  • Loading branch information
adityathebe authored Apr 26, 2024
1 parent 95df56b commit 234e3cf
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 79 deletions.
4 changes: 3 additions & 1 deletion canary-checker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
# check.disabled.dns=false
# check.disabled.s3=false

# check.disabled.tcp=false
# check.disabled.tcp=false

# topology.runNow=true
228 changes: 167 additions & 61 deletions checks/kubernetes_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ import (
"time"

"github.com/flanksource/gomplate/v3"
"github.com/flanksource/is-healthy/pkg/health"
"github.com/flanksource/is-healthy/pkg/lua"
"github.com/samber/lo"
"github.com/sethvargo/go-retry"
"golang.org/x/sync/errgroup"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
cliresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

"github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/utils"
"github.com/flanksource/duty/types"
)
Expand All @@ -31,11 +35,13 @@ const (

resourceWaitTimeoutDefault = time.Minute * 10
resourceWaitIntervalDefault = time.Second * 5
waitForExprDefault = `dyn(resources).all(r, k8s.isHealthy(r))`

annotationkey = "flanksource.canary-checker/kubernetes-resource-canary"
waitForExprDefault = `dyn(resources).all(r, k8s.isReady(r))`
)

func resourceLabelKey(key string) string {
return fmt.Sprintf("canaries.flanksource.com/%s", key)
}

type KubernetesResourceChecker struct{}

func (c *KubernetesResourceChecker) Type() string {
Expand Down Expand Up @@ -67,41 +73,34 @@ func (c *KubernetesResourceChecker) Check(ctx *context.Context, check v1.Kuberne
}
}

if err := templateKubernetesResourceCheck(ctx.Canary.GetPersistedID(), ctx.Canary.GetCheckID(check.GetName()), &check); err != nil {
return results.Failf("templating error: %v", err)
}

for i := range check.StaticResources {
resource := check.StaticResources[i]

// annotate the resource with the canary ID so we can easily clean it up later
// TODO: see if this is actually needed
resource.SetAnnotations(map[string]string{annotationkey: ctx.Canary.ID()})
if err := ctx.Kommons().ApplyUnstructured(utils.Coalesce(resource.GetNamespace(), ctx.Namespace), &resource); err != nil {
return results.Failf("failed to apply static resource %s: %v", resource.GetName(), err)
}
}

// Keep track of all the created resources
// so we can delete them together instead of deleting them one by one.
var createdResources []unstructured.Unstructured
defer func() {
if err := deleteResources(ctx, check.WaitFor.Delete, createdResources...); err != nil {
if err := DeleteResources(ctx, check, false); err != nil {
results.Failf(err.Error())
}
}()

if check.ClearResources {
if err := deleteResources(ctx, true, check.Resources...); err != nil {
if err := DeleteResources(ctx, check, false); err != nil {
results.Failf(err.Error())
}
}

for i := range check.Resources {
resource := check.Resources[i]

resource.SetAnnotations(map[string]string{annotationkey: ctx.Canary.ID()})
if err := ctx.Kommons().ApplyUnstructured(utils.Coalesce(resource.GetNamespace(), ctx.Namespace), &resource); err != nil {
return results.Failf("failed to apply resource (%s/%s/%s): %v", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
}

createdResources = append(createdResources, resource)
}

if !check.WaitFor.Disable {
Expand Down Expand Up @@ -197,33 +196,40 @@ func (c *KubernetesResourceChecker) evalWaitFor(ctx *context.Context, check v1.K
retryErr := retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
ctx = _ctx.(*context.Context)
attempts++
ctx.Logger.V(5).Infof("waiting for %d resources to be ready. (attempts: %d)", check.TotalResources(), attempts)

var templateVar = map[string]any{}
if response, err := kClient.FetchResources(ctx, append(check.StaticResources, check.Resources...)...); err != nil {
ctx.Logger.V(4).Infof("waiting for %d resources to be in the desired state. (attempts: %d)", check.TotalResources(), attempts)

resourceObjs, err := kClient.FetchResources(ctx, append(check.StaticResources, check.Resources...)...)
if err != nil {
return fmt.Errorf("wait for evaluation. fetching resources: %w", err)
} else if len(response) != check.TotalResources() {
} else if len(resourceObjs) != check.TotalResources() {
var got []string
for _, r := range response {
for _, r := range resourceObjs {
got = append(got, fmt.Sprintf("%s/%s/%s", r.GetKind(), r.GetNamespace(), r.GetName()))
}

return fmt.Errorf("unxpected error. expected %d resources, got %d (%s)", check.TotalResources(), len(response), strings.Join(got, ","))
} else {
templateVar["resources"] = response
return fmt.Errorf("unxpected error. expected %d resources, got %d (%s)", check.TotalResources(), len(resourceObjs), strings.Join(got, ","))
}

waitForExpr := check.WaitFor.Expr
if waitForExpr == "" {
waitForExpr = waitForExprDefault
}

var templateVar = map[string]any{
"resources": resourceObjs,
}
if response, err := gomplate.RunTemplate(templateVar, gomplate.Template{Expression: waitForExpr}); err != nil {
return fmt.Errorf("wait for expression evaluation: %w", err)
} else if parsed, err := strconv.ParseBool(response); err != nil {
return fmt.Errorf("wait for expression (%q) didn't evaluate to a boolean", check.WaitFor.Expr)
} else if !parsed {
return retry.RetryableError(fmt.Errorf("not all resources are ready"))
for _, r := range resourceObjs {
rh, _ := health.GetResourceHealth(&r, lua.ResourceHealthOverrides{})
ctx.Logger.V(4).Infof("health for (namespace:%s gvk:%v) = %+v", r.GetNamespace(), r.GetObjectKind().GroupVersionKind(), rh)
}

return retry.RetryableError(fmt.Errorf("not all resources are in their desired state"))
}

return nil
Expand Down Expand Up @@ -282,45 +288,92 @@ func (c *KubernetesResourceChecker) validate(ctx *context.Context, check v1.Kube

maxResourcesAllowed := ctx.Properties().Int("checks.kubernetesResource.maxResources", defaultMaxResourcesAllowed)
if check.TotalResources() > maxResourcesAllowed {
return fmt.Errorf("too many resources (%d). only %d allowed", check.TotalResources(), maxResourcesAllowed)
return fmt.Errorf("too many resources (%d). only %d supported", check.TotalResources(), maxResourcesAllowed)
}

return nil
}

func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unstructured.Unstructured) error {
ctx.Logger.V(4).Infof("deleting %d resources", len(resources))
func DeleteResources(ctx *context.Context, check v1.KubernetesResourceCheck, deleteStatic bool) error {
ctx.Logger.V(4).Infof("deleting resources")

resources := check.Resources
if deleteStatic {
resources = append(resources, check.StaticResources...)
}

// cache dynamic clients
// firstly, cache dynamic clients by gvk
clients := sync.Map{}
for i := range resources {
resource := resources[i]

gvk := resource.GetObjectKind().GroupVersionKind()
if _, ok := clients.Load(gvk); ok {
continue // client already cached
}

namespace := utils.Coalesce(resource.GetNamespace(), ctx.Namespace)
rc, _, _, err := ctx.Kommons().GetDynamicClientFor(namespace, &resource)
if err != nil {
return fmt.Errorf("failed to get rest client for (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
}
clients.Store(gvk, rc)
}

eg, _ := errgroup.WithContext(ctx)
for i := range resources {
resource := resources[i]

eg.Go(func() error {
rc, err := ctx.Kommons().GetRestClient(resource)
if err != nil {
return fmt.Errorf("failed to get rest client for (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
}
gvk := resource.GetObjectKind().GroupVersionKind()
clients.Store(gvk, rc)
cachedClient, _ := clients.Load(resource.GetObjectKind().GroupVersionKind())
rc := cachedClient.(dynamic.ResourceInterface)

namespace := utils.Coalesce(resource.GetNamespace(), ctx.Namespace)
deleteOpt := &metav1.DeleteOptions{
deleteOpt := metav1.DeleteOptions{
GracePeriodSeconds: lo.ToPtr(int64(0)),
PropagationPolicy: lo.ToPtr(metav1.DeletePropagationOrphan),
PropagationPolicy: lo.ToPtr(lo.Ternary(check.WaitFor.Delete, metav1.DeletePropagationForeground, metav1.DeletePropagationBackground)),
}
if _, err := rc.DeleteWithOptions(namespace, resource.GetName(), deleteOpt); err != nil {
var statusErr *apiErrors.StatusError
if errors.As(err, &statusErr) {
switch statusErr.ErrStatus.Code {
case 404:
return nil

switch resource.GetKind() {
case "Namespace", "Service":
// NOTE: namespace cannot be deleted with `.DeleteCollection()`
//
// FIXME: Even though Service can be deleted with `.DeleteCollection()`
// it failed on the CI. It's probably due to an older kubernetes
// version we're using on the CI (v1.20.7).
// Delete it by name for now while we wait upgrade the kubernetes version
// on our CI.
if err := rc.Delete(ctx, resource.GetName(), deleteOpt); err != nil {
var statusErr *apiErrors.StatusError
if errors.As(err, &statusErr) {
switch statusErr.ErrStatus.Code {
case 404:
return nil
}
}

return fmt.Errorf("failed to delete resource (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
}

return fmt.Errorf("failed to delete resource (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
default:
labelSelector := fmt.Sprintf("%s=%s", resourceLabelKey("canary-id"), ctx.Canary.GetPersistedID())
if checkID := ctx.Canary.GetCheckID(check.GetName()); checkID != "" {
labelSelector += fmt.Sprintf(",%s=%s", resourceLabelKey("check-id"), checkID)
}
if !deleteStatic {
labelSelector += fmt.Sprintf(",!%s", resourceLabelKey("is-static"))
}

if err := rc.DeleteCollection(ctx, deleteOpt, metav1.ListOptions{LabelSelector: labelSelector}); err != nil {
var statusErr *apiErrors.StatusError
if errors.As(err, &statusErr) {
switch statusErr.ErrStatus.Code {
case 404:
return nil
}
}

return fmt.Errorf("failed to delete resource (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
}
}

return nil
Expand All @@ -330,7 +383,7 @@ func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unst
return err
}

if !waitForDelete {
if !check.WaitFor.Delete {
return nil
}

Expand All @@ -341,36 +394,89 @@ func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unst
select {
case <-ticker.C:
if len(resources) == 0 {
ctx.Logger.V(5).Infof("all the resources have been deleted")
ctx.Logger.V(4).Infof("all the resources have been deleted")
return nil
}

deleted := make(map[string]struct{})
deleted := make(map[schema.GroupVersionKind]struct{})
for _, resource := range resources {
cachedClient, _ := clients.Load(resource.GetObjectKind().GroupVersionKind())
rc := cachedClient.(*cliresource.Helper)
gvk := resource.GetObjectKind().GroupVersionKind()
cachedClient, _ := clients.Load(gvk)
rc := cachedClient.(dynamic.ResourceInterface)

if _, err := rc.Get(resource.GetNamespace(), resource.GetName()); err != nil {
if !apiErrors.IsNotFound(err) {
return fmt.Errorf("error getting resource (%s/%s/%s) while polling: %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
}
labelSelector := fmt.Sprintf("%s=%s", resourceLabelKey("canary-id"), ctx.Canary.GetPersistedID())
if checkID := ctx.Canary.GetCheckID(check.GetName()); checkID != "" {
labelSelector += fmt.Sprintf(",%s=%s", resourceLabelKey("check-id"), checkID)
}
if !deleteStatic {
labelSelector += fmt.Sprintf(",!%s", resourceLabelKey("is-static"))
}

deleted[string(resource.GetUID())] = struct{}{}
ctx.Logger.V(5).Infof("(%s/%s/%s) has been deleted", resource.GetKind(), resource.GetNamespace(), resource.GetName())
if listResponse, err := rc.List(ctx, metav1.ListOptions{LabelSelector: labelSelector}); err != nil {
return fmt.Errorf("error getting resource (%s/%s/%s) while polling: %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
} else if listResponse == nil || len(listResponse.Items) == 0 {
ctx.Logger.V(4).Infof("all (%v) have been deleted", gvk)
deleted[gvk] = struct{}{}
} else {
ctx.Logger.V(5).Infof("(%s/%s/%s) has not been deleted", resource.GetKind(), resource.GetNamespace(), resource.GetName())
ctx.Logger.V(4).Infof("all (%v) have not been deleted", gvk)
}
}

resources = lo.Filter(resources, func(item unstructured.Unstructured, _ int) bool {
_, ok := deleted[string(item.GetUID())]
_, ok := deleted[item.GetObjectKind().GroupVersionKind()]
return !ok
})

break

case <-ctx.Done():
return ctx.Err()
}
}
}

func templateKubernetesResourceCheck(canaryID, checkID string, check *v1.KubernetesResourceCheck) error {
templater := gomplate.StructTemplater{
ValueFunctions: true,
DelimSets: []gomplate.Delims{
{Left: "{{", Right: "}}"},
{Left: "$(", Right: ")"},
},
}

for i, r := range check.Resources {
namespace, kind := r.GetNamespace(), r.GetObjectKind().GroupVersionKind()
if err := templater.Walk(&r); err != nil {
return fmt.Errorf("error templating resource: %w", err)
}

if r.GetNamespace() != namespace || r.GetObjectKind().GroupVersionKind() != kind {
return fmt.Errorf("templating the namespace or group/version/kind of a resource is not allowed")
}
newLabels := collections.MergeMap(r.GetLabels(), map[string]string{
resourceLabelKey("canary-id"): canaryID,
resourceLabelKey("check-id"): checkID,
})
r.SetLabels(newLabels)
check.Resources[i] = r
}

for i, r := range check.StaticResources {
namespace, kind := r.GetNamespace(), r.GetKind()
if err := templater.Walk(&r); err != nil {
return fmt.Errorf("error templating resource: %w", err)
}

if r.GetNamespace() != namespace || r.GetKind() != kind {
return fmt.Errorf("templating the namespace or group/version/kind of a resource is not allowed")
}

newLabels := collections.MergeMap(r.GetLabels(), map[string]string{
resourceLabelKey("canary-id"): canaryID,
resourceLabelKey("check-id"): checkID,
resourceLabelKey("is-static"): "true",
})
r.SetLabels(newLabels)
check.StaticResources[i] = r
}

return nil
}
1 change: 1 addition & 0 deletions fixtures/k8s/kubernetes_resource_ingress_pass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ spec:
namespace: default
description: "deploy httpbin & check that it's accessible via ingress"
waitFor:
expr: 'dyn(resources).all(r, k8s.isReady(r))'
interval: 2s
timeout: 5m
staticResources:
Expand Down
2 changes: 1 addition & 1 deletion fixtures/k8s/kubernetes_resource_pod_exit_code_pass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
- name: hello-world
image: hello-world
waitFor:
expr: 'dyn(resources).all(r, k8s.getHealth(r).status == "Healthy")'
expr: 'dyn(resources).all(r, k8s.isHealthy(r))'
interval: "1s"
timeout: "20s"
checkRetries:
Expand Down
3 changes: 3 additions & 0 deletions fixtures/k8s/kubernetes_resource_service_fail.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ spec:
- name: invalid service configuration
namespace: default
description: "deploy httpbin & check that it's accessible via service"
waitFor:
interval: 5s
timeout: 30s
resources:
- apiVersion: v1
kind: Pod
Expand Down
Loading

0 comments on commit 234e3cf

Please sign in to comment.