Skip to content

Commit

Permalink
Expose nodeclaim disruption candidates through nodeclaim condition, a…
Browse files Browse the repository at this point in the history
…dd eviction message from condition
  • Loading branch information
cnmcavoy committed Jul 1, 2024
1 parent b445b7f commit 3e04007
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 19 deletions.
13 changes: 7 additions & 6 deletions pkg/apis/v1beta1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
)

const (
ConditionTypeLaunched = "Launched"
ConditionTypeRegistered = "Registered"
ConditionTypeInitialized = "Initialized"
ConditionTypeEmpty = "Empty"
ConditionTypeDrifted = "Drifted"
ConditionTypeTerminating = "Terminating"
ConditionTypeLaunched = "Launched"
ConditionTypeRegistered = "Registered"
ConditionTypeInitialized = "Initialized"
ConditionTypeEmpty = "Empty"
ConditionTypeDrifted = "Drifted"
ConditionTypeTerminating = "Terminating"
ConditionTypeDisruptionCandidate = "DisruptionCandidate"
)

// NodeClaimStatus defines the observed state of NodeClaim
Expand Down
27 changes: 27 additions & 0 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,21 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

// Set the status of the nodeclaims to reflect that they are disruption candidates
err = multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error {
candidate.NodeClaim.StatusConditions().SetTrueWithReason(v1beta1.ConditionTypeDisruptionCandidate, v1beta1.ConditionTypeDisruptionCandidate, evictionReason(m, candidate.NodeClaim))
return c.kubeClient.Status().Update(ctx, candidate.NodeClaim)
})...)
if err != nil {
return multierr.Append(fmt.Errorf("updating nodeclaim status: %w", err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}

if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Type(), m.ConsolidationType())); err != nil {
c.cluster.UnmarkForDeletion(providerIDs...)
err = multierr.Combine(err, multierr.Combine(lo.Map(cmd.candidates, func(candidate *Candidate, _ int) error {
return multierr.Append(candidate.NodeClaim.StatusConditions().Clear(v1beta1.ConditionTypeDisruptionCandidate), c.kubeClient.Status().Update(ctx, candidate.NodeClaim))
})...))
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)))
}

Expand All @@ -251,6 +263,21 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
return nil
}

func evictionReason(m Method, nodeClaim *v1beta1.NodeClaim) string {
switch m.Type() {
case metrics.DriftReason:
return fmt.Sprintf("node %s/%s drifted", nodeClaim.Name, nodeClaim.Status.NodeName)
case metrics.ExpirationReason:
return fmt.Sprintf("node %s/%s ttl expired", nodeClaim.Name, nodeClaim.Status.NodeName)
case metrics.EmptinessReason:
return fmt.Sprintf("node %s/%s was empty", nodeClaim.Name, nodeClaim.Status.NodeName)
case metrics.ConsolidationReason:
return fmt.Sprintf("node %s/%s was %s node consolidated", nodeClaim.Name, nodeClaim.Status.NodeName, m.ConsolidationType())
default:
return m.Type()
}
}

// createReplacementNodeClaims creates replacement NodeClaims
func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method, cmd Command) ([]string, error) {
reason := fmt.Sprintf("%s/%s", m.Type(), cmd.Action())
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ var _ = Describe("Disruption Taints", func() {
wg.Wait()
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1beta1.DisruptionNoScheduleTaint))
Expect(nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)).To(BeNil())
})
It("should add and remove taints from NodeClaims that fail to disrupt", func() {
nodePool.Spec.Disruption.ConsolidationPolicy = v1beta1.ConsolidationPolicyWhenUnderutilized
Expand Down Expand Up @@ -578,6 +579,11 @@ var _ = Describe("Disruption Taints", func() {
}
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).To(ContainElement(v1beta1.DisruptionNoScheduleTaint))
existingNodeClaim := lo.Filter(ExpectNodeClaims(ctx, env.Client), func(nc *v1beta1.NodeClaim, _ int) bool {
return nc.Name == nodeClaim.Name
})
Expect(existingNodeClaim[0].StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)).ToNot(BeNil())
Expect(existingNodeClaim[0].StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate).IsTrue()).To(BeTrue())

