Skip to content

Commit

Permalink
chore: Wait for instance termination before deleting nodeclaim
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Apr 16, 2024
1 parent ed7564c commit 99f8d1d
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ e2etests: ## Run the e2e suite against your local cluster
go test \
-p 1 \
-count 1 \
-timeout 3h \
-timeout 5h \
-v \
./suites/$(shell echo $(TEST_SUITE) | tr A-Z a-z)/... \
--ginkgo.focus="${FOCUS}" \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/pkg v0.0.0-20231010144348-ca8c009405dd
sigs.k8s.io/controller-runtime v0.17.2
sigs.k8s.io/karpenter v0.36.0
sigs.k8s.io/karpenter v0.36.1-0.20240407055957-43da3604c140
sigs.k8s.io/yaml v1.4.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,8 @@ sigs.k8s.io/controller-runtime v0.17.2 h1:FwHwD1CTUemg0pW2otk7/U5/i5m2ymzvOXdbeG
sigs.k8s.io/controller-runtime v0.17.2/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/karpenter v0.36.0 h1:i82fOsFWKwnChedKsj0Hep2yrTkAjCek/aZPSMX2dW8=
sigs.k8s.io/karpenter v0.36.0/go.mod h1:fieFojxOec/l0tDmFT7R+g/Y+SGQbL9VlcYO8xb3sLo=
sigs.k8s.io/karpenter v0.36.1-0.20240407055957-43da3604c140 h1:h/BzOIL+r2MWaIOD5HZfFkSITi0DvJ9Ex1VXO0EXgEQ=
sigs.k8s.io/karpenter v0.36.1-0.20240407055957-43da3604c140/go.mod h1:fieFojxOec/l0tDmFT7R+g/Y+SGQbL9VlcYO8xb3sLo=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc

func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *v1beta1.NodeClaim, nodeList *v1.NodeList) error {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", nodeClaim.Status.ProviderID))
if err := c.cloudProvider.Delete(ctx, nodeClaim); err != nil {
return cloudprovider.IgnoreNodeClaimNotFoundError(err)
if err := c.cloudProvider.Delete(ctx, nodeClaim); cloudprovider.IgnoreNodeClaimNotFoundError(err) != nil && cloudprovider.IgnoreRetryableError(err) != nil {
return err
}
logging.FromContext(ctx).Debugf("garbage collected cloudprovider instance")

Expand Down
10 changes: 8 additions & 2 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
}

func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
if _, err := p.ec2Batcher.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
out, err := p.ec2Batcher.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
InstanceIds: []*string{aws.String(id)},
}); err != nil {
})
if err != nil {
if awserrors.IsNotFound(err) {
return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("instance already terminated"))
}
Expand All @@ -174,6 +175,11 @@ func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
}
return fmt.Errorf("terminating instance, %w", err)
}
for _, instance := range out.TerminatingInstances {
if *instance.CurrentState.Name != ec2.InstanceStateNameTerminated {
return cloudprovider.NewRetryableError(fmt.Errorf("received terminate instance call but instance is still in state %q", aws.StringValue(instance.CurrentState.Name)))
}
}
return nil
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/providers/instance/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,45 @@ var _ = Describe("InstanceProvider", func() {
retrievedIDs := sets.New[string](lo.Map(instances, func(i *instance.Instance, _ int) string { return i.ID })...)
Expect(ids.Equal(retrievedIDs)).To(BeTrue())
})
It("should return retryable error when instance state is not terminated", func() {
instanceID := fake.InstanceID()
instance := &ec2.Instance{
State: &ec2.InstanceState{
Name: aws.String(ec2.InstanceStateNameRunning),
},
Tags: []*ec2.Tag{
{
Key: aws.String(fmt.Sprintf("kubernetes.io/cluster/%s", options.FromContext(ctx).ClusterName)),
Value: aws.String("owned"),
},
{
Key: aws.String(corev1beta1.NodePoolLabelKey),
Value: aws.String("default"),
},
{
Key: aws.String(v1beta1.LabelNodeClass),
Value: aws.String("default"),
},
{
Key: aws.String(corev1beta1.ManagedByAnnotationKey),
Value: aws.String(options.FromContext(ctx).ClusterName),
},
},
PrivateDnsName: aws.String(fake.PrivateDNSName()),
Placement: &ec2.Placement{
AvailabilityZone: aws.String(fake.DefaultRegion),
},
// Launch time was 1m ago
LaunchTime: aws.Time(time.Now().Add(-time.Minute)),
InstanceId: aws.String(instanceID),
InstanceType: aws.String("m5.large"),
}
awsEnv.EC2API.Instances.Store(
instanceID,
instance,
)

err := awsEnv.InstanceProvider.Delete(ctx, instanceID)
Expect(corecloudprovider.IsRetryableError(err)).To(BeTrue())
})
})
25 changes: 25 additions & 0 deletions test/pkg/environment/common/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,31 @@ func (env *Environment) ConsistentlyExpectDisruptionsWithNodeCount(disruptingNod
return lo.ToSlicePtr(nodes)
}

