Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support vpc.amazonaws.com/efa resource #5068

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hack/code/instancetype_testdata_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func getInstanceTypeInfo(info *ec2.InstanceTypeInfo) string {
fmt.Fprintf(src, "},\n")
}
fmt.Fprintf(src, "NetworkInfo: &ec2.NetworkInfo{\n")
if info.NetworkInfo.EfaInfo != nil {
fmt.Fprintf(src, "EfaInfo: &ec2.EfaInfo{\n")
fmt.Fprintf(src, "MaximumEfaInterfaces: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.EfaInfo.MaximumEfaInterfaces))
fmt.Fprintf(src, "},\n")
}
fmt.Fprintf(src, "MaximumNetworkInterfaces: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.MaximumNetworkInterfaces))
fmt.Fprintf(src, "Ipv4AddressesPerInterface: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.Ipv4AddressesPerInterface))
fmt.Fprintf(src, "EncryptionInTransitSupported: aws.Bool(%t),\n", lo.FromPtr(info.NetworkInfo.EncryptionInTransitSupported))
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1beta1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var (
ResourceHabanaGaudi v1.ResourceName = "habana.ai/gaudi"
ResourceAWSPodENI v1.ResourceName = "vpc.amazonaws.com/pod-eni"
ResourcePrivateIPv4Address v1.ResourceName = "vpc.amazonaws.com/PrivateIPv4Address"
ResourceEFA v1.ResourceName = "vpc.amazonaws.com/efa"

LabelNodeClass = Group + "/ec2nodeclass"

Expand Down
15 changes: 13 additions & 2 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,19 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType *
labels[key] = req.Values()[0]
}
}
nodeClaim.Status.Capacity = functional.FilterMap(instanceType.Capacity, func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) })
nodeClaim.Status.Allocatable = functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) })
resourceFilter := func(n v1.ResourceName, v resource.Quantity) bool {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
if resources.IsZero(v) {
return false
}
// The nodeclaim should only advertise an EFA resource if it was requested. EFA network interfaces are only
// added to the launch template if they're requested, otherwise the instance is launched with a normal ENI.
if n == v1beta1.ResourceEFA {
return i.EFAEnabled
}
return true
}
nodeClaim.Status.Capacity = functional.FilterMap(instanceType.Capacity, resourceFilter)
nodeClaim.Status.Allocatable = functional.FilterMap(instanceType.Allocatable(), resourceFilter)
}
labels[v1.LabelTopologyZone] = i.Zone
labels[corev1beta1.CapacityTypeLabelKey] = i.CapacityType
Expand Down
30 changes: 30 additions & 0 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -600,4 +601,33 @@ var _ = Describe("CloudProvider", func() {
ExpectScheduled(ctx, env.Client, pod)
})
})
Context("EFA", func() {
It("should include vpc.amazonaws.com/efa on a nodeclaim if it requests it", func() {
nodeClaim.Spec.Requirements = []v1.NodeSelectorRequirement{
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"dl1.24xlarge"},
},
}
nodeClaim.Spec.Resources.Requests = v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")}
ExpectApplied(ctx, env.Client, nodePool, nodeClass, nodeClaim)
cloudProviderNodeClaim, err := cloudProvider.Create(ctx, nodeClaim)
Expect(err).To(BeNil())
Expect(lo.Keys(cloudProviderNodeClaim.Status.Allocatable)).To(ContainElement(v1beta1.ResourceEFA))
})
It("shouldn't include vpc.amazonaws.com/efa on a nodeclaim if it doesn't request it", func() {
nodeClaim.Spec.Requirements = []v1.NodeSelectorRequirement{
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"dl1.24xlarge"},
},
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass, nodeClaim)
cloudProviderNodeClaim, err := cloudProvider.Create(ctx, nodeClaim)
Expect(err).To(BeNil())
Expect(lo.Keys(cloudProviderNodeClaim.Status.Allocatable)).ToNot(ContainElement(v1beta1.ResourceEFA))
})
})
})
9 changes: 9 additions & 0 deletions pkg/fake/zz_generated.describe_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{
TotalSizeInGB: aws.Int64(4000),
},
NetworkInfo: &ec2.NetworkInfo{
EfaInfo: &ec2.EfaInfo{
MaximumEfaInterfaces: aws.Int64(4),
},
MaximumNetworkInterfaces: aws.Int64(60),
Ipv4AddressesPerInterface: aws.Int64(50),
EncryptionInTransitSupported: aws.Bool(true),
Expand Down Expand Up @@ -147,6 +150,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{
TotalSizeInGB: aws.Int64(900),
},
NetworkInfo: &ec2.NetworkInfo{
EfaInfo: &ec2.EfaInfo{
MaximumEfaInterfaces: aws.Int64(1),
},
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(15),
EncryptionInTransitSupported: aws.Bool(true),
Expand Down Expand Up @@ -348,6 +354,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{
TotalSizeInGB: aws.Int64(7600),
},
NetworkInfo: &ec2.NetworkInfo{
EfaInfo: &ec2.EfaInfo{
MaximumEfaInterfaces: aws.Int64(2),
},
MaximumNetworkInterfaces: aws.Int64(14),
Ipv4AddressesPerInterface: aws.Int64(50),
EncryptionInTransitSupported: aws.Bool(true),
Expand Down
88 changes: 56 additions & 32 deletions pkg/providers/amifamily/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type LaunchTemplate struct {
AMIID string
InstanceTypes []*cloudprovider.InstanceType `hash:"ignore"`
DetailedMonitoring bool
EFACount int
}

// AMIFamily can be implemented to override the default logic for generating dynamic launch template parameters
Expand Down Expand Up @@ -130,43 +131,29 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass,
}
var resolvedTemplates []*LaunchTemplate
for amiID, instanceTypes := range mappedAMIs {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
maxPodsToInstanceTypes := lo.GroupBy(instanceTypes, func(instanceType *cloudprovider.InstanceType) int {
return int(instanceType.Capacity.Pods().Value())
})
// In order to support reserved ENIs for CNI custom networking setups,
// we need to pass down the max-pods calculation to the kubelet.
// This requires that we resolve a unique launch template per max-pods value.
for maxPods, instanceTypes := range maxPodsToInstanceTypes {
kubeletConfig := &corev1beta1.KubeletConfiguration{}
if nodeClaim.Spec.Kubelet != nil {
if err := mergo.Merge(kubeletConfig, nodeClaim.Spec.Kubelet); err != nil {
return nil, err
}
}
if kubeletConfig.MaxPods == nil {
kubeletConfig.MaxPods = lo.ToPtr(int32(maxPods))
}
resolved := &LaunchTemplate{
Options: options,
UserData: amiFamily.UserData(
r.defaultClusterDNS(options, kubeletConfig),
append(nodeClaim.Spec.Taints, nodeClaim.Spec.StartupTaints...),
options.Labels,
options.CABundle,
instanceTypes,
nodeClass.Spec.UserData,
// Similarly, instance types configured with EfAs require unique launch templates depending on the number of
// EFAs they support.
type launchTemplateParams struct {
efaCount int
maxPods int
}
paramsToInstanceTypes := lo.GroupBy(instanceTypes, func(instanceType *cloudprovider.InstanceType) launchTemplateParams {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
return launchTemplateParams{
efaCount: lo.Ternary(
lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA),
int(lo.ToPtr(instanceType.Capacity[v1beta1.ResourceEFA]).Value()),
0,
),
BlockDeviceMappings: nodeClass.Spec.BlockDeviceMappings,
MetadataOptions: nodeClass.Spec.MetadataOptions,
DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring),
AMIID: amiID,
InstanceTypes: instanceTypes,
maxPods: int(instanceType.Capacity.Pods().Value()),
}
if len(resolved.BlockDeviceMappings) == 0 {
resolved.BlockDeviceMappings = amiFamily.DefaultBlockDeviceMappings()
}
if resolved.MetadataOptions == nil {
resolved.MetadataOptions = amiFamily.DefaultMetadataOptions()
})
for params, instanceTypes := range paramsToInstanceTypes {
resolved, err := r.resolveLaunchTemplate(nodeClass, nodeClaim, instanceTypes, amiFamily, amiID, params.maxPods, params.efaCount, options)
if err != nil {
return nil, err
}
resolvedTemplates = append(resolvedTemplates, resolved)
}
Expand Down Expand Up @@ -216,3 +203,40 @@ func (r Resolver) defaultClusterDNS(opts *Options, kubeletConfig *corev1beta1.Ku
newKubeletConfig.ClusterDNS = []string{opts.KubeDNSIP.String()}
return newKubeletConfig
}

func (r Resolver) resolveLaunchTemplate(nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType,
amiFamily AMIFamily, amiID string, maxPods int, efaCount int, options *Options) (*LaunchTemplate, error) {
kubeletConfig := &corev1beta1.KubeletConfiguration{}
if nodeClaim.Spec.Kubelet != nil {
if err := mergo.Merge(kubeletConfig, nodeClaim.Spec.Kubelet); err != nil {
return nil, err
}
}
if kubeletConfig.MaxPods == nil {
kubeletConfig.MaxPods = lo.ToPtr(int32(maxPods))
}
resolved := &LaunchTemplate{
Options: options,
UserData: amiFamily.UserData(
r.defaultClusterDNS(options, kubeletConfig),
append(nodeClaim.Spec.Taints, nodeClaim.Spec.StartupTaints...),
options.Labels,
options.CABundle,
instanceTypes,
nodeClass.Spec.UserData,
),
BlockDeviceMappings: nodeClass.Spec.BlockDeviceMappings,
MetadataOptions: nodeClass.Spec.MetadataOptions,
DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring),
AMIID: amiID,
InstanceTypes: instanceTypes,
EFACount: efaCount,
}
if len(resolved.BlockDeviceMappings) == 0 {
resolved.BlockDeviceMappings = amiFamily.DefaultBlockDeviceMappings()
}
if resolved.MetadataOptions == nil {
resolved.MetadataOptions = amiFamily.DefaultMetadataOptions()
}
return resolved, nil
}
3 changes: 2 additions & 1 deletion pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (p *Provider) Create(ctx context.Context, nodeClass *v1beta1.EC2NodeClass,
if err != nil {
return nil, err
}
return NewInstanceFromFleet(fleetInstance, tags), nil
efaEnabled := lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA)
return NewInstanceFromFleet(fleetInstance, tags, efaEnabled), nil
}

func (p *Provider) Link(ctx context.Context, id, provisionerName string) error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/providers/instance/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Instance struct {
SecurityGroupIDs []string
SubnetID string
Tags map[string]string
EFAEnabled bool
}

func NewInstance(out *ec2.Instance) *Instance {
Expand All @@ -53,11 +54,14 @@ func NewInstance(out *ec2.Instance) *Instance {
}),
SubnetID: aws.StringValue(out.SubnetId),
Tags: lo.SliceToMap(out.Tags, func(t *ec2.Tag) (string, string) { return aws.StringValue(t.Key), aws.StringValue(t.Value) }),
EFAEnabled: lo.ContainsBy(out.NetworkInterfaces, func(ni *ec2.InstanceNetworkInterface) bool {
return ni != nil && lo.FromPtr(ni.InterfaceType) == ec2.NetworkInterfaceTypeEfa
}),
}

}

func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string) *Instance {
func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool) *Instance {
return &Instance{
LaunchTime: time.Now(), // estimate the launch time since we just launched
State: ec2.StatePending,
Expand All @@ -68,5 +72,6 @@ func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string)
CapacityType: aws.StringValue(out.Lifecycle),
SubnetID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.SubnetId),
Tags: tags,
EFAEnabled: efaEnabled,
}
}
32 changes: 32 additions & 0 deletions pkg/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,38 @@ var _ = Describe("InstanceTypes", func() {
}
Expect(nodeNames.Len()).To(Equal(1))
})
It("should launch instances for vpc.amazonaws.com/efa resource requests", func() {
nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirement{
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"dl1.24xlarge"},
},
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
pods := []*v1.Pod{
coretest.UnschedulablePod(coretest.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")},
Limits: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")},
},
}),
coretest.UnschedulablePod(coretest.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("2")},
Limits: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("2")},
},
}),
}
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)
nodes := sets.NewString()
for _, pod := range pods {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "dl1.24xlarge"))
nodes.Insert(node.Name)
}
Expect(nodes.Len()).To(Equal(1))
})
It("should not set pods to 110 if using ENI-based pod density", func() {
instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx)
Expect(err).To(BeNil())
Expand Down
9 changes: 9 additions & 0 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func computeCapacity(ctx context.Context, info *ec2.InstanceTypeInfo, amiFamily
v1beta1.ResourceAMDGPU: *amdGPUs(info),
v1beta1.ResourceAWSNeuron: *awsNeurons(info),
v1beta1.ResourceHabanaGaudi: *habanaGaudis(info),
v1beta1.ResourceEFA: *efas(info),
}
return resourceList
}
Expand Down Expand Up @@ -296,6 +297,14 @@ func habanaGaudis(info *ec2.InstanceTypeInfo) *resource.Quantity {
return resources.Quantity(fmt.Sprint(count))
}

