diff --git a/cmd/controller/main.go b/cmd/controller/main.go index b1871bd91f1e..fc09b917beef 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -67,6 +67,7 @@ func main() { op.PricingProvider, op.AMIProvider, op.LaunchTemplateProvider, + op.InstanceTypesProvider, )...). WithWebhooks(ctx, webhooks.NewWebhooks()...). Start(ctx) diff --git a/hack/code/prices_gen/main.go b/hack/code/prices_gen/main.go index 7b73fdfc8564..1bf74fbcd76f 100644 --- a/hack/code/prices_gen/main.go +++ b/hack/code/prices_gen/main.go @@ -34,7 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" - controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/pricing" + controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/test" diff --git a/pkg/cloudprovider/suite_test.go b/pkg/cloudprovider/suite_test.go index 556767b6d109..a97a24631c35 100644 --- a/pkg/cloudprovider/suite_test.go +++ b/pkg/cloudprovider/suite_test.go @@ -138,6 +138,7 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) }) It("should return an ICE error when there are no instance types to launch", func() { // Specify no instance types and expect to receive a capacity error @@ -230,6 +231,7 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) instanceNames := lo.Map(instances, func(info *ec2.InstanceTypeInfo, _ int) string { return *info.InstanceType }) @@ -324,6 +326,7 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) instanceNames := lo.Map(instances, func(info *ec2.InstanceTypeInfo, _ int) string { return *info.InstanceType }) @@ -425,6 +428,7 @@ var _ = Describe("CloudProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) instanceNames := lo.Map(uniqInstanceTypes, func(info *ec2.InstanceTypeInfo, _ int) string { return *info.InstanceType }) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index e4c8c9388829..6510a522f870 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -22,6 +22,7 @@ import ( nodeclasshash "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/hash" nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status" nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination" + controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" @@ -42,6 +43,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup" "github.com/aws/karpenter-provider-aws/pkg/providers/sqs" @@ -51,7 +53,7 @@ import ( func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider, securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider, - pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider) []controller.Controller { + pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider instancetype.Provider) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), @@ -60,6 +62,7 @@ func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider), nodeclaimtagging.NewController(kubeClient, instanceProvider), controllerspricing.NewController(pricingProvider), + controllersinstancetype.NewController(instanceTypeProvider), } if options.FromContext(ctx).InterruptionQueue != "" { sqsapi := servicesqs.New(sess) diff --git a/pkg/controllers/providers/instancetype/controller.go b/pkg/controllers/providers/instancetype/controller.go new file mode 100644 index 000000000000..077aa6aba516 --- /dev/null +++ b/pkg/controllers/providers/instancetype/controller.go @@ -0,0 +1,48 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancetype + +import ( + "context" + "time" + + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/operator/controller" + + "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" +) + +type Controller struct { + instancetypeProvider instancetype.Provider +} + +func NewController(instancetypeProvider instancetype.Provider) *Controller { + return &Controller{ + instancetypeProvider: instancetypeProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { + return reconcile.Result{RequeueAfter: 6 * time.Hour}, c.instancetypeProvider.Update(ctx) +} + +func (c *Controller) Name() string { + return "instancetype" +} + +func (c *Controller) Builder(_ context.Context, m manager.Manager) controller.Builder { + return controller.NewSingletonManagedBy(m) +} diff --git a/pkg/controllers/providers/instancetype/suite_test.go b/pkg/controllers/providers/instancetype/suite_test.go new file mode 100644 index 000000000000..36bd138fc9ff --- /dev/null +++ b/pkg/controllers/providers/instancetype/suite_test.go @@ -0,0 +1,160 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancetype_test + +import ( + "context" + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/types" + corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + "sigs.k8s.io/karpenter/pkg/operator/scheme" + coretest "sigs.k8s.io/karpenter/pkg/test" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/karpenter-provider-aws/pkg/apis" + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" + "github.com/aws/karpenter-provider-aws/pkg/fake" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/test" + "github.com/samber/lo" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "knative.dev/pkg/logging/testing" + . "sigs.k8s.io/karpenter/pkg/test/expectations" +) + +var ctx context.Context +var stop context.CancelFunc +var env *coretest.Environment +var awsEnv *test.Environment +var controller *controllersinstancetype.Controller + +func TestAWS(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "Pricing") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...)) + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + ctx, stop = context.WithCancel(ctx) + awsEnv = test.NewEnvironment(ctx, env) + controller = controllersinstancetype.NewController(awsEnv.InstanceTypesProvider) +}) + +var _ = AfterSuite(func() { + stop() + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + + awsEnv.Reset() +}) + +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = Describe("InstanceType", func() { + It("should update instance type date with response from the DescribeInstanceTypes API ", func() { + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + instanceTypes, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + Expect(err).To(BeNil()) + Expect(reflect.DeepEqual(instanceTypes, []*ec2.InstanceTypeInfo{})).To(BeTrue()) + + ec2InstanceTypes := fake.MakeInstances() + ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypes) + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypes, + }) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ + InstanceTypeOfferings: ec2Offerings, + }) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + instanceTypes, err = awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + Expect(err).To(BeNil()) + Expect(reflect.DeepEqual(instanceTypes, ec2InstanceTypes)).To(BeTrue()) + }) + It("should update instance type offering date with response from the DescribeInstanceTypesOfferings API ", func() { + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{}) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{}) + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + _, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) + Expect(err).ToNot(BeNil()) + + ec2InstanceTypes := fake.MakeInstances() + ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypes) + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypes, + }) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ + InstanceTypeOfferings: ec2Offerings, + }) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{}) + Expect(err).To(BeNil()) + + Expect(len(instanceTypes)).To(BeNumerically("==", len(ec2InstanceTypes))) + for x := range instanceTypes { + offering, found := lo.Find(ec2Offerings, func(off *ec2.InstanceTypeOffering) bool { + return instanceTypes[x].Name == lo.FromPtr(off.InstanceType) + }) + Expect(found).To(BeTrue()) + for y := range instanceTypes[x].Offerings { + Expect(instanceTypes[x].Offerings[y].Zone).To(Equal(lo.FromPtr(offering.Location))) + } + } + }) + It("should not update instance type date with response from the DescribeInstanceTypes API on failure", func() { + ec2InstanceTypes := fake.MakeInstances() + ec2Offerings := fake.MakeInstanceOfferings(ec2InstanceTypes) + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypes, + }) + awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ + InstanceTypeOfferings: ec2Offerings, + }) + + ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{}) + instanceTypes, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + Expect(err).To(BeNil()) + Expect(reflect.DeepEqual(instanceTypes, ec2InstanceTypes)).To(BeFalse()) + + awsEnv.EC2API.DescribeInstanceTypesOutput.Set(&ec2.DescribeInstanceTypesOutput{ + InstanceTypes: ec2InstanceTypes[:1], + }) + awsEnv.EC2API.NextError.Set(awserr.New("InternalServerError", "", nil), fake.MaxCalls(1)) + + ExpectReconcileFailed(ctx, controller, types.NamespacedName{}) + instanceTypes, err = awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) + Expect(err).To(BeNil()) + Expect(reflect.DeepEqual(instanceTypes, ec2InstanceTypes[:1])).To(BeFalse()) + }) +}) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 39be55788322..907c9ac308ed 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -159,7 +159,6 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont ) instanceTypeProvider := instancetype.NewDefaultProvider( *sess.Config.Region, - cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval), ec2api, subnetProvider, unavailableOfferingsCache, diff --git a/pkg/providers/instance/suite_test.go b/pkg/providers/instance/suite_test.go index 45d3504bafb6..d4eca6a3dade 100644 --- a/pkg/providers/instance/suite_test.go +++ b/pkg/providers/instance/suite_test.go @@ -106,6 +106,7 @@ var _ = Describe("InstanceProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) }) It("should return an ICE error when all attempted instance types return an ICE error", func() { ExpectApplied(ctx, env.Client, nodeClaim, nodePool, nodeClass) diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index af3c82406711..19a1fb43cb1f 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -22,7 +22,6 @@ import ( "sync/atomic" "github.com/mitchellh/hashstructure/v2" - "github.com/patrickmn/go-cache" "github.com/prometheus/client_golang/prometheus" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" @@ -37,24 +36,18 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/logging" + "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" "github.com/aws/karpenter-provider-aws/pkg/providers/pricing" "github.com/aws/karpenter-provider-aws/pkg/providers/subnet" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/utils/pretty" - - "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" -) - -const ( - InstanceTypesCacheKey = "types" - InstanceTypeOfferingsCacheKey = "offerings" ) type Provider interface { LivenessProbe(*http.Request) error - List(context.Context, *corev1beta1.KubeletConfiguration, *v1beta1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) + Update(ctx context.Context) error } type DefaultProvider struct { @@ -62,14 +55,16 @@ type DefaultProvider struct { ec2api ec2iface.EC2API subnetProvider subnet.Provider pricingProvider pricing.Provider - // Has one cache entry for all the instance types (key: InstanceTypesCacheKey) - // Has one cache entry for all the zones for each subnet selector (key: InstanceTypesZonesCacheKeyPrefix:) - // Values cached *before* considering insufficient capacity errors from the unavailableOfferings cache. + + // Values stored *before* considering insufficient capacity errors from the unavailableOfferings cache. // Fully initialized Instance Types are also cached based on the set of all instance types, zones, unavailableOfferings cache, // EC2NodeClass, and kubelet configuration from the NodePool - mu sync.Mutex - cache *cache.Cache + muInstanceType sync.RWMutex + muInstanceTypeOfferings sync.RWMutex + instanceTypesInfo []*ec2.InstanceTypeInfo + instanceTypeOfferings map[string]sets.Set[string] + instanceTypes map[string][]*cloudprovider.InstanceType unavailableOfferings *awscache.UnavailableOfferings cm *pretty.ChangeMonitor @@ -79,31 +74,35 @@ type DefaultProvider struct { instanceTypeOfferingsSeqNum uint64 } -func NewDefaultProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, +func NewDefaultProvider(region string, ec2api ec2iface.EC2API, subnetProvider subnet.Provider, unavailableOfferingsCache *awscache.UnavailableOfferings, pricingProvider pricing.Provider) *DefaultProvider { return &DefaultProvider{ - ec2api: ec2api, - region: region, - subnetProvider: subnetProvider, - pricingProvider: pricingProvider, - cache: cache, - unavailableOfferings: unavailableOfferingsCache, - cm: pretty.NewChangeMonitor(), - instanceTypesSeqNum: 0, + ec2api: ec2api, + region: region, + subnetProvider: subnetProvider, + pricingProvider: pricingProvider, + instanceTypesInfo: []*ec2.InstanceTypeInfo{}, + instanceTypeOfferings: map[string]sets.Set[string]{}, + instanceTypes: map[string][]*cloudprovider.InstanceType{}, + unavailableOfferings: unavailableOfferingsCache, + cm: pretty.NewChangeMonitor(), + instanceTypesSeqNum: 0, } } func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguration, nodeClass *v1beta1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) { - // Get InstanceTypes from EC2 - instanceTypes, err := p.GetInstanceTypes(ctx) - if err != nil { - return nil, err + p.muInstanceType.RLock() + p.muInstanceTypeOfferings.RLock() + defer p.muInstanceType.RUnlock() + defer p.muInstanceTypeOfferings.RUnlock() + + if len(p.instanceTypesInfo) == 0 { + return nil, fmt.Errorf("no instance types found") } - // Get InstanceTypeOfferings from EC2 - instanceTypeOfferings, err := p.getInstanceTypeOfferings(ctx) - if err != nil { - return nil, err + if len(p.instanceTypeOfferings) == 0 { + return nil, fmt.Errorf("no instance types offerings found") } + subnets, err := p.subnetProvider.List(ctx, nodeClass) if err != nil { return nil, err @@ -133,14 +132,14 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi aws.StringValue((*string)(nodeClass.Spec.InstanceStorePolicy)), aws.StringValue(nodeClass.Spec.AMIFamily), ) - if item, ok := p.cache.Get(key); ok { - return item.([]*cloudprovider.InstanceType), nil + if item, ok := p.instanceTypes[key]; ok { + return item, nil } // Get all zones across all offerings // We don't use this in the cache key since this is produced from our instanceTypeOfferings which we do cache allZones := sets.New[string]() - for _, offeringZones := range instanceTypeOfferings { + for _, offeringZones := range p.instanceTypeOfferings { for zone := range offeringZones { allZones.Insert(zone) } @@ -149,7 +148,7 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi logging.FromContext(ctx).With("zones", allZones.UnsortedList()).Debugf("discovered zones") } amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) - result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType { + result := lo.Map(p.instanceTypesInfo, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType { instanceTypeVCPU.With(prometheus.Labels{ instanceTypeLabel: *i.InstanceType, }).Set(float64(aws.Int64Value(i.VCpuInfo.DefaultVCpus))) @@ -164,9 +163,9 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi return NewInstanceType(ctx, i, p.region, nodeClass.Spec.BlockDeviceMappings, nodeClass.Spec.InstanceStorePolicy, kc.MaxPods, kc.PodsPerCore, kc.KubeReserved, kc.SystemReserved, kc.EvictionHard, kc.EvictionSoft, - amiFamily, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones)) + amiFamily, p.createOfferings(ctx, i, p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones)) }) - p.cache.SetDefault(key, result) + p.instanceTypes[key] = result return result, nil } @@ -177,6 +176,100 @@ func (p *DefaultProvider) LivenessProbe(req *http.Request) error { return p.pricingProvider.LivenessProbe(req) } +func (p *DefaultProvider) Update(ctx context.Context) error { + // Hydrate InstanceTypes from EC2 + if err := p.updateInstanceTypes(ctx); err != nil { + return err + } + // Hydrate InstanceTypeOfferings from EC2 + if err := p.updateInstanceTypeOfferings(ctx); err != nil { + return err + } + return nil +} + +// GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters +func (p *DefaultProvider) GetInstanceTypes(ctx context.Context) ([]*ec2.InstanceTypeInfo, error) { + p.muInstanceType.RLock() + defer p.muInstanceType.RUnlock() + + if len(p.instanceTypesInfo) == 0 { + return nil, fmt.Errorf("no instance types found") + } + return p.instanceTypesInfo, nil +} + +func (p *DefaultProvider) updateInstanceTypes(ctx context.Context) error { + // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- + // We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple + // calls to EC2 when we could have just made one call. + // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache + p.muInstanceType.Lock() + defer p.muInstanceType.Unlock() + var instanceTypes []*ec2.InstanceTypeInfo + + if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("supported-virtualization-type"), + Values: []*string{aws.String("hvm")}, + }, + { + Name: aws.String("processor-info.supported-architecture"), + Values: aws.StringSlice([]string{"x86_64", "arm64"}), + }, + }, + }, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool { + instanceTypes = append(instanceTypes, page.InstanceTypes...) + return true + }); err != nil { + return fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err) + } + + if p.cm.HasChanged("instance-types", instanceTypes) { + // Only update instanceTypesSeqNun with the instance types have been changed + // This is to not create new keys with duplicate instance types option + atomic.AddUint64(&p.instanceTypesSeqNum, 1) + logging.FromContext(ctx).With( + "count", len(instanceTypes)).Debugf("discovered instance types") + } + p.instanceTypesInfo = instanceTypes + return nil +} + +func (p *DefaultProvider) updateInstanceTypeOfferings(ctx context.Context) error { + // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- + // We lock here so that multiple callers to GetInstanceTypes do not result in cache misses and multiple + // calls to EC2 when we could have just made one call. This lock is here because multiple callers to EC2 result + // in A LOT of extra memory generated from the response for simultaneous callers. + // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache + p.muInstanceTypeOfferings.Lock() + defer p.muInstanceTypeOfferings.Unlock() + + // Get offerings from EC2 + instanceTypeOfferings := map[string]sets.Set[string]{} + if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")}, + func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { + for _, offering := range output.InstanceTypeOfferings { + if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok { + instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]() + } + instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) + } + return true + }); err != nil { + return fmt.Errorf("describing instance type zone offerings, %w", err) + } + if p.cm.HasChanged("instance-type-offering", instanceTypeOfferings) { + // Only update instanceTypesSeqNun with the instance type offerings have been changed + // This is to not create new keys with duplicate instance type offerings option + atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1) + logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types") + } + p.instanceTypeOfferings = instanceTypeOfferings + return nil +} + func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, zones, subnetZones sets.Set[string]) []cloudprovider.Offering { var offerings []cloudprovider.Offering for zone := range zones { @@ -220,79 +313,8 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 return offerings } -func (p *DefaultProvider) getInstanceTypeOfferings(ctx context.Context) (map[string]sets.Set[string], error) { - // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- - // We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple - // calls to EC2 when we could have just made one call. - // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache - p.mu.Lock() - defer p.mu.Unlock() - if cached, ok := p.cache.Get(InstanceTypeOfferingsCacheKey); ok { - return cached.(map[string]sets.Set[string]), nil - } - - // Get offerings from EC2 - instanceTypeOfferings := map[string]sets.Set[string]{} - if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")}, - func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { - for _, offering := range output.InstanceTypeOfferings { - if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok { - instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]() - } - instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) - } - return true - }); err != nil { - return nil, fmt.Errorf("describing instance type zone offerings, %w", err) - } - if p.cm.HasChanged("instance-type-offering", instanceTypeOfferings) { - // Only update instanceTypesSeqNun with the instance type offerings have been changed - // This is to not create new keys with duplicate instance type offerings option - atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1) - logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types") - } - p.cache.SetDefault(InstanceTypeOfferingsCacheKey, instanceTypeOfferings) - return instanceTypeOfferings, nil -} - -// GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters -func (p *DefaultProvider) GetInstanceTypes(ctx context.Context) ([]*ec2.InstanceTypeInfo, error) { - // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- - // We lock here so that multiple callers to GetInstanceTypes do not result in cache misses and multiple - // calls to EC2 when we could have just made one call. This lock is here because multiple callers to EC2 result - // in A LOT of extra memory generated from the response for simultaneous callers. - // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache - p.mu.Lock() - defer p.mu.Unlock() - - if cached, ok := p.cache.Get(InstanceTypesCacheKey); ok { - return cached.([]*ec2.InstanceTypeInfo), nil - } - var instanceTypes []*ec2.InstanceTypeInfo - if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{ - Filters: []*ec2.Filter{ - { - Name: aws.String("supported-virtualization-type"), - Values: []*string{aws.String("hvm")}, - }, - { - Name: aws.String("processor-info.supported-architecture"), - Values: aws.StringSlice([]string{"x86_64", "arm64"}), - }, - }, - }, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool { - instanceTypes = append(instanceTypes, page.InstanceTypes...) - return true - }); err != nil { - return nil, fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err) - } - if p.cm.HasChanged("instance-types", instanceTypes) { - // Only update instanceTypesSeqNun with the instance types have been changed - // This is to not create new keys with duplicate instance types option - atomic.AddUint64(&p.instanceTypesSeqNum, 1) - logging.FromContext(ctx).With( - "count", len(instanceTypes)).Debugf("discovered instance types") - } - p.cache.SetDefault(InstanceTypesCacheKey, instanceTypes) - return instanceTypes, nil +func (p *DefaultProvider) Reset() { + p.instanceTypesInfo = []*ec2.InstanceTypeInfo{} + p.instanceTypeOfferings = map[string]sets.Set[string]{} + p.instanceTypes = map[string][]*cloudprovider.InstanceType{} } diff --git a/pkg/providers/instancetype/suite_test.go b/pkg/providers/instancetype/suite_test.go index 6b0d284d725b..066ed8fe9cdd 100644 --- a/pkg/providers/instancetype/suite_test.go +++ b/pkg/providers/instancetype/suite_test.go @@ -155,6 +155,7 @@ var _ = Describe("InstanceTypeProvider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) }) It("should support individual instance type labels", func() { @@ -333,6 +334,7 @@ var _ = Describe("InstanceTypeProvider", func() { awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ InstanceTypeOfferings: fake.MakeInstanceOfferings(instances), }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) ExpectApplied(ctx, env.Client, nodePool, nodeClass) pod := coretest.UnschedulablePod(coretest.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -376,6 +378,7 @@ var _ = Describe("InstanceTypeProvider", func() { awsEnv.EC2API.DescribeInstanceTypeOfferingsOutput.Set(&ec2.DescribeInstanceTypeOfferingsOutput{ InstanceTypeOfferings: fake.MakeInstanceOfferings(instances), }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) nodePool.Spec.Template.Spec.Requirements = []corev1beta1.NodeSelectorRequirementWithMinValues{ { @@ -392,6 +395,7 @@ var _ = Describe("InstanceTypeProvider", func() { ExpectApplied(ctx, env.Client, nodePool, nodeClass) awsEnv.EC2API.DescribeSpotPriceHistoryOutput.Set(generateSpotPricing(cloudProvider, nodePool)) Expect(awsEnv.PricingProvider.UpdateSpotPricing(ctx)).To(Succeed()) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) pod := coretest.UnschedulablePod(coretest.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -1917,7 +1921,6 @@ var _ = Describe("InstanceTypeProvider", func() { } } - awsEnv.InstanceTypeCache.Flush() instanceTypes, err := cloudProvider.GetInstanceTypes(ctx, nodePool) Expect(err).To(BeNil()) instanceTypeNames := sets.NewString() @@ -2275,7 +2278,7 @@ func uniqueInstanceTypeList(instanceTypesLists [][]*corecloudprovider.InstanceTy func generateSpotPricing(cp *cloudprovider.CloudProvider, nodePool *corev1beta1.NodePool) *ec2.DescribeSpotPriceHistoryOutput { rsp := &ec2.DescribeSpotPriceHistoryOutput{} instanceTypes, err := cp.GetInstanceTypes(ctx, nodePool) - awsEnv.InstanceTypeCache.Flush() + awsEnv.InstanceTypesProvider.Reset() Expect(err).To(Succeed()) t := fakeClock.Now() diff --git a/pkg/providers/launchtemplate/suite_test.go b/pkg/providers/launchtemplate/suite_test.go index 17449addc88f..6303915df5ab 100644 --- a/pkg/providers/launchtemplate/suite_test.go +++ b/pkg/providers/launchtemplate/suite_test.go @@ -146,6 +146,7 @@ var _ = Describe("LaunchTemplate Provider", func() { }, }, }) + Expect(awsEnv.InstanceTypesProvider.Update(ctx)).To(Succeed()) }) It("should create unique launch templates for multiple identical nodeClasses", func() { nodeClass2 := test.EC2NodeClass() diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 0a0b1ca56e51..593ea915d518 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -90,7 +90,6 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment // cache ec2Cache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) kubernetesVersionCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) - instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) unavailableOfferingsCache := awscache.NewUnavailableOfferings() launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) @@ -106,7 +105,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment instanceProfileProvider := instanceprofile.NewDefaultProvider(fake.DefaultRegion, iamapi, instanceProfileCache) amiProvider := amifamily.NewDefaultProvider(versionProvider, ssmapi, ec2api, ec2Cache) amiResolver := amifamily.NewResolver(amiProvider) - instanceTypesProvider := instancetype.NewDefaultProvider(fake.DefaultRegion, instanceTypeCache, ec2api, subnetProvider, unavailableOfferingsCache, pricingProvider) + instanceTypesProvider := instancetype.NewDefaultProvider(fake.DefaultRegion, ec2api, subnetProvider, unavailableOfferingsCache, pricingProvider) launchTemplateProvider := launchtemplate.NewDefaultProvider( ctx, @@ -140,7 +139,6 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment EC2Cache: ec2Cache, KubernetesVersionCache: kubernetesVersionCache, - InstanceTypeCache: instanceTypeCache, LaunchTemplateCache: launchTemplateCache, SubnetCache: subnetCache, SecurityGroupCache: securityGroupCache, @@ -167,10 +165,10 @@ func (env *Environment) Reset() { env.IAMAPI.Reset() env.PricingAPI.Reset() env.PricingProvider.Reset() + env.InstanceTypesProvider.Reset() env.EC2Cache.Flush() env.KubernetesVersionCache.Flush() - env.InstanceTypeCache.Flush() env.UnavailableOfferingsCache.Flush() env.LaunchTemplateCache.Flush() env.SubnetCache.Flush()