From 11088788f267d02cf32ad177467312a11c5692ec Mon Sep 17 00:00:00 2001 From: Carter McKinnon Date: Mon, 27 Nov 2023 21:12:19 +0000 Subject: [PATCH] Drop Node events when EC2 instance does not exist and node is not new --- pkg/controllers/tagging/tagging_controller.go | 19 ++++++++++ .../tagging/tagging_controller_test.go | 37 +++++++++++++++++-- pkg/providers/v1/aws.go | 5 ++- pkg/providers/v1/aws_fakes.go | 26 +++++++++++++ pkg/providers/v1/tags.go | 2 +- 5 files changed, 82 insertions(+), 7 deletions(-) diff --git a/pkg/controllers/tagging/tagging_controller.go b/pkg/controllers/tagging/tagging_controller.go index 3620212406..89affd4d77 100644 --- a/pkg/controllers/tagging/tagging_controller.go +++ b/pkg/controllers/tagging/tagging_controller.go @@ -63,6 +63,9 @@ const ( // The label for depicting total number of errors a work item encounter and fail errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted" + + // The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API. + newNodeEventualConsistencyGracePeriod = time.Minute * 5 ) // Controller is the controller implementation for tagging cluster resources. @@ -292,6 +295,18 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error { err := tc.cloud.TagResource(string(instanceID), tc.tags) if err != nil { + if awsv1.IsAWSErrorInstanceNotFound(err) { + // This can happen for two reasons. + // 1. The CreateTags API is eventually consistent. In rare cases, a newly-created instance may not be taggable for a short period. + // We will re-queue the event and retry. + if isNodeWithinEventualConsistencyGracePeriod(node) { + return fmt.Errorf("EC2 instance %s for node %s does not exist, but node is within eventual consistency grace period", instanceID, node.GetName()) + } + // 2. The event in our workQueue is stale, and the instance no longer exists. + // Tagging will never succeed, and the event should not be re-queued. + klog.Infof("Skip tagging since EC2 instance %s for node %s does not exist", instanceID, node.GetName()) + return nil + } klog.Errorf("Error in tagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err) return err } @@ -380,3 +395,7 @@ func (tc *Controller) getChecksumOfTags() string { sort.Strings(tags) return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(tags, ",")))) } + +func isNodeWithinEventualConsistencyGracePeriod(node *v1.Node) bool { + return time.Since(node.CreationTimestamp.Time) < newNodeEventualConsistencyGracePeriod +} diff --git a/pkg/controllers/tagging/tagging_controller_test.go b/pkg/controllers/tagging/tagging_controller_test.go index 4e7a3cdaa6..a96b4135f5 100644 --- a/pkg/controllers/tagging/tagging_controller_test.go +++ b/pkg/controllers/tagging/tagging_controller_test.go @@ -17,6 +17,11 @@ import ( "bytes" "context" "flag" + "os" + "strings" + "testing" + "time" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -25,10 +30,6 @@ import ( "k8s.io/client-go/util/workqueue" awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" "k8s.io/klog/v2" - "os" - "strings" - "testing" - "time" ) const TestClusterID = "clusterid.test" @@ -162,6 +163,34 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { toBeTagged: false, expectedMessages: []string{"Successfully untagged i-0001"}, }, + { + name: "node0 is recently created and the instance is not found the first 3 CreateTags attempts", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Now(), + }, + Spec: v1.NodeSpec{ + ProviderID: "i-not-found-count-3-0001", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Successfully tagged i-not-found-count-3-0001", "node is within eventual consistency grace period"}, + }, + { + name: "node0 is not recently created and the instance is not found", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "i-not-found", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Skip tagging since EC2 instance i-not-found for node node0 does not exist"}, + }, } awsServices := awsv1.NewFakeAWSServices(TestClusterID) diff --git a/pkg/providers/v1/aws.go b/pkg/providers/v1/aws.go index e2212cd105..2adc93ce9b 100644 --- a/pkg/providers/v1/aws.go +++ b/pkg/providers/v1/aws.go @@ -1707,7 +1707,7 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin instances, err := c.ec2.DescribeInstances(request) if err != nil { // if err is InstanceNotFound, return false with no error - if isAWSErrorInstanceNotFound(err) { + if IsAWSErrorInstanceNotFound(err) { return false, nil } return false, err @@ -1946,7 +1946,8 @@ func (c *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) } -func isAWSErrorInstanceNotFound(err error) bool { +// IsAWSErrorInstanceNotFound returns true if the specified error is an awserr.Error with the code `InvalidInstanceId.NotFound`. +func IsAWSErrorInstanceNotFound(err error) bool { if err == nil { return false } diff --git a/pkg/providers/v1/aws_fakes.go b/pkg/providers/v1/aws_fakes.go index 84737b1417..a4e45a1f29 100644 --- a/pkg/providers/v1/aws_fakes.go +++ b/pkg/providers/v1/aws_fakes.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "sort" + "strconv" "strings" "github.com/aws/aws-sdk-go/aws" @@ -47,6 +48,8 @@ type FakeAWSServices struct { asg *FakeASG metadata *FakeMetadata kms *FakeKMS + + callCounts map[string]int } // NewFakeAWSServices creates a new FakeAWSServices @@ -79,6 +82,8 @@ func NewFakeAWSServices(clusterID string) *FakeAWSServices { tag.Value = aws.String(clusterID) selfInstance.Tags = []*ec2.Tag{&tag} + s.callCounts = make(map[string]int) + return s } @@ -97,6 +102,15 @@ func (s *FakeAWSServices) WithRegion(region string) *FakeAWSServices { return s } +// countCall increments the counter for the given service, api, and resourceID and returns the resulting call count +func (s *FakeAWSServices) countCall(service string, api string, resourceID string) int { + key := fmt.Sprintf("%s:%s:%s", service, api, resourceID) + s.callCounts[key]++ + count := s.callCounts[key] + klog.Warningf("call count: %s:%d", key, count) + return count +} + // Compute returns a fake EC2 client func (s *FakeAWSServices) Compute(region string) (EC2, error) { return s.ec2, nil @@ -295,6 +309,7 @@ func (ec2i *FakeEC2Impl) DescribeAvailabilityZones(request *ec2.DescribeAvailabi // CreateTags is a mock for CreateTags from EC2 func (ec2i *FakeEC2Impl) CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) { for _, id := range input.Resources { + callCount := ec2i.aws.countCall("ec2", "CreateTags", *id) if *id == "i-error" { return nil, errors.New("Unable to tag") } @@ -302,6 +317,17 @@ func (ec2i *FakeEC2Impl) CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTags if *id == "i-not-found" { return nil, awserr.New("InvalidInstanceID.NotFound", "Instance not found", nil) } + // return an Instance not found error for the first `n` calls + // instance ID should be of the format `i-not-found-count-$N-$SUFFIX` + if strings.HasPrefix(*id, "i-not-found-count-") { + notFoundCount, err := strconv.Atoi(strings.Split(*id, "-")[4]) + if err != nil { + panic(err) + } + if callCount < notFoundCount { + return nil, awserr.New("InvalidInstanceID.NotFound", "Instance not found", nil) + } + } } return &ec2.CreateTagsOutput{}, nil } diff --git a/pkg/providers/v1/tags.go b/pkg/providers/v1/tags.go index beaaf0ca7d..9b82436097 100644 --- a/pkg/providers/v1/tags.go +++ b/pkg/providers/v1/tags.go @@ -344,7 +344,7 @@ func (c *Cloud) UntagResource(resourceID string, tags map[string]string) error { if err != nil { // An instance not found should not fail the untagging workflow as it // would for tagging, since the target state is already reached. - if isAWSErrorInstanceNotFound(err) { + if IsAWSErrorInstanceNotFound(err) { klog.Infof("Couldn't find resource when trying to untag it hence skipping it, %v", err) return nil }