func efas(info *ec2.InstanceTypeInfo) *resource.Quantity {
count := int64(0)
if info.NetworkInfo != nil && info.NetworkInfo.EfaInfo != nil {
count = lo.FromPtr(info.NetworkInfo.EfaInfo.MaximumEfaInterfaces)
}
return resources.Quantity(fmt.Sprint(count))
}

func ENILimitedPods(ctx context.Context, info *ec2.InstanceTypeInfo) *resource.Quantity {
// The number of pods per node is calculated using the formula:
// max number of ENIs * (IPv4 Addresses per ENI -1) + 2
Expand Down
31 changes: 22 additions & 9 deletions pkg/providers/launchtemplate/launchtemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string
if capacityType == corev1beta1.CapacityTypeSpot {
launchTemplateDataTags = append(launchTemplateDataTags, &ec2.LaunchTemplateTagSpecificationRequest{ResourceType: aws.String(ec2.ResourceTypeSpotInstancesRequest), Tags: utils.MergeTags(options.Tags)})
}
networkInterface := p.generateNetworkInterface(options)
networkInterfaces := p.generateNetworkInterfaces(options)
output, err := p.ec2api.CreateLaunchTemplateWithContext(ctx, &ec2.CreateLaunchTemplateInput{
LaunchTemplateName: aws.String(launchTemplateName(options)),
LaunchTemplateData: &ec2.RequestLaunchTemplateData{
Expand All @@ -254,7 +254,7 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string
Enabled: aws.Bool(options.DetailedMonitoring),
},
// If the network interface is defined, the security groups are defined within it
SecurityGroupIds: lo.Ternary(networkInterface != nil, nil, lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) })),
SecurityGroupIds: lo.Ternary(networkInterfaces != nil, nil, lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) })),
UserData: aws.String(userData),
ImageId: aws.String(options.AMIID),
MetadataOptions: &ec2.LaunchTemplateInstanceMetadataOptionsRequest{
Expand All @@ -263,7 +263,7 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string
HttpPutResponseHopLimit: options.MetadataOptions.HTTPPutResponseHopLimit,
HttpTokens: options.MetadataOptions.HTTPTokens,
},
NetworkInterfaces: networkInterface,
NetworkInterfaces: networkInterfaces,
TagSpecifications: launchTemplateDataTags,
},
TagSpecifications: []*ec2.TagSpecification{
Expand All @@ -280,12 +280,25 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string
return output.LaunchTemplate, nil
}

