From c2849cf62487aed69384a3770770afe853a2d4c2 Mon Sep 17 00:00:00 2001 From: njtran Date: Thu, 19 Oct 2023 09:17:39 -0700 Subject: [PATCH] fix: create offerings regardless of subnets --- pkg/fake/ec2api.go | 6 +- pkg/operator/operator.go | 1 - pkg/providers/instancetype/instancetype.go | 108 +++++++++--------- pkg/providers/instancetype/nodeclass_test.go | 28 ++--- .../instancetype/nodetemplate_test.go | 12 +- pkg/providers/instancetype/types.go | 6 +- pkg/test/environment.go | 2 +- 7 files changed, 87 insertions(+), 76 deletions(-) diff --git a/pkg/fake/ec2api.go b/pkg/fake/ec2api.go index 68a4faba4c23..0d6249f3b0a4 100644 --- a/pkg/fake/ec2api.go +++ b/pkg/fake/ec2api.go @@ -483,9 +483,9 @@ func (e *EC2API) DescribeAvailabilityZonesWithContext(context.Context, *ec2.Desc return e.DescribeAvailabilityZonesOutput.Clone(), nil } return &ec2.DescribeAvailabilityZonesOutput{AvailabilityZones: []*ec2.AvailabilityZone{ - {ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a")}, - {ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b")}, - {ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c")}, + {ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a"), ZoneType: aws.String("availability-zone")}, + {ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b"), ZoneType: aws.String("availability-zone")}, + {ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c"), ZoneType: aws.String("availability-zone")}, }}, nil } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 3e430aafa32c..75d95ced52b3 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -155,7 +155,6 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont *sess.Config.Region, cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval), ec2api, - subnetProvider, unavailableOfferingsCache, pricingProvider, ) diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index 4e780e41fbf4..0b7227520a0a 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -20,6 +20,7 @@ import ( "net/http" "sync" "sync/atomic" + "time" "github.com/prometheus/client_golang/prometheus" @@ -37,21 +38,20 @@ import ( "knative.dev/pkg/logging" "github.com/aws/karpenter/pkg/providers/pricing" - "github.com/aws/karpenter/pkg/providers/subnet" "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/utils/pretty" ) const ( - InstanceTypesCacheKey = "types" - InstanceTypeZonesCacheKeyPrefix = "zones:" + InstanceTypesCacheKey = "types" + InstanceTypeOfferingsCacheKey = "offerings" + AvailabilityZonesCacheKey = "zones" ) type Provider struct { region string ec2api ec2iface.EC2API - subnetProvider *subnet.Provider pricingProvider *pricing.Provider // Has one cache entry for all the instance types (key: InstanceTypesCacheKey) // Has one cache entry for all the zones for each subnet selector (key: InstanceTypesZonesCacheKeyPrefix:) @@ -68,12 +68,11 @@ type Provider struct { instanceTypesSeqNum uint64 } -func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider *subnet.Provider, +func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, unavailableOfferingsCache *awscache.UnavailableOfferings, pricingProvider *pricing.Provider) *Provider { return &Provider{ ec2api: ec2api, region: region, - subnetProvider: subnetProvider, pricingProvider: pricingProvider, cache: cache, unavailableOfferings: unavailableOfferingsCache, @@ -88,25 +87,27 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio if err != nil { return nil, err } - // Get Viable EC2 Purchase offerings - instanceTypeZones, err := p.getInstanceTypeZones(ctx, nodeClass) + // Get InstanceTypeOfferings from EC2 + instanceTypeOfferings, err := p.getInstanceTypeOfferings(ctx) + if err != nil { + return nil, err + } + // Get AvailabilityZones from EC2 + availabilityZones, err := p.listAvailabilityZones(ctx) if err != nil { return nil, err } // Compute fully initialized instance types hash key - instanceTypeZonesHash, _ := hashstructure.Hash(instanceTypeZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) + instanceTypeZonesHash, _ := hashstructure.Hash(instanceTypeOfferings, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) kcHash, _ := hashstructure.Hash(kc, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) - key := fmt.Sprintf("%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, instanceTypeZonesHash, kcHash) + key := fmt.Sprintf("%d-%d-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, instanceTypeZonesHash, kcHash) if item, ok := p.cache.Get(key); ok { return item.([]*cloudprovider.InstanceType), nil } - // Reject any instance types that don't have any offerings due to zone - result := lo.Reject(lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType { - return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeZones[aws.StringValue(i.InstanceType)])) - }), func(i *cloudprovider.InstanceType, _ int) bool { - return len(i.Offerings) == 0 + result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType { + return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], availabilityZones)) }) for _, instanceType := range instanceTypes { InstanceTypeVCPU.With(prometheus.Labels{ @@ -121,33 +122,30 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio } func (p *Provider) LivenessProbe(req *http.Request) error { - if err := p.subnetProvider.LivenessProbe(req); err != nil { - return err - } return p.pricingProvider.LivenessProbe(req) } -func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, zones sets.Set[string]) []cloudprovider.Offering { +func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, availabilityZones sets.Set[string]) []cloudprovider.Offering { var offerings []cloudprovider.Offering - for zone := range zones { + for az := range availabilityZones { // while usage classes should be a distinct set, there's no guarantee of that for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) { // exclude any offerings that have recently seen an insufficient capacity error from EC2 - isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, zone, capacityType) + isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, az, capacityType) var price float64 var ok bool switch capacityType { case ec2.UsageClassTypeSpot: - price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone) + price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, az) case ec2.UsageClassTypeOnDemand: price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType) default: logging.FromContext(ctx).Errorf("Received unknown capacity type %s for instance type %s", capacityType, *instanceType.InstanceType) continue } - available := !isUnavailable && ok + available := !isUnavailable && ok && instanceTypeZones.Has(az) offerings = append(offerings, cloudprovider.Offering{ - Zone: zone, + Zone: az, CapacityType: capacityType, Price: price, Available: available, @@ -157,56 +155,62 @@ func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.Instan return offerings } -func (p *Provider) getInstanceTypeZones(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (map[string]sets.Set[string], error) { +func (p *Provider) listAvailabilityZones(ctx context.Context) (sets.Set[string], error) { // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- - // We lock here so that multiple callers to getInstanceTypeZones do not result in cache misses and multiple + // We lock here so that multiple callers to listAvailabilityZones do not result in cache misses and multiple // calls to EC2 when we could have just made one call. // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache p.mu.Lock() defer p.mu.Unlock() + if cached, ok := p.cache.Get(AvailabilityZonesCacheKey); ok { + return cached.(sets.Set[string]), nil + } - subnetSelectorHash, err := hashstructure.Hash(nodeClass.Spec.SubnetSelectorTerms, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) + // Get zones from EC2 + instanceTypeZones := sets.Set[string]{} + output, err := p.ec2api.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{}) if err != nil { - return nil, fmt.Errorf("failed to hash the subnet selector: %w", err) + return nil, fmt.Errorf("describing availability zones, %w", err) } - cacheKey := fmt.Sprintf("%s%016x", InstanceTypeZonesCacheKeyPrefix, subnetSelectorHash) - if cached, ok := p.cache.Get(cacheKey); ok { - return cached.(map[string]sets.Set[string]), nil + for i := range output.AvailabilityZones { + zone := output.AvailabilityZones[i] + if aws.StringValue(zone.ZoneType) == "availability-zone" { + instanceTypeZones.Insert(aws.StringValue(zone.ZoneName)) + } } + logging.FromContext(ctx).With("zones", instanceTypeZones.UnsortedList()).Debugf("discovered availability zones") + p.cache.Set(AvailabilityZonesCacheKey, instanceTypeZones, 24*time.Hour) + return instanceTypeZones, nil +} - // Constrain AZs from subnets - subnets, err := p.subnetProvider.List(ctx, nodeClass) - if err != nil { - return nil, err - } - if len(subnets) == 0 { - return nil, nil +func (p *Provider) getInstanceTypeOfferings(ctx context.Context) (map[string]sets.Set[string], error) { + // DO NOT REMOVE THIS LOCK ---------------------------------------------------------------------------- + // We lock here so that multiple callers to getInstanceTypeZones do not result in cache misses and multiple + // calls to EC2 when we could have just made one call. + // TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache + p.mu.Lock() + defer p.mu.Unlock() + if cached, ok := p.cache.Get(InstanceTypeOfferingsCacheKey); ok { + return cached.(map[string]sets.Set[string]), nil } - zones := sets.NewString(lo.Map(subnets, func(subnet *ec2.Subnet, _ int) string { - return aws.StringValue(subnet.AvailabilityZone) - })...) // Get offerings from EC2 - instanceTypeZones := map[string]sets.Set[string]{} + instanceTypeOfferings := map[string]sets.Set[string]{} if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")}, func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { for _, offering := range output.InstanceTypeOfferings { - if zones.Has(aws.StringValue(offering.Location)) { - if _, ok := instanceTypeZones[aws.StringValue(offering.InstanceType)]; !ok { - instanceTypeZones[aws.StringValue(offering.InstanceType)] = sets.New[string]() - } - instanceTypeZones[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) + if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok { + instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]() } + instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) } return true }); err != nil { return nil, fmt.Errorf("describing instance type zone offerings, %w", err) } - if p.cm.HasChanged("zonal-offerings", nodeClass.Spec.SubnetSelectorTerms) { - logging.FromContext(ctx).With("zones", zones.List(), "instance-type-count", len(instanceTypeZones), "node-template", nodeClass.Name).Debugf("discovered offerings for instance types") - } - p.cache.SetDefault(cacheKey, instanceTypeZones) - return instanceTypeZones, nil + logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types") + p.cache.SetDefault(InstanceTypeOfferingsCacheKey, instanceTypeOfferings) + return instanceTypeOfferings, nil } // GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters diff --git a/pkg/providers/instancetype/nodeclass_test.go b/pkg/providers/instancetype/nodeclass_test.go index d2b4cdae9a00..631979135638 100644 --- a/pkg/providers/instancetype/nodeclass_test.go +++ b/pkg/providers/instancetype/nodeclass_test.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/logging" "knative.dev/pkg/ptr" corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1" @@ -98,14 +99,14 @@ var _ = Describe("NodeClass/InstanceTypes", func() { nodeSelector := map[string]string{ // Well known - corev1beta1.NodePoolLabelKey: nodePool.Name, - v1.LabelTopologyRegion: fake.DefaultRegion, - v1.LabelTopologyZone: "test-zone-1a", - v1.LabelInstanceTypeStable: "g4dn.8xlarge", - v1.LabelOSStable: "linux", + corev1beta1.NodePoolLabelKey: nodePool.Name, + v1.LabelTopologyRegion: fake.DefaultRegion, + v1.LabelTopologyZone: "test-zone-1a", + v1.LabelInstanceTypeStable: "g4dn.8xlarge", + v1.LabelOSStable: "linux", v1.LabelArchStable: "amd64", corev1beta1.CapacityTypeLabelKey: "on-demand", - // Well Known to AWS + //Well Known to AWS v1beta1.LabelInstanceHypervisor: "nitro", v1beta1.LabelInstanceEncryptionInTransitSupported: "true", v1beta1.LabelInstanceCategory: "g", @@ -128,10 +129,10 @@ var _ = Describe("NodeClass/InstanceTypes", func() { v1.LabelFailureDomainBetaRegion: fake.DefaultRegion, v1.LabelFailureDomainBetaZone: "test-zone-1a", "beta.kubernetes.io/arch": "amd64", - "beta.kubernetes.io/os": "linux", + "beta.kubernetes.io/os": "linux", v1.LabelInstanceType: "g4dn.8xlarge", "topology.ebs.csi.aws.com/zone": "test-zone-1a", - v1.LabelWindowsBuild: v1beta1.Windows2022Build, + v1.LabelWindowsBuild: v1beta1.Windows2022Build, } // Ensure that we're exercising all well known labels @@ -142,7 +143,8 @@ var _ = Describe("NodeClass/InstanceTypes", func() { pods = append(pods, coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{key: value}})) } ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) - for _, pod := range pods { + for i, pod := range pods { + logging.FromContext(ctx).Infof("DEBUGGING: THIS IS THE (%d)th iteration with requirements %s", i, pod.Spec.NodeSelector) ExpectScheduled(ctx, env.Client, pod) } }) @@ -1175,8 +1177,8 @@ var _ = Describe("NodeClass/InstanceTypes", func() { pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ { Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, - }}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, + }}, }, }}} ExpectApplied(ctx, env.Client, nodePool, nodeClass) @@ -1253,8 +1255,8 @@ var _ = Describe("NodeClass/InstanceTypes", func() { pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ { Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, - }}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, + }}, }, }}} ExpectApplied(ctx, env.Client, nodePool, nodeClass) diff --git a/pkg/providers/instancetype/nodetemplate_test.go b/pkg/providers/instancetype/nodetemplate_test.go index ba650c3b4ca7..5cd961da4386 100644 --- a/pkg/providers/instancetype/nodetemplate_test.go +++ b/pkg/providers/instancetype/nodetemplate_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/logging" "knative.dev/pkg/ptr" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" @@ -149,7 +150,8 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() { pods = append(pods, coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{key: value}})) } ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) - for _, pod := range pods { + for i, pod := range pods { + logging.FromContext(ctx).Infof("DEBUGGING: THIS IS THE (%d)th iteration with requirements %s", i, pod.Spec.NodeSelector) ExpectScheduled(ctx, env.Client, pod) } }) @@ -1206,8 +1208,8 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() { pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ { Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, - }}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, + }}, }, }}} ExpectApplied(ctx, env.Client, provisioner, nodeTemplate) @@ -1284,8 +1286,8 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() { pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ { Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ - {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, - }}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, + }}, }, }}} ExpectApplied(ctx, env.Client, provisioner, nodeTemplate) diff --git a/pkg/providers/instancetype/types.go b/pkg/providers/instancetype/types.go index a126ac84615d..9626601c15d6 100644 --- a/pkg/providers/instancetype/types.go +++ b/pkg/providers/instancetype/types.go @@ -27,6 +27,7 @@ import ( "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "knative.dev/pkg/logging" "knative.dev/pkg/ptr" corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1" @@ -53,6 +54,9 @@ func NewInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, kc *corev1 region string, nodeClass *v1beta1.EC2NodeClass, offerings cloudprovider.Offerings) *cloudprovider.InstanceType { amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{}) + if aws.StringValue(info.InstanceType) == "trn1.2xlarge" { + logging.FromContext(ctx).Infof("DEBUGGING: this is the AMI Family for the trn1.2xlarge: %s", amiFamily.DefaultAMIs("v1.27", false)[0].Query) + } return &cloudprovider.InstanceType{ Name: aws.StringValue(info.InstanceType), Requirements: computeRequirements(ctx, info, offerings, region, amiFamily, kc, nodeClass), @@ -150,7 +154,7 @@ func computeRequirements(ctx context.Context, info *ec2.InstanceTypeInfo, offeri } func getOS(info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily) []string { - if _, ok := amiFamily.(*amifamily.Windows); ok { + if _, ok := amiFamily.(amifamily.Windows); ok { if getArchitecture(info) == corev1beta1.ArchitectureAmd64 { return []string{string(v1.Windows)} } diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 250cf5125338..e2302ed37acb 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -94,7 +94,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment instanceProfileProvider := instanceprofile.NewProvider(fake.DefaultRegion, iamapi, instanceProfileCache) amiProvider := amifamily.NewProvider(versionProvider, ssmapi, ec2api, ec2Cache) amiResolver := amifamily.New(amiProvider) - instanceTypesProvider := instancetype.NewProvider(fake.DefaultRegion, instanceTypeCache, ec2api, subnetProvider, unavailableOfferingsCache, pricingProvider) + instanceTypesProvider := instancetype.NewProvider(fake.DefaultRegion, instanceTypeCache, ec2api, unavailableOfferingsCache, pricingProvider) launchTemplateProvider := launchtemplate.NewProvider( ctx,