diff --git a/canary-checker.properties b/canary-checker.properties index c52672742..18c689bcc 100644 --- a/canary-checker.properties +++ b/canary-checker.properties @@ -2,4 +2,6 @@ # check.disabled.dns=false # check.disabled.s3=false -# check.disabled.tcp=false \ No newline at end of file +# check.disabled.tcp=false + +# topology.runNow=true diff --git a/checks/kubernetes_resource.go b/checks/kubernetes_resource.go index b42981425..716c7f37b 100644 --- a/checks/kubernetes_resource.go +++ b/checks/kubernetes_resource.go @@ -16,11 +16,12 @@ import ( apiErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - cliresource "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/dynamic" "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" + "github.com/flanksource/commons/collections" "github.com/flanksource/commons/utils" "github.com/flanksource/duty/types" ) @@ -32,10 +33,12 @@ const ( resourceWaitTimeoutDefault = time.Minute * 10 resourceWaitIntervalDefault = time.Second * 5 waitForExprDefault = `dyn(resources).all(r, k8s.isHealthy(r))` - - annotationkey = "flanksource.canary-checker/kubernetes-resource-canary" ) +func resourceLabelKey(key string) string { + return fmt.Sprintf("canaries.flanksource.com/%s", key) +} + type KubernetesResourceChecker struct{} func (c *KubernetesResourceChecker) Type() string { @@ -67,16 +70,12 @@ func (c *KubernetesResourceChecker) Check(ctx *context.Context, check v1.Kuberne } } - if err := c.templateResources(&check); err != nil { + if err := c.templateResources(ctx, &check); err != nil { return results.Failf("templating error: %v", err) } for i := range check.StaticResources { resource := check.StaticResources[i] - - // annotate the resource with the canary ID so we can easily clean it up later - // TODO: see if this is actually needed - resource.SetAnnotations(map[string]string{annotationkey: ctx.Canary.ID()}) if err := ctx.Kommons().ApplyUnstructured(utils.Coalesce(resource.GetNamespace(), ctx.Namespace), &resource); err != nil { return results.Failf("failed to apply static resource %s: %v", resource.GetName(), err) } @@ -86,20 +85,19 @@ func (c *KubernetesResourceChecker) Check(ctx *context.Context, check v1.Kuberne // so we can delete them together instead of deleting them one by one. var createdResources []unstructured.Unstructured defer func() { - if err := deleteResources(ctx, check.WaitFor.Delete, createdResources...); err != nil { + if err := DeleteResources(ctx, check, createdResources...); err != nil { results.Failf(err.Error()) } }() if check.ClearResources { - if err := deleteResources(ctx, true, check.Resources...); err != nil { + if err := DeleteResources(ctx, check, check.Resources...); err != nil { results.Failf(err.Error()) } } for i := range check.Resources { resource := check.Resources[i] - resource.SetAnnotations(map[string]string{annotationkey: ctx.Canary.ID()}) if err := ctx.Kommons().ApplyUnstructured(utils.Coalesce(resource.GetNamespace(), ctx.Namespace), &resource); err != nil { return results.Failf("failed to apply resource (%s/%s/%s): %v", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err) } @@ -285,36 +283,53 @@ func (c *KubernetesResourceChecker) validate(ctx *context.Context, check v1.Kube maxResourcesAllowed := ctx.Properties().Int("checks.kubernetesResource.maxResources", defaultMaxResourcesAllowed) if check.TotalResources() > maxResourcesAllowed { - return fmt.Errorf("too many resources (%d). only %d allowed", check.TotalResources(), maxResourcesAllowed) + return fmt.Errorf("too many resources (%d). only %d supported", check.TotalResources(), maxResourcesAllowed) } return nil } -func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unstructured.Unstructured) error { - ctx.Logger.V(4).Infof("deleting %d resources", len(resources)) +func DeleteResources(ctx *context.Context, check v1.KubernetesResourceCheck, resources ...unstructured.Unstructured) error { + ctx.Logger.V(4).Infof("deleting resources") - // cache dynamic clients + // firstly, cache dynamic clients by gvk clients := sync.Map{} + for i := range resources { + resource := resources[i] + + gvk := resource.GetObjectKind().GroupVersionKind() + if _, ok := clients.Load(gvk); ok { + continue // client already cached + } + + namespace := utils.Coalesce(resource.GetNamespace(), ctx.Namespace) + rc, _, _, err := ctx.Kommons().GetDynamicClientFor(namespace, &resource) + if err != nil { + return fmt.Errorf("failed to get rest client for (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err) + } + clients.Store(gvk, rc) + } eg, _ := errgroup.WithContext(ctx) for i := range resources { resource := resources[i] eg.Go(func() error { - rc, err := ctx.Kommons().GetRestClient(resource) - if err != nil { - return fmt.Errorf("failed to get rest client for (%s/%s/%s): %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err) - } - gvk := resource.GetObjectKind().GroupVersionKind() - clients.Store(gvk, rc) + cachedClient, _ := clients.Load(resource.GetObjectKind().GroupVersionKind()) + rc := cachedClient.(dynamic.ResourceInterface) - namespace := utils.Coalesce(resource.GetNamespace(), ctx.Namespace) - deleteOpt := &metav1.DeleteOptions{ + deleteOpt := metav1.DeleteOptions{ GracePeriodSeconds: lo.ToPtr(int64(0)), PropagationPolicy: lo.ToPtr(metav1.DeletePropagationOrphan), } - if _, err := rc.DeleteWithOptions(namespace, resource.GetName(), deleteOpt); err != nil { + listOpt := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s=%s,!%s", + resourceLabelKey("check-id"), ctx.Canary.GetCheckID(check.GetName()), + resourceLabelKey("canary-id"), ctx.Canary.GetPersistedID(), + resourceLabelKey("is-static"), + ), + } + if err := rc.DeleteCollection(ctx, deleteOpt, listOpt); err != nil { var statusErr *apiErrors.StatusError if errors.As(err, &statusErr) { switch statusErr.ErrStatus.Code { @@ -333,7 +348,7 @@ func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unst return err } - if !waitForDelete { + if !check.WaitFor.Delete { return nil } @@ -351,17 +366,17 @@ func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unst deleted := make(map[string]struct{}) for _, resource := range resources { cachedClient, _ := clients.Load(resource.GetObjectKind().GroupVersionKind()) - rc := cachedClient.(*cliresource.Helper) + rc := cachedClient.(dynamic.ResourceInterface) - if _, err := rc.Get(resource.GetNamespace(), resource.GetName()); err != nil { + if _, err := rc.Get(ctx, resource.GetName(), metav1.GetOptions{}); err != nil { if !apiErrors.IsNotFound(err) { return fmt.Errorf("error getting resource (%s/%s/%s) while polling: %w", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err) } deleted[string(resource.GetUID())] = struct{}{} - ctx.Logger.V(5).Infof("(%s/%s/%s) has been deleted", resource.GetKind(), resource.GetNamespace(), resource.GetName()) + ctx.Logger.V(4).Infof("(%s/%s/%s) has been deleted", resource.GetKind(), resource.GetNamespace(), resource.GetName()) } else { - ctx.Logger.V(5).Infof("(%s/%s/%s) has not been deleted", resource.GetKind(), resource.GetNamespace(), resource.GetName()) + ctx.Logger.V(4).Infof("(%s/%s/%s) has not been deleted", resource.GetKind(), resource.GetNamespace(), resource.GetName()) } } @@ -378,12 +393,7 @@ func deleteResources(ctx *context.Context, waitForDelete bool, resources ...unst } } -func (c *KubernetesResourceChecker) templateResources(check *v1.KubernetesResourceCheck) error { - // TODO: Find a better way to delete the resources - // because the templating can generate a random resource name. - // If the program crashes in the midst of the check, then we do not have - // a pointer back to the generated resources to clear them. - +func (c *KubernetesResourceChecker) templateResources(ctx *context.Context, check *v1.KubernetesResourceCheck) error { templater := gomplate.StructTemplater{ ValueFunctions: true, DelimSets: []gomplate.Delims{ @@ -393,24 +403,37 @@ func (c *KubernetesResourceChecker) templateResources(check *v1.KubernetesResour } for i, r := range check.Resources { + namespace, kind := r.GetNamespace(), r.GetKind() if err := templater.Walk(&r); err != nil { return fmt.Errorf("error templating resource: %w", err) } + + if r.GetNamespace() != namespace || r.GetKind() != kind { + return fmt.Errorf("templating the namespace/kind of a resource is not allowed") + } + + r.SetLabels(collections.MergeMap(r.GetLabels(), map[string]string{ + resourceLabelKey("canary-id"): ctx.Canary.GetPersistedID(), + resourceLabelKey("check-id"): ctx.Canary.GetCheckID(check.GetName()), + })) check.Resources[i] = r } - // For the reasons mentioned above, we do not allow changing the - // name, namespace & kind of static resources. for i, r := range check.StaticResources { - name, namespace, kind := r.GetName(), r.GetNamespace(), r.GetKind() + namespace, kind := r.GetNamespace(), r.GetKind() if err := templater.Walk(&r); err != nil { return fmt.Errorf("error templating resource: %w", err) } - if r.GetName() != name || r.GetNamespace() != namespace || r.GetKind() != kind { - return fmt.Errorf("templating the name/namespace/kind of a static resource is not allowed") + if r.GetNamespace() != namespace || r.GetKind() != kind { + return fmt.Errorf("templating the namespace/kind of a static resource is not allowed") } + r.SetLabels(collections.MergeMap(r.GetLabels(), map[string]string{ + resourceLabelKey("canary-id"): ctx.Canary.GetPersistedID(), + resourceLabelKey("check-id"): ctx.Canary.GetCheckID(check.GetName()), + resourceLabelKey("is-static"): "true", + })) check.StaticResources[i] = r }