diff --git a/hack/code/instancetype_testdata_gen/main.go b/hack/code/instancetype_testdata_gen/main.go index 881faa700372..35b684036226 100644 --- a/hack/code/instancetype_testdata_gen/main.go +++ b/hack/code/instancetype_testdata_gen/main.go @@ -154,6 +154,11 @@ func getInstanceTypeInfo(info *ec2.InstanceTypeInfo) string { fmt.Fprintf(src, "},\n") } fmt.Fprintf(src, "NetworkInfo: &ec2.NetworkInfo{\n") + if info.NetworkInfo.EfaInfo != nil { + fmt.Fprintf(src, "EfaInfo: &ec2.EfaInfo{\n") + fmt.Fprintf(src, "MaximumEfaInterfaces: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.EfaInfo.MaximumEfaInterfaces)) + fmt.Fprintf(src, "},\n") + } fmt.Fprintf(src, "MaximumNetworkInterfaces: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.MaximumNetworkInterfaces)) fmt.Fprintf(src, "Ipv4AddressesPerInterface: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.Ipv4AddressesPerInterface)) fmt.Fprintf(src, "EncryptionInTransitSupported: aws.Bool(%t),\n", lo.FromPtr(info.NetworkInfo.EncryptionInTransitSupported)) diff --git a/pkg/apis/v1beta1/labels.go b/pkg/apis/v1beta1/labels.go index db637ea393bd..1b86eb70726f 100644 --- a/pkg/apis/v1beta1/labels.go +++ b/pkg/apis/v1beta1/labels.go @@ -88,6 +88,7 @@ var ( ResourceHabanaGaudi v1.ResourceName = "habana.ai/gaudi" ResourceAWSPodENI v1.ResourceName = "vpc.amazonaws.com/pod-eni" ResourcePrivateIPv4Address v1.ResourceName = "vpc.amazonaws.com/PrivateIPv4Address" + ResourceEFA v1.ResourceName = "vpc.amazonaws.com/efa" LabelNodeClass = Group + "/ec2nodeclass" diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index cfc3265ba35a..634078ad03b7 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -311,8 +311,14 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType * labels[key] = req.Values()[0] } } - nodeClaim.Status.Capacity = functional.FilterMap(instanceType.Capacity, func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }) - nodeClaim.Status.Allocatable = functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }) + resourceFilter := func(n v1.ResourceName, v resource.Quantity) bool { + if !i.EFAEnabled && n == v1beta1.ResourceEFA { + return false + } + return !resources.IsZero(v) + } + nodeClaim.Status.Capacity = functional.FilterMap(instanceType.Capacity, resourceFilter) + nodeClaim.Status.Allocatable = functional.FilterMap(instanceType.Allocatable(), resourceFilter) } labels[v1.LabelTopologyZone] = i.Zone labels[corev1beta1.CapacityTypeLabelKey] = i.CapacityType diff --git a/pkg/cloudprovider/suite_test.go b/pkg/cloudprovider/suite_test.go index 4a845bce9d83..872819708a7a 100644 --- a/pkg/cloudprovider/suite_test.go +++ b/pkg/cloudprovider/suite_test.go @@ -22,6 +22,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" @@ -600,4 +601,33 @@ var _ = Describe("CloudProvider", func() { ExpectScheduled(ctx, env.Client, pod) }) }) + Context("EFA", func() { + It("should include vpc.amazonaws.com/efa on a nodeclaim if it requests it", func() { + nodeClaim.Spec.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelInstanceTypeStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{"dl1.24xlarge"}, + }, + } + nodeClaim.Spec.Resources.Requests = v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")} + ExpectApplied(ctx, env.Client, nodePool, nodeClass, nodeClaim) + cloudProviderNodeClaim, err := cloudProvider.Create(ctx, nodeClaim) + Expect(err).To(BeNil()) + Expect(lo.Keys(cloudProviderNodeClaim.Status.Allocatable)).To(ContainElement(v1beta1.ResourceEFA)) + }) + It("shouldn't include vpc.amazonaws.com/efa on a nodeclaim if it doesn't request it", func() { + nodeClaim.Spec.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelInstanceTypeStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{"dl1.24xlarge"}, + }, + } + ExpectApplied(ctx, env.Client, nodePool, nodeClass, nodeClaim) + cloudProviderNodeClaim, err := cloudProvider.Create(ctx, nodeClaim) + Expect(err).To(BeNil()) + Expect(lo.Keys(cloudProviderNodeClaim.Status.Allocatable)).ToNot(ContainElement(v1beta1.ResourceEFA)) + }) + }) }) diff --git a/pkg/fake/zz_generated.describe_instance_types.go b/pkg/fake/zz_generated.describe_instance_types.go index 0458a976133b..0d9aa45d555b 100644 --- a/pkg/fake/zz_generated.describe_instance_types.go +++ b/pkg/fake/zz_generated.describe_instance_types.go @@ -90,6 +90,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{ TotalSizeInGB: aws.Int64(4000), }, NetworkInfo: &ec2.NetworkInfo{ + EfaInfo: &ec2.EfaInfo{ + MaximumEfaInterfaces: aws.Int64(4), + }, MaximumNetworkInterfaces: aws.Int64(60), Ipv4AddressesPerInterface: aws.Int64(50), EncryptionInTransitSupported: aws.Bool(true), @@ -147,6 +150,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{ TotalSizeInGB: aws.Int64(900), }, NetworkInfo: &ec2.NetworkInfo{ + EfaInfo: &ec2.EfaInfo{ + MaximumEfaInterfaces: aws.Int64(1), + }, MaximumNetworkInterfaces: aws.Int64(4), Ipv4AddressesPerInterface: aws.Int64(15), EncryptionInTransitSupported: aws.Bool(true), @@ -348,6 +354,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{ TotalSizeInGB: aws.Int64(7600), }, NetworkInfo: &ec2.NetworkInfo{ + EfaInfo: &ec2.EfaInfo{ + MaximumEfaInterfaces: aws.Int64(2), + }, MaximumNetworkInterfaces: aws.Int64(14), Ipv4AddressesPerInterface: aws.Int64(50), EncryptionInTransitSupported: aws.Bool(true), diff --git a/pkg/providers/amifamily/resolver.go b/pkg/providers/amifamily/resolver.go index ab5754dbdf5a..221fe103f745 100644 --- a/pkg/providers/amifamily/resolver.go +++ b/pkg/providers/amifamily/resolver.go @@ -69,6 +69,7 @@ type LaunchTemplate struct { AMIID string InstanceTypes []*cloudprovider.InstanceType `hash:"ignore"` DetailedMonitoring bool + EFACount int } // AMIFamily can be implemented to override the default logic for generating dynamic launch template parameters @@ -130,13 +131,24 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, } var resolvedTemplates []*LaunchTemplate for amiID, instanceTypes := range mappedAMIs { - maxPodsToInstanceTypes := lo.GroupBy(instanceTypes, func(instanceType *cloudprovider.InstanceType) int { - return int(instanceType.Capacity.Pods().Value()) + type launchTemplateParams struct { + efaCount int + maxPods int + } + paramsToInstanceTypes := lo.GroupBy(instanceTypes, func(instanceType *cloudprovider.InstanceType) launchTemplateParams { + return launchTemplateParams{ + efaCount: lo.Ternary( + lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA), + int(lo.ToPtr(instanceType.Capacity[v1beta1.ResourceEFA]).Value()), + 0, + ), + maxPods: int(instanceType.Capacity.Pods().Value()), + } }) // In order to support reserved ENIs for CNI custom networking setups, // we need to pass down the max-pods calculation to the kubelet. // This requires that we resolve a unique launch template per max-pods value. - for maxPods, instanceTypes := range maxPodsToInstanceTypes { + for params, instanceTypes := range paramsToInstanceTypes { kubeletConfig := &corev1beta1.KubeletConfiguration{} if nodeClaim.Spec.Kubelet != nil { if err := mergo.Merge(kubeletConfig, nodeClaim.Spec.Kubelet); err != nil { @@ -144,7 +156,7 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, } } if kubeletConfig.MaxPods == nil { - kubeletConfig.MaxPods = lo.ToPtr(int32(maxPods)) + kubeletConfig.MaxPods = lo.ToPtr(int32(params.maxPods)) } resolved := &LaunchTemplate{ Options: options, @@ -161,6 +173,7 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring), AMIID: amiID, InstanceTypes: instanceTypes, + EFACount: params.efaCount, } if len(resolved.BlockDeviceMappings) == 0 { resolved.BlockDeviceMappings = amiFamily.DefaultBlockDeviceMappings() diff --git a/pkg/providers/instance/instance.go b/pkg/providers/instance/instance.go index cfd102ef9838..47d4b6fba0b0 100644 --- a/pkg/providers/instance/instance.go +++ b/pkg/providers/instance/instance.go @@ -100,7 +100,8 @@ func (p *Provider) Create(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, if err != nil { return nil, err } - return NewInstanceFromFleet(fleetInstance, tags), nil + efaEnabled := lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA) + return NewInstanceFromFleet(fleetInstance, tags, efaEnabled), nil } func (p *Provider) Link(ctx context.Context, id, provisionerName string) error { diff --git a/pkg/providers/instance/types.go b/pkg/providers/instance/types.go index e44deeb408dd..b12a7f9a96a6 100644 --- a/pkg/providers/instance/types.go +++ b/pkg/providers/instance/types.go @@ -37,6 +37,7 @@ type Instance struct { SecurityGroupIDs []string SubnetID string Tags map[string]string + EFAEnabled bool } func NewInstance(out *ec2.Instance) *Instance { @@ -53,11 +54,14 @@ func NewInstance(out *ec2.Instance) *Instance { }), SubnetID: aws.StringValue(out.SubnetId), Tags: lo.SliceToMap(out.Tags, func(t *ec2.Tag) (string, string) { return aws.StringValue(t.Key), aws.StringValue(t.Value) }), + EFAEnabled: lo.ContainsBy(out.NetworkInterfaces, func(ni *ec2.InstanceNetworkInterface) bool { + return ni != nil && lo.FromPtr(ni.InterfaceType) == "efa" + }), } } -func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string) *Instance { +func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool) *Instance { return &Instance{ LaunchTime: time.Now(), // estimate the launch time since we just launched State: ec2.StatePending, @@ -68,5 +72,6 @@ func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string) CapacityType: aws.StringValue(out.Lifecycle), SubnetID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.SubnetId), Tags: tags, + EFAEnabled: efaEnabled, } } diff --git a/pkg/providers/instancetype/suite_test.go b/pkg/providers/instancetype/suite_test.go index 12b199b111e3..7a83ccc53ee4 100644 --- a/pkg/providers/instancetype/suite_test.go +++ b/pkg/providers/instancetype/suite_test.go @@ -623,6 +623,38 @@ var _ = Describe("InstanceTypes", func() { } Expect(nodeNames.Len()).To(Equal(1)) }) + It("should launch instances for vpc.amazonaws.com/efa resource requests", func() { + nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelInstanceTypeStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{"dl1.24xlarge"}, + }, + } + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + pods := []*v1.Pod{ + coretest.UnschedulablePod(coretest.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")}, + Limits: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")}, + }, + }), + coretest.UnschedulablePod(coretest.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("2")}, + Limits: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("2")}, + }, + }), + } + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + nodes := sets.NewString() + for _, pod := range pods { + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "dl1.24xlarge")) + nodes.Insert(node.Name) + } + Expect(nodes.Len()).To(Equal(1)) + }) It("should not set pods to 110 if using ENI-based pod density", func() { instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx) Expect(err).To(BeNil()) diff --git a/pkg/providers/instancetype/types.go b/pkg/providers/instancetype/types.go index 00458f30942c..36af3cd1900f 100644 --- a/pkg/providers/instancetype/types.go +++ b/pkg/providers/instancetype/types.go @@ -181,6 +181,7 @@ func computeCapacity(ctx context.Context, info *ec2.InstanceTypeInfo, amiFamily v1beta1.ResourceAMDGPU: *amdGPUs(info), v1beta1.ResourceAWSNeuron: *awsNeurons(info), v1beta1.ResourceHabanaGaudi: *habanaGaudis(info), + v1beta1.ResourceEFA: *efas(info), } return resourceList } @@ -296,6 +297,14 @@ func habanaGaudis(info *ec2.InstanceTypeInfo) *resource.Quantity { return resources.Quantity(fmt.Sprint(count)) } +func efas(info *ec2.InstanceTypeInfo) *resource.Quantity { + count := int64(0) + if info.NetworkInfo != nil && info.NetworkInfo.EfaInfo != nil { + count = lo.FromPtr(info.NetworkInfo.EfaInfo.MaximumEfaInterfaces) + } + return resources.Quantity(fmt.Sprint(count)) +} + func ENILimitedPods(ctx context.Context, info *ec2.InstanceTypeInfo) *resource.Quantity { // The number of pods per node is calculated using the formula: // max number of ENIs * (IPv4 Addresses per ENI -1) + 2 diff --git a/pkg/providers/launchtemplate/launchtemplate.go b/pkg/providers/launchtemplate/launchtemplate.go index 0a3af6cff0d2..953e3006ecb3 100644 --- a/pkg/providers/launchtemplate/launchtemplate.go +++ b/pkg/providers/launchtemplate/launchtemplate.go @@ -243,6 +243,7 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string launchTemplateDataTags = append(launchTemplateDataTags, &ec2.LaunchTemplateTagSpecificationRequest{ResourceType: aws.String(ec2.ResourceTypeSpotInstancesRequest), Tags: utils.MergeTags(options.Tags)}) } networkInterface := p.generateNetworkInterface(options) + logging.FromContext(ctx).Debugf("Network Interfaces: %d", len(networkInterface)) output, err := p.ec2api.CreateLaunchTemplateWithContext(ctx, &ec2.CreateLaunchTemplateInput{ LaunchTemplateName: aws.String(launchTemplateName(options)), LaunchTemplateData: &ec2.RequestLaunchTemplateData{ @@ -286,16 +287,23 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string // This is done to help comply with AWS account policies that require explicitly setting that field to 'false'. // https://github.com/aws/karpenter/issues/3815 func (p *Provider) generateNetworkInterface(options *amifamily.LaunchTemplate) []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest { - if options.AssociatePublicIPAddress != nil { - return []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{ - { - AssociatePublicIpAddress: options.AssociatePublicIPAddress, - DeviceIndex: aws.Int64(0), - Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }), - }, + interfaces := lo.Times(options.EFACount, func(i int) *ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest { + return &ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{ + AssociatePublicIpAddress: options.AssociatePublicIPAddress, + NetworkCardIndex: lo.ToPtr(int64(i)), + DeviceIndex: lo.ToPtr(lo.Ternary[int64](i == 0, 0, 1)), + InterfaceType: lo.ToPtr("efa"), + Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }), } + }) + if len(interfaces) == 0 && options.AssociatePublicIPAddress != nil { + interfaces = append(interfaces, &ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{ + AssociatePublicIpAddress: options.AssociatePublicIPAddress, + DeviceIndex: aws.Int64(0), + Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }), + }) } - return nil + return interfaces } func (p *Provider) blockDeviceMappings(blockDeviceMappings []*v1beta1.BlockDeviceMapping) []*ec2.LaunchTemplateBlockDeviceMappingRequest { diff --git a/test/suites/integration/extended_resources_test.go b/test/suites/integration/extended_resources_test.go index c5b883e326bb..07eba151ac49 100644 --- a/test/suites/integration/extended_resources_test.go +++ b/test/suites/integration/extended_resources_test.go @@ -209,6 +209,48 @@ var _ = Describe("Extended Resources", func() { env.ExpectCreatedNodeCount("==", 1) env.EventuallyExpectInitializedNodeCount("==", 1) }) + + It("should provision nodes for a deployment that requests vpc.amazonaws.com/efa", func() { + ExpectEFADevicePluginCreated() + + nodePool.Spec.Template.Labels = map[string]string{ + "aws.amazon.com/efa": "true", + } + nodePool.Spec.Template.Spec.Taints = []v1.Taint{ + { + Key: "aws.amazon.com/efa", + Effect: v1.TaintEffectNoSchedule, + }, + } + numPods := 1 + dep := test.Deployment(test.DeploymentOptions{ + Replicas: int32(numPods), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "efa-app"}, + }, + Tolerations: []v1.Toleration{ + { + Key: "aws.amazon.com/efa", + Operator: v1.TolerationOpExists, + }, + }, + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "vpc.amazonaws.com/efa": resource.MustParse("1"), + }, + Limits: v1.ResourceList{ + "vpc.amazonaws.com/efa": resource.MustParse("1"), + }, + }, + }, + }) + selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + env.ExpectCreated(nodeClass, nodePool, dep) + env.EventuallyExpectHealthyPodCount(selector, numPods) + env.ExpectCreatedNodeCount("==", 1) + env.EventuallyExpectInitializedNodeCount("==", 1) + }) }) func ExpectNvidiaDevicePluginCreated() { @@ -425,3 +467,80 @@ func ExpectHabanaDevicePluginCreated() { }, }) } + +func ExpectEFADevicePluginCreated() { + GinkgoHelper() + env.ExpectCreated(&appsv1.DaemonSet{ + ObjectMeta: test.ObjectMeta(metav1.ObjectMeta{ + Name: "aws-efa-k8s-device-plugin-daemonset", + Namespace: "kube-system", + }), + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "aws-efa-k8s-device-plugin", + }, + }, + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: test.ObjectMeta(metav1.ObjectMeta{ + Annotations: map[string]string{ + "scheduler.alpha.kubernetes.io/critical-pod": "", + }, + Labels: map[string]string{ + "name": "aws-efa-k8s-device-plugin", + }, + }), + Spec: v1.PodSpec{ + NodeSelector: map[string]string{ + "aws.amazon.com/efa": "true", + }, + Tolerations: []v1.Toleration{ + { + Key: "CriticalAddonsOnly", + Operator: v1.TolerationOpExists, + }, + { + Key: "aws.amazon.com/efa", + Operator: v1.TolerationOpExists, + Effect: v1.TaintEffectNoSchedule, + }, + }, + PriorityClassName: "system-node-critical", + HostNetwork: true, + Containers: []v1.Container{ + { + Name: "aws-efea-k8s-device-plugin", + Image: "602401143452.dkr.ecr.us-west-2.amazonaws.com/eks/aws-efa-k8s-device-plugin:v0.3.3", + SecurityContext: &v1.SecurityContext{ + AllowPrivilegeEscalation: lo.ToPtr(false), + Capabilities: &v1.Capabilities{ + Drop: []v1.Capability{"ALL"}, + }, + RunAsNonRoot: lo.ToPtr(false), + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "device-plugin", + MountPath: "/var/lib/kubelet/device-plugins", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "device-plugin", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/var/lib/kubelet/device-plugins", + }, + }, + }, + }, + }, + }, + }, + }) +}