From c74f7823e34ece5270c8347011e4760f7ce936ed Mon Sep 17 00:00:00 2001 From: Drew Sirenko <68304519+AndrewSirenko@users.noreply.github.com> Date: Fri, 5 Jul 2024 15:58:15 -0400 Subject: [PATCH] fix: Ensure volumes are detached before deleting node --- kwok/charts/templates/clusterrole.yaml | 2 +- pkg/controllers/controllers.go | 2 +- .../node/termination/controller.go | 83 +++++++++++++++++-- .../node/termination/suite_test.go | 4 +- pkg/operator/operator.go | 4 + pkg/test/environment.go | 9 ++ pkg/utils/node/node.go | 12 +++ pkg/utils/pod/scheduling.go | 14 +++- 8 files changed, 115 insertions(+), 15 deletions(-) diff --git a/kwok/charts/templates/clusterrole.yaml b/kwok/charts/templates/clusterrole.yaml index 199951021b..5e1caa370a 100644 --- a/kwok/charts/templates/clusterrole.yaml +++ b/kwok/charts/templates/clusterrole.yaml @@ -36,7 +36,7 @@ rules: resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"] verbs: ["get", "list", "watch"] - apiGroups: ["storage.k8s.io"] - resources: ["storageclasses", "csinodes"] + resources: ["storageclasses", "csinodes", "volumeattachments"] verbs: ["get", "watch", "list"] - apiGroups: ["apps"] resources: ["daemonsets", "deployments", "replicasets", "statefulsets"] diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index e85c777ebc..d425eca64f 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -77,7 +77,7 @@ func NewControllers( informer.NewPodController(kubeClient, cluster), informer.NewNodePoolController(kubeClient, cluster), informer.NewNodeClaimController(kubeClient, cluster), - termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), + termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), metricspod.NewController(kubeClient), metricsnodepool.NewController(kubeClient), metricsnode.NewController(cluster), diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index fdb01340d6..289d9dd58d 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -21,15 +21,16 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/api/errors" - - "sigs.k8s.io/karpenter/pkg/utils/termination" - "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -38,19 +39,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/karpenter/pkg/operator/injection" - v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" + "sigs.k8s.io/karpenter/pkg/operator/injection" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + "sigs.k8s.io/karpenter/pkg/utils/pod" + "sigs.k8s.io/karpenter/pkg/utils/termination" + volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume" ) // Controller for the resource type Controller struct { + clock clock.Clock kubeClient client.Client cloudProvider cloudprovider.CloudProvider terminator *terminator.Terminator @@ -58,8 +62,9 @@ type Controller struct { } // NewController constructs a controller instance -func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller { +func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller { return &Controller{ + clock: clk, kubeClient: kubeClient, cloudProvider: cloudProvider, terminator: terminator, @@ -119,6 +124,18 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } + // In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait + // for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer. + // However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting. + if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) { + areVolumesDetached, err := c.ensureVolumesDetached(ctx, node) + if err != nil { + return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err) + } + if !areVolumesDetached { + return reconcile.Result{RequeueAfter: 1 * time.Second}, nil + } + } nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient) if err != nil { return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) @@ -158,6 +175,58 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1. return nil } +func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) { + volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node) + if err != nil { + return false, err + } + // Filter out VolumeAttachments associated with not drain-able Pods + filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock) + if err != nil { + return false, err + } + return len(filteredVolumeAttachments) == 0, nil +} + +// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination +// of the passed corev1.Node +func filterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node, volumeAttachments []*storagev1.VolumeAttachment, clk clock.Clock) ([]*storagev1.VolumeAttachment, error) { + // No need to filter empty VolumeAttachments list + if len(volumeAttachments) == 0 { + return volumeAttachments, nil + } + // Create list of non-drain-able Pods associated with Node + pods, err := nodeutils.GetPods(ctx, kubeClient, node) + if err != nil { + return nil, err + } + unDrainablePods := lo.Reject(pods, func(p *corev1.Pod, _ int) bool { + return pod.IsDrainable(p, clk) + }) + // Filter out VolumeAttachments associated with non-drain-able Pods + // Match on Pod -> PersistentVolumeClaim -> PersistentVolume Name <- VolumeAttachment + shouldFilterOutVolume := sets.New[string]() + for _, p := range unDrainablePods { + for _, v := range p.Spec.Volumes { + pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v) + if errors.IsNotFound(err) { + continue + } + if err != nil { + return nil, err + } + if pvc != nil { + shouldFilterOutVolume.Insert(pvc.Spec.VolumeName) + } + } + } + filteredVolumeAttachments := lo.Reject(volumeAttachments, func(v *storagev1.VolumeAttachment, _ int) bool { + pvName := v.Spec.Source.PersistentVolumeName + return pvName == nil || shouldFilterOutVolume.Has(*pvName) + }) + return filteredVolumeAttachments, nil +} + func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error { stored := n.DeepCopy() controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer) diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index 1b760f5e2f..61861fa1cb 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -64,12 +64,12 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) - env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx))) + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.VolumeAttachmentFieldIndexer(ctx))) cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() queue = terminator.NewQueue(env.Client, recorder) - terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) + terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) }) var _ = AfterSuite(func() { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index a036ee4528..e238a3d200 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/client_golang/prometheus" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" "k8s.io/klog/v2" "knative.dev/pkg/changeset" ctrl "sigs.k8s.io/controller-runtime" @@ -218,6 +219,9 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string { return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Name} }), "failed to setup nodepool nodeclassref name indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string { + return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName} + }), "failed to setup volumeattachment indexer") lo.Must0(mgr.AddHealthzCheck("healthz", healthz.Ping)) lo.Must0(mgr.AddReadyzCheck("readyz", healthz.Ping)) diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 715e92e8ce..f48f8ccbf2 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -25,6 +25,7 @@ import ( "github.com/awslabs/operatorpkg/option" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/kubernetes" @@ -75,6 +76,14 @@ func NodeClaimFieldIndexer(ctx context.Context) func(cache.Cache) error { } } +func VolumeAttachmentFieldIndexer(ctx context.Context) func(cache.Cache) error { + return func(c cache.Cache) error { + return c.IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(obj client.Object) []string { + return []string{obj.(*storagev1.VolumeAttachment).Spec.NodeName} + }) + } +} + func NewEnvironment(options ...option.Function[EnvironmentOptions]) *Environment { opts := option.Resolve(options...) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/utils/node/node.go b/pkg/utils/node/node.go index 34b8d97086..685e990230 100644 --- a/pkg/utils/node/node.go +++ b/pkg/utils/node/node.go @@ -22,6 +22,7 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -143,6 +144,17 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*cor }), nil } +// GetVolumeAttachments grabs all volumeAttachments associated with the passed node +func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) { + var volumeAttachmentList storagev1.VolumeAttachmentList + if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { + return nil, fmt.Errorf("listing volumeattachments, %w", err) + } + return lo.FilterMap(volumeAttachmentList.Items, func(v storagev1.VolumeAttachment, _ int) (*storagev1.VolumeAttachment, bool) { + return &v, v.Spec.NodeName == node.Name + }), nil +} + func GetCondition(n *corev1.Node, match corev1.NodeConditionType) corev1.NodeCondition { for _, condition := range n.Status.Conditions { if condition.Type == match { diff --git a/pkg/utils/pod/scheduling.go b/pkg/utils/pod/scheduling.go index a092185ca2..139df8e634 100644 --- a/pkg/utils/pod/scheduling.go +++ b/pkg/utils/pod/scheduling.go @@ -62,14 +62,20 @@ func IsEvictable(pod *corev1.Pod) bool { // IsWaitingEviction checks if this is a pod that we are waiting to be removed from the node by ensuring that the pod: // - Isn't a terminal pod (Failed or Succeeded) -// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds +// - Can be drained by Karpenter (See IsDrainable) +func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool { + return !IsTerminal(pod) && + IsDrainable(pod, clk) +} + +// IsDrainable checks if a pod can be drained by Karpenter by ensuring that the pod: // - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint +// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds // - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/) // Note: pods with the `karpenter.sh/do-not-disrupt` annotation are included since node drain should stall until these pods are evicted or become terminal, even though Karpenter won't orchestrate the eviction. -func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool { - return !IsTerminal(pod) && +func IsDrainable(pod *corev1.Pod, clk clock.Clock) bool { + return !ToleratesDisruptedNoScheduleTaint(pod) && !IsStuckTerminating(pod, clk) && - !ToleratesDisruptedNoScheduleTaint(pod) && // Mirror pods cannot be deleted through the API server since they are created and managed by kubelet // This means they are effectively read-only and can't be controlled by API server calls // https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#drain