Skip to content

Commit

Permalink
Explicitly Call each signature for instance types
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Mar 13, 2024
1 parent 8b9d231 commit fdef8bb
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 57 deletions.
48 changes: 45 additions & 3 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ func (c *CloudProvider) LivenessProbe(req *http.Request) error {
// GetInstanceTypes returns all available InstanceTypes
func (c *CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *corev1beta1.NodePool) ([]*cloudprovider.InstanceType, error) {
if nodePool == nil {
return c.instanceTypeProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{})
subnets, err := c.subnetProvider.List(ctx, &v1beta1.EC2NodeClass{})
if err != nil {
return nil, err
}
return c.instanceTypeProvider.List(ctx,
// Kubelet Configuration Inputs
v1.ResourceList{}, v1.ResourceList{}, map[string]string{}, map[string]string{}, nil, nil,
// NodeClass Inputs
[]*v1beta1.BlockDeviceMapping{}, nil, nil, subnets)
}
nodeClass, err := c.resolveNodeClassFromNodePool(ctx, nodePool)
if err != nil {
Expand All @@ -160,8 +168,25 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *corev1be
// as the cause.
return nil, fmt.Errorf("resolving node class, %w", err)
}
subnets, err := c.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
}
// TODO, break this coupling
instanceTypes, err := c.instanceTypeProvider.List(ctx, nodePool.Spec.Template.Spec.Kubelet, nodeClass)
instanceTypes, err := c.instanceTypeProvider.List(ctx,
// Kubelet Configuration Inputs
nodePool.Spec.Template.Spec.Kubelet.KubeReserved,
nodePool.Spec.Template.Spec.Kubelet.SystemReserved,
nodePool.Spec.Template.Spec.Kubelet.EvictionHard,
nodePool.Spec.Template.Spec.Kubelet.EvictionSoft,
nodePool.Spec.Template.Spec.Kubelet.MaxPods,
nodePool.Spec.Template.Spec.Kubelet.PodsPerCore,
// NodeClass Inputs
nodeClass.Spec.BlockDeviceMappings,
nodeClass.Spec.InstanceStorePolicy,
nodeClass.Spec.AMIFamily,
subnets,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -239,7 +264,24 @@ func (c *CloudProvider) resolveNodeClassFromNodePool(ctx context.Context, nodePo
}

func (c *CloudProvider) resolveInstanceTypes(ctx context.Context, nodeClaim *corev1beta1.NodeClaim, nodeClass *v1beta1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) {
instanceTypes, err := c.instanceTypeProvider.List(ctx, nodeClaim.Spec.Kubelet, nodeClass)
subnets, err := c.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, fmt.Errorf("getting instance types, %w", err)
}
instanceTypes, err := c.instanceTypeProvider.List(ctx,
// Kubelet Configuration Inputs
nodeClaim.Spec.Kubelet.KubeReserved,
nodeClaim.Spec.Kubelet.SystemReserved,
nodeClaim.Spec.Kubelet.EvictionHard,
nodeClaim.Spec.Kubelet.EvictionSoft,
nodeClaim.Spec.Kubelet.MaxPods,
nodeClaim.Spec.Kubelet.PodsPerCore,
// NodeClass Inputs
nodeClass.Spec.BlockDeviceMappings,
nodeClass.Spec.InstanceStorePolicy,
nodeClass.Spec.AMIFamily,
subnets,
)
if err != nil {
return nil, fmt.Errorf("getting instance types, %w", err)
}
Expand Down
35 changes: 16 additions & 19 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ import (
"github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"

corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"

"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"

"github.com/aws/karpenter-provider-aws/pkg/providers/pricing"
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"

"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
"sigs.k8s.io/karpenter/pkg/utils/resources"
Expand Down Expand Up @@ -87,7 +87,9 @@ func NewProvider(region string, cache *cache.Cache, ec2api ec2iface.EC2API, subn
}
}

func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguration, nodeClass *v1beta1.EC2NodeClass) ([]*cloudprovider.InstanceType, error) {
func (p *Provider) List(ctx context.Context,
kubeReserved v1.ResourceList, systemReserved v1.ResourceList, evictionHard map[string]string, evictionSoft map[string]string, maxPods *int32, podsPerCore *int32,
blockDeviceMappings []*v1beta1.BlockDeviceMapping, instanceStorePolicy *v1beta1.InstanceStorePolicy, nodeClassAmiFamiliy *string, subnets []*ec2.Subnet) ([]*cloudprovider.InstanceType, error) {
// Get InstanceTypes from EC2
instanceTypes, err := p.GetInstanceTypes(ctx)
if err != nil {
Expand All @@ -98,46 +100,39 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
if err != nil {
return nil, err
}
subnets, err := p.subnetProvider.List(ctx, nodeClass)
if err != nil {
return nil, err
}

subnetZones := sets.New[string](lo.Map(subnets, func(s *ec2.Subnet, _ int) string {
return aws.StringValue(s.AvailabilityZone)
})...)

// Compute fully initialized instance types hash key
subnetZonesHash, _ := hashstructure.Hash(subnetZones, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
kcHash, _ := hashstructure.Hash(kc, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
// TODO: remove kubeReservedHash and systemReservedHash once v1.ResourceList objects are hashed as strings in KubeletConfiguration
// For more information on the v1.ResourceList hash issue: https://github.com/kubernetes-sigs/karpenter/issues/1080
kubeReservedHash, systemReservedHash := uint64(0), uint64(0)
if kc != nil {
kubeReservedHash, _ = hashstructure.Hash(resources.StringMap(kc.KubeReserved), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
systemReservedHash, _ = hashstructure.Hash(resources.StringMap(kc.SystemReserved), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
}
blockDeviceMappingsHash, _ := hashstructure.Hash(nodeClass.Spec.BlockDeviceMappings, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
kubeReservedHash, _ := hashstructure.Hash(resources.StringMap(kubeReserved), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
systemReservedHash, _ := hashstructure.Hash(resources.StringMap(systemReserved), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
blockDeviceMappingsHash, _ := hashstructure.Hash(blockDeviceMappings, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
// TODO: remove volumeSizeHash once resource.Quantity objects get hashed as a string in BlockDeviceMappings
// For more information on the resource.Quantity hash issue: https://github.com/aws/karpenter-provider-aws/issues/5447
volumeSizeHash, _ := hashstructure.Hash(lo.Reduce(nodeClass.Spec.BlockDeviceMappings, func(agg string, block *v1beta1.BlockDeviceMapping, _ int) string {
volumeSizeHash, _ := hashstructure.Hash(lo.Reduce(blockDeviceMappings, func(agg string, block *v1beta1.BlockDeviceMapping, _ int) string {
return fmt.Sprintf("%s/%s", agg, block.EBS.VolumeSize)
}, ""), hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
key := fmt.Sprintf("%d-%d-%d-%016x-%016x-%016x-%s-%s-%016x-%016x-%016x",

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.24.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.27.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.23.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.25.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.26.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.28.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64

Check failure on line 120 in pkg/providers/instancetype/instancetype.go

View workflow job for this annotation

GitHub Actions / ci-test (1.29.x)

fmt.Sprintf format %s has arg volumeSizeHash of wrong type uint64
p.instanceTypesSeqNum,
p.instanceTypeOfferingsSeqNum,
p.unavailableOfferings.SeqNum,
subnetZonesHash,
kcHash,
blockDeviceMappingsHash,
aws.StringValue((*string)(nodeClass.Spec.InstanceStorePolicy)),
aws.StringValue(nodeClass.Spec.AMIFamily),
aws.StringValue((*string)(instanceStorePolicy)),
aws.StringValue(nodeClassAmiFamiliy),
volumeSizeHash,
kubeReservedHash,
systemReservedHash,
)
if item, ok := p.cache.Get(key); ok {
return item.([]*cloudprovider.InstanceType), nil
}
amiFamily := amifamily.GetAMIFamily(nodeClassAmiFamiliy, &amifamily.Options{})

// Get all zones across all offerings
// We don't use this in the cache key since this is produced from our instanceTypeOfferings which we do cache
Expand All @@ -158,7 +153,9 @@ func (p *Provider) List(ctx context.Context, kc *corev1beta1.KubeletConfiguratio
instanceTypeLabel: *i.InstanceType,
}).Set(float64(aws.Int64Value(i.MemoryInfo.SizeInMiB) * 1024 * 1024))

return NewInstanceType(ctx, i, kc, p.region, nodeClass, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones))
return NewInstanceType(ctx, i,
p.region, blockDeviceMappings, instanceStorePolicy, maxPods, podsPerCore, kubeReserved, systemReserved,
evictionHard, evictionSoft, amiFamily, p.createOfferings(ctx, i, instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones))
})
p.cache.SetDefault(key, result)
return result, nil
Expand Down
69 changes: 34 additions & 35 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,20 @@ var (
instanceTypeScheme = regexp.MustCompile(`(^[a-z]+)(\-[0-9]+tb)?([0-9]+).*\.`)
)

func NewInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, kc *corev1beta1.KubeletConfiguration,
region string, nodeClass *v1beta1.EC2NodeClass, offerings cloudprovider.Offerings) *cloudprovider.InstanceType {
func NewInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, region string,
blockDeviceMapping []*v1beta1.BlockDeviceMapping, instanceStorePolicy *v1beta1.InstanceStorePolicy, maxPods *int32, podsPerCore *int32,
kubeReserved v1.ResourceList, systemReserved v1.ResourceList, evictionHard map[string]string, evictionSoft map[string]string,
amiFamily amifamily.AMIFamily, offerings cloudprovider.Offerings) *cloudprovider.InstanceType {

amiFamily := amifamily.GetAMIFamily(nodeClass.Spec.AMIFamily, &amifamily.Options{})
it := &cloudprovider.InstanceType{
Name: aws.StringValue(info.InstanceType),
Requirements: computeRequirements(info, offerings, region, amiFamily),
Offerings: offerings,
Capacity: computeCapacity(ctx, info, amiFamily, nodeClass, kc),
Capacity: computeCapacity(ctx, info, amiFamily, blockDeviceMapping, instanceStorePolicy, maxPods, podsPerCore),
Overhead: &cloudprovider.InstanceTypeOverhead{
KubeReserved: kubeReservedResources(cpu(info), pods(ctx, info, amiFamily, kc), ENILimitedPods(ctx, info), amiFamily, kc),
SystemReserved: systemReservedResources(kc),
EvictionThreshold: evictionThreshold(memory(ctx, info), ephemeralStorage(info, amiFamily, nodeClass), amiFamily, kc),
KubeReserved: kubeReservedResources(cpu(info), pods(ctx, info, amiFamily, maxPods, podsPerCore), ENILimitedPods(ctx, info), amiFamily, kubeReserved),
SystemReserved: systemReservedResources(systemReserved),
EvictionThreshold: evictionThreshold(memory(ctx, info), ephemeralStorage(info, amiFamily, blockDeviceMapping, instanceStorePolicy), amiFamily, evictionHard, evictionSoft),
},
}
if it.Requirements.Compatible(scheduling.NewRequirements(scheduling.NewRequirement(v1.LabelOSStable, v1.NodeSelectorOpIn, string(v1.Windows)))) == nil {
Expand Down Expand Up @@ -174,13 +175,14 @@ func getArchitecture(info *ec2.InstanceTypeInfo) string {
}

func computeCapacity(ctx context.Context, info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily,
nodeClass *v1beta1.EC2NodeClass, kc *corev1beta1.KubeletConfiguration) v1.ResourceList {
blockDeviceMapping []*v1beta1.BlockDeviceMapping, instanceStorePolicy *v1beta1.InstanceStorePolicy,
maxPods *int32, podsPerCore *int32) v1.ResourceList {

resourceList := v1.ResourceList{
v1.ResourceCPU: *cpu(info),
v1.ResourceMemory: *memory(ctx, info),
v1.ResourceEphemeralStorage: *ephemeralStorage(info, amiFamily, nodeClass),
v1.ResourcePods: *pods(ctx, info, amiFamily, kc),
v1.ResourceEphemeralStorage: *ephemeralStorage(info, amiFamily, blockDeviceMapping, instanceStorePolicy),
v1.ResourcePods: *pods(ctx, info, amiFamily, maxPods, podsPerCore),
v1beta1.ResourceAWSPodENI: *awsPodENI(aws.StringValue(info.InstanceType)),
v1beta1.ResourceNVIDIAGPU: *nvidiaGPUs(info),
v1beta1.ResourceAMDGPU: *amdGPUs(info),
Expand Down Expand Up @@ -208,28 +210,28 @@ func memory(ctx context.Context, info *ec2.InstanceTypeInfo) *resource.Quantity
}

// Setting ephemeral-storage to be either the default value, what is defined in blockDeviceMappings, or the combined size of local store volumes.
func ephemeralStorage(info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily, nodeClass *v1beta1.EC2NodeClass) *resource.Quantity {
func ephemeralStorage(info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily, blockDeviceMappings []*v1beta1.BlockDeviceMapping, instanceStorePolicy *v1beta1.InstanceStorePolicy) *resource.Quantity {
// If local store disks have been configured for node ephemeral-storage, use the total size of the disks.
if lo.FromPtr(nodeClass.Spec.InstanceStorePolicy) == v1beta1.InstanceStorePolicyRAID0 {
if lo.FromPtr(instanceStorePolicy) == v1beta1.InstanceStorePolicyRAID0 {
if info.InstanceStorageInfo != nil && info.InstanceStorageInfo.TotalSizeInGB != nil {
return resources.Quantity(fmt.Sprintf("%dG", *info.InstanceStorageInfo.TotalSizeInGB))
}
}
if len(nodeClass.Spec.BlockDeviceMappings) != 0 {
if len(blockDeviceMappings) != 0 {
// First check if there's a root volume configured in blockDeviceMappings.
if blockDeviceMapping, ok := lo.Find(nodeClass.Spec.BlockDeviceMappings, func(bdm *v1beta1.BlockDeviceMapping) bool {
if blockDeviceMapping, ok := lo.Find(blockDeviceMappings, func(bdm *v1beta1.BlockDeviceMapping) bool {
return bdm.RootVolume
}); ok && blockDeviceMapping.EBS.VolumeSize != nil {
return blockDeviceMapping.EBS.VolumeSize
}
switch amiFamily.(type) {
case *amifamily.Custom:
// We can't know if a custom AMI is going to have a volume size.
volumeSize := nodeClass.Spec.BlockDeviceMappings[len(nodeClass.Spec.BlockDeviceMappings)-1].EBS.VolumeSize
volumeSize := blockDeviceMappings[len(blockDeviceMappings)-1].EBS.VolumeSize
return lo.Ternary(volumeSize != nil, volumeSize, amifamily.DefaultEBS.VolumeSize)
default:
// If a block device mapping exists in the provider for the root volume, use the volume size specified in the provider. If not, use the default
if blockDeviceMapping, ok := lo.Find(nodeClass.Spec.BlockDeviceMappings, func(bdm *v1beta1.BlockDeviceMapping) bool {
if blockDeviceMapping, ok := lo.Find(blockDeviceMappings, func(bdm *v1beta1.BlockDeviceMapping) bool {
return *bdm.DeviceName == *amiFamily.EphemeralBlockDevice()
}); ok && blockDeviceMapping.EBS.VolumeSize != nil {
return blockDeviceMapping.EBS.VolumeSize
Expand Down Expand Up @@ -338,14 +340,14 @@ func privateIPv4Address(info *ec2.InstanceTypeInfo) *resource.Quantity {
return resources.Quantity(fmt.Sprint(capacity))
}

func systemReservedResources(kc *corev1beta1.KubeletConfiguration) v1.ResourceList {
if kc != nil && kc.SystemReserved != nil {
return kc.SystemReserved
func systemReservedResources(systemReserved v1.ResourceList) v1.ResourceList {
if systemReserved != nil {
return systemReserved
}
return v1.ResourceList{}
}

func kubeReservedResources(cpus, pods, eniLimitedPods *resource.Quantity, amiFamily amifamily.AMIFamily, kc *corev1beta1.KubeletConfiguration) v1.ResourceList {
func kubeReservedResources(cpus, pods, eniLimitedPods *resource.Quantity, amiFamily amifamily.AMIFamily, kubeReserved v1.ResourceList) v1.ResourceList {
if amiFamily.FeatureFlags().UsesENILimitedMemoryOverhead {
pods = eniLimitedPods
}
Expand Down Expand Up @@ -375,28 +377,25 @@ func kubeReservedResources(cpus, pods, eniLimitedPods *resource.Quantity, amiFam
resources[v1.ResourceCPU] = *cpuOverhead
}
}
if kc != nil && kc.KubeReserved != nil {
return lo.Assign(resources, kc.KubeReserved)
if kubeReserved != nil {
return lo.Assign(resources, kubeReserved)
}
return resources
}

func evictionThreshold(memory *resource.Quantity, storage *resource.Quantity, amiFamily amifamily.AMIFamily, kc *corev1beta1.KubeletConfiguration) v1.ResourceList {
func evictionThreshold(memory *resource.Quantity, storage *resource.Quantity, amiFamily amifamily.AMIFamily, evictionHard map[string]string, evictionSoft map[string]string) v1.ResourceList {
overhead := v1.ResourceList{
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse(fmt.Sprint(math.Ceil(float64(storage.Value()) / 100 * 10))),
}
if kc == nil {
return overhead
}

override := v1.ResourceList{}
var evictionSignals []map[string]string
if kc.EvictionHard != nil {
evictionSignals = append(evictionSignals, kc.EvictionHard)
if evictionHard != nil {
evictionSignals = append(evictionSignals, evictionHard)
}
if kc.EvictionSoft != nil && amiFamily.FeatureFlags().EvictionSoftEnabled {
evictionSignals = append(evictionSignals, kc.EvictionSoft)
if evictionSoft != nil && amiFamily.FeatureFlags().EvictionSoftEnabled {
evictionSignals = append(evictionSignals, evictionSoft)
}

for _, m := range evictionSignals {
Expand All @@ -413,19 +412,19 @@ func evictionThreshold(memory *resource.Quantity, storage *resource.Quantity, am
return lo.Assign(overhead, override)
}

func pods(ctx context.Context, info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily, kc *corev1beta1.KubeletConfiguration) *resource.Quantity {
func pods(ctx context.Context, info *ec2.InstanceTypeInfo, amiFamily amifamily.AMIFamily, maxPods *int32, podsPerCore *int32) *resource.Quantity {
var count int64
switch {
case kc != nil && kc.MaxPods != nil:
count = int64(ptr.Int32Value(kc.MaxPods))
case maxPods != nil:
count = int64(ptr.Int32Value(maxPods))
case amiFamily.FeatureFlags().SupportsENILimitedPodDensity:
count = ENILimitedPods(ctx, info).Value()
default:
count = 110

}
if kc != nil && ptr.Int32Value(kc.PodsPerCore) > 0 && amiFamily.FeatureFlags().PodsPerCoreEnabled {
count = lo.Min([]int64{int64(ptr.Int32Value(kc.PodsPerCore)) * ptr.Int64Value(info.VCpuInfo.DefaultVCpus), count})
if ptr.Int32Value(podsPerCore) > 0 && amiFamily.FeatureFlags().PodsPerCoreEnabled {
count = lo.Min([]int64{int64(ptr.Int32Value(podsPerCore)) * ptr.Int64Value(info.VCpuInfo.DefaultVCpus), count})
}
return resources.Quantity(fmt.Sprint(count))
}
Expand Down

0 comments on commit fdef8bb

Please sign in to comment.