From 3e0400789d3ab95f1f9e9af672d3c64ae508052d Mon Sep 17 00:00:00 2001 From: Cameron McAvoy Date: Thu, 27 Jun 2024 16:49:55 -0500 Subject: [PATCH] Expose nodeclaim disruption candidates through nodeclaim condition, add eviction message from condition --- pkg/apis/v1beta1/nodeclaim_status.go | 13 ++++--- pkg/controllers/disruption/controller.go | 27 +++++++++++++ pkg/controllers/disruption/suite_test.go | 7 ++++ .../termination/terminator/events/events.go | 4 +- .../node/termination/terminator/eviction.go | 38 ++++++++++++++++++- pkg/events/suite_test.go | 18 ++++----- pkg/operator/operator.go | 3 ++ 7 files changed, 91 insertions(+), 19 deletions(-) diff --git a/pkg/apis/v1beta1/nodeclaim_status.go b/pkg/apis/v1beta1/nodeclaim_status.go index 28d2362bd1..f9cdc10812 100644 --- a/pkg/apis/v1beta1/nodeclaim_status.go +++ b/pkg/apis/v1beta1/nodeclaim_status.go @@ -22,12 +22,13 @@ import ( ) const ( - ConditionTypeLaunched = "Launched" - ConditionTypeRegistered = "Registered" - ConditionTypeInitialized = "Initialized" - ConditionTypeEmpty = "Empty" - ConditionTypeDrifted = "Drifted" - ConditionTypeTerminating = "Terminating" + ConditionTypeLaunched = "Launched" + ConditionTypeRegistered = "Registered" + ConditionTypeInitialized = "Initialized" + ConditionTypeEmpty = "Empty" + ConditionTypeDrifted = "Drifted" + ConditionTypeTerminating = "Terminating" + ConditionTypeDisruptionCandidate = "DisruptionCandidate" ) // NodeClaimStatus defines the observed state of NodeClaim diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 4f1417887b..5e0516d549 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -222,9 +222,21 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, // We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion c.cluster.MarkForDeletion(providerIDs...) + // Set the status of the nodeclaims to reflect that they are disruption candidates + err = multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error { + candidate.NodeClaim.StatusConditions().SetTrueWithReason(v1beta1.ConditionTypeDisruptionCandidate, v1beta1.ConditionTypeDisruptionCandidate, evictionReason(m, candidate.NodeClaim)) + return c.kubeClient.Status().Update(ctx, candidate.NodeClaim) + })...) + if err != nil { + return multierr.Append(fmt.Errorf("updating nodeclaim status: %w", err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + } + if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames, lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Type(), m.ConsolidationType())); err != nil { c.cluster.UnmarkForDeletion(providerIDs...) + err = multierr.Combine(err, multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error { + return multierr.Append(candidate.NodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeDisruptionCandidate), c.kubeClient.Status().Update(ctx, candidate.NodeClaim)) + })...)) return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))) } @@ -251,6 +263,21 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, return nil } +func evictionReason(m Method, nodeClaim *v1beta1.NodeClaim) string { + switch m.Type() { + case metrics.DriftReason: + return fmt.Sprintf("node %s/%s drifted", nodeClaim.Name, nodeClaim.Status.NodeName) + case metrics.ExpirationReason: + return fmt.Sprintf("node %s/%s ttl expired", nodeClaim.Name, nodeClaim.Status.NodeName) + case metrics.EmptinessReason: + return fmt.Sprintf("node %s/%s was empty", nodeClaim.Name, nodeClaim.Status.NodeName) + case metrics.ConsolidationReason: + return fmt.Sprintf("node %s/%s was %s node consolidated", nodeClaim.Name, nodeClaim.Status.NodeName, m.ConsolidationType()) + default: + return m.Type() + } +} + // createReplacementNodeClaims creates replacement NodeClaims func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method, cmd Command) ([]string, error) { reason := fmt.Sprintf("%s/%s", m.Type(), cmd.Action()) diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index d5ac0cdf64..c6222a0723 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -542,6 +542,7 @@ var _ = Describe("Disruption Taints", func() { wg.Wait() node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).ToNot(ContainElement(v1beta1.DisruptionNoScheduleTaint)) + Expect(nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)).To(BeNil()) }) It("should add and remove taints from NodeClaims that fail to disrupt", func() { nodePool.Spec.Disruption.ConsolidationPolicy = v1beta1.ConsolidationPolicyWhenUnderutilized @@ -578,6 +579,11 @@ var _ = Describe("Disruption Taints", func() { } node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).To(ContainElement(v1beta1.DisruptionNoScheduleTaint)) + existingNodeClaim := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1beta1.NodeClaim, _ int) bool { + return nc.Name == nodeClaim.Name + }) + Expect(existingNodeClaim[0].StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)).ToNot(BeNil()) + Expect(existingNodeClaim[0].StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate).IsTrue()).To(BeTrue()) createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1beta1.NodeClaim, _ int) bool { return nc.Name == nodeClaim.Name @@ -594,6 +600,7 @@ var _ = Describe("Disruption Taints", func() { node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Spec.Taints).ToNot(ContainElement(v1beta1.DisruptionNoScheduleTaint)) + Expect(nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)).To(BeNil()) }) }) diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index 84c4b07bc0..80a76edd42 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -24,12 +24,12 @@ import ( "sigs.k8s.io/karpenter/pkg/events" ) -func EvictPod(pod *v1.Pod) events.Event { +func EvictPod(pod *v1.Pod, message string) events.Event { return events.Event{ InvolvedObject: pod, Type: v1.EventTypeNormal, Reason: "Evicted", - Message: "Evicted pod", + Message: "Evicted pod: " + message, DedupeValues: []string{pod.Name}, } } diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/terminator/eviction.go index e195a99627..2a5608620a 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/terminator/eviction.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" v1 "k8s.io/api/core/v1" @@ -68,13 +70,15 @@ func IsNodeDrainError(err error) bool { type QueueKey struct { types.NamespacedName - UID types.UID + UID types.UID + nodeName string } func NewQueueKey(pod *v1.Pod) QueueKey { return QueueKey{ NamespacedName: client.ObjectKeyFromObject(pod), UID: pod.UID, + nodeName: pod.Spec.NodeName, } } @@ -159,6 +163,10 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { // Evict returns true if successful eviction call, and false if not an eviction-related error func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name))) + evictionMessage, err := evictionReason(ctx, key, q.kubeClient) + if err != nil { + log.FromContext(ctx).Error(err, "failed looking up pod eviction reason") + } if err := q.kubeClient.SubResource("eviction").Create(ctx, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}}, &policyv1.Eviction{ @@ -187,10 +195,36 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { log.FromContext(ctx).Error(err, "failed evicting pod") return false } - q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}})) + + q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage)) return true } +func getNodeClaims(ctx context.Context, key QueueKey, kubeClient client.Client) ([]*v1beta1.NodeClaim, error) { + nodeClaimList := &v1beta1.NodeClaimList{} + if err := kubeClient.List(ctx, nodeClaimList, client.MatchingFields{"status.nodeName": key.nodeName}); err != nil { + return nil, fmt.Errorf("listing nodeClaims, %w", err) + } + return lo.ToSlicePtr(nodeClaimList.Items), nil +} + +func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) { + nodeClaims, err := getNodeClaims(ctx, key, kubeClient) + if err != nil { + return "", err + } + for _, nodeClaim := range nodeClaims { + terminationCondition := nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate) + if terminationCondition.IsTrue() { + return terminationCondition.Message, nil + } + if !nodeClaim.DeletionTimestamp.IsZero() { + return fmt.Sprintf("node %s/%s is marked for deletion", nodeClaim.Name, nodeClaim.Status.NodeName), nil + } + } + return "", nil +} + func (q *Queue) Reset() { q.mu.Lock() defer q.mu.Unlock() diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index bd5d6bda7d..7539071f49 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -88,8 +88,8 @@ var _ = Describe("Event Creation", func() { Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), "")) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) }) It("should create a PodFailedToSchedule event", func() { eventRecorder.Publish(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf(""))) @@ -105,31 +105,31 @@ var _ = Describe("Dedupe", func() { It("should only create a single event when many events are created quickly", func() { pod := PodWithUID() for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(pod)) + eventRecorder.Publish(terminatorevents.EvictPod(pod, "")) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) }) It("should allow the dedupe timeout to be overridden", func() { pod := PodWithUID() - evt := terminatorevents.EvictPod(pod) + evt := terminatorevents.EvictPod(pod, "") evt.DedupeTimeout = time.Second * 2 // Generate a set of events within the dedupe timeout for i := 0; i < 10; i++ { eventRecorder.Publish(evt) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1)) // Wait until after the overridden dedupe timeout time.Sleep(time.Second * 3) eventRecorder.Publish(evt) - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(2)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(2)) }) It("should allow events with different entities to be created", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) + eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), "")) } - Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(100)) + Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(100)) }) }) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index acde5ab8b6..7fe9dbb1ea 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -194,6 +194,9 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "status.providerID", func(o client.Object) []string { return []string{o.(*v1beta1.NodeClaim).Status.ProviderID} }), "failed to setup nodeclaim provider id indexer") + lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "status.nodeName", func(o client.Object) []string { + return []string{o.(*v1beta1.NodeClaim).Status.NodeName} + }), "failed to setup nodeclaim nodeName indexer") lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.apiVersion", func(o client.Object) []string { return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.APIVersion} }), "failed to setup nodeclaim nodeclassref apiversion indexer")