Skip to content

Commit

Permalink
chore: Add an Subnet Controller to Asynchronously Hydrate Subnet Data (
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam authored May 1, 2024
1 parent 7b51ac1 commit 9836cbe
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 167 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
controller-gen.kubebuilder.io/version: v0.15.0
name: ec2nodeclasses.karpenter.k8s.aws
spec:
group: karpenter.k8s.aws
Expand Down
4 changes: 4 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const (
InstanceTypesAndZonesTTL = 5 * time.Minute
// InstanceProfileTTL is the time before we refresh checking instance profile existence at IAM
InstanceProfileTTL = 15 * time.Minute
// AvailableIPAddressTTL is time to drop AvailableIPAddress data if it is not updated within the TTL
AvailableIPAddressTTL = 2 * time.Minute
// AvailableIPAddressTTL is time to drop AssociatePublicIPAddressTTL data if it is not updated within the TTL
AssociatePublicIPAddressTTL = 2 * time.Minute
)

const (
Expand Down
17 changes: 5 additions & 12 deletions pkg/cloudprovider/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ import (
corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"

"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"github.com/aws/karpenter-provider-aws/pkg/providers/instance"
Expand Down Expand Up @@ -58,7 +55,7 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *corev
if err != nil {
return "", fmt.Errorf("calculating securitygroup drift, %w", err)
}
subnetDrifted, err := c.isSubnetDrifted(ctx, instance, nodeClass)
subnetDrifted, err := c.isSubnetDrifted(instance, nodeClass)
if err != nil {
return "", fmt.Errorf("calculating subnet drift, %w", err)
}
Expand Down Expand Up @@ -96,18 +93,14 @@ func (c *CloudProvider) isAMIDrifted(ctx context.Context, nodeClaim *corev1beta1

// Checks if the security groups are drifted, by comparing the subnet returned from the subnetProvider
// to the ec2 instance subnets
func (c *CloudProvider) isSubnetDrifted(ctx context.Context, instance *instance.Instance, nodeClass *v1beta1.EC2NodeClass) (cloudprovider.DriftReason, error) {
subnets, err := c.subnetProvider.List(ctx, nodeClass)
if err != nil {
return "", err
}
func (c *CloudProvider) isSubnetDrifted(instance *instance.Instance, nodeClass *v1beta1.EC2NodeClass) (cloudprovider.DriftReason, error) {
// subnets need to be found to check for drift
if len(subnets) == 0 {
if len(nodeClass.Status.Subnets) == 0 {
return "", fmt.Errorf("no subnets are discovered")
}

_, found := lo.Find(subnets, func(subnet *ec2.Subnet) bool {
return aws.StringValue(subnet.SubnetId) == instance.SubnetID
_, found := lo.Find(nodeClass.Status.Subnets, func(subnet v1beta1.Subnet) bool {
return subnet.ID == instance.SubnetID
})

if !found {
Expand Down
86 changes: 70 additions & 16 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/apis"
"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"

"sigs.k8s.io/controller-runtime/pkg/client"
corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"
corecloudproivder "sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
Expand Down Expand Up @@ -113,7 +115,41 @@ var _ = Describe("CloudProvider", func() {
var nodePool *corev1beta1.NodePool
var nodeClaim *corev1beta1.NodeClaim
var _ = BeforeEach(func() {
nodeClass = test.EC2NodeClass()
nodeClass = test.EC2NodeClass(
v1beta1.EC2NodeClass{
Status: v1beta1.EC2NodeClassStatus{
InstanceProfile: "test-profile",
SecurityGroups: []v1beta1.SecurityGroup{
{
ID: "sg-test1",
Name: "securityGroup-test1",
},
{
ID: "sg-test2",
Name: "securityGroup-test2",
},
{
ID: "sg-test3",
Name: "securityGroup-test3",
},
},
Subnets: []v1beta1.Subnet{
{
ID: "subnet-test1",
Zone: "test-zone-1a",
},
{
ID: "subnet-test2",
Zone: "test-zone-1b",
},
{
ID: "subnet-test3",
Zone: "test-zone-1c",
},
},
},
},
)
nodePool = coretest.NodePool(corev1beta1.NodePool{
Spec: corev1beta1.NodePoolSpec{
Template: corev1beta1.NodeClaimTemplate{
Expand All @@ -138,20 +174,8 @@ var _ = Describe("CloudProvider", func() {
},
},
})
nodeClass.Status.SecurityGroups = []v1beta1.SecurityGroup{
{
ID: "sg-test1",
Name: "securityGroup-test1",
},
{
ID: "sg-test2",
Name: "securityGroup-test2",
},
{
ID: "sg-test3",
Name: "securityGroup-test3",
},
}
_, err := awsEnv.SubnetProvider.List(ctx, nodeClass) // Hydrate the subnet cache
Expect(err).To(BeNil())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed())
})
Expand Down Expand Up @@ -600,6 +624,16 @@ var _ = Describe("CloudProvider", func() {
},
},
})
nodeClass.Status.Subnets = []v1beta1.Subnet{
{
ID: validSubnet1,
Zone: "zone-1",
},
{
ID: validSubnet2,
Zone: "zone-2",
},
}
nodeClass.Status.SecurityGroups = []v1beta1.SecurityGroup{
{
ID: validSecurityGroup,
Expand Down Expand Up @@ -680,7 +714,8 @@ var _ = Describe("CloudProvider", func() {
})
It("should return an error if subnets are empty", func() {
awsEnv.SubnetCache.Flush()
awsEnv.EC2API.DescribeSubnetsOutput.Set(&ec2.DescribeSubnetsOutput{Subnets: []*ec2.Subnet{}})
nodeClass.Status.Subnets = []v1beta1.Subnet{}
ExpectApplied(ctx, env.Client, nodeClass)
_, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).To(HaveOccurred())
})
Expand Down Expand Up @@ -797,6 +832,16 @@ var _ = Describe("CloudProvider", func() {
},
},
Status: v1beta1.EC2NodeClassStatus{
Subnets: []v1beta1.Subnet{
{
ID: validSubnet1,
Zone: "zone-1",
},
{
ID: validSubnet2,
Zone: "zone-2",
},
},
SecurityGroups: []v1beta1.SecurityGroup{
{
ID: validSecurityGroup,
Expand Down Expand Up @@ -964,28 +1009,34 @@ var _ = Describe("CloudProvider", func() {
Expect(foundNonGPULT).To(BeTrue())
})
It("should launch instances into subnet with the most available IP addresses", func() {
awsEnv.SubnetCache.Flush()
awsEnv.EC2API.DescribeSubnetsOutput.Set(&ec2.DescribeSubnetsOutput{Subnets: []*ec2.Subnet{
{SubnetId: aws.String("test-subnet-1"), AvailabilityZone: aws.String("test-zone-1a"), AvailableIpAddressCount: aws.Int64(10),
Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-1")}}},
{SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailableIpAddressCount: aws.Int64(100),
Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider)
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(nodeClass))
pod := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectScheduled(ctx, env.Client, pod)
createFleetInput := awsEnv.EC2API.CreateFleetBehavior.CalledWithInput.Pop()
Expect(fake.SubnetsFromFleetRequest(createFleetInput)).To(ConsistOf("test-subnet-2"))
})
It("should launch instances into subnet with the most available IP addresses in-between cache refreshes", func() {
awsEnv.SubnetCache.Flush()
awsEnv.EC2API.DescribeSubnetsOutput.Set(&ec2.DescribeSubnetsOutput{Subnets: []*ec2.Subnet{
{SubnetId: aws.String("test-subnet-1"), AvailabilityZone: aws.String("test-zone-1a"), AvailableIpAddressCount: aws.Int64(10),
Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-1")}}},
{SubnetId: aws.String("test-subnet-2"), AvailabilityZone: aws.String("test-zone-1a"), AvailableIpAddressCount: aws.Int64(11),
Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider)
nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{MaxPods: aws.Int32(1)}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(nodeClass))
pod1 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
pod2 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod1, pod2)
Expand Down Expand Up @@ -1020,6 +1071,8 @@ var _ = Describe("CloudProvider", func() {
}})
nodeClass.Spec.SubnetSelectorTerms = []v1beta1.SubnetSelectorTerm{{Tags: map[string]string{"Name": "test-subnet-1"}}}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
controller := status.NewController(env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(nodeClass))
podSubnet1 := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet1)
ExpectScheduled(ctx, env.Client, podSubnet1)
Expand Down Expand Up @@ -1059,6 +1112,7 @@ var _ = Describe("CloudProvider", func() {
},
})
ExpectApplied(ctx, env.Client, nodePool2, nodeClass2)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(nodeClass2))
podSubnet2 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{corev1beta1.NodePoolLabelKey: nodePool2.Name}})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet2)
ExpectScheduled(ctx, env.Client, podSubnet2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclass/status/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ func (s *Subnet) Reconcile(ctx context.Context, nodeClass *v1beta1.EC2NodeClass)
}
})

return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
return reconcile.Result{RequeueAfter: time.Minute}, nil
}
38 changes: 36 additions & 2 deletions pkg/controllers/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,24 @@ var _ = Describe("InstanceType", func() {
})

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{})
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{
Status: v1beta1.EC2NodeClassStatus{
Subnets: []v1beta1.Subnet{
{
ID: "subnet-test1",
Zone: "test-zone-1a",
},
{
ID: "subnet-test2",
Zone: "test-zone-1b",
},
{
ID: "subnet-test3",
Zone: "test-zone-1c",
},
},
},
})
Expect(err).To(BeNil())
for i := range instanceTypes {
Expect(instanceTypes[i].Name).To(Equal(lo.FromPtr(ec2InstanceTypes[i].InstanceType)))
Expand All @@ -106,7 +123,24 @@ var _ = Describe("InstanceType", func() {
})

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{})
instanceTypes, err := awsEnv.InstanceTypesProvider.List(ctx, &corev1beta1.KubeletConfiguration{}, &v1beta1.EC2NodeClass{
Status: v1beta1.EC2NodeClassStatus{
Subnets: []v1beta1.Subnet{
{
ID: "subnet-test1",
Zone: "test-zone-1a",
},
{
ID: "subnet-test2",
Zone: "test-zone-1b",
},
{
ID: "subnet-test3",
Zone: "test-zone-1c",
},
},
},
})
Expect(err).To(BeNil())

Expect(len(instanceTypes)).To(BeNumerically("==", len(ec2InstanceTypes)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
}

unavailableOfferingsCache := awscache.NewUnavailableOfferings()
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
instanceProfileProvider := instanceprofile.NewDefaultProvider(*sess.Config.Region, iam.New(sess), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval))
pricingProvider := pricing.NewDefaultProvider(
Expand Down
8 changes: 4 additions & 4 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (p *DefaultProvider) checkODFallback(nodeClaim *corev1beta1.NodeClaim, inst
}

func (p *DefaultProvider) getLaunchTemplateConfigs(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim,
instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*ec2.Subnet, capacityType string, tags map[string]string) ([]*ec2.FleetLaunchTemplateConfigRequest, error) {
instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, capacityType string, tags map[string]string) ([]*ec2.FleetLaunchTemplateConfigRequest, error) {
var launchTemplateConfigs []*ec2.FleetLaunchTemplateConfigRequest
launchTemplates, err := p.launchTemplateProvider.EnsureAll(ctx, nodeClass, nodeClaim, instanceTypes, capacityType, tags)
if err != nil {
Expand All @@ -311,7 +311,7 @@ func (p *DefaultProvider) getLaunchTemplateConfigs(ctx context.Context, nodeClas

// getOverrides creates and returns launch template overrides for the cross product of InstanceTypes and subnets (with subnets being constrained by
// zones and the offerings in InstanceTypes)
func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*ec2.Subnet, zones *scheduling.Requirement, capacityType string, image string) []*ec2.FleetLaunchTemplateOverridesRequest {
func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceType, zonalSubnets map[string]*subnet.Subnet, zones *scheduling.Requirement, capacityType string, image string) []*ec2.FleetLaunchTemplateOverridesRequest {
// Unwrap all the offerings to a flat slice that includes a pointer
// to the parent instance type name
type offeringWithParentName struct {
Expand Down Expand Up @@ -343,11 +343,11 @@ func (p *DefaultProvider) getOverrides(instanceTypes []*cloudprovider.InstanceTy
}
overrides = append(overrides, &ec2.FleetLaunchTemplateOverridesRequest{
InstanceType: aws.String(offering.parentInstanceTypeName),
SubnetId: subnet.SubnetId,
SubnetId: lo.ToPtr(subnet.ID),
ImageId: aws.String(image),
// This is technically redundant, but is useful if we have to parse insufficient capacity errors from
// CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet
AvailabilityZone: subnet.AvailabilityZone,
AvailabilityZone: lo.ToPtr(subnet.Zone),
})
}
return overrides
Expand Down
46 changes: 34 additions & 12 deletions pkg/providers/instance/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,38 @@ var _ = Describe("InstanceProvider", func() {
var nodePool *corev1beta1.NodePool
var nodeClaim *corev1beta1.NodeClaim
BeforeEach(func() {
nodeClass = test.EC2NodeClass()
nodeClass = test.EC2NodeClass(
v1beta1.EC2NodeClass{
Status: v1beta1.EC2NodeClassStatus{
InstanceProfile: "test-profile",
SecurityGroups: []v1beta1.SecurityGroup{
{
ID: "sg-test1",
},
{
ID: "sg-test2",
},
{
ID: "sg-test3",
},
},
Subnets: []v1beta1.Subnet{
{
ID: "subnet-test1",
Zone: "test-zone-1a",
},
{
ID: "subnet-test2",
Zone: "test-zone-1b",
},
{
ID: "subnet-test3",
Zone: "test-zone-1c",
},
},
},
},
)
nodePool = coretest.NodePool(corev1beta1.NodePool{
Spec: corev1beta1.NodePoolSpec{
Template: corev1beta1.NodeClaimTemplate{
Expand All @@ -106,17 +137,8 @@ var _ = Describe("InstanceProvider", func() {
},
},
})
nodeClass.Status.SecurityGroups = []v1beta1.SecurityGroup{
{
ID: "sg-test1",
},
{
ID: "sg-test2",
},
{
ID: "sg-test3",
},
}
_, err := awsEnv.SubnetProvider.List(ctx, nodeClass) // Hydrate the subnet cache
Expect(err).To(BeNil())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypes(ctx)).To(Succeed())
Expect(awsEnv.InstanceTypesProvider.UpdateInstanceTypeOfferings(ctx)).To(Succeed())
})
Expand Down
Loading

0 comments on commit 9836cbe

Please sign in to comment.