From f724f6eda71f1dd1c9d7dae9d04a459e11138030 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda <74629455+engedaam@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:40:42 -0700 Subject: [PATCH] chore: Add an Instancetype Controller to Asynchronously Hydrate InstanceType Data (#6045) --- cmd/controller/main.go | 1 + pkg/cloudprovider/suite_test.go | 8 + pkg/controllers/controllers.go | 5 +- .../providers/instancetype/controller.go | 65 +++++ .../providers/instancetype/suite_test.go | 137 +++++++++++ .../providers/pricing/controller.go | 2 +- pkg/providers/instance/suite_test.go | 2 + pkg/providers/instancetype/instancetype.go | 224 +++++++++--------- pkg/providers/instancetype/suite_test.go | 59 +++-- pkg/providers/launchtemplate/suite_test.go | 2 + pkg/test/environment.go | 3 +- 11 files changed, 368 insertions(+), 140 deletions(-) create mode 100644 pkg/controllers/providers/instancetype/controller.go create mode 100644 pkg/controllers/providers/instancetype/suite_test.go diff --git a/cmd/controller/main.go b/cmd/controller/main.go index b1871bd91f1e..fc09b917beef 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -67,6 +67,7 @@ func main() { op.PricingProvider, op.AMIProvider, op.LaunchTemplateProvider, + op.InstanceTypesProvider, )...). WithWebhooks(ctx, webhooks.NewWebhooks()...). Start(ctx) diff --git a/pkg/cloudprovider/suite_test.go b/pkg/cloudprovider/suite_test.go index 556767b6d109..b9d255ed3808 100644 --- a/pkg/cloudprovider/suite_test.go +++ b/pkg/cloudprovider/suite_test.go @@ -138,6 +138,8 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) }) It("should return an ICE error when there are no instance types to launch", func() { // Specify no instance types and expect to receive a capacity error @@ -230,6 +232,8 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) instanceNames := lo.Map(instances, func(info *ec2.InstanceTypeInfo, _ int) string { return *info.InstanceType }) @@ -324,6 +328,8 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) instanceNames := lo.Map(instances, func(info *ec2.InstanceTypeInfo, _ int) string { return *info.InstanceType }) @@ -425,6 +431,8 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) instanceNames := lo.Map(uniqInstanceTypes, func(info *ec2.InstanceTypeInfo, _ int) string { return *info.InstanceType }) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index e4c8c9388829..6510a522f870 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -22,6 +22,7 @@ import ( nodeclasshash "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/hash" nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status" nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination" + controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" @@ -42,6 +43,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup" "github.com/aws/karpenter-provider-aws/pkg/providers/sqs" @@ -51,7 +53,7 @@ import ( func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider, securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider, - pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider) []controller.Controller { + pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider instancetype.Provider) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), @@ -60,6 +62,7 @@ func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider), nodeclaimtagging.NewController(kubeClient, instanceProvider), controllerspricing.NewController(pricingProvider), + controllersinstancetype.NewController(instanceTypeProvider), } if options.FromContext(ctx).InterruptionQueue != "" { sqsapi := servicesqs.New(sess) diff --git a/pkg/controllers/providers/instancetype/controller.go b/pkg/controllers/providers/instancetype/controller.go new file mode 100644 index 000000000000..2a35101c8e7c --- /dev/null +++ b/pkg/controllers/providers/instancetype/controller.go @@ -0,0 +1,65 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancetype + +import ( + "context" + "fmt" + "time" + + lop "github.com/samber/lo/parallel" + "go.uber.org/multierr" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/operator/controller" + + "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" +) + +type Controller struct { + instancetypeProvider instancetype.Provider +} + +func NewController(instancetypeProvider instancetype.Provider) *Controller { + return &Controller{ + instancetypeProvider: instancetypeProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { + work := []func(ctx context.Context) error{ + c.instancetypeProvider.UpdateInstanceTypes, + c.instancetypeProvider.UpdateInstanceTypeOfferings, + } + errs := make([]error, len(work)) + lop.ForEach(work, func(f func(ctx context.Context) error, i int) { + if err := f(ctx); err != nil { + errs[i] = err + } + }) + if err := multierr.Combine(errs...); err != nil { + return reconcile.Result{}, fmt.Errorf("updating instancetype, %w", err) + } + return reconcile.Result{RequeueAfter: 12 * time.Hour}, nil +} + +func (c *Controller) Name() string { + return "providers.instancetype" +} + +func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Builder { + // Includes a default exponential failure rate limiter of base: time.Millisecond, and max: 1000*time.Second + return controller.NewSingletonManagedBy(m) +} diff --git a/pkg/controllers/providers/instancetype/suite_test.go b/pkg/controllers/providers/instancetype/suite_test.go new file mode 100644 index 000000000000..9f3fb844b9b1 --- /dev/null +++ b/pkg/controllers/providers/instancetype/suite_test.go @@ -0,0 +1,137 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancetype_test + +import ( + "context" + "testing" + + "k8s.io/apimachinery/pkg/types" + corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + "sigs.k8s.io/karpenter/pkg/operator/scheme" + coretest "sigs.k8s.io/karpenter/pkg/test" + + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/samber/lo" + + "github.com/aws/karpenter-provider-aws/pkg/apis" + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" + "github.com/aws/karpenter-provider-aws/pkg/fake" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "knative.dev/pkg/logging/testing" + . "sigs.k8s.io/karpenter/pkg/test/expectations" +) + +var ctx context.Context +var stop context.CancelFunc +var env *coretest.Environment +var awsEnv *test.Environment +var controller *controllersinstancetype.Controller + +func TestAWS(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "InstanceType") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...)) + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + ctx, stop = context.WithCancel(ctx) + awsEnv = test.NewEnvironment(ctx, env) + controller = controllersinstancetype.NewController(awsEnv.InstanceTypesProvider) +}) + +var _ = AfterSuite(func() { + stop() + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + + awsEnv.Reset() +}) + +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = Describe("InstanceType", func() { + It("should update instance type date with response from the DescribeInstanceTypes API", func() { + ec2InstanceTypes := fake.MakeInstances() + ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypes) + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypes, + }) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ + InstanceTypeOfferings: ec2Offerings, + }) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) + Expect(err).To(BeNil()) + for i := range instanceTypes { + Expect(instanceTypes[i].Name).To(Equal(lo.FromPtr(ec2InstanceTypes[i].InstanceType))) + } + }) + It("should update instance type offering date with response from the DescribeInstanceTypesOfferings API", func() { + ec2InstanceTypes := fake.MakeInstances() + ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypes) + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypes, + }) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ + InstanceTypeOfferings: ec2Offerings, + }) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) + Expect(err).To(BeNil()) + + Expect(len(instanceTypes)).To(BeNumerically("==", len(ec2InstanceTypes))) + for x := range instanceTypes { + offering, found := lo.Find(ec2Offerings, func(off *ec2.InstanceTypeOffering) bool { + return instanceTypes[x].Name == lo.FromPtr(off.InstanceType) + }) + Expect(found).To(BeTrue()) + for y := range instanceTypes[x].Offerings { + Expect(instanceTypes[x].Offerings[y].Zone).To(Equal(lo.FromPtr(offering.Location))) + } + } + }) + It("should not update instance type date with response from the DescribeInstanceTypes API is empty", func() { + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) + Expect(err).ToNot(BeNil()) + }) + It("should not update instance type offering date with response from the DescribeInstanceTypesOfferings API", func() { + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) + Expect(err).ToNot(BeNil()) + }) +}) diff --git a/pkg/controllers/providers/pricing/controller.go b/pkg/controllers/providers/pricing/controller.go index 065e41130d03..e07f1b2307f0 100644 --- a/pkg/controllers/providers/pricing/controller.go +++ b/pkg/controllers/providers/pricing/controller.go @@ -57,7 +57,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc } func (c *Controller) Name() string { - return "pricing" + return "providers.pricing" } func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Builder { diff --git a/pkg/providers/instance/suite_test.go b/pkg/providers/instance/suite_test.go index 45d3504bafb6..1693e2eadf0d 100644 --- a/pkg/providers/instance/suite_test.go +++ b/pkg/providers/instance/suite_test.go @@ -106,6 +106,8 @@ var _ = Describe("InstanceProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) }) It("should return an ICE error when all attempted instance types return an ICE error", func() { ExpectApplied(ctx, env.Client, nodeClaim, nodePool, nodeClass) diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index af3c82406711..1caaebccdbe1 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -37,24 +37,19 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/logging" + "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/utils/pretty" - - "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" -) - -const ( - InstanceTypesCacheKey = "types" - InstanceTypeOfferingsCacheKey = "offerings" ) type Provider interface { LivenessProbe(*http.Request) error - List(context.Context, *corev1beta1.KubeletConfiguration, *v1beta1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) + UpdateInstanceTypes(ctx context.Context) error + UpdateInstanceTypeOfferings(ctx context.Context) error } type DefaultProvider struct { @@ -62,14 +57,19 @@ type DefaultProvider struct { ec2api ec2iface.EC2API subnetProvider subnet.Provider pricingProvider pricing.Provider - // Has one cache entry for all the instance types (key: InstanceTypesCacheKey) - // Has one cache entry for all the zones for each subnet selector (key: InstanceTypesZonesCacheKeyPrefix:) - // Values cached *before* considering insufficient capacity errors from the unavailableOfferings cache. + + // Values stored *before* considering insufficient capacity errors from the unavailableOfferings cache. // Fully initialized Instance Types are also cached based on the set of all instance types, zones, unavailableOfferings cache, // EC2NodeClass, and kubelet configuration from the NodePool - mu sync.Mutex - cache *cache.Cache + muInstanceTypeInfo sync.RWMutex + // TODO @engedaam: Look into only storing the needed EC2InstanceTypeInfo + instanceTypesInfo []*ec2.InstanceTypeInfo + + muInstanceTypeOfferings sync.RWMutex + instanceTypeOfferings map[string]sets.Set[string] + + instanceTypesCache *cache.Cache unavailableOfferings *awscache.UnavailableOfferings cm *pretty.ChangeMonitor @@ -79,31 +79,35 @@ type DefaultProvider struct { instanceTypeOfferingsSeqNum uint64 } -func NewDefaultProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, +func NewDefaultProvider(region string, instanceTypesCache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, unavailableOfferingsCache *awscache.UnavailableOfferings, pricingProvider pricing.Provider) *DefaultProvider { return &DefaultProvider{ - ec2api: ec2api, - region: region, - subnetProvider: subnetProvider, - pricingProvider: pricingProvider, - cache: cache, - unavailableOfferings: unavailableOfferingsCache, - cm: pretty.NewChangeMonitor(), - instanceTypesSeqNum: 0, + ec2api: ec2api, + region: region, + subnetProvider: subnetProvider, + pricingProvider: pricingProvider, + instanceTypesInfo: []*ec2.InstanceTypeInfo{}, + instanceTypeOfferings: map[string]sets.Set[string]{}, + instanceTypesCache: instanceTypesCache, + unavailableOfferings: unavailableOfferingsCache, + cm: pretty.NewChangeMonitor(), + instanceTypesSeqNum: 0, } } func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguration, nodeClass *v1beta1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) { - // Get InstanceTypes from EC2 - instanceTypes, err := p.GetInstanceTypes(ctx) - if err != nil { - return nil, err + p.muInstanceTypeInfo.RLock() + p.muInstanceTypeOfferings.RLock() + defer p.muInstanceTypeInfo.RUnlock() + defer p.muInstanceTypeOfferings.RUnlock() + + if len(p.instanceTypesInfo) == 0 { + return nil, fmt.Errorf("no instance types found") } - // Get InstanceTypeOfferings from EC2 - instanceTypeOfferings, err := p.getInstanceTypeOfferings(ctx) - if err != nil { - return nil, err + if len(p.instanceTypeOfferings) == 0 { + return nil, fmt.Errorf("no instance types offerings found") } + subnets, err := p.subnetProvider.List(ctx, nodeClass) if err != nil { return nil, err @@ -133,14 +137,14 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi aws.StringValue((*string)(nodeClass.Spec.InstanceStorePolicy)), aws.StringValue(nodeClass.Spec.AMIFamily), ) - if item, ok := p.cache.Get(key); ok { + if item, ok := p.instanceTypesCache.Get(key); ok { return item.([]*cloudprovider.InstanceType), nil } // Get all zones across all offerings // We don't use this in the cache key since this is produced from our instanceTypeOfferings which we do cache allZones := sets.New[string]() - for _, offeringZones := range instanceTypeOfferings { + for _, offeringZones := range p.instanceTypeOfferings { for zone := range offeringZones { allZones.Insert(zone) } @@ -149,7 +153,7 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi logging.FromContext(ctx).With("zones", allZones.UnsortedList()).Debugf("discovered zones") } amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) - result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType { + result := lo.Map(p.instanceTypesInfo, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType { instanceTypeVCPU.With(prometheus.Labels{ instanceTypeLabel: *i.InstanceType, }).Set(float64(aws.Int64Value(i.VCpuInfo.DefaultVCpus))) @@ -164,9 +168,9 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi return NewInstanceType(ctx, i, p.region, nodeClass.Spec.BlockDeviceMappings, nodeClass.Spec.InstanceStorePolicy, kc.MaxPods, kc.PodsPerCore, kc.KubeReserved, kc.SystemReserved, kc.EvictionHard, kc.EvictionSoft, - amiFamily, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones)) + amiFamily, p.createOfferings(ctx, i, p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones)) }) - p.cache.SetDefault(key, result) + p.instanceTypesCache.SetDefault(key, result) return result, nil } @@ -177,6 +181,77 @@ func (p *DefaultProvider) LivenessProbe(req *http.Request) error { return p.pricingProvider.LivenessProbe(req) } +func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error { + // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- + // We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple + // calls to EC2 when we could have just made one call. + // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache + p.muInstanceTypeInfo.Lock() + defer p.muInstanceTypeInfo.Unlock() + var instanceTypes []*ec2.InstanceTypeInfo + + if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("supported-virtualization-type"), + Values: []*string{aws.String("hvm")}, + }, + { + Name: aws.String("processor-info.supported-architecture"), + Values: aws.StringSlice([]string{"x86_64", "arm64"}), + }, + }, + }, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool { + instanceTypes = append(instanceTypes, page.InstanceTypes...) + return true + }); err != nil { + return fmt.Errorf("describing instance types, %w", err) + } + + if p.cm.HasChanged("instance-types", instanceTypes) { + // Only update instanceTypesSeqNun with the instance types have been changed + // This is to not create new keys with duplicate instance types option + atomic.AddUint64(&p.instanceTypesSeqNum, 1) + logging.FromContext(ctx).With( + "count", len(instanceTypes)).Debugf("discovered instance types") + } + p.instanceTypesInfo = instanceTypes + return nil +} + +func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error { + // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- + // We lock here so that multiple callers to GetInstanceTypes do not result in cache misses and multiple + // calls to EC2 when we could have just made one call. This lock is here because multiple callers to EC2 result + // in A LOT of extra memory generated from the response for simultaneous callers. + // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache + p.muInstanceTypeOfferings.Lock() + defer p.muInstanceTypeOfferings.Unlock() + + // Get offerings from EC2 + instanceTypeOfferings := map[string]sets.Set[string]{} + if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")}, + func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { + for _, offering := range output.InstanceTypeOfferings { + if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok { + instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]() + } + instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) + } + return true + }); err != nil { + return fmt.Errorf("describing instance type zone offerings, %w", err) + } + if p.cm.HasChanged("instance-type-offering", instanceTypeOfferings) { + // Only update instanceTypesSeqNun with the instance type offerings have been changed + // This is to not create new keys with duplicate instance type offerings option + atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1) + logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types") + } + p.instanceTypeOfferings = instanceTypeOfferings + return nil +} + func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, zones, subnetZones sets.Set[string]) []cloudprovider.Offering { var offerings []cloudprovider.Offering for zone := range zones { @@ -220,79 +295,8 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 return offerings } -func (p *DefaultProvider) getInstanceTypeOfferings(ctx context.Context) (map[string]sets.Set[string], error) { - // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- - // We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple - // calls to EC2 when we could have just made one call. - // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache - p.mu.Lock() - defer p.mu.Unlock() - if cached, ok := p.cache.Get(InstanceTypeOfferingsCacheKey); ok { - return cached.(map[string]sets.Set[string]), nil - } - - // Get offerings from EC2 - instanceTypeOfferings := map[string]sets.Set[string]{} - if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")}, - func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { - for _, offering := range output.InstanceTypeOfferings { - if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok { - instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]() - } - instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) - } - return true - }); err != nil { - return nil, fmt.Errorf("describing instance type zone offerings, %w", err) - } - if p.cm.HasChanged("instance-type-offering", instanceTypeOfferings) { - // Only update instanceTypesSeqNun with the instance type offerings have been changed - // This is to not create new keys with duplicate instance type offerings option - atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1) - logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types") - } - p.cache.SetDefault(InstanceTypeOfferingsCacheKey, instanceTypeOfferings) - return instanceTypeOfferings, nil -} - -// GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters -func (p *DefaultProvider) GetInstanceTypes(ctx context.Context) ([]*ec2.InstanceTypeInfo, error) { - // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- - // We lock here so that multiple callers to GetInstanceTypes do not result in cache misses and multiple - // calls to EC2 when we could have just made one call. This lock is here because multiple callers to EC2 result - // in A LOT of extra memory generated from the response for simultaneous callers. - // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache - p.mu.Lock() - defer p.mu.Unlock() - - if cached, ok := p.cache.Get(InstanceTypesCacheKey); ok { - return cached.([]*ec2.InstanceTypeInfo), nil - } - var instanceTypes []*ec2.InstanceTypeInfo - if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{ - Filters: []*ec2.Filter{ - { - Name: aws.String("supported-virtualization-type"), - Values: []*string{aws.String("hvm")}, - }, - { - Name: aws.String("processor-info.supported-architecture"), - Values: aws.StringSlice([]string{"x86_64", "arm64"}), - }, - }, - }, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool { - instanceTypes = append(instanceTypes, page.InstanceTypes...) - return true - }); err != nil { - return nil, fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err) - } - if p.cm.HasChanged("instance-types", instanceTypes) { - // Only update instanceTypesSeqNun with the instance types have been changed - // This is to not create new keys with duplicate instance types option - atomic.AddUint64(&p.instanceTypesSeqNum, 1) - logging.FromContext(ctx).With( - "count", len(instanceTypes)).Debugf("discovered instance types") - } - p.cache.SetDefault(InstanceTypesCacheKey, instanceTypes) - return instanceTypes, nil +func (p *DefaultProvider) Reset() { + p.instanceTypesInfo = []*ec2.InstanceTypeInfo{} + p.instanceTypeOfferings = map[string]sets.Set[string]{} + p.instanceTypesCache.Flush() } diff --git a/pkg/providers/instancetype/suite_test.go b/pkg/providers/instancetype/suite_test.go index 6b0d284d725b..0e270e24a70b 100644 --- a/pkg/providers/instancetype/suite_test.go +++ b/pkg/providers/instancetype/suite_test.go @@ -155,6 +155,8 @@ var _ = Describe("InstanceTypeProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) }) It("should support individual instance type labels", func() { @@ -333,6 +335,8 @@ var _ = Describe("InstanceTypeProvider", func() { awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ InstanceTypeOfferings: fake.MakeInstanceOfferings(instances), }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) ExpectApplied(ctx, env.Client, nodePool, nodeClass) pod := coretest.UnschedulablePod(coretest.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -376,6 +380,8 @@ var _ = Describe("InstanceTypeProvider", func() { awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ InstanceTypeOfferings: fake.MakeInstanceOfferings(instances), }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) nodePool.Spec.Template.Spec.Requirements = []corev1beta1.NodeSelectorRequirementWithMinValues{ { @@ -392,6 +398,8 @@ var _ = Describe("InstanceTypeProvider", func() { ExpectApplied(ctx, env.Client, nodePool, nodeClass) awsEnv.EC2API.DescribeSpotPriceHistoryOutput.Set(generateSpotPricing(cloudProvider, nodePool)) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) pod := coretest.UnschedulablePod(coretest.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -732,9 +740,9 @@ var _ = Describe("InstanceTypeProvider", func() { Expect(*node.Status.Capacity.StorageEphemeral()).To(Equal(resource.MustParse("7600G"))) }) It("should not set pods to 110 if using ENI-based pod density", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -754,10 +762,10 @@ var _ = Describe("InstanceTypeProvider", func() { } }) It("should set pods to 110 if AMI Family doesn't support", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(windowsNodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -864,9 +872,9 @@ var _ = Describe("InstanceTypeProvider", func() { })) var ok bool - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) - info, ok = lo.Find(instanceInfo, func(i *ec2.InstanceTypeInfo) bool { + info, ok = lo.Find(instanceInfo.InstanceTypes, func(i *ec2.InstanceTypeInfo) bool { return aws.StringValue(i.InstanceType) == "m5.xlarge" }) Expect(ok).To(BeTrue()) @@ -1343,9 +1351,9 @@ var _ = Describe("InstanceTypeProvider", func() { }) }) It("should default max pods based off of network interfaces", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { if *info.InstanceType == "t3.large" { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, @@ -1385,12 +1393,12 @@ var _ = Describe("InstanceTypeProvider", func() { } }) It("should set max-pods to user-defined value if specified", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ MaxPods: ptr.Int32(10), } - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -1410,12 +1418,12 @@ var _ = Describe("InstanceTypeProvider", func() { } }) It("should override max-pods value", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ MaxPods: ptr.Int32(10), } - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -1439,9 +1447,9 @@ var _ = Describe("InstanceTypeProvider", func() { ReservedENIs: lo.ToPtr(1), })) - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) - t3Large, ok := lo.Find(instanceInfo, func(info *ec2.InstanceTypeInfo) bool { + t3Large, ok := lo.Find(instanceInfo.InstanceTypes, func(info *ec2.InstanceTypeInfo) bool { return *info.InstanceType == "t3.large" }) Expect(ok).To(Equal(true)) @@ -1473,9 +1481,9 @@ var _ = Describe("InstanceTypeProvider", func() { ReservedENIs: lo.ToPtr(1_000_000), })) - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) - t3Large, ok := lo.Find(instanceInfo, func(info *ec2.InstanceTypeInfo) bool { + t3Large, ok := lo.Find(instanceInfo.InstanceTypes, func(info *ec2.InstanceTypeInfo) bool { return *info.InstanceType == "t3.large" }) Expect(ok).To(Equal(true)) @@ -1504,12 +1512,12 @@ var _ = Describe("InstanceTypeProvider", func() { Expect(it.Capacity.Pods().Value()).To(BeNumerically("==", maxPods)) }) It("should override pods-per-core value", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ PodsPerCore: ptr.Int32(1), } - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -1529,13 +1537,13 @@ var _ = Describe("InstanceTypeProvider", func() { } }) It("should take the minimum of pods-per-core and max-pods", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ PodsPerCore: ptr.Int32(4), MaxPods: ptr.Int32(20), } - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -1555,13 +1563,13 @@ var _ = Describe("InstanceTypeProvider", func() { } }) It("should ignore pods-per-core when using Bottlerocket AMI", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) nodeClass.Spec.AMIFamily = &v1beta1.AMIFamilyBottlerocket nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ PodsPerCore: ptr.Int32(1), } - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, info, @@ -1582,12 +1590,12 @@ var _ = Describe("InstanceTypeProvider", func() { } }) It("should take limited pod density to be the default pods number when pods-per-core is 0", func() { - instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + instanceInfo, err := awsEnv.EC2API.DescribeInstanceTypesWithContext(ctx, &ec2.DescribeInstanceTypesInput{}) Expect(err).To(BeNil()) nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{ PodsPerCore: ptr.Int32(0), } - for _, info := range instanceInfo { + for _, info := range instanceInfo.InstanceTypes { if *info.InstanceType == "t3.large" { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) it := instancetype.NewInstanceType(ctx, @@ -1917,7 +1925,6 @@ var _ = Describe("InstanceTypeProvider", func() { } } - awsEnv.InstanceTypeCache.Flush() instanceTypes, err := cloudProvider.GetInstanceTypes(ctx, nodePool) Expect(err).To(BeNil()) instanceTypeNames := sets.NewString() @@ -2275,7 +2282,7 @@ func uniqueInstanceTypeList(instanceTypesLists [][]*corecloudprovider.InstanceTy func generateSpotPricing(cp *cloudprovider.CloudProvider, nodePool *corev1beta1.NodePool) *ec2.DescribeSpotPriceHistoryOutput { rsp := &ec2.DescribeSpotPriceHistoryOutput{} instanceTypes, err := cp.GetInstanceTypes(ctx, nodePool) - awsEnv.InstanceTypeCache.Flush() + awsEnv.InstanceTypesProvider.Reset() Expect(err).To(Succeed()) t := fakeClock.Now() diff --git a/pkg/providers/launchtemplate/suite_test.go b/pkg/providers/launchtemplate/suite_test.go index 17449addc88f..6799b9ed3ab8 100644 --- a/pkg/providers/launchtemplate/suite_test.go +++ b/pkg/providers/launchtemplate/suite_test.go @@ -146,6 +146,8 @@ var _ = Describe("LaunchTemplate Provider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed()) }) It("should create unique launch templates for multiple identical nodeClasses", func() { nodeClass2 := test.EC2NodeClass() diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 0a0b1ca56e51..8c29b954ddf9 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -140,7 +140,6 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment EC2Cache: ec2Cache, KubernetesVersionCache: kubernetesVersionCache, - InstanceTypeCache: instanceTypeCache, LaunchTemplateCache: launchTemplateCache, SubnetCache: subnetCache, SecurityGroupCache: securityGroupCache, @@ -167,10 +166,10 @@ func (env *Environment) Reset() { env.IAMAPI.Reset() env.PricingAPI.Reset() env.PricingProvider.Reset() + env.InstanceTypesProvider.Reset() env.EC2Cache.Flush() env.KubernetesVersionCache.Flush() - env.InstanceTypeCache.Flush() env.UnavailableOfferingsCache.Flush() env.LaunchTemplateCache.Flush() env.SubnetCache.Flush()