Skip to content

Commit

Permalink
Create Subnet controller
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Apr 19, 2024
1 parent 881ad7c commit 4ba17dd
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
InstanceTypesAndZonesTTL = 5 * time.Minute
// InstanceProfileTTL is the time before we refresh checking instance profile existence at IAM
InstanceProfileTTL = 15 * time.Minute
// InstanceTypesAndZonesTTL is the time before we remove subnets that have been refreshed
SubnetTTL = 10 * time.Minute
)

const (
Expand Down
18 changes: 18 additions & 0 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ var _ = Describe("CloudProvider", func() {
},
},
})
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
})
It("should return an ICE error when there are no instance types to launch", func() {
// Specify no instance types and expect to receive a capacity error
Expand Down Expand Up @@ -658,6 +659,7 @@ var _ = Describe("CloudProvider", func() {
Expect(err).To(HaveOccurred())
})
It("should not return drifted if the NodeClaim is valid", func() {
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).ToNot(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand Down Expand Up @@ -696,11 +698,13 @@ var _ = Describe("CloudProvider", func() {
},
},
})
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).ToNot(HaveOccurred())
Expect(isDrifted).To(Equal(cloudprovider.SecurityGroupDrift))
})
It("should not return drifted if the security groups match", func() {
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).ToNot(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand Down Expand Up @@ -776,6 +780,7 @@ var _ = Describe("CloudProvider", func() {
DescribeTable("should return drifted if a statically drifted EC2NodeClass.Spec field is updated",
func(changes v1beta1.EC2NodeClass) {
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand Down Expand Up @@ -816,13 +821,15 @@ var _ = Describe("CloudProvider", func() {
nodeClass.Annotations = lo.Assign(nodeClass.Annotations, map[string]string{v1beta1.AnnotationEC2NodeClassHash: nodeClass.Hash()})

ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(Equal(cloudprovider.NodeClassDrift))
})
DescribeTable("should not return drifted if dynamic fields are updated",
func(changes v1beta1.EC2NodeClass) {
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand All @@ -831,6 +838,7 @@ var _ = Describe("CloudProvider", func() {
nodeClass.Annotations = lo.Assign(nodeClass.Annotations, map[string]string{v1beta1.AnnotationEC2NodeClassHash: nodeClass.Hash()})

ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err = cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand All @@ -847,6 +855,7 @@ var _ = Describe("CloudProvider", func() {
"Test Key": "Test Value",
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand All @@ -861,6 +870,7 @@ var _ = Describe("CloudProvider", func() {
v1beta1.AnnotationEC2NodeClassHashVersion: "test-hash-version-2",
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand All @@ -878,6 +888,7 @@ var _ = Describe("CloudProvider", func() {
"Test Key": "Test Value",
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand All @@ -895,6 +906,7 @@ var _ = Describe("CloudProvider", func() {
"Test Key": "Test Value",
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
isDrifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).NotTo(HaveOccurred())
Expect(isDrifted).To(BeEmpty())
Expand Down Expand Up @@ -937,6 +949,7 @@ var _ = Describe("CloudProvider", func() {
Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-2")}}},
}})
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
pod := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectScheduled(ctx, env.Client, pod)
Expand All @@ -952,6 +965,7 @@ var _ = Describe("CloudProvider", func() {
}})
nodePool.Spec.Template.Spec.Kubelet = &corev1beta1.KubeletConfiguration{MaxPods: aws.Int32(1)}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
pod1 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
pod2 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod1, pod2)
Expand All @@ -971,6 +985,7 @@ var _ = Describe("CloudProvider", func() {
{SubnetId: aws.String("test-subnet-1"), AvailabilityZone: aws.String("test-zone-1a"), AvailableIpAddressCount: aws.Int64(10),
Tags: []*ec2.Tag{{Key: aws.String("Name"), Value: aws.String("test-subnet-1")}}},
}})
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
pod1 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}})
ExpectApplied(ctx, env.Client, nodePool, nodeClass, pod1)
awsEnv.EC2API.CreateFleetBehavior.Error.Set(fmt.Errorf("CreateFleet synthetic error"))
Expand All @@ -986,6 +1001,7 @@ var _ = Describe("CloudProvider", func() {
}})
nodeClass.Spec.SubnetSelectorTerms = []v1beta1.SubnetSelectorTerm{{Tags: map[string]string{"Name": "test-subnet-1"}}}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
podSubnet1 := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet1)
ExpectScheduled(ctx, env.Client, podSubnet1)
Expand Down Expand Up @@ -1018,6 +1034,7 @@ var _ = Describe("CloudProvider", func() {
},
})
ExpectApplied(ctx, env.Client, nodePool2, nodeClass2)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass2)).To(BeNil())
podSubnet2 := coretest.UnschedulablePod(coretest.PodOptions{NodeSelector: map[string]string{corev1beta1.NodePoolLabelKey: nodePool2.Name}})
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, podSubnet2)
ExpectScheduled(ctx, env.Client, podSubnet2)
Expand Down Expand Up @@ -1059,6 +1076,7 @@ var _ = Describe("CloudProvider", func() {
},
})
ExpectApplied(ctx, env.Client, nodePool, nodePool2, nodeClass, misconfiguredNodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
pod := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectScheduled(ctx, env.Client, pod)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
nodeclassstatus "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/status"
nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination"
controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
controllerssubnets "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/subnet"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"

"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock,
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
nodeclaimtagging.NewController(kubeClient, instanceProvider),
controllerspricing.NewController(pricingProvider),
controllerssubnets.NewController(kubeClient, subnetProvider),
}
if options.FromContext(ctx).InterruptionQueue != "" {
sqsapi := servicesqs.New(sess)
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/nodeclass/status/subnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
})
It("Should update EC2NodeClass status for Subnets", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -81,6 +82,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
{SubnetId: aws.String("subnet-test3"), AvailabilityZone: aws.String("test-zone-1c"), AvailableIpAddressCount: aws.Int64(50)},
}})
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -108,6 +110,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -128,6 +131,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -139,6 +143,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
})
It("Should update Subnet status when the Subnet selector gets updated by tags", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -173,6 +178,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -188,6 +194,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
})
It("Should update Subnet status when the Subnet selector gets updated by ids", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -215,6 +222,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -231,12 +239,14 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileFailed(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(BeNil())
})
It("Should not resolve a invalid selectors for an updated subnet selector", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -264,6 +274,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileFailed(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(BeNil())
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/nodeclass/status/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var _ = BeforeEach(func() {
ctx = coreoptions.ToContext(ctx, coretest.Options())
nodeClass = test.EC2NodeClass()
awsEnv.Reset()
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
})

var _ = AfterEach(func() {
Expand Down
60 changes: 60 additions & 0 deletions pkg/controllers/providers/subnet/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subnet

import (
"context"
"time"

controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
corecontroller "sigs.k8s.io/karpenter/pkg/operator/controller"

"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"
)

var _ corecontroller.TypedController[*v1beta1.EC2NodeClass] = (*Controller)(nil)

type Controller struct {
subnetProvider subnet.Provider
}

func NewController(kubeClient client.Client, subnetProvider subnet.Provider) corecontroller.Controller {
return corecontroller.Typed[*v1beta1.EC2NodeClass](kubeClient, &Controller{
subnetProvider: subnetProvider,
})
}

func (c *Controller) Reconcile(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (reconcile.Result, error) {
return reconcile.Result{RequeueAfter: 5 * time.Minute}, c.subnetProvider.UpdateSubnets(ctx, nodeClass)
}

func (c *Controller) Name() string {
return "subnet"
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder {
return corecontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
For(&v1beta1.EC2NodeClass{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 10,
}))
}
Loading

0 comments on commit 4ba17dd

Please sign in to comment.