// generateNetworkInterface generates a network interface for the launch template.
// If all referenced subnets do not assign public IPv4 addresses to EC2 instances therein, we explicitly set
// AssociatePublicIpAddress to 'false' in the Launch Template, generated based on this configuration struct.
// This is done to help comply with AWS account policies that require explicitly setting that field to 'false'.
// https://github.com/aws/karpenter/issues/3815
func (p *Provider) generateNetworkInterface(options *amifamily.LaunchTemplate) []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest {
// generateNetworkInterfaces generates network interfaces for the launch template.
func (p *Provider) generateNetworkInterfaces(options *amifamily.LaunchTemplate) []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest {
if options.EFACount != 0 {
return lo.Times(options.EFACount, func(i int) *ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest {
return &ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{
NetworkCardIndex: lo.ToPtr(int64(i)),
// Some networking magic to ensure that one network card has higher priority than all the others (important if an instance needs a public IP w/o adding an EIP to every network card)
DeviceIndex: lo.ToPtr(lo.Ternary[int64](i == 0, 0, 1)),
InterfaceType: lo.ToPtr(ec2.NetworkInterfaceTypeEfa),
Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }),
}
})
}

// If all referenced subnets do not assign public IPv4 addresses to EC2 instances therein, we explicitly set
// AssociatePublicIpAddress to 'false' in the Launch Template, generated based on this configuration struct.
// This is done to help comply with AWS account policies that require explicitly setting that field to 'false'.
// This is ignored for EFA instances since it can't be specified if you launch with multiple network interfaces.
// https://github.com/aws/karpenter/issues/3815
if options.AssociatePublicIPAddress != nil {
return []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{
{
Expand Down
Loading