// EventuallyExpectDisruptionsWithNodeCount will eventually ensure that there are exactly disruptingNodes with totalNodes (including replacements and existing nodes)
func (env *Environment) EventuallyExpectDisruptionsWithNodeCount(disruptingNodes, totalNodes int) (taintedNodes []*v1.Node) {
GinkgoHelper()
nodes := []v1.Node{}
Eventually(func(g Gomega) {
// Ensure we don't change our NodeClaims
nodeClaimList := &corev1beta1.NodeClaimList{}
g.Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
g.Expect(nodeClaimList.Items).To(HaveLen(totalNodes))

nodeList := &v1.NodeList{}
g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
g.Expect(nodeList.Items).To(HaveLen(totalNodes))

nodes = lo.Filter(nodeList.Items, func(n v1.Node, _ int) bool {
_, ok := lo.Find(n.Spec.Taints, func(t v1.Taint) bool {
return corev1beta1.IsDisruptingTaint(t)
})
return ok
})
g.Expect(nodes).To(HaveLen(disruptingNodes))
}).Should(Succeed())
return lo.ToSlicePtr(nodes)
}

func (env *Environment) EventuallyExpectTaintedNodeCount(comparator string, count int) []*v1.Node {
GinkgoHelper()
By(fmt.Sprintf("waiting for tainted nodes to be %s to %d", comparator, count))
Expand Down
2 changes: 1 addition & 1 deletion test/suites/consolidation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ var _ = Describe("Consolidation", func() {

// This check ensures that we are consolidating nodes at the same time
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectDisruptionsWithNodeCount(2, 3, 5*time.Second)
nodes = env.EventuallyExpectDisruptionsWithNodeCount(2, 3)

for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
Expand Down
9 changes: 1 addition & 8 deletions test/suites/drift/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ var _ = Describe("Drift", func() {
env.ConsistentlyExpectNodeClaimsNotDrifted(time.Minute, nodeClaim)
})
Context("Failure", func() {
It("should not continue to drift if a node never registers", func() {
It("should not disrupt existing nodes if after drift new nodes fail to register", func() {
// launch a new nodeClaim
var numPods int32 = 2
dep := coretest.Deployment(coretest.DeploymentOptions{
Expand Down Expand Up @@ -888,13 +888,6 @@ var _ = Describe("Drift", func() {
// TODO: reduce timeouts when disruption waits are factored out
env.EventuallyExpectNodesUntaintedWithTimeout(11*time.Minute, taintedNodes...)

// We give another 6 minutes here to handle the deletion at the 15m registration timeout
Eventually(func(g Gomega) {
nodeClaims := &corev1beta1.NodeClaimList{}
g.Expect(env.Client.List(env, nodeClaims, client.HasLabels{coretest.DiscoveryLabel})).To(Succeed())
g.Expect(nodeClaims.Items).To(HaveLen(int(numPods)))
}).WithTimeout(6 * time.Minute).Should(Succeed())

// Expect all the NodeClaims that existed on the initial provisioning loop are not removed
Consistently(func(g Gomega) {
nodeClaims := &corev1beta1.NodeClaimList{}
Expand Down
7 changes: 0 additions & 7 deletions test/suites/expiration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,6 @@ var _ = Describe("Expiration", func() {
// TODO: reduce timeouts when deprovisioning waits are factored out
env.EventuallyExpectNodesUntaintedWithTimeout(11*time.Minute, taintedNodes...)

// The nodeclaims that never registers will be removed
Eventually(func(g Gomega) {
nodeClaims := &corev1beta1.NodeClaimList{}
g.Expect(env.Client.List(env, nodeClaims, client.HasLabels{coretest.DiscoveryLabel})).To(Succeed())
g.Expect(len(nodeClaims.Items)).To(BeNumerically("==", int(numPods)))
}).WithTimeout(6 * time.Minute).Should(Succeed())

// Expect all the NodeClaims that existed on the initial provisioning loop are not removed
Consistently(func(g Gomega) {
nodeClaims := &corev1beta1.NodeClaimList{}
Expand Down
54 changes: 48 additions & 6 deletions test/suites/nodeclaim/nodeclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
"os"
"time"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/karpenter/pkg/utils/resources"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/test"
"sigs.k8s.io/karpenter/pkg/utils/resources"

"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"

Expand Down Expand Up @@ -226,7 +226,7 @@ var _ = Describe("StandaloneNodeClaim", func() {
env.EventuallyExpectNotFound(nodeClaim, node)

Eventually(func(g Gomega) {
g.Expect(lo.FromPtr(env.GetInstanceByID(instanceID).State.Name)).To(Equal("shutting-down"))
g.Expect(lo.FromPtr(env.GetInstanceByID(instanceID).State.Name)).To(Equal("terminated"))
}, time.Second*10).Should(Succeed())
})
It("should delete a NodeClaim from the node termination finalizer", func() {
Expand Down Expand Up @@ -260,12 +260,12 @@ var _ = Describe("StandaloneNodeClaim", func() {
instanceID := env.ExpectParsedProviderID(node.Spec.ProviderID)
env.GetInstance(node.Name)

// Delete the node and expect both the node and nodeClaim to be gone as well as the instance to be shutting-down
// Delete the node and expect both the node and nodeClaim to be gone as well as the instance to be terminated
env.ExpectDeleted(node)
env.EventuallyExpectNotFound(nodeClaim, node)

Eventually(func(g Gomega) {
g.Expect(lo.FromPtr(env.GetInstanceByID(instanceID).State.Name)).To(Equal("shutting-down"))
g.Expect(lo.FromPtr(env.GetInstanceByID(instanceID).State.Name)).To(Equal("terminated"))
}, time.Second*10).Should(Succeed())
})
It("should create a NodeClaim with custom labels passed through the userData", func() {
Expand Down Expand Up @@ -377,4 +377,46 @@ var _ = Describe("StandaloneNodeClaim", func() {
// Expect that the nodeClaim is eventually de-provisioned due to the registration timeout
env.EventuallyExpectNotFoundAssertion(nodeClaim).WithTimeout(time.Minute * 20).Should(Succeed())
})
It("should wait for instance termination before removing finalizer from nodeClaim", func() {
nodeClaim := test.NodeClaim(corev1beta1.NodeClaim{
Spec: corev1beta1.NodeClaimSpec{
Requirements: []corev1beta1.NodeSelectorRequirementWithMinValues{
{
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: v1beta1.LabelInstanceCategory,
Operator: v1.NodeSelectorOpIn,
Values: []string{"c"},
},
},
{
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: corev1beta1.CapacityTypeLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{corev1beta1.CapacityTypeOnDemand},
},
},
},
NodeClassRef: &corev1beta1.NodeClassReference{
Name: nodeClass.Name,
},
},
})
env.ExpectCreated(nodeClass, nodeClaim)
node := env.EventuallyExpectInitializedNodeCount("==", 1)[0]
nodeClaim = env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0]

instanceID := env.ExpectParsedProviderID(node.Spec.ProviderID)
env.GetInstance(node.Name)

env.ExpectDeleted(nodeClaim)
Eventually(func(g Gomega) {
g.Expect(lo.FromPtr(env.GetInstanceByID(instanceID).State.Name)).To(Equal("shutting-down"))
}, time.Second*10).Should(Succeed())

Expect(nodeClaim.Finalizers).Should(ContainElement(corev1beta1.TerminationFinalizer))
env.EventuallyExpectNotFound(nodeClaim, node)
Eventually(func(g Gomega) {
g.Expect(lo.FromPtr(env.GetInstanceByID(instanceID).State.Name)).To(Equal("terminated"))
}, time.Second*10).Should(Succeed())
})
})

0 comments on commit 99f8d1d

Please sign in to comment.