From 125464798d894b5006a6efb4807269a3fe48c84b Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Mon, 3 Jun 2024 00:34:50 -0700 Subject: [PATCH] Convert singleton reconcilers to operatorpkg --- go.mod | 2 ++ go.sum | 4 ++-- hack/code/prices_gen/main.go | 4 +--- pkg/controllers/interruption/controller.go | 17 +++++++++------ pkg/controllers/interruption/suite_test.go | 21 +++++++++---------- .../nodeclaim/garbagecollection/controller.go | 13 ++++++++---- .../nodeclaim/garbagecollection/suite_test.go | 20 ++++++++---------- .../providers/instancetype/controller.go | 13 ++++++++---- .../providers/instancetype/suite_test.go | 9 ++++---- .../providers/pricing/controller.go | 14 ++++++++----- .../providers/pricing/suite_test.go | 19 ++++++++--------- .../suites/integration/kubelet_config_test.go | 2 +- 12 files changed, 76 insertions(+), 62 deletions(-) diff --git a/go.mod b/go.mod index 9130b9a49903..cca7809f835c 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,8 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +replace sigs.k8s.io/karpenter => github.com/jonathan-innis/karpenter v0.0.4-0.20240603073850-5253faaaa7a1 + require ( contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect diff --git a/go.sum b/go.sum index 598d2ae19b53..d7af65fb8bc8 100644 --- a/go.sum +++ b/go.sum @@ -222,6 +222,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jonathan-innis/karpenter v0.0.4-0.20240603073850-5253faaaa7a1 h1:LaxX8BD9S6arcmKVv2r8HIBSs/s0Zd57qALieXR+2HU= +github.com/jonathan-innis/karpenter v0.0.4-0.20240603073850-5253faaaa7a1/go.mod h1:5XYrIz9Bi7HgQyaUsx7O08ft+TJjrH+htlnPq8Sz9J8= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -761,8 +763,6 @@ sigs.k8s.io/controller-runtime v0.18.3 h1:B5Wmmo8WMWK7izei+2LlXLVDGzMwAHBNLX68lw sigs.k8s.io/controller-runtime v0.18.3/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= -sigs.k8s.io/karpenter v0.37.0 h1:eUFD9hJ2mpZrw31OUYhpbxLWEDmbXT05wX27dZB2E5o= -sigs.k8s.io/karpenter v0.37.0/go.mod h1:5XYrIz9Bi7HgQyaUsx7O08ft+TJjrH+htlnPq8Sz9J8= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/hack/code/prices_gen/main.go b/hack/code/prices_gen/main.go index 1bf74fbcd76f..837eeaedb360 100644 --- a/hack/code/prices_gen/main.go +++ b/hack/code/prices_gen/main.go @@ -31,8 +31,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" ec22 "github.com/aws/aws-sdk-go/service/ec2" "github.com/samber/lo" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/reconcile" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/operator/options" @@ -110,7 +108,7 @@ func main() { log.Println("fetching for", region) pricingProvider := pricing.NewDefaultProvider(ctx, pricing.NewAPI(sess, region), ec2, region) controller := controllerspricing.NewController(pricingProvider) - _, err := controller.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{}}) + _, err := controller.Reconcile(ctx) if err != nil { log.Fatalf("failed to initialize pricing provider %s", err) } diff --git a/pkg/controllers/interruption/controller.go b/pkg/controllers/interruption/controller.go index e5addf69f5e3..a8c0bf9ab189 100644 --- a/pkg/controllers/interruption/controller.go +++ b/pkg/controllers/interruption/controller.go @@ -20,6 +20,7 @@ import ( "time" sqsapi "github.com/aws/aws-sdk-go/service/sqs" + "github.com/awslabs/operatorpkg/singleton" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/multierr" @@ -27,11 +28,14 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/karpenter/pkg/metrics" + "sigs.k8s.io/karpenter/pkg/operator/controller" + "sigs.k8s.io/karpenter/pkg/operator/injection" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/utils/pretty" @@ -44,7 +48,6 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/utils" "sigs.k8s.io/karpenter/pkg/events" - corecontroller "sigs.k8s.io/karpenter/pkg/operator/controller" ) type Action string @@ -81,7 +84,8 @@ func NewController(kubeClient client.Client, clk clock.Clock, recorder events.Re } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "interruption") ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name())) if c.cm.HasChanged(c.sqsProvider.Name(), nil) { log.FromContext(ctx).V(1).Info("watching interruption queue") @@ -91,7 +95,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc return reconcile.Result{}, fmt.Errorf("getting messages from queue, %w", err) } if len(sqsMessages) == 0 { - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: controller.Immediately}, nil } nodeClaimInstanceIDMap, err := c.makeNodeClaimInstanceIDMap(ctx) if err != nil { @@ -119,13 +123,14 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc if err = multierr.Combine(errs...); err != nil { return reconcile.Result{}, err } - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: controller.Immediately}, nil } func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return corecontroller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("interruption"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } // parseMessage parses the passed SQS message into an internal Message interface diff --git a/pkg/controllers/interruption/suite_test.go b/pkg/controllers/interruption/suite_test.go index 20d9ac2e30a1..ca786b360be3 100644 --- a/pkg/controllers/interruption/suite_test.go +++ b/pkg/controllers/interruption/suite_test.go @@ -27,7 +27,6 @@ import ( "github.com/samber/lo" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" @@ -119,7 +118,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(spotInterruptionMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)))) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -128,7 +127,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(scheduledChangeMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)))) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -153,7 +152,7 @@ var _ = Describe("InterruptionHandling", func() { messages = append(messages, stateChangeMessage(instanceID, state)) } ExpectMessagesCreated(messages...) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) client.Object { return nc })...) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(4)) @@ -183,7 +182,7 @@ var _ = Describe("InterruptionHandling", func() { messages = append(messages, spotInterruptionMessage(id)) } ExpectMessagesCreated(messages...) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) client.Object { return nc })...) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(100)) @@ -199,7 +198,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(badMessage) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) }) @@ -207,7 +206,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(stateChangeMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)), "creating")) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectExists(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -221,7 +220,7 @@ var _ = Describe("InterruptionHandling", func() { ExpectMessagesCreated(spotInterruptionMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)))) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1)) ExpectNotFound(ctx, env.Client, nodeClaim) Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1)) @@ -235,15 +234,15 @@ var _ = Describe("InterruptionHandling", func() { var _ = Describe("Error Handling", func() { It("should send an error on polling when QueueNotExists", func() { sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode(servicesqs.ErrCodeQueueDoesNotExist), fake.MaxCalls(0)) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) }) It("should send an error on polling when AccessDenied", func() { sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode("AccessDenied"), fake.MaxCalls(0)) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) }) It("should not return an error when deleting a nodeClaim that is already deleted", func() { ExpectMessagesCreated(spotInterruptionMessage(fake.InstanceID())) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) }) }) diff --git a/pkg/controllers/nodeclaim/garbagecollection/controller.go b/pkg/controllers/nodeclaim/garbagecollection/controller.go index 71748eaaa02c..04b27aa88af5 100644 --- a/pkg/controllers/nodeclaim/garbagecollection/controller.go +++ b/pkg/controllers/nodeclaim/garbagecollection/controller.go @@ -19,20 +19,22 @@ import ( "fmt" "time" + "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/operator/injection" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" - "sigs.k8s.io/karpenter/pkg/operator/controller" ) type Controller struct { @@ -49,7 +51,9 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "nodeclaim.garbagecollection") + // We LIST machines on the CloudProvider BEFORE we grab Machines/Nodes on the cluster so that we make sure that, if // LISTing instances takes a long time, our information is more updated by the time we get to Machine and Node LIST // This works since our CloudProvider instances are deleted based on whether the Machine exists or not, not vise-versa @@ -105,7 +109,8 @@ func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *v1beta1.Node } func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("nodeclaim.garbagecollection"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } diff --git a/pkg/controllers/nodeclaim/garbagecollection/suite_test.go b/pkg/controllers/nodeclaim/garbagecollection/suite_test.go index 0446d774da30..c8d37bcf5a48 100644 --- a/pkg/controllers/nodeclaim/garbagecollection/suite_test.go +++ b/pkg/controllers/nodeclaim/garbagecollection/suite_test.go @@ -22,11 +22,9 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/aws/aws-sdk-go/service/ec2" "k8s.io/client-go/tools/record" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider" @@ -136,7 +134,7 @@ var _ = Describe("GarbageCollection", func() { instance.LaunchTime = aws.Time(time.Now().Add(-time.Minute)) awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).To(HaveOccurred()) Expect(corecloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue()) @@ -151,7 +149,7 @@ var _ = Describe("GarbageCollection", func() { }) ExpectApplied(ctx, env.Client, node) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).To(HaveOccurred()) Expect(corecloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue()) @@ -199,7 +197,7 @@ var _ = Describe("GarbageCollection", func() { ) ids = append(ids, instanceID) } - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) wg := sync.WaitGroup{} for _, id := range ids { @@ -257,7 +255,7 @@ var _ = Describe("GarbageCollection", func() { nodeClaims = append(nodeClaims, nodeClaim) ids = append(ids, instanceID) } - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) wg := sync.WaitGroup{} for _, id := range ids { @@ -281,7 +279,7 @@ var _ = Describe("GarbageCollection", func() { instance.LaunchTime = aws.Time(time.Now()) awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).NotTo(HaveOccurred()) }) @@ -295,7 +293,7 @@ var _ = Describe("GarbageCollection", func() { instance.LaunchTime = aws.Time(time.Now().Add(-time.Minute)) awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).NotTo(HaveOccurred()) }) @@ -319,7 +317,7 @@ var _ = Describe("GarbageCollection", func() { }) ExpectApplied(ctx, env.Client, nodeClaim, node) - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) _, err := cloudProvider.Get(ctx, providerID) Expect(err).ToNot(HaveOccurred()) ExpectExists(ctx, env.Client, node) @@ -377,7 +375,7 @@ var _ = Describe("GarbageCollection", func() { ids = append(ids, instanceID) nodes = append(nodes, node) } - ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{}) + ExpectSingletonReconciled(ctx, garbageCollectionController) wg := sync.WaitGroup{} for i := range ids { diff --git a/pkg/controllers/providers/instancetype/controller.go b/pkg/controllers/providers/instancetype/controller.go index 0768c81d1abb..38537ab88007 100644 --- a/pkg/controllers/providers/instancetype/controller.go +++ b/pkg/controllers/providers/instancetype/controller.go @@ -19,11 +19,13 @@ import ( "fmt" "time" + "github.com/awslabs/operatorpkg/singleton" lop "github.com/samber/lo/parallel" "go.uber.org/multierr" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/karpenter/pkg/operator/controller" + "sigs.k8s.io/karpenter/pkg/operator/injection" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" ) @@ -38,7 +40,9 @@ func NewController(instancetypeProvider instancetype.Provider) *Controller { } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "providers.instancetype") + work := []func(ctx context.Context) error{ c.instancetypeProvider.UpdateInstanceTypes, c.instancetypeProvider.UpdateInstanceTypeOfferings, @@ -57,7 +61,8 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc func (c *Controller) Register(_ context.Context, m manager.Manager) error { // Includes a default exponential failure rate limiter of base: time.Millisecond, and max: 1000*time.Second - return controller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("providers.instancetype"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } diff --git a/pkg/controllers/providers/instancetype/suite_test.go b/pkg/controllers/providers/instancetype/suite_test.go index 4a156e5d9ae6..e520d6dce547 100644 --- a/pkg/controllers/providers/instancetype/suite_test.go +++ b/pkg/controllers/providers/instancetype/suite_test.go @@ -19,7 +19,6 @@ import ( "testing" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/operator/scheme" @@ -89,7 +88,7 @@ var _ = Describe("InstanceType", func() { InstanceTypeOfferings: ec2Offerings, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{ Status: v1beta1.EC2NodeClassStatus{ Subnets: []v1beta1.Subnet{ @@ -123,7 +122,7 @@ var _ = Describe("InstanceType", func() { InstanceTypeOfferings: ec2Offerings, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{ Status: v1beta1.EC2NodeClassStatus{ Subnets: []v1beta1.Subnet{ @@ -158,14 +157,14 @@ var _ = Describe("InstanceType", func() { It("should not update instance type date with response from the DescribeInstanceTypes API is empty", func() { awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) Expect(err).ToNot(BeNil()) }) It("should not update instance type offering date with response from the DescribeInstanceTypesOfferings API", func() { awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) Expect(err).ToNot(BeNil()) }) diff --git a/pkg/controllers/providers/pricing/controller.go b/pkg/controllers/providers/pricing/controller.go index 8cd42a8fd489..24daae768bc6 100644 --- a/pkg/controllers/providers/pricing/controller.go +++ b/pkg/controllers/providers/pricing/controller.go @@ -19,12 +19,13 @@ import ( "fmt" "time" + "github.com/awslabs/operatorpkg/singleton" lop "github.com/samber/lo/parallel" "go.uber.org/multierr" + controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "sigs.k8s.io/karpenter/pkg/operator/controller" + "sigs.k8s.io/karpenter/pkg/operator/injection" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" ) @@ -39,7 +40,9 @@ func NewController(pricingProvider pricing.Provider) *Controller { } } -func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { +func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "providers.pricing") + work := []func(ctx context.Context) error{ c.pricingProvider.UpdateSpotPricing, c.pricingProvider.UpdateOnDemandPricing, @@ -57,7 +60,8 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc } func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controller.NewSingletonManagedBy(m). + return controllerruntime.NewControllerManagedBy(m). Named("providers.pricing"). - Complete(c) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } diff --git a/pkg/controllers/providers/pricing/suite_test.go b/pkg/controllers/providers/pricing/suite_test.go index 8941d0f85cac..ba48bccda745 100644 --- a/pkg/controllers/providers/pricing/suite_test.go +++ b/pkg/controllers/providers/pricing/suite_test.go @@ -24,7 +24,6 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" awspricing "github.com/aws/aws-sdk-go/service/pricing" "github.com/samber/lo" - "k8s.io/apimachinery/pkg/types" coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/operator/scheme" coretest "sigs.k8s.io/karpenter/pkg/test" @@ -98,14 +97,14 @@ var _ = Describe("Pricing", func() { ) It("should return static on-demand data if pricing API fails", func() { awsEnv.PricingAPI.NextError.Set(fmt.Errorf("failed")) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) price, ok := awsEnv.PricingProvider.OnDemandPrice("c5.large") Expect(ok).To(BeTrue()) Expect(price).To(BeNumerically(">", 0)) }) It("should return static spot data if EC2 describeSpotPriceHistory API fails", func() { awsEnv.PricingAPI.NextError.Set(fmt.Errorf("failed")) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) price, ok := awsEnv.PricingProvider.SpotPrice("c5.large", "test-zone-1a") Expect(ok).To(BeTrue()) Expect(price).To(BeNumerically(">", 0)) @@ -119,7 +118,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + _ = ExpectSingletonReconcileFailed(ctx, controller) price, ok := awsEnv.PricingProvider.OnDemandPrice("c98.large") Expect(ok).To(BeTrue()) @@ -165,7 +164,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) price, ok := awsEnv.PricingProvider.SpotPrice("c98.large", "test-zone-1b") Expect(ok).To(BeTrue()) @@ -199,7 +198,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) price, ok := awsEnv.PricingProvider.SpotPrice("c98.large", "test-zone-1a") Expect(ok).To(BeTrue()) @@ -226,7 +225,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) _, ok := awsEnv.PricingProvider.SpotPrice("c99.large", "test-zone-1b") Expect(ok).To(BeFalse()) @@ -253,7 +252,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c99.large", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) inp := awsEnv.EC2API.DescribeSpotPriceHistoryInput.Clone() Expect(lo.Map(inp.ProductDescriptions, func(x *string, _ int) string { return *x })). To(ContainElements("Linux/UNIX", "Linux/UNIX (Amazon VPC)")) @@ -288,7 +287,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPrice("c5.xlarge", 1.23), }, }) - ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, controller) price, ok := awsEnv.PricingProvider.OnDemandPrice("c3.2xlarge") Expect(ok).To(BeTrue()) Expect(price).To(BeNumerically("==", 0.420000)) @@ -318,7 +317,7 @@ var _ = Describe("Pricing", func() { fake.NewOnDemandPriceWithCurrency("c99.large", 1.23, "CNY"), }, }) - ExpectReconcileSucceeded(ctx, tmpController, types.NamespacedName{}) + ExpectSingletonReconciled(ctx, tmpController) price, ok := tmpPricingProvider.OnDemandPrice("c98.large") Expect(ok).To(BeTrue()) diff --git a/test/suites/integration/kubelet_config_test.go b/test/suites/integration/kubelet_config_test.go index 7f59366245ba..5030c9496394 100644 --- a/test/suites/integration/kubelet_config_test.go +++ b/test/suites/integration/kubelet_config_test.go @@ -157,7 +157,7 @@ var _ = Describe("KubeletConfiguration Overrides", func() { // Get the DS pod count and use it to calculate the DS pod overhead dsCount := env.GetDaemonSetCount(nodePool) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ - MaxPods: lo.ToPtr(int32(1 + int32(dsCount))), + MaxPods: lo.ToPtr(1 + int32(dsCount)), } numPods := 3