Skip to content

Commit

Permalink
Create Subnet controller
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Apr 18, 2024
1 parent 30baa2a commit bfa4173
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
InstanceTypesAndZonesTTL = 5 * time.Minute
// InstanceProfileTTL is the time before we refresh checking instance profile existence at IAM
InstanceProfileTTL = 15 * time.Minute
// InstanceTypesAndZonesTTL is the time before we remove subnets that have been refreshed
SubnetTTL = 10 * time.Minute
)

const (
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/nodeclass/status/subnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
})
It("Should update EC2NodeClass status for Subnets", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -81,6 +82,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
{SubnetId: aws.String("subnet-test3"), AvailabilityZone: aws.String("test-zone-1c"), AvailableIpAddressCount: aws.Int64(50)},
}})
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -108,6 +110,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -128,6 +131,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -139,6 +143,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
})
It("Should update Subnet status when the Subnet selector gets updated by tags", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -173,6 +178,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -188,6 +194,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
})
It("Should update Subnet status when the Subnet selector gets updated by ids", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -215,6 +222,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand All @@ -231,12 +239,14 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileFailed(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(BeNil())
})
It("Should not resolve a invalid selectors for an updated subnet selector", func() {
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileSucceeded(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(Equal([]v1beta1.Subnet{
Expand Down Expand Up @@ -264,6 +274,7 @@ var _ = Describe("NodeClass Subnet Status Controller", func() {
},
}
ExpectApplied(ctx, env.Client, nodeClass)
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
ExpectReconcileFailed(ctx, statusController, client.ObjectKeyFromObject(nodeClass))
nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(nodeClass.Status.Subnets).To(BeNil())
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/nodeclass/status/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var _ = BeforeEach(func() {
ctx = coreoptions.ToContext(ctx, coretest.Options())
nodeClass = test.EC2NodeClass()
awsEnv.Reset()
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
})

var _ = AfterEach(func() {
Expand Down
59 changes: 59 additions & 0 deletions pkg/controllers/providers/subnet/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subnet

import (
"context"
"time"

controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
corecontroller "sigs.k8s.io/karpenter/pkg/operator/controller"

"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/aws/karpenter-provider-aws/pkg/providers/subnet"
)

var _ corecontroller.TypedController[*v1beta1.EC2NodeClass] = (*Controller)(nil)

type Controller struct {
subnetProvider subnet.Provider
}

func NewController(subnetProvider subnet.Provider) *Controller {
return &Controller{
subnetProvider: subnetProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) (reconcile.Result, error) {
return reconcile.Result{RequeueAfter: 5 * time.Minute}, c.subnetProvider.UpdateSubnets(ctx, nodeClass)
}

func (c *Controller) Name() string {
return "subnet"
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder {
return corecontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
For(&v1beta1.EC2NodeClass{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 10,
}))
}
59 changes: 59 additions & 0 deletions pkg/controllers/providers/subnet/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subnet_test

import (
"context"
"testing"

coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/operator/scheme"
coretest "sigs.k8s.io/karpenter/pkg/test"

"github.com/aws/karpenter-provider-aws/pkg/apis"
controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/test"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "knative.dev/pkg/logging/testing"
)

var ctx context.Context
var stop context.CancelFunc
var env *coretest.Environment
var awsEnv *test.Environment
var controller *controllerspricing.Controller

func TestAWS(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
RunSpecs(t, "Subnet")
}

var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...))
ctx = coreoptions.ToContext(ctx, coretest.Options())
ctx = options.ToContext(ctx, test.Options())
ctx, stop = context.WithCancel(ctx)
awsEnv = test.NewEnvironment(ctx, env)
controller = controllerspricing.NewController(awsEnv.PricingProvider)
})

var _ = AfterSuite(func() {
stop()
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
}

unavailableOfferingsCache := awscache.NewUnavailableOfferings()
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.SubnetTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
instanceProfileProvider := instanceprofile.NewDefaultProvider(*sess.Config.Region, iam.New(sess), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval))
pricingProvider := pricing.NewDefaultProvider(
Expand Down
1 change: 1 addition & 0 deletions pkg/providers/instance/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ var _ = Describe("InstanceProvider", func() {
{CapacityType: corev1beta1.CapacityTypeSpot, InstanceType: "m5.xlarge", Zone: "test-zone-1a"},
{CapacityType: corev1beta1.CapacityTypeSpot, InstanceType: "m5.xlarge", Zone: "test-zone-1b"},
})
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
instanceTypes, err := cloudProvider.GetInstanceTypes(ctx, nodePool)
Expect(err).ToNot(HaveOccurred())

Expand Down
2 changes: 2 additions & 0 deletions pkg/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ var _ = Describe("InstanceTypeProvider", func() {
},
},
})
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, windowsNodeClass)).To(BeNil())
})