createdNodeClaim := lo.Reject(ExpectNodeClaims(ctx, env.Client), func(nc *v1beta1.NodeClaim, _ int) bool {
return nc.Name == nodeClaim.Name
Expand All @@ -594,6 +600,7 @@ var _ = Describe("Disruption Taints", func() {

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expect(node.Spec.Taints).ToNot(ContainElement(v1beta1.DisruptionNoScheduleTaint))
Expect(nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)).To(BeNil())
})
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/termination/terminator/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

func EvictPod(pod *v1.Pod) events.Event {
func EvictPod(pod *v1.Pod, message string) events.Event {
return events.Event{
InvolvedObject: pod,
Type: v1.EventTypeNormal,
Reason: "Evicted",
Message: "Evicted pod",
Message: "Evicted pod: " + message,
DedupeValues: []string{pod.Name},
}
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -68,13 +70,15 @@ func IsNodeDrainError(err error) bool {

type QueueKey struct {
types.NamespacedName
UID types.UID
UID types.UID
nodeName string
}

func NewQueueKey(pod *v1.Pod) QueueKey {
return QueueKey{
NamespacedName: client.ObjectKeyFromObject(pod),
UID: pod.UID,
nodeName: pod.Spec.NodeName,
}
}

Expand Down Expand Up @@ -159,6 +163,10 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
// Evict returns true if successful eviction call, and false if not an eviction-related error
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name)))
evictionMessage, err := evictionReason(ctx, key, q.kubeClient)
if err != nil {
log.FromContext(ctx).Error(err, "failed looking up pod eviction reason")
}
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
Expand Down Expand Up @@ -187,10 +195,36 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
}
q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))

q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, evictionMessage))
return true
}

func getNodeClaims(ctx context.Context, key QueueKey, kubeClient client.Client) ([]*v1beta1.NodeClaim, error) {
nodeClaimList := &v1beta1.NodeClaimList{}
if err := kubeClient.List(ctx, nodeClaimList, client.MatchingFields{"status.nodeName": key.nodeName}); err != nil {
return nil, fmt.Errorf("listing nodeClaims, %w", err)
}
return lo.ToSlicePtr(nodeClaimList.Items), nil
}

func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) (string, error) {
nodeClaims, err := getNodeClaims(ctx, key, kubeClient)
if err != nil {
return "", err
}
for _, nodeClaim := range nodeClaims {
terminationCondition := nodeClaim.StatusConditions().Get(v1beta1.ConditionTypeDisruptionCandidate)
if terminationCondition.IsTrue() {
return terminationCondition.Message, nil
}
if !nodeClaim.DeletionTimestamp.IsZero() {
return fmt.Sprintf("node %s/%s is marked for deletion", nodeClaim.Name, nodeClaim.Status.NodeName), nil
}
}
return "", nil
}

func (q *Queue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
Expand Down
18 changes: 9 additions & 9 deletions pkg/events/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ var _ = Describe("Event Creation", func() {
Expect(internalRecorder.Calls(schedulingevents.NominatePodEvent(PodWithUID(), NodeWithUID(), NodeClaimWithUID()).Reason)).To(Equal(1))
})
It("should create a EvictPod event", func() {
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID()))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1))
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), ""))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1))
})
It("should create a PodFailedToSchedule event", func() {
eventRecorder.Publish(schedulingevents.PodFailedToScheduleEvent(PodWithUID(), fmt.Errorf("")))
Expand All @@ -105,31 +105,31 @@ var _ = Describe("Dedupe", func() {
It("should only create a single event when many events are created quickly", func() {
pod := PodWithUID()
for i := 0; i < 100; i++ {
eventRecorder.Publish(terminatorevents.EvictPod(pod))
eventRecorder.Publish(terminatorevents.EvictPod(pod, ""))
}
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1))
})
It("should allow the dedupe timeout to be overridden", func() {
pod := PodWithUID()
evt := terminatorevents.EvictPod(pod)
evt := terminatorevents.EvictPod(pod, "")
evt.DedupeTimeout = time.Second * 2

// Generate a set of events within the dedupe timeout
for i := 0; i < 10; i++ {
eventRecorder.Publish(evt)
}
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(1))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(1))

// Wait until after the overridden dedupe timeout
time.Sleep(time.Second * 3)
eventRecorder.Publish(evt)
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(2))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(2))
})
It("should allow events with different entities to be created", func() {
for i := 0; i < 100; i++ {
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID()))
eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID(), ""))
}
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID()).Reason)).To(Equal(100))
Expect(internalRecorder.Calls(terminatorevents.EvictPod(PodWithUID(), "").Reason)).To(Equal(100))
})
})

Expand Down
3 changes: 3 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "status.nodeName", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Status.NodeName}
}), "failed to setup nodeclaim nodeName indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.apiVersion", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.APIVersion}
}), "failed to setup nodeclaim nodeclassref apiversion indexer")
Expand Down

0 comments on commit 3e04007

Please sign in to comment.