Skip to content

Commit

Permalink
support EFA resource requests
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Nov 30, 2023
1 parent 504c71e commit 96960f7
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 16 deletions.
5 changes: 5 additions & 0 deletions hack/code/instancetype_testdata_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func getInstanceTypeInfo(info *ec2.InstanceTypeInfo) string {
fmt.Fprintf(src, "},\n")
}
fmt.Fprintf(src, "NetworkInfo: &ec2.NetworkInfo{\n")
if info.NetworkInfo.EfaInfo != nil {
fmt.Fprintf(src, "EfaInfo: &ec2.EfaInfo{\n")
fmt.Fprintf(src, "MaximumEfaInterfaces: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.EfaInfo.MaximumEfaInterfaces))
fmt.Fprintf(src, "},\n")
}
fmt.Fprintf(src, "MaximumNetworkInterfaces: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.MaximumNetworkInterfaces))
fmt.Fprintf(src, "Ipv4AddressesPerInterface: aws.Int64(%d),\n", lo.FromPtr(info.NetworkInfo.Ipv4AddressesPerInterface))
fmt.Fprintf(src, "EncryptionInTransitSupported: aws.Bool(%t),\n", lo.FromPtr(info.NetworkInfo.EncryptionInTransitSupported))
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1beta1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var (
ResourceHabanaGaudi v1.ResourceName = "habana.ai/gaudi"
ResourceAWSPodENI v1.ResourceName = "vpc.amazonaws.com/pod-eni"
ResourcePrivateIPv4Address v1.ResourceName = "vpc.amazonaws.com/PrivateIPv4Address"
ResourceEFA v1.ResourceName = "vpc.amazonaws.com/efa"

LabelNodeClass = Group + "/ec2nodeclass"

Expand Down
10 changes: 8 additions & 2 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,14 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType *
labels[key] = req.Values()[0]
}
}
nodeClaim.Status.Capacity = functional.FilterMap(instanceType.Capacity, func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) })
nodeClaim.Status.Allocatable = functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) })
resourceFilter := func(n v1.ResourceName, v resource.Quantity) bool {
if !i.EFAEnabled && n == v1beta1.ResourceEFA {
return false
}
return !resources.IsZero(v)
}
nodeClaim.Status.Capacity = functional.FilterMap(instanceType.Capacity, resourceFilter)
nodeClaim.Status.Allocatable = functional.FilterMap(instanceType.Allocatable(), resourceFilter)
}
labels[v1.LabelTopologyZone] = i.Zone
labels[corev1beta1.CapacityTypeLabelKey] = i.CapacityType
Expand Down
30 changes: 30 additions & 0 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -600,4 +601,33 @@ var _ = Describe("CloudProvider", func() {
ExpectScheduled(ctx, env.Client, pod)
})
})
Context("EFA", func() {
It("should include vpc.amazonaws.com/efa on a nodeclaim if it requests it", func() {
nodeClaim.Spec.Requirements = []v1.NodeSelectorRequirement{
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"dl1.24xlarge"},
},
}
nodeClaim.Spec.Resources.Requests = v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")}
ExpectApplied(ctx, env.Client, nodePool, nodeClass, nodeClaim)
cloudProviderNodeClaim, err := cloudProvider.Create(ctx, nodeClaim)
Expect(err).To(BeNil())
Expect(lo.Keys(cloudProviderNodeClaim.Status.Allocatable)).To(ContainElement(v1beta1.ResourceEFA))
})
It("shouldn't include vpc.amazonaws.com/efa on a nodeclaim if it doesn't request it", func() {
nodeClaim.Spec.Requirements = []v1.NodeSelectorRequirement{
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"dl1.24xlarge"},
},
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass, nodeClaim)
cloudProviderNodeClaim, err := cloudProvider.Create(ctx, nodeClaim)
Expect(err).To(BeNil())
Expect(lo.Keys(cloudProviderNodeClaim.Status.Allocatable)).ToNot(ContainElement(v1beta1.ResourceEFA))
})
})
})
9 changes: 9 additions & 0 deletions pkg/fake/zz_generated.describe_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{
TotalSizeInGB: aws.Int64(4000),
},
NetworkInfo: &ec2.NetworkInfo{
EfaInfo: &ec2.EfaInfo{
MaximumEfaInterfaces: aws.Int64(4),
},
MaximumNetworkInterfaces: aws.Int64(60),
Ipv4AddressesPerInterface: aws.Int64(50),
EncryptionInTransitSupported: aws.Bool(true),
Expand Down Expand Up @@ -147,6 +150,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{
TotalSizeInGB: aws.Int64(900),
},
NetworkInfo: &ec2.NetworkInfo{
EfaInfo: &ec2.EfaInfo{
MaximumEfaInterfaces: aws.Int64(1),
},
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(15),
EncryptionInTransitSupported: aws.Bool(true),
Expand Down Expand Up @@ -348,6 +354,9 @@ var defaultDescribeInstanceTypesOutput = &ec2.DescribeInstanceTypesOutput{
TotalSizeInGB: aws.Int64(7600),
},
NetworkInfo: &ec2.NetworkInfo{
EfaInfo: &ec2.EfaInfo{
MaximumEfaInterfaces: aws.Int64(2),
},
MaximumNetworkInterfaces: aws.Int64(14),
Ipv4AddressesPerInterface: aws.Int64(50),
EncryptionInTransitSupported: aws.Bool(true),
Expand Down
21 changes: 17 additions & 4 deletions pkg/providers/amifamily/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type LaunchTemplate struct {
AMIID string
InstanceTypes []*cloudprovider.InstanceType `hash:"ignore"`
DetailedMonitoring bool
EFACount int
}

// AMIFamily can be implemented to override the default logic for generating dynamic launch template parameters
Expand Down Expand Up @@ -130,21 +131,32 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass,
}
var resolvedTemplates []*LaunchTemplate
for amiID, instanceTypes := range mappedAMIs {
maxPodsToInstanceTypes := lo.GroupBy(instanceTypes, func(instanceType *cloudprovider.InstanceType) int {
return int(instanceType.Capacity.Pods().Value())
type launchTemplateParams struct {
efaCount int
maxPods int
}
paramsToInstanceTypes := lo.GroupBy(instanceTypes, func(instanceType *cloudprovider.InstanceType) launchTemplateParams {
return launchTemplateParams{
efaCount: lo.Ternary(
lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA),
int(lo.ToPtr(instanceType.Capacity[v1beta1.ResourceEFA]).Value()),
0,
),
maxPods: int(instanceType.Capacity.Pods().Value()),
}
})
// In order to support reserved ENIs for CNI custom networking setups,
// we need to pass down the max-pods calculation to the kubelet.
// This requires that we resolve a unique launch template per max-pods value.
for maxPods, instanceTypes := range maxPodsToInstanceTypes {
for params, instanceTypes := range paramsToInstanceTypes {
kubeletConfig := &corev1beta1.KubeletConfiguration{}
if nodeClaim.Spec.Kubelet != nil {
if err := mergo.Merge(kubeletConfig, nodeClaim.Spec.Kubelet); err != nil {
return nil, err
}
}
if kubeletConfig.MaxPods == nil {
kubeletConfig.MaxPods = lo.ToPtr(int32(maxPods))
kubeletConfig.MaxPods = lo.ToPtr(int32(params.maxPods))
}
resolved := &LaunchTemplate{
Options: options,
Expand All @@ -161,6 +173,7 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass,
DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring),
AMIID: amiID,
InstanceTypes: instanceTypes,
EFACount: params.efaCount,
}
if len(resolved.BlockDeviceMappings) == 0 {
resolved.BlockDeviceMappings = amiFamily.DefaultBlockDeviceMappings()
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (p *Provider) Create(ctx context.Context, nodeClass *v1beta1.EC2NodeClass,
if err != nil {
return nil, err
}
return NewInstanceFromFleet(fleetInstance, tags), nil
efaEnabled := lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA)
return NewInstanceFromFleet(fleetInstance, tags, efaEnabled), nil
}

func (p *Provider) Link(ctx context.Context, id, provisionerName string) error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/providers/instance/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Instance struct {
SecurityGroupIDs []string
SubnetID string
Tags map[string]string
EFAEnabled bool
}

func NewInstance(out *ec2.Instance) *Instance {
Expand All @@ -53,11 +54,14 @@ func NewInstance(out *ec2.Instance) *Instance {
}),
SubnetID: aws.StringValue(out.SubnetId),
Tags: lo.SliceToMap(out.Tags, func(t *ec2.Tag) (string, string) { return aws.StringValue(t.Key), aws.StringValue(t.Value) }),
EFAEnabled: lo.ContainsBy(out.NetworkInterfaces, func(ni *ec2.InstanceNetworkInterface) bool {
return ni != nil && lo.FromPtr(ni.InterfaceType) == "efa"
}),
}

}

func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string) *Instance {
func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool) *Instance {
return &Instance{
LaunchTime: time.Now(), // estimate the launch time since we just launched
State: ec2.StatePending,
Expand All @@ -68,5 +72,6 @@ func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string)
CapacityType: aws.StringValue(out.Lifecycle),
SubnetID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.SubnetId),
Tags: tags,
EFAEnabled: efaEnabled,
}
}
32 changes: 32 additions & 0 deletions pkg/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,38 @@ var _ = Describe("InstanceTypes", func() {
}
Expect(nodeNames.Len()).To(Equal(1))
})
It("should launch instances for vpc.amazonaws.com/efa resource requests", func() {
nodePool.Spec.Template.Spec.Requirements = []v1.NodeSelectorRequirement{
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{"dl1.24xlarge"},
},
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
pods := []*v1.Pod{
coretest.UnschedulablePod(coretest.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")},
Limits: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("1")},
},
}),
coretest.UnschedulablePod(coretest.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("2")},
Limits: v1.ResourceList{v1beta1.ResourceEFA: resource.MustParse("2")},
},
}),
}
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)
nodes := sets.NewString()
for _, pod := range pods {
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "dl1.24xlarge"))
nodes.Insert(node.Name)
}
Expect(nodes.Len()).To(Equal(1))
})
It("should not set pods to 110 if using ENI-based pod density", func() {
instanceInfo, err := awsEnv.InstanceTypesProvider.GetInstanceTypes(ctx)
Expect(err).To(BeNil())
Expand Down
9 changes: 9 additions & 0 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func computeCapacity(ctx context.Context, info *ec2.InstanceTypeInfo, amiFamily
v1beta1.ResourceAMDGPU: *amdGPUs(info),
v1beta1.ResourceAWSNeuron: *awsNeurons(info),
v1beta1.ResourceHabanaGaudi: *habanaGaudis(info),
v1beta1.ResourceEFA: *efas(info),
}
return resourceList
}
Expand Down Expand Up @@ -296,6 +297,14 @@ func habanaGaudis(info *ec2.InstanceTypeInfo) *resource.Quantity {
return resources.Quantity(fmt.Sprint(count))
}

func efas(info *ec2.InstanceTypeInfo) *resource.Quantity {
count := int64(0)
if info.NetworkInfo != nil && info.NetworkInfo.EfaInfo != nil {
count = lo.FromPtr(info.NetworkInfo.EfaInfo.MaximumEfaInterfaces)
}
return resources.Quantity(fmt.Sprint(count))
}

func ENILimitedPods(ctx context.Context, info *ec2.InstanceTypeInfo) *resource.Quantity {
// The number of pods per node is calculated using the formula:
// max number of ENIs * (IPv4 Addresses per ENI -1) + 2
Expand Down
24 changes: 16 additions & 8 deletions pkg/providers/launchtemplate/launchtemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string
launchTemplateDataTags = append(launchTemplateDataTags, &ec2.LaunchTemplateTagSpecificationRequest{ResourceType: aws.String(ec2.ResourceTypeSpotInstancesRequest), Tags: utils.MergeTags(options.Tags)})
}
networkInterface := p.generateNetworkInterface(options)
logging.FromContext(ctx).Debugf("Network Interfaces: %d", len(networkInterface))
output, err := p.ec2api.CreateLaunchTemplateWithContext(ctx, &ec2.CreateLaunchTemplateInput{
LaunchTemplateName: aws.String(launchTemplateName(options)),
LaunchTemplateData: &ec2.RequestLaunchTemplateData{
Expand Down Expand Up @@ -286,16 +287,23 @@ func (p *Provider) createLaunchTemplate(ctx context.Context, capacityType string
// This is done to help comply with AWS account policies that require explicitly setting that field to 'false'.
// https://github.com/aws/karpenter/issues/3815
func (p *Provider) generateNetworkInterface(options *amifamily.LaunchTemplate) []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest {
if options.AssociatePublicIPAddress != nil {
return []*ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{
{
AssociatePublicIpAddress: options.AssociatePublicIPAddress,
DeviceIndex: aws.Int64(0),
Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }),
},
interfaces := lo.Times(options.EFACount, func(i int) *ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest {
return &ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{
AssociatePublicIpAddress: options.AssociatePublicIPAddress,
NetworkCardIndex: lo.ToPtr(int64(i)),
DeviceIndex: lo.ToPtr(lo.Ternary[int64](i == 0, 0, 1)),
InterfaceType: lo.ToPtr("efa"),
Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }),
}
})
if len(interfaces) == 0 && options.AssociatePublicIPAddress != nil {
interfaces = append(interfaces, &ec2.LaunchTemplateInstanceNetworkInterfaceSpecificationRequest{
AssociatePublicIpAddress: options.AssociatePublicIPAddress,
DeviceIndex: aws.Int64(0),
Groups: lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) }),
})
}
return nil
return interfaces
}

func (p *Provider) blockDeviceMappings(blockDeviceMappings []*v1beta1.BlockDeviceMapping) []*ec2.LaunchTemplateBlockDeviceMappingRequest {
Expand Down
Loading

0 comments on commit 96960f7

Please sign in to comment.