Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure persistent volumes are detached before deleting node #1294

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kwok/charts/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rules:
resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"]
verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses", "csinodes"]
resources: ["storageclasses", "csinodes", "volumeattachments"]
verbs: ["get", "watch", "list"]
- apiGroups: ["apps"]
resources: ["daemonsets", "deployments", "replicasets", "statefulsets"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnode.NewController(cluster),
Expand Down
83 changes: 76 additions & 7 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"sigs.k8s.io/karpenter/pkg/utils/termination"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -38,28 +39,32 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/operator/injection"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
"sigs.k8s.io/karpenter/pkg/utils/pod"
"sigs.k8s.io/karpenter/pkg/utils/termination"
volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume"
)

// Controller for the resource
type Controller struct {
clock clock.Clock
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
terminator *terminator.Terminator
recorder events.Recorder
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller {
func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, terminator *terminator.Terminator, recorder events.Recorder) *Controller {
return &Controller{
clock: clk,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
terminator: terminator,
Expand Down Expand Up @@ -119,6 +124,18 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) {
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
Expand Down Expand Up @@ -158,6 +175,58 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.
return nil
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
}
// Filter out VolumeAttachments associated with not drain-able Pods
filteredVolumeAttachments, err := filterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments, c.clock)
if err != nil {
return false, err
}
return len(filteredVolumeAttachments) == 0, nil
}

// filterVolumeAttachments filters out storagev1.VolumeAttachments that should not block the termination
// of the passed corev1.Node
func filterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node, volumeAttachments []*storagev1.VolumeAttachment, clk clock.Clock) ([]*storagev1.VolumeAttachment, error) {
// No need to filter empty VolumeAttachments list
if len(volumeAttachments) == 0 {
return volumeAttachments, nil
}
// Create list of non-drain-able Pods associated with Node
pods, err := nodeutils.GetPods(ctx, kubeClient, node)
if err != nil {
return nil, err
}
unDrainablePods := lo.Reject(pods, func(p *corev1.Pod, _ int) bool {
return pod.IsDrainable(p, clk)
})
// Filter out VolumeAttachments associated with non-drain-able Pods
// Match on Pod -> PersistentVolumeClaim -> PersistentVolume Name <- VolumeAttachment
shouldFilterOutVolume := sets.New[string]()
for _, p := range unDrainablePods {
for _, v := range p.Spec.Volumes {
pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v)
if errors.IsNotFound(err) {
continue
}
if err != nil {
return nil, err
}
if pvc != nil {
shouldFilterOutVolume.Insert(pvc.Spec.VolumeName)
}
}
}
filteredVolumeAttachments := lo.Reject(volumeAttachments, func(v *storagev1.VolumeAttachment, _ int) bool {
pvName := v.Spec.Source.PersistentVolumeName
return pvName == nil || shouldFilterOutVolume.Has(*pvName)
})
return filteredVolumeAttachments, nil
}

func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error {
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer)
Expand Down
78 changes: 76 additions & 2 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx)))
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.VolumeAttachmentFieldIndexer(ctx)))

cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.Client, recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -763,6 +763,80 @@ var _ = Describe("Termination", func() {
ExpectSingletonReconciled(ctx, queue)
ExpectDeleted(ctx, env.Client, pod)
})
Context("VolumeAttachments", func() {
It("should wait for volume attachments", func() {
va := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "foo",
})
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va)
Expect(env.Client.Delete(ctx, node)).To(Succeed())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)

ExpectDeleted(ctx, env.Client, va)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should only wait for volume attachments associated with drainable pods", func() {
vaDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "foo",
})
vaNonDrainable := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "bar",
})
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
VolumeName: "bar",
})
pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: defaultOwnerRefs,
},
Tolerations: []corev1.Toleration{{
Key: v1.DisruptedTaintKey,
Operator: corev1.TolerationOpExists,
}},
PersistentVolumeClaims: []string{pvc.Name},
})
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, vaDrainable, vaNonDrainable, pod, pvc)
ExpectManualBinding(ctx, env.Client, pod, node)
Expect(env.Client.Delete(ctx, node)).To(Succeed())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)

ExpectDeleted(ctx, env.Client, vaDrainable)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should wait for volume attachments until the nodeclaim's termination grace period expires", func() {
va := test.VolumeAttachment(test.VolumeAttachmentOptions{
NodeName: node.Name,
VolumeName: "foo",
})
nodeClaim.Annotations = map[string]string{
v1.NodeClaimTerminationTimestampAnnotationKey: fakeClock.Now().Add(time.Minute).Format(time.RFC3339),
}
ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, va)
Expect(env.Client.Delete(ctx, node)).To(Succeed())

ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)

