Skip to content

Commit

Permalink
chore: Add support for v1beta1/NodeClaim to the interruption contro…
Browse files Browse the repository at this point in the history
…ller (#4526)
  • Loading branch information
jonathan-innis authored Sep 1, 2023
1 parent 3696509 commit 5e1a81f
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 101 deletions.
84 changes: 40 additions & 44 deletions pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

sqsapi "github.com/aws/aws-sdk-go/service/sqs"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
Expand All @@ -31,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/utils/pretty"
"github.com/aws/karpenter/pkg/apis/settings"
"github.com/aws/karpenter/pkg/apis/v1alpha1"
Expand All @@ -40,10 +40,9 @@ import (
"github.com/aws/karpenter/pkg/controllers/interruption/messages/statechange"
"github.com/aws/karpenter/pkg/utils"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"
nodeclaimutil "github.com/aws/karpenter-core/pkg/utils/nodeclaim"
)

type Action string
Expand Down Expand Up @@ -92,9 +91,9 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
if len(sqsMessages) == 0 {
return reconcile.Result{}, nil
}
machineInstanceIDMap, err := c.makeMachineInstanceIDMap(ctx)
nodeClaimInstanceIDMap, err := c.makeNodeClaimInstanceIDMap(ctx)
if err != nil {
return reconcile.Result{}, fmt.Errorf("making machine instance id map, %w", err)
return reconcile.Result{}, fmt.Errorf("making nodeclaim instance id map, %w", err)
}
nodeInstanceIDMap, err := c.makeNodeInstanceIDMap(ctx)
if err != nil {
Expand All @@ -109,7 +108,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
errs[i] = c.deleteMessage(ctx, sqsMessages[i])
return
}
if e = c.handleMessage(ctx, machineInstanceIDMap, nodeInstanceIDMap, msg); e != nil {
if e = c.handleMessage(ctx, nodeClaimInstanceIDMap, nodeInstanceIDMap, msg); e != nil {
errs[i] = fmt.Errorf("handling message, %w", e)
return
}
Expand Down Expand Up @@ -140,7 +139,7 @@ func (c *Controller) parseMessage(raw *sqsapi.Message) (messages.Message, error)
}

// handleMessage takes an action against every node involved in the message that is owned by a Provisioner
func (c *Controller) handleMessage(ctx context.Context, machineInstanceIDMap map[string]*v1alpha5.Machine,
func (c *Controller) handleMessage(ctx context.Context, nodeClaimInstanceIDMap map[string]*v1beta1.NodeClaim,
nodeInstanceIDMap map[string]*v1.Node, msg messages.Message) (err error) {

ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("messageKind", msg.Kind()))
Expand All @@ -150,18 +149,18 @@ func (c *Controller) handleMessage(ctx context.Context, machineInstanceIDMap map
return nil
}
for _, instanceID := range msg.EC2InstanceIDs() {
machine, ok := machineInstanceIDMap[instanceID]
nodeClaim, ok := nodeClaimInstanceIDMap[instanceID]
if !ok {
continue
}
node := nodeInstanceIDMap[instanceID]
if e := c.handleMachine(ctx, msg, machine, node); e != nil {
if e := c.handleNodeClaim(ctx, msg, nodeClaim, node); e != nil {
err = multierr.Append(err, e)
}
}
messageLatency.Observe(time.Since(msg.StartTime()).Seconds())
if err != nil {
return fmt.Errorf("acting on machines, %w", err)
return fmt.Errorf("acting on NodeClaims, %w", err)
}
return nil
}
Expand All @@ -175,91 +174,88 @@ func (c *Controller) deleteMessage(ctx context.Context, msg *sqsapi.Message) err
return nil
}

// handleMachine retrieves the action for the message and then performs the appropriate action against the node
func (c *Controller) handleMachine(ctx context.Context, msg messages.Message, machine *v1alpha5.Machine, node *v1.Node) error {
// handleNodeClaim retrieves the action for the message and then performs the appropriate action against the node
func (c *Controller) handleNodeClaim(ctx context.Context, msg messages.Message, nodeClaim *v1beta1.NodeClaim, node *v1.Node) error {
action := actionForMessage(msg)
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("machine", machine.Name))
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With(lo.Ternary(nodeClaim.IsMachine, "machine", "nodeclaim"), nodeClaim.Name))
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("action", string(action)))
if node != nil {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", node.Name))
}

// Record metric and event for this action
c.notifyForMessage(msg, machine, node)
c.notifyForMessage(msg, nodeClaim, node)
actionsPerformed.WithLabelValues(string(action)).Inc()

