Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create offerings regardless of subnets #4857

Merged
merged 9 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,9 @@ func (e *EC2API) DescribeAvailabilityZonesWithContext(context.Context, *ec2.Desc
return e.DescribeAvailabilityZonesOutput.Clone(), nil
}
return &ec2.DescribeAvailabilityZonesOutput{AvailabilityZones: []*ec2.AvailabilityZone{
{ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a")},
{ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b")},
{ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c")},
{ZoneName: aws.String("test-zone-1a"), ZoneId: aws.String("testzone1a"), ZoneType: aws.String("availability-zone")},
{ZoneName: aws.String("test-zone-1b"), ZoneId: aws.String("testzone1b"), ZoneType: aws.String("availability-zone")},
{ZoneName: aws.String("test-zone-1c"), ZoneId: aws.String("testzone1c"), ZoneType: aws.String("availability-zone")},
}}, nil
}

Expand Down
113 changes: 69 additions & 44 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"

Expand All @@ -44,8 +45,9 @@ import (
)

const (
InstanceTypesCacheKey = "types"
InstanceTypeZonesCacheKeyPrefix = "zones:"
InstanceTypesCacheKey = "types"
InstanceTypeOfferingsCacheKey = "offerings"
AvailabilityZonesCacheKey = "zones"
)

type Provider struct {
Expand All @@ -66,6 +68,8 @@ type Provider struct {
cm *pretty.ChangeMonitor
// instanceTypesSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypesSeqNum uint64
// instanceTypeOfferingsSeqNum is a monotonically increasing change counter used to avoid the expensive hashing operation on instance types
instanceTypeOfferingsSeqNum uint64
}

func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subnetProvider *subnet.Provider,
Expand All @@ -88,25 +92,35 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
if err != nil {
return nil, err
}
// Get Viable EC2 Purchase offerings
instanceTypeZones, err := p.getInstanceTypeZones(ctx, nodeClass)
// Get InstanceTypeOfferings from EC2
instanceTypeOfferings, err := p.getInstanceTypeOfferings(ctx)
if err != nil {
return nil, err
}
// Get AvailabilityZones from EC2
availabilityZones, err := p.getAvailabilityZones(ctx)
if err != nil {
return nil, err
}
// Constrain AZs from subnets
subnets, err := p.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
}
subnetZones := sets.New[string](lo.Map(subnets, func(s *ec2.Subnet, _ int) string {
return aws.StringValue(s.AvailabilityZone)
})...)

// Compute fully initialized instance types hash key
instanceTypeZonesHash, _ := hashstructure.Hash(instanceTypeZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
subnetHash, _ := hashstructure.Hash(subnets, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
kcHash, _ := hashstructure.Hash(kc, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, instanceTypeZonesHash, kcHash)
key := fmt.Sprintf("%d-%d-%d-%s-%016x-%016x", p.instanceTypesSeqNum, p.instanceTypeOfferingsSeqNum, p.unavailableOfferings.SeqNum, nodeClass.UID, subnetHash, kcHash)

if item, ok := p.cache.Get(key); ok {
return item.([]*cloudprovider.InstanceType), nil
}
// Reject any instance types that don't have any offerings due to zone
result := lo.Reject(lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeZones[aws.StringValue(i.InstanceType)]))
}), func(i *cloudprovider.InstanceType, _ int) bool {
return len(i.Offerings) == 0
result := lo.Map(instanceTypes, func(i *ec2.InstanceTypeInfo, _ int) *cloudprovider.InstanceType {
return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], availabilityZones, subnetZones))
})
for _, instanceType := range instanceTypes {
InstanceTypeVCPU.With(prometheus.Labels{
Expand All @@ -127,27 +141,27 @@ func (p *Provider) LivenessProbe(req *http.Request) error {
return p.pricingProvider.LivenessProbe(req)
}

func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, zones sets.Set[string]) []cloudprovider.Offering {
func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, availabilityZones, subnetZones sets.Set[string]) []cloudprovider.Offering {
var offerings []cloudprovider.Offering
for zone := range zones {
for az := range availabilityZones {
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, zone, capacityType)
isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, az, capacityType)
var price float64
var ok bool
switch capacityType {
case ec2.UsageClassTypeSpot:
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone)
price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, az)
case ec2.UsageClassTypeOnDemand:
price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType)
default:
logging.FromContext(ctx).Errorf("Received unknown capacity type %s for instance type %s", capacityType, *instanceType.InstanceType)
continue
}
available := !isUnavailable && ok
available := !isUnavailable && ok && instanceTypeZones.Has(az) && subnetZones.Has(az)
offerings = append(offerings, cloudprovider.Offering{
Zone: zone,
Zone: az,
CapacityType: capacityType,
Price: price,
Available: available,
Expand All @@ -157,56 +171,67 @@ func (p *Provider) createOfferings(ctx context.Context, instanceType *ec2.Instan
return offerings
}

func (p *Provider) getInstanceTypeZones(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (map[string]sets.Set[string], error) {
func (p *Provider) getAvailabilityZones(ctx context.Context) (sets.Set[string], error) {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeZones do not result in cache misses and multiple
// We lock here so that multiple callers to getAvailabilityZones do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.mu.Lock()
defer p.mu.Unlock()
if cached, ok := p.cache.Get(AvailabilityZonesCacheKey); ok {
return cached.(sets.Set[string]), nil
}

subnetSelectorHash, err := hashstructure.Hash(nodeClass.Spec.SubnetSelectorTerms, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
// Get zones from EC2
instanceTypeZones := sets.Set[string]{}
output, err := p.ec2api.DescribeAvailabilityZonesWithContext(ctx, &ec2.DescribeAvailabilityZonesInput{})
if err != nil {
return nil, fmt.Errorf("failed to hash the subnet selector: %w", err)
return nil, fmt.Errorf("describing availability zones, %w", err)
}
cacheKey := fmt.Sprintf("%s%016x", InstanceTypeZonesCacheKeyPrefix, subnetSelectorHash)
if cached, ok := p.cache.Get(cacheKey); ok {
return cached.(map[string]sets.Set[string]), nil
for i := range output.AvailabilityZones {
zone := output.AvailabilityZones[i]
if aws.StringValue(zone.ZoneType) == "availability-zone" {
instanceTypeZones.Insert(aws.StringValue(zone.ZoneName))
}
}

// Constrain AZs from subnets
subnets, err := p.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
if p.cm.HasChanged("zones", instanceTypeZones) {
logging.FromContext(ctx).With("zones", instanceTypeZones.UnsortedList()).Debugf("discovered availability zones")
}
if len(subnets) == 0 {
return nil, nil
p.cache.Set(AvailabilityZonesCacheKey, instanceTypeZones, 24*time.Hour)
return instanceTypeZones, nil
}

func (p *Provider) getInstanceTypeOfferings(ctx context.Context) (map[string]sets.Set[string], error) {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypeOfferings do not result in cache misses and multiple
// calls to EC2 when we could have just made one call.
// TODO @joinnis: This can be made more efficient by holding a Read lock and only obtaining the Write if not in cache
p.mu.Lock()
defer p.mu.Unlock()
if cached, ok := p.cache.Get(InstanceTypeOfferingsCacheKey); ok {
return cached.(map[string]sets.Set[string]), nil
}
zones := sets.NewString(lo.Map(subnets, func(subnet *ec2.Subnet, _ int) string {
return aws.StringValue(subnet.AvailabilityZone)
})...)

// Get offerings from EC2
instanceTypeZones := map[string]sets.Set[string]{}
instanceTypeOfferings := map[string]sets.Set[string]{}
if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")},
func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool {
for _, offering := range output.InstanceTypeOfferings {
if zones.Has(aws.StringValue(offering.Location)) {
if _, ok := instanceTypeZones[aws.StringValue(offering.InstanceType)]; !ok {
instanceTypeZones[aws.StringValue(offering.InstanceType)] = sets.New[string]()
}
instanceTypeZones[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location))
if _, ok := instanceTypeOfferings[aws.StringValue(offering.InstanceType)]; !ok {
instanceTypeOfferings[aws.StringValue(offering.InstanceType)] = sets.New[string]()
}
instanceTypeOfferings[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location))
}
return true
}); err != nil {
return nil, fmt.Errorf("describing instance type zone offerings, %w", err)
}
if p.cm.HasChanged("zonal-offerings", nodeClass.Spec.SubnetSelectorTerms) {
logging.FromContext(ctx).With("zones", zones.List(), "instance-type-count", len(instanceTypeZones), "node-template", nodeClass.Name).Debugf("discovered offerings for instance types")
if p.cm.HasChanged("instance-type-count", len(instanceTypeOfferings)) {
logging.FromContext(ctx).With("instance-type-count", len(instanceTypeOfferings)).Debugf("discovered offerings for instance types")
}
p.cache.SetDefault(cacheKey, instanceTypeZones)
return instanceTypeZones, nil
atomic.AddUint64(&p.instanceTypeOfferingsSeqNum, 1)
p.cache.SetDefault(InstanceTypeOfferingsCacheKey, instanceTypeOfferings)
return instanceTypeOfferings, nil
}

// GetInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters
Expand Down