fakeClock.Step(5 * time.Minute)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
})
})
Context("Metrics", func() {
It("should fire the terminationSummary metric when deleting nodes", func() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/klog/v2"
"knative.dev/pkg/changeset"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -218,6 +219,9 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Name}
}), "failed to setup nodepool nodeclassref name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName}
}), "failed to setup volumeattachment indexer")

lo.Must0(mgr.AddHealthzCheck("healthz", healthz.Ping))
lo.Must0(mgr.AddReadyzCheck("readyz", healthz.Ping))
Expand Down
9 changes: 9 additions & 0 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/awslabs/operatorpkg/option"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -75,6 +76,14 @@ func NodeClaimFieldIndexer(ctx context.Context) func(cache.Cache) error {
}
}

func VolumeAttachmentFieldIndexer(ctx context.Context) func(cache.Cache) error {
return func(c cache.Cache) error {
return c.IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(obj client.Object) []string {
return []string{obj.(*storagev1.VolumeAttachment).Spec.NodeName}
})
}
}

func NewEnvironment(options ...option.Function[EnvironmentOptions]) *Environment {
opts := option.Resolve(options...)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
25 changes: 25 additions & 0 deletions pkg/test/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,28 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass {
VolumeBindingMode: options.VolumeBindingMode,
}
}

type VolumeAttachmentOptions struct {
metav1.ObjectMeta
NodeName string
VolumeName string
}

func VolumeAttachment(overrides ...VolumeAttachmentOptions) *storagev1.VolumeAttachment {
options := VolumeAttachmentOptions{}
for _, opts := range overrides {
if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil {
panic(fmt.Sprintf("Failed to merge options: %s", err))
}
}
return &storagev1.VolumeAttachment{
ObjectMeta: ObjectMeta(options.ObjectMeta),
Spec: storagev1.VolumeAttachmentSpec{
NodeName: options.NodeName,
Attacher: "fake-csi",
Source: storagev1.VolumeAttachmentSource{
PersistentVolumeName: lo.ToPtr(options.VolumeName),
},
},
}
}
10 changes: 10 additions & 0 deletions pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"

Expand Down Expand Up @@ -143,6 +144,15 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*cor
}), nil
}

// GetVolumeAttachments grabs all volumeAttachments associated with the passed node
func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *corev1.Node) ([]*storagev1.VolumeAttachment, error) {
AndrewSirenko marked this conversation as resolved.
Show resolved Hide resolved
var volumeAttachmentList storagev1.VolumeAttachmentList
if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
return nil, fmt.Errorf("listing volumeAttachments, %w", err)
}
return lo.ToSlicePtr(volumeAttachmentList.Items), nil
}

func GetCondition(n *corev1.Node, match corev1.NodeConditionType) corev1.NodeCondition {
for _, condition := range n.Status.Conditions {
if condition.Type == match {
Expand Down
14 changes: 10 additions & 4 deletions pkg/utils/pod/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,20 @@ func IsEvictable(pod *corev1.Pod) bool {

// IsWaitingEviction checks if this is a pod that we are waiting to be removed from the node by ensuring that the pod:
// - Isn't a terminal pod (Failed or Succeeded)
// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds
// - Can be drained by Karpenter (See IsDrainable)
func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool {
return !IsTerminal(pod) &&
IsDrainable(pod, clk)
}

// IsDrainable checks if a pod can be drained by Karpenter by ensuring that the pod:
// - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint
// - Isn't a pod that has been terminating past its terminationGracePeriodSeconds
// - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/)
// Note: pods with the `karpenter.sh/do-not-disrupt` annotation are included since node drain should stall until these pods are evicted or become terminal, even though Karpenter won't orchestrate the eviction.
func IsWaitingEviction(pod *corev1.Pod, clk clock.Clock) bool {
return !IsTerminal(pod) &&
func IsDrainable(pod *corev1.Pod, clk clock.Clock) bool {
return !ToleratesDisruptedNoScheduleTaint(pod) &&
!IsStuckTerminating(pod, clk) &&
!ToleratesDisruptedNoScheduleTaint(pod) &&
// Mirror pods cannot be deleted through the API server since they are created and managed by kubelet
// This means they are effectively read-only and can't be controlled by API server calls
// https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#drain
Expand Down
Loading