// Mark the offering as unavailable in the ICE cache since we got a spot interruption warning
if msg.Kind() == messages.SpotInterruptionKind {
zone := machine.Labels[v1.LabelTopologyZone]
instanceType := machine.Labels[v1.LabelInstanceTypeStable]
zone := nodeClaim.Labels[v1.LabelTopologyZone]
instanceType := nodeClaim.Labels[v1.LabelInstanceTypeStable]
if zone != "" && instanceType != "" {
c.unavailableOfferingsCache.MarkUnavailable(ctx, string(msg.Kind()), instanceType, zone, v1alpha1.CapacityTypeSpot)
}
}
if action != NoAction {
return c.deleteMachine(ctx, machine, node)
return c.deleteNodeClaim(ctx, nodeClaim, node)
}
return nil
}

// deleteMachine removes the machine from the api-server
func (c *Controller) deleteMachine(ctx context.Context, machine *v1alpha5.Machine, node *v1.Node) error {
if !machine.DeletionTimestamp.IsZero() {
// deleteNodeClaim removes the NodeClaim from the api-server
func (c *Controller) deleteNodeClaim(ctx context.Context, nodeClaim *v1beta1.NodeClaim, node *v1.Node) error {
if !nodeClaim.DeletionTimestamp.IsZero() {
return nil
}
if err := c.kubeClient.Delete(ctx, machine); err != nil {
if err := nodeclaimutil.Delete(ctx, c.kubeClient, nodeClaim); err != nil {
return client.IgnoreNotFound(fmt.Errorf("deleting the node on interruption message, %w", err))
}
logging.FromContext(ctx).Infof("initiating delete for machine from interruption message")
c.recorder.Publish(interruptionevents.TerminatingOnInterruption(node, machine)...)
metrics.MachinesTerminatedCounter.With(prometheus.Labels{
metrics.ReasonLabel: terminationReasonLabel,
metrics.ProvisionerLabel: machine.Labels[v1alpha5.ProvisionerNameLabelKey],
}).Inc()
logging.FromContext(ctx).Infof("initiating delete from interruption message")
c.recorder.Publish(interruptionevents.TerminatingOnInterruption(node, nodeClaim)...)
nodeclaimutil.TerminatedCounter(nodeClaim, terminationReasonLabel).Inc()
return nil
}

// notifyForMessage publishes the relevant alert based on the message kind
func (c *Controller) notifyForMessage(msg messages.Message, m *v1alpha5.Machine, n *v1.Node) {
func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *v1beta1.NodeClaim, n *v1.Node) {
switch msg.Kind() {
case messages.RebalanceRecommendationKind:
c.recorder.Publish(interruptionevents.RebalanceRecommendation(n, m)...)
c.recorder.Publish(interruptionevents.RebalanceRecommendation(n, nodeClaim)...)

case messages.ScheduledChangeKind:
c.recorder.Publish(interruptionevents.Unhealthy(n, m)...)
c.recorder.Publish(interruptionevents.Unhealthy(n, nodeClaim)...)

case messages.SpotInterruptionKind:
c.recorder.Publish(interruptionevents.SpotInterrupted(n, m)...)
c.recorder.Publish(interruptionevents.SpotInterrupted(n, nodeClaim)...)

case messages.StateChangeKind:
typed := msg.(statechange.Message)
if lo.Contains([]string{"stopping", "stopped"}, typed.Detail.State) {
c.recorder.Publish(interruptionevents.Stopping(n, m)...)
c.recorder.Publish(interruptionevents.Stopping(n, nodeClaim)...)
} else {
c.recorder.Publish(interruptionevents.Terminating(n, m)...)
c.recorder.Publish(interruptionevents.Terminating(n, nodeClaim)...)
}

default:
}
}

// makeMachineInstanceIDMap builds a map between the instance id that is stored in the
// machine .status.providerID and the machine
func (c *Controller) makeMachineInstanceIDMap(ctx context.Context) (map[string]*v1alpha5.Machine, error) {
m := map[string]*v1alpha5.Machine{}
machineList := &v1alpha5.MachineList{}
if err := c.kubeClient.List(ctx, machineList); err != nil {
return nil, fmt.Errorf("listing machines, %w", err)
// makeNodeClaimInstanceIDMap builds a map between the instance id that is stored in the
// NodeClaim .status.providerID and the NodeClaim
func (c *Controller) makeNodeClaimInstanceIDMap(ctx context.Context) (map[string]*v1beta1.NodeClaim, error) {
m := map[string]*v1beta1.NodeClaim{}
nodeClaimList, err := nodeclaimutil.List(ctx, c.kubeClient)
if err != nil {
return nil, err
}
for i := range machineList.Items {
if machineList.Items[i].Status.ProviderID == "" {
for i := range nodeClaimList.Items {
if nodeClaimList.Items[i].Status.ProviderID == "" {
continue
}
id, err := utils.ParseInstanceID(machineList.Items[i].Status.ProviderID)
id, err := utils.ParseInstanceID(nodeClaimList.Items[i].Status.ProviderID)
if err != nil || id == "" {
continue
}
m[id] = &machineList.Items[i]
m[id] = &nodeClaimList.Items[i]
}
return m, nil
}
Expand Down
Loading

0 comments on commit 5e1a81f

Please sign in to comment.