Skip to content

Commit

Permalink
Expose nodeclaim disruption candidates through nodeclaim condition, a…
Browse files Browse the repository at this point in the history
…dd eviction message from condition
  • Loading branch information
cnmcavoy committed Jul 12, 2024
1 parent 56f0708 commit ef86002
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ConditionTypeDrifted = "Drifted"
ConditionTypeTerminating = "Terminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
ConditionTypeDisruptionCandidate = "DisruptionCandidate"
)

// NodeClaimStatus defines the observed state of NodeClaim
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,21 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

// Set the status of the nodeclaims to reflect that they are disruption candidates
err = multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error {
candidate.NodeClaim.StatusConditions().SetTrueWithReason(v1.ConditionTypeDisruptionCandidate, v1.ConditionTypeDisruptionCandidate, m.EvictionReason(candidate.NodeClaim))
return c.kubeClient.Status().Update(ctx, candidate.NodeClaim)
})...)
if err != nil {
return multierr.Append(fmt.Errorf("updating nodeclaim status: %w", err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}

if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Type(), m.ConsolidationType())); err != nil {
c.cluster.UnmarkForDeletion(providerIDs...)
err = multierr.Combine(err, multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error {
return multierr.Append(candidate.NodeClaim.StatusConditions().Clear(v1.ConditionTypeDisruptionCandidate), c.kubeClient.Status().Update(ctx, candidate.NodeClaim))
})...))
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)))
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package disruption
import (
"context"
"errors"
"fmt"
"sort"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -120,3 +121,7 @@ func (d *Drift) Type() string {
func (d *Drift) ConsolidationType() string {
return ""
}

func (d *Drift) EvictionReason(nodeClaim *v1.NodeClaim) string {
return fmt.Sprintf("node %s drifted", nodeClaim.Status.NodeName)
}
4 changes: 4 additions & 0 deletions pkg/controllers/disruption/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ func (e *Emptiness) Type() string {
func (e *Emptiness) ConsolidationType() string {
return ""
}

func (e *Emptiness) EvictionReason(nodeClaim *v1.NodeClaim) string {
return fmt.Sprintf("node %s was empty", nodeClaim.Status.NodeName)
}
4 changes: 4 additions & 0 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ func (c *EmptyNodeConsolidation) Type() string {
func (c *EmptyNodeConsolidation) ConsolidationType() string {
return "empty"
}

func (c *EmptyNodeConsolidation) EvictionReason(nodeClaim *v1.NodeClaim) string {
return fmt.Sprintf("node %s was %s node consolidated", nodeClaim.Status.NodeName, c.ConsolidationType())
}
4 changes: 4 additions & 0 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,7 @@ func (m *MultiNodeConsolidation) Type() string {
func (m *MultiNodeConsolidation) ConsolidationType() string {
return "multi"
}

func (m *MultiNodeConsolidation) EvictionReason(nodeClaim *v1.NodeClaim) string {
return fmt.Sprintf("node %s was %s node consolidated", nodeClaim.Status.NodeName, m.ConsolidationType())
}
4 changes: 4 additions & 0 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ func (s *SingleNodeConsolidation) Type() string {
func (s *SingleNodeConsolidation) ConsolidationType() string {
return "single"
}

func (s *SingleNodeConsolidation) EvictionReason(nodeClaim *v1.NodeClaim) string {
return fmt.Sprintf("node %s was %s node consolidated", nodeClaim.Status.NodeName, s.ConsolidationType())
}
7 changes: 7 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ var _ = Describe("Disruption Taints", func() {
wg.Wait()
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptionNoScheduleTaint))
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)).To(BeNil())
})
It("should add and remove taints from NodeClaims that fail to disrupt", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = v1.ConsolidationPolicyWhenUnderutilized
Expand Down Expand Up @@ -580,6 +581,11 @@ var _ = Describe("Disruption Taints", func() {
}
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).To(ContainElement(v1.DisruptionNoScheduleTaint))
existingNodeClaim := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Name == nodeClaim.Name
})
Expect(existingNodeClaim[0].StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)).ToNot(BeNil())
Expect(existingNodeClaim[0].StatusConditions().Get(v1.ConditionTypeDisruptionCandidate).IsTrue()).To(BeTrue())

createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim, _ int) bool {
return nc.Name == nodeClaim.Name
Expand All @@ -596,6 +602,7 @@ var _ = Describe("Disruption Taints", func() {

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptionNoScheduleTaint))
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)).To(BeNil())
})
})

Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Method interface {
ComputeCommand(context.Context, map[string]map[v1.DisruptionReason]int, ...*Candidate) (Command, scheduling.Results, error)
Type() string
ConsolidationType() string
EvictionReason(nodeClaim *v1.NodeClaim) string
}

type CandidateFilter func(context.Context, *Candidate) bool
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

func EvictPod(pod *corev1.Pod) events.Event {
func EvictPod(pod *corev1.Pod, message string) events.Event {
return events.Event{
InvolvedObject: pod,
Type: corev1.EventTypeNormal,
Reason: "Evicted",
Message: "Evicted pod",
Message: "Evicted pod: " + message,
DedupeValues: []string{pod.Name},
}
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand All @@ -39,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/operator/injection"

terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events"
Expand Down Expand Up @@ -68,13 +71,15 @@ func IsNodeDrainError(err error) bool {

type QueueKey struct {
types.NamespacedName
UID types.UID
UID types.UID
nodeName string
}

func NewQueueKey(pod *corev1.Pod) QueueKey {
return QueueKey{
NamespacedName: client.ObjectKeyFromObject(pod),
UID: pod.UID,
nodeName: pod.Spec.NodeName,
}
}

Expand Down Expand Up @@ -159,6 +164,10 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Evict returns true if successful eviction call, and false if not an eviction-related error
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name)))
evictionMessage, err := evictionReason(ctx, key, q.kubeClient)
if err != nil {
log.FromContext(ctx).Error(err, "failed looking up pod eviction reason")
}
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
Expand Down Expand Up @@ -187,10 +196,35 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
}
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage))
return true
}

func getNodeClaims(ctx context.Context, key QueueKey, kubeClient client.Client) ([]*v1beta1.NodeClaim, error) {
nodeClaimList := &v1beta1.NodeClaimList{}
if err := kubeClient.List(ctx, nodeClaimList, client.MatchingFields{"status.nodeName": key.nodeName}); err != nil {
return nil, fmt.Errorf("listing nodeClaims, %w", err)
}
return lo.ToSlicePtr(nodeClaimList.Items), nil
}

func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) {
nodeClaims, err := getNodeClaims(ctx, key, kubeClient)
if err != nil {
return "", err
}
for _, nodeClaim := range nodeClaims {
terminationCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeDisruptionCandidate)
if terminationCondition.IsTrue() {
return terminationCondition.Message, nil
}
if !nodeClaim.DeletionTimestamp.IsZero() {
return fmt.Sprintf("node %s/%s is marked for deletion", nodeClaim.Name, nodeClaim.Status.NodeName), nil
}
}
return "", nil
}

func (q *Queue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
Expand Down
18 changes: 9 additions & 9 deletions pkg/events/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ var _ = Describe("Event Creation", func() {
Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1))
})
It("should create a EvictPod event", func() {
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID()))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1))
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), ""))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1))
})
It("should create a PodFailedToSchedule event", func() {
eventRecorder.Publish(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf("")))
Expand All @@ -105,31 +105,31 @@ var _ = Describe("Dedupe", func() {
It("should only create a single event when many events are created quickly", func() {
pod := PodWithUID()
for i := 0; i < 100; i++ {
eventRecorder.Publish(terminatorevents.EvictPod(pod))
eventRecorder.Publish(terminatorevents.EvictPod(pod, ""))
}
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1))
})
It("should allow the dedupe timeout to be overridden", func() {
pod := PodWithUID()
evt := terminatorevents.EvictPod(pod)
evt := terminatorevents.EvictPod(pod, "")
evt.DedupeTimeout = time.Second * 2

// Generate a set of events within the dedupe timeout
for i := 0; i < 10; i++ {
eventRecorder.Publish(evt)
}
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1))

// Wait until after the overridden dedupe timeout
time.Sleep(time.Second * 3)
eventRecorder.Publish(evt)
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(2))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(2))
})
It("should allow events with different entities to be created", func() {
for i := 0; i < 100; i++ {
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID()))
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), ""))
}
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(100))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(100))
})
})

Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http/pprof"
"runtime"
"runtime/debug"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sync"
"time"

Expand Down Expand Up @@ -200,6 +201,9 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.nodeName", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Status.NodeName}
}), "failed to setup nodeclaim nodeName indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.group", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Group}
}), "failed to setup nodeclaim nodeclassref apiversion indexer")
Expand Down

0 comments on commit ef86002

Please sign in to comment.