Skip to content

Commit

Permalink
fix: create offerings regardless of subnets
Browse files Browse the repository at this point in the history
  • Loading branch information
njtran authored and jonathan-innis committed Oct 19, 2023
1 parent 315ff23 commit c2849cf
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 76 deletions.
6 changes: 3 additions & 3 deletions pkg/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,9 @@ func (e *EC2API) DescribeAvailabilityZonesWithContext(context.Context, *ec2.Desc
return e.DescribeAvailabilityZonesOutput.Clone(), nil
}
return &ec2.DescribeAvailabilityZonesOutput{AvailabilityZones: []*ec2.AvailabilityZone{
{ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a")},
{ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b")},
{ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c")},
{ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a"), ZoneType: aws.String("availability-zone")},
{ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b"), ZoneType: aws.String("availability-zone")},
{ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c"), ZoneType: aws.String("availability-zone")},
}}, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
*sess.Config.Region,
cache.New(awscache.InstanceTypesAndZonesTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
unavailableOfferingsCache,
pricingProvider,
)
Expand Down
108 changes: 56 additions & 52 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"

Expand All @@ -37,21 +38,20 @@ import (
"knative.dev/pkg/logging"

"github.com/aws/karpenter/pkg/providers/pricing"
"github.com/aws/karpenter/pkg/providers/subnet"

"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/utils/pretty"
)

const (
InstanceTypesCacheKey = "types"
InstanceTypeZonesCacheKeyPrefix = "zones:"
InstanceTypesCacheKey = "types"
InstanceTypeOfferingsCacheKey = "offerings"
AvailabilityZonesCacheKey = "zones"
)

type Provider struct {
region string
ec2api ec2iface.EC2API
subnetProvider *subnet.Provider
pricingProvider *pricing.Provider
// Has one cache entry for all the instance types (key: InstanceTypesCacheKey)
// Has one cache entry for all the zones for each subnet selector (key: InstanceTypesZonesCacheKeyPrefix:<hash_of_selector>)
Expand All @@ -68,12 +68,11 @@ type Provider struct {
instanceTypesSeqNum uint64
}

func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider *subnet.Provider,
func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API,
unavailableOfferingsCache *awscache.UnavailableOfferings, pricingProvider *pricing.Provider) *Provider {
return &Provider{
ec2api: ec2api,
region: region,
subnetProvider: subnetProvider,
pricingProvider: pricingProvider,
cache: cache,
unavailableOfferings: unavailableOfferingsCache,
Expand All @@ -88,25 +87,27 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
if err != nil {
return nil, err
}
// Get Viable EC2 Purchase offerings
instanceTypeZones, err := p.getInstanceTypeZones(ctx, nodeClass)
// Get InstanceTypeOfferings from EC2
instanceTypeOfferings, err := p.getInstanceTypeOfferings(ctx)
if err != nil {
return nil, err
}
// Get AvailabilityZones from EC2
availabilityZones, err := p.listAvailabilityZones(ctx)
if err != nil {
return nil, err
}

// Compute fully initialized instance types hash key
instanceTypeZonesHash, _ := hashstructure.Hash(instanceTypeZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
instanceTypeZonesHash, _ := hashstructure.Hash(instanceTypeOfferings, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
kcHash, _ := hashstructure.Hash(kc, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, instanceTypeZonesHash, kcHash)
key := fmt.Sprintf("%d-%d-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, instanceTypeZonesHash, kcHash)

if item, ok := p.cache.Get(key); ok {
return item.([]*cloudprovider.InstanceType), nil
}
// Reject any instance types that don't have any offerings due to zone
result := lo.Reject(lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeZones[aws.StringValue(i.InstanceType)]))
}), func(i *cloudprovider.InstanceType, _ int) bool {
return len(i.Offerings) == 0
result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], availabilityZones))
})
for _, instanceType := range instanceTypes {
InstanceTypeVCPU.With(prometheus.Labels{
Expand All @@ -121,33 +122,30 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
}

func (p *Provider) LivenessProbe(req *http.Request) error {
if err := p.subnetProvider.LivenessProbe(req); err != nil {
return err
}
return p.pricingProvider.LivenessProbe(req)
}

func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, zones sets.Set[string]) []cloudprovider.Offering {
func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, availabilityZones sets.Set[string]) []cloudprovider.Offering {
var offerings []cloudprovider.Offering
for zone := range zones {
for az := range availabilityZones {
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, zone, capacityType)
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, az, capacityType)
var price float64
var ok bool
switch capacityType {
case ec2.UsageClassTypeSpot:
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone)
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, az)
case ec2.UsageClassTypeOnDemand:
price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType)
default:
logging.FromContext(ctx).Errorf("Received unknown capacity type %s for instance type %s", capacityType, *instanceType.InstanceType)
continue
}
available := !isUnavailable && ok
available := !isUnavailable && ok && instanceTypeZones.Has(az)
offerings = append(offerings, cloudprovider.Offering{
Zone: zone,
Zone: az,
CapacityType: capacityType,
Price: price,
Available: available,
Expand All @@ -157,56 +155,62 @@ func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.Instan
return offerings
}

func (p *Provider) getInstanceTypeZones(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (map[string]sets.Set[string], error) {
func (p *Provider) listAvailabilityZones(ctx context.Context) (sets.Set[string], error) {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeZones do not result in cache misses and multiple
// We lock here so that multiple callers to listAvailabilityZones do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.mu.Lock()
defer p.mu.Unlock()
if cached, ok := p.cache.Get(AvailabilityZonesCacheKey); ok {
return cached.(sets.Set[string]), nil
}

subnetSelectorHash, err := hashstructure.Hash(nodeClass.Spec.SubnetSelectorTerms, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
// Get zones from EC2
instanceTypeZones := sets.Set[string]{}
output, err := p.ec2api.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{})
if err != nil {
return nil, fmt.Errorf("failed to hash the subnet selector: %w", err)
return nil, fmt.Errorf("describing availability zones, %w", err)
}
cacheKey := fmt.Sprintf("%s%016x", InstanceTypeZonesCacheKeyPrefix, subnetSelectorHash)
if cached, ok := p.cache.Get(cacheKey); ok {
return cached.(map[string]sets.Set[string]), nil
for i := range output.AvailabilityZones {
zone := output.AvailabilityZones[i]
if aws.StringValue(zone.ZoneType) == "availability-zone" {
instanceTypeZones.Insert(aws.StringValue(zone.ZoneName))
}
}
logging.FromContext(ctx).With("zones", instanceTypeZones.UnsortedList()).Debugf("discovered availability zones")
p.cache.Set(AvailabilityZonesCacheKey, instanceTypeZones, 24*time.Hour)
return instanceTypeZones, nil
}

// Constrain AZs from subnets
subnets, err := p.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
}
if len(subnets) == 0 {
return nil, nil
func (p *Provider) getInstanceTypeOfferings(ctx context.Context) (map[string]sets.Set[string], error) {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeZones do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.mu.Lock()
defer p.mu.Unlock()
if cached, ok := p.cache.Get(InstanceTypeOfferingsCacheKey); ok {
return cached.(map[string]sets.Set[string]), nil
}
zones := sets.NewString(lo.Map(subnets, func(subnet *ec2.Subnet, _ int) string {
return aws.StringValue(subnet.AvailabilityZone)
})...)

// Get offerings from EC2
instanceTypeZones := map[string]sets.Set[string]{}
instanceTypeOfferings := map[string]sets.Set[string]{}
if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")},
func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool {
for _, offering := range output.InstanceTypeOfferings {
if zones.Has(aws.StringValue(offering.Location)) {
if _, ok := instanceTypeZones[aws.StringValue(offering.InstanceType)]; !ok {
instanceTypeZones[aws.StringValue(offering.InstanceType)] = sets.New[string]()
}
instanceTypeZones[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location))
if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok {
instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]()
}
instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location))
}
return true
}); err != nil {
return nil, fmt.Errorf("describing instance type zone offerings, %w", err)
}
if p.cm.HasChanged("zonal-offerings", nodeClass.Spec.SubnetSelectorTerms) {
logging.FromContext(ctx).With("zones", zones.List(), "instance-type-count", len(instanceTypeZones), "node-template", nodeClass.Name).Debugf("discovered offerings for instance types")
}
p.cache.SetDefault(cacheKey, instanceTypeZones)
return instanceTypeZones, nil
logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types")
p.cache.SetDefault(InstanceTypeOfferingsCacheKey, instanceTypeOfferings)
return instanceTypeOfferings, nil
}

// GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters
Expand Down
28 changes: 15 additions & 13 deletions pkg/providers/instancetype/nodeclass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"

corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1"
Expand Down Expand Up @@ -98,14 +99,14 @@ var _ = Describe("NodeClass/InstanceTypes", func() {

nodeSelector := map[string]string{
// Well known
corev1beta1.NodePoolLabelKey: nodePool.Name,
v1.LabelTopologyRegion: fake.DefaultRegion,
v1.LabelTopologyZone: "test-zone-1a",
v1.LabelInstanceTypeStable: "g4dn.8xlarge",
v1.LabelOSStable: "linux",
corev1beta1.NodePoolLabelKey: nodePool.Name,
v1.LabelTopologyRegion: fake.DefaultRegion,
v1.LabelTopologyZone: "test-zone-1a",
v1.LabelInstanceTypeStable: "g4dn.8xlarge",
v1.LabelOSStable: "linux",
v1.LabelArchStable: "amd64",
corev1beta1.CapacityTypeLabelKey: "on-demand",
// Well Known to AWS
//Well Known to AWS
v1beta1.LabelInstanceHypervisor: "nitro",
v1beta1.LabelInstanceEncryptionInTransitSupported: "true",
v1beta1.LabelInstanceCategory: "g",
Expand All @@ -128,10 +129,10 @@ var _ = Describe("NodeClass/InstanceTypes", func() {
v1.LabelFailureDomainBetaRegion: fake.DefaultRegion,
v1.LabelFailureDomainBetaZone: "test-zone-1a",
"beta.kubernetes.io/arch": "amd64",
"beta.kubernetes.io/os": "linux",
"beta.kubernetes.io/os": "linux",
v1.LabelInstanceType: "g4dn.8xlarge",
"topology.ebs.csi.aws.com/zone": "test-zone-1a",
v1.LabelWindowsBuild: v1beta1.Windows2022Build,
v1.LabelWindowsBuild: v1beta1.Windows2022Build,
}

// Ensure that we're exercising all well known labels
Expand All @@ -142,7 +143,8 @@ var _ = Describe("NodeClass/InstanceTypes", func() {
pods = append(pods, coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{key: value}}))
}
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)
for _, pod := range pods {
for i, pod := range pods {
logging.FromContext(ctx).Infof("DEBUGGING: THIS IS THE (%d)th iteration with requirements %s", i, pod.Spec.NodeSelector)
ExpectScheduled(ctx, env.Client, pod)
}
})
Expand Down Expand Up @@ -1175,8 +1177,8 @@ var _ = Describe("NodeClass/InstanceTypes", func() {
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
},
}}}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expand Down Expand Up @@ -1253,8 +1255,8 @@ var _ = Describe("NodeClass/InstanceTypes", func() {
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
},
}}}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expand Down
12 changes: 7 additions & 5 deletions pkg/providers/instancetype/nodetemplate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
Expand Down Expand Up @@ -149,7 +150,8 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() {
pods = append(pods, coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{key: value}}))
}
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)
for _, pod := range pods {
for i, pod := range pods {
logging.FromContext(ctx).Infof("DEBUGGING: THIS IS THE (%d)th iteration with requirements %s", i, pod.Spec.NodeSelector)
ExpectScheduled(ctx, env.Client, pod)
}
})
Expand Down Expand Up @@ -1206,8 +1208,8 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() {
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
},
}}}
ExpectApplied(ctx, env.Client, provisioner, nodeTemplate)
Expand Down Expand Up @@ -1284,8 +1286,8 @@ var _ = Describe("NodeTemplate/InstanceTypes", func() {
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}},
}},
},
}}}
ExpectApplied(ctx, env.Client, provisioner, nodeTemplate)
Expand Down
6 changes: 5 additions & 1 deletion pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"

corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1"
Expand All @@ -53,6 +54,9 @@ func NewInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, kc *corev1
region string, nodeClass *v1beta1.EC2NodeClass, offerings cloudprovider.Offerings) *cloudprovider.InstanceType {

amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{})
if aws.StringValue(info.InstanceType) == "trn1.2xlarge" {
logging.FromContext(ctx).Infof("DEBUGGING: this is the AMI Family for the trn1.2xlarge: %s", amiFamily.DefaultAMIs("v1.27", false)[0].Query)
}
return &cloudprovider.InstanceType{
Name: aws.StringValue(info.InstanceType),
Requirements: computeRequirements(ctx, info, offerings, region, amiFamily, kc, nodeClass),
Expand Down Expand Up @@ -150,7 +154,7 @@ func computeRequirements(ctx context.Context, info *ec2.InstanceTypeInfo, offeri
}

func getOS(info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily) []string {
if _, ok := amiFamily.(*amifamily.Windows); ok {
if _, ok := amiFamily.(amifamily.Windows); ok {
if getArchitecture(info) == corev1beta1.ArchitectureAmd64 {
return []string{string(v1.Windows)}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
instanceProfileProvider := instanceprofile.NewProvider(fake.DefaultRegion, iamapi, instanceProfileCache)
amiProvider := amifamily.NewProvider(versionProvider, ssmapi, ec2api, ec2Cache)
amiResolver := amifamily.New(amiProvider)
instanceTypesProvider := instancetype.NewProvider(fake.DefaultRegion, instanceTypeCache, ec2api, subnetProvider, unavailableOfferingsCache, pricingProvider)
instanceTypesProvider := instancetype.NewProvider(fake.DefaultRegion, instanceTypeCache, ec2api, unavailableOfferingsCache, pricingProvider)
launchTemplateProvider :=
launchtemplate.NewProvider(
ctx,
Expand Down

0 comments on commit c2849cf

Please sign in to comment.