It("should support individual instance type labels", func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/providers/launchtemplate/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var _ = Describe("LaunchTemplate Provider", func() {
},
},
})
Expect(awsEnv.SubnetProvider.UpdateSubnets(ctx, nodeClass)).To(BeNil())
})
It("should create unique launch templates for multiple identical nodeClasses", func() {
nodeClass2 := test.EC2NodeClass()
Expand Down
39 changes: 33 additions & 6 deletions pkg/providers/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Provider interface {
CheckAnyPublicIPAssociations(context.Context, *v1beta1.EC2NodeClass) (bool, error)
ZonalSubnetsForLaunch(context.Context, *v1beta1.EC2NodeClass, []*cloudprovider.InstanceType, string) (map[string]*ec2.Subnet, error)
UpdateInflightIPs(*ec2.CreateFleetInput, *ec2.CreateFleetOutput, []*cloudprovider.InstanceType, []*ec2.Subnet, string)
UpdateSubnets(context.Context, *v1beta1.EC2NodeClass) error
}

type DefaultProvider struct {
Expand All @@ -64,41 +65,59 @@ func NewDefaultProvider(ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProv
}

func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) ([]*ec2.Subnet, error) {
p.Lock()
defer p.Unlock()
p.RLock()
defer p.RUnlock()

filterSets := getFilterSets(nodeClass.Spec.SubnetSelectorTerms)
if len(filterSets) == 0 {
return []*ec2.Subnet{}, nil
}
hash, err := hashstructure.Hash(filterSets, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
hash, err := subnetHash(filterSets)
if err != nil {
return nil, err
return []*ec2.Subnet{}, err
}
if subnets, ok := p.cache.Get(fmt.Sprint(hash)); ok {
return subnets.([]*ec2.Subnet), nil
}
return []*ec2.Subnet{}, nil
}

func (p *DefaultProvider) UpdateSubnets(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) error {
p.Lock()
defer p.Unlock()

filterSets := getFilterSets(nodeClass.Spec.SubnetSelectorTerms)
if len(filterSets) == 0 {
return nil
}
hash, err := subnetHash(filterSets)
if err != nil {
return err
}

// Ensure that all the subnets that are returned here are unique
subnets := map[string]*ec2.Subnet{}
for _, filters := range filterSets {
output, err := p.ec2api.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{Filters: filters})
if err != nil {
return nil, fmt.Errorf("describing subnets %s, %w", pretty.Concise(filters), err)
return fmt.Errorf("describing subnets %s, %w", pretty.Concise(filters), err)
}
for i := range output.Subnets {
subnets[lo.FromPtr(output.Subnets[i].SubnetId)] = output.Subnets[i]
delete(p.inflightIPs, lo.FromPtr(output.Subnets[i].SubnetId)) // remove any previously tracked IP addresses since we just refreshed from EC2
}
}
p.cache.SetDefault(fmt.Sprint(hash), lo.Values(subnets))
p.cache.DeleteExpired() // delete expired after we have successfully updated the cache
if p.cm.HasChanged(fmt.Sprintf("subnets/%s", nodeClass.Name), subnets) {
logging.FromContext(ctx).
With("subnets", lo.Map(lo.Values(subnets), func(s *ec2.Subnet, _ int) string {
return fmt.Sprintf("%s (%s)", aws.StringValue(s.SubnetId), aws.StringValue(s.AvailabilityZone))
})).
Debugf("discovered subnets")
}
return lo.Values(subnets), nil

return nil
}

// CheckAnyPublicIPAssociations returns a bool indicating whether all referenced subnets assign public IPv4 addresses to EC2 instances created therein
Expand Down Expand Up @@ -266,3 +285,11 @@ func getFilterSets(terms []v1beta1.SubnetSelectorTerm) (res [][]*ec2.Filter) {
}
return res
}

func subnetHash(filterSets [][]*ec2.Filter) (string, error) {
hash, err := hashstructure.Hash(filterSets, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
if err != nil {
return "", err
}
return fmt.Sprintf("%d", hash), nil
}
Loading

0 comments on commit bfa4173

Please sign in to comment.