From 0f3ab47b962efd11f09cacd8820f80fcc36c2916 Mon Sep 17 00:00:00 2001 From: Thorben von Hacht Date: Mon, 6 May 2024 00:42:24 -0700 Subject: [PATCH] Add initial implementation to add capacityReservationSelectorTerms and update status when found --- cmd/controller/main.go | 1 + .../karpenter.k8s.aws_ec2nodeclasses.yaml | 106 ++++++ pkg/apis/v1beta1/ec2nodeclass.go | 27 ++ pkg/apis/v1beta1/ec2nodeclass_status.go | 51 +++ pkg/apis/v1beta1/zz_generated.deepcopy.go | 56 +++ pkg/cloudprovider/cloudprovider.go | 3 + pkg/controllers/controllers.go | 5 +- .../nodeclass/status/controller.go | 26 +- .../nodeclass/status/suite_test.go | 1 + pkg/errors/errors.go | 1 + pkg/operator/operator.go | 58 +-- pkg/providers/amifamily/resolver.go | 77 +++- .../capacityreservation.go | 206 +++++++++++ .../capacityreservation/suite_test.go | 345 ++++++++++++++++++ pkg/providers/instance/instance.go | 50 ++- pkg/providers/instance/types.go | 5 +- pkg/providers/instancetype/instancetype.go | 73 +++- .../launchtemplate/launchtemplate.go | 47 ++- pkg/test/environment.go | 47 ++- .../scripts/step04-controller-iam.sh | 11 + 20 files changed, 1092 insertions(+), 104 deletions(-) create mode 100644 pkg/providers/capacityreservation/capacityreservation.go create mode 100644 pkg/providers/capacityreservation/suite_test.go diff --git a/cmd/controller/main.go b/cmd/controller/main.go index fc09b917beef..af2cd28e5d7b 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -62,6 +62,7 @@ func main() { cloudProvider, op.SubnetProvider, op.SecurityGroupProvider, + op.CapacityReservationProvider, op.InstanceProfileProvider, op.InstanceProvider, op.PricingProvider, diff --git a/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml b/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml index 66ec659e7747..1f3ec11201f4 100644 --- a/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml +++ b/pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml @@ -216,6 +216,43 @@ spec: - message: must have only one blockDeviceMappings with rootVolume rule: self.filter(x, has(x.rootVolume)?x.rootVolume==true:false).size() <= 1 + capacityReservationSelectorTerms: + description: CapacityReservationSelectorTerms is a list of or Capacity + Reservation selector terms. The terms are ORed. + items: + description: |- + CapacityReservationSelectorTerm defines selection logic for a Capacity Reservation used by Karpenter to launch nodes. + If multiple fields are used for selection, the requirements are ANDed. + properties: + availabilityZone: + description: The Availability Zone of the Capacity Reservation + type: string + id: + description: The platform of operating system for which the + Capacity Reservation reserves capacity + type: string + instanceType: + description: The type of operating system for which the Capacity + Reservation reserves capacity + type: string + ownerId: + description: The ID of the Amazon Web Services account that + owns the Capacity Reservation + type: string + tags: + additionalProperties: + type: string + description: |- + Tags is a map of key/value tags used to select subnets + Specifying '*' for a value selects all values for a given tag key. + maxProperties: 20 + type: object + x-kubernetes-validations: + - message: empty tag keys or values aren't supported + rule: self.all(k, k != '' && self[k] != '') + type: object + maxItems: 30 + type: array context: description: |- Context is a Reserved field in EC2 APIs @@ -516,6 +553,75 @@ spec: - requirements type: object type: array + capacityReservations: + description: |- + CapacityReservations contains the current Capacity Reservations values that are available to the + cluster under the CapacityReservations selectors. + items: + description: CapacityReservation contains resolved Capacity Reservation + selector values utilized for node launch + properties: + availabilityZone: + description: AvailabilityZone of the Capacity Reservation + type: string + availableInstanceCount: + description: Available Instance Count of the Capacity Reservation + type: integer + endDate: + description: |- + The date and time at which the Capacity Reservation expires. When a Capacity + Reservation expires, the reserved capacity is released and you can no longer + launch instances into it. The Capacity Reservation's state changes to expired + when it reaches its end date and time. + type: string + endDateType: + description: |- + Indicates the way in which the Capacity Reservation ends. A Capacity Reservation + can have one of the following end types: + + + * unlimited - The Capacity Reservation remains active until you explicitly + cancel it. + + + * limited - The Capacity Reservation expires automatically at a specified + date and time. + type: string + id: + description: ID of the Capacity Reservation + type: string + instanceMatchCriteria: + description: Instance Match Criteria of the Capacity Reservation + type: string + instancePlatform: + description: Instance Platform of the Capacity Reservation + type: string + instanceType: + description: Instance Type of the Capacity Reservation + type: string + ownerId: + description: Owner Id of the Capacity Reservation + type: string + startDate: + description: The date and time at which the Capacity Reservation + was started. + type: string + totalInstanceCount: + description: Total Instance Count of the Capacity Reservation + type: integer + required: + - availabilityZone + - availableInstanceCount + - endDateType + - id + - instanceMatchCriteria + - instancePlatform + - instanceType + - ownerId + - startDate + - totalInstanceCount + type: object + type: array instanceProfile: description: InstanceProfile contains the resolved instance profile for the role diff --git a/pkg/apis/v1beta1/ec2nodeclass.go b/pkg/apis/v1beta1/ec2nodeclass.go index 66a2926b2ced..b72607144844 100644 --- a/pkg/apis/v1beta1/ec2nodeclass.go +++ b/pkg/apis/v1beta1/ec2nodeclass.go @@ -55,6 +55,10 @@ type EC2NodeClassSpec struct { // +kubebuilder:validation:Enum:={AL2,AL2023,Bottlerocket,Ubuntu,Custom,Windows2019,Windows2022} // +required AMIFamily *string `json:"amiFamily"` + // CapacityReservationSelectorTerms is a list of or Capacity Reservation selector terms. The terms are ORed. + // +kubebuilder:validation:MaxItems:=30 + // +required + CapacityReservationSelectorTerms []CapacityReservationSelectorTerm `json:"capacityReservationSelectorTerms,omitempty" hash:"ignore"` // UserData to be applied to the provisioned nodes. // It must be in the appropriate format based on the AMIFamily in use. Karpenter will merge certain fields into // this UserData to ensure nodes are being provisioned with the correct configuration. @@ -175,6 +179,29 @@ type AMISelectorTerm struct { Owner string `json:"owner,omitempty"` } +// CapacityReservationSelectorTerm defines selection logic for a Capacity Reservation used by Karpenter to launch nodes. +// If multiple fields are used for selection, the requirements are ANDed. +type CapacityReservationSelectorTerm struct { + // The Availability Zone of the Capacity Reservation + // +optional + AvailabilityZone string `json:"availabilityZone,omitempty"` + // The platform of operating system for which the Capacity Reservation reserves capacity + // +optional + ID string `json:"id,omitempty"` + // Tags is a map of key/value tags used to select subnets + // Specifying '*' for a value selects all values for a given tag key. + // +kubebuilder:validation:XValidation:message="empty tag keys or values aren't supported",rule="self.all(k, k != '' && self[k] != '')" + // +kubebuilder:validation:MaxProperties:=20 + // +optional + Tags map[string]string `json:"tags,omitempty"` + // The type of operating system for which the Capacity Reservation reserves capacity + // +optional + InstanceType string `json:"instanceType,omitempty"` + // The ID of the Amazon Web Services account that owns the Capacity Reservation + // +optional + OwnerID string `json:"ownerId,omitempty"` +} + // MetadataOptions contains parameters for specifying the exposure of the // Instance Metadata Service to provisioned EC2 nodes. type MetadataOptions struct { diff --git a/pkg/apis/v1beta1/ec2nodeclass_status.go b/pkg/apis/v1beta1/ec2nodeclass_status.go index 611e94d62117..983246b6a629 100644 --- a/pkg/apis/v1beta1/ec2nodeclass_status.go +++ b/pkg/apis/v1beta1/ec2nodeclass_status.go @@ -38,6 +38,53 @@ type SecurityGroup struct { Name string `json:"name,omitempty"` } +// CapacityReservation contains resolved Capacity Reservation selector values utilized for node launch +type CapacityReservation struct { + // ID of the Capacity Reservation + // +required + ID string `json:"id"` + // AvailabilityZone of the Capacity Reservation + // +required + AvailabilityZone string `json:"availabilityZone"` + // Available Instance Count of the Capacity Reservation + // +required + AvailableInstanceCount int `json:"availableInstanceCount"` + // The date and time at which the Capacity Reservation expires. When a Capacity + // Reservation expires, the reserved capacity is released and you can no longer + // launch instances into it. The Capacity Reservation's state changes to expired + // when it reaches its end date and time. + // +optional + EndDate *string `json:"endDate,omitempty"` + // Indicates the way in which the Capacity Reservation ends. A Capacity Reservation + // can have one of the following end types: + // + // * unlimited - The Capacity Reservation remains active until you explicitly + // cancel it. + // + // * limited - The Capacity Reservation expires automatically at a specified + // date and time. + // +required + EndDateType string `json:"endDateType"` + // Instance Match Criteria of the Capacity Reservation + // +required + InstanceMatchCriteria string `json:"instanceMatchCriteria"` + // Instance Platform of the Capacity Reservation + // +required + InstancePlatform string `json:"instancePlatform"` + // Instance Type of the Capacity Reservation + // +required + InstanceType string `json:"instanceType"` + // Owner Id of the Capacity Reservation + // +required + OwnerID string `json:"ownerId"` + // The date and time at which the Capacity Reservation was started. + // +required + StartDate string `json:"startDate"` + // Total Instance Count of the Capacity Reservation + // +required + TotalInstanceCount int `json:"totalInstanceCount"` +} + // AMI contains resolved AMI selector values utilized for node launch type AMI struct { // ID of the AMI @@ -53,6 +100,10 @@ type AMI struct { // EC2NodeClassStatus contains the resolved state of the EC2NodeClass type EC2NodeClassStatus struct { + // CapacityReservations contains the current Capacity Reservations values that are available to the + // cluster under the CapacityReservations selectors. + // +optional + CapacityReservations []CapacityReservation `json:"capacityReservations,omitempty"` // Subnets contains the current Subnet values that are available to the // cluster under the subnet selectors. // +optional diff --git a/pkg/apis/v1beta1/zz_generated.deepcopy.go b/pkg/apis/v1beta1/zz_generated.deepcopy.go index 781d88c876c8..a0aae4b73ead 100644 --- a/pkg/apis/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/v1beta1/zz_generated.deepcopy.go @@ -147,6 +147,48 @@ func (in *BlockDeviceMapping) DeepCopy() *BlockDeviceMapping { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapacityReservation) DeepCopyInto(out *CapacityReservation) { + *out = *in + if in.EndDate != nil { + in, out := &in.EndDate, &out.EndDate + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapacityReservation. +func (in *CapacityReservation) DeepCopy() *CapacityReservation { + if in == nil { + return nil + } + out := new(CapacityReservation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CapacityReservationSelectorTerm) DeepCopyInto(out *CapacityReservationSelectorTerm) { + *out = *in + if in.Tags != nil { + in, out := &in.Tags, &out.Tags + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CapacityReservationSelectorTerm. +func (in *CapacityReservationSelectorTerm) DeepCopy() *CapacityReservationSelectorTerm { + if in == nil { + return nil + } + out := new(CapacityReservationSelectorTerm) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EC2NodeClass) DeepCopyInto(out *EC2NodeClass) { *out = *in @@ -240,6 +282,13 @@ func (in *EC2NodeClassSpec) DeepCopyInto(out *EC2NodeClassSpec) { *out = new(string) **out = **in } + if in.CapacityReservationSelectorTerms != nil { + in, out := &in.CapacityReservationSelectorTerms, &out.CapacityReservationSelectorTerms + *out = make([]CapacityReservationSelectorTerm, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.UserData != nil { in, out := &in.UserData, &out.UserData *out = new(string) @@ -303,6 +352,13 @@ func (in *EC2NodeClassSpec) DeepCopy() *EC2NodeClassSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EC2NodeClassStatus) DeepCopyInto(out *EC2NodeClassStatus) { *out = *in + if in.CapacityReservations != nil { + in, out := &in.CapacityReservations, &out.CapacityReservations + *out = make([]CapacityReservation, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.Subnets != nil { in, out := &in.Subnets, &out.Subnets *out = make([]Subnet, len(*in)) diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index acea3187e4d1..35923589d55c 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -96,6 +96,9 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *corev1beta1.NodeC } instance, err := c.instanceProvider.Create(ctx, nodeClass, nodeClaim, instanceTypes) if err != nil { + if cloudprovider.IsInsufficientCapacityError(err) { + return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("creating instance, %w", err)) + } return nil, fmt.Errorf("creating instance, %w", err) } instanceType, _ := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool { diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 6510a522f870..ab23945200a3 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -24,6 +24,7 @@ import ( nodeclasstermination "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/termination" controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype" controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/aws-sdk-go/aws/session" @@ -52,12 +53,12 @@ import ( func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder, unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider, - securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider, + securityGroupProvider securitygroup.Provider, capacityReservationProvider capacityreservation.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider, pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider instancetype.Provider) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), - nodeclassstatus.NewController(kubeClient, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider), + nodeclassstatus.NewController(kubeClient, subnetProvider, securityGroupProvider, capacityReservationProvider, amiProvider, instanceProfileProvider, launchTemplateProvider), nodeclasstermination.NewController(kubeClient, recorder, instanceProfileProvider, launchTemplateProvider), nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider), nodeclaimtagging.NewController(kubeClient, instanceProvider), diff --git a/pkg/controllers/nodeclass/status/controller.go b/pkg/controllers/nodeclass/status/controller.go index da4aeca04dc0..37b7598322ae 100644 --- a/pkg/controllers/nodeclass/status/controller.go +++ b/pkg/controllers/nodeclass/status/controller.go @@ -34,6 +34,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/securitygroup" @@ -49,23 +50,25 @@ type nodeClassStatusReconciler interface { type Controller struct { kubeClient client.Client - ami *AMI - instanceprofile *InstanceProfile - subnet *Subnet - securitygroup *SecurityGroup - launchtemplate *LaunchTemplate + ami *AMI + instanceprofile *InstanceProfile + subnet *Subnet + securitygroup *SecurityGroup + capacityreservation *CapacityReservation + launchtemplate *LaunchTemplate } -func NewController(kubeClient client.Client, subnetProvider subnet.Provider, securityGroupProvider securitygroup.Provider, +func NewController(kubeClient client.Client, subnetProvider subnet.Provider, securityGroupProvider securitygroup.Provider, capacityReservationProvider capacityreservation.Provider, amiProvider amifamily.Provider, instanceProfileProvider instanceprofile.Provider, launchTemplateProvider launchtemplate.Provider) corecontroller.Controller { return corecontroller.Typed[*v1beta1.EC2NodeClass](kubeClient, &Controller{ kubeClient: kubeClient, - ami: &AMI{amiProvider: amiProvider}, - subnet: &Subnet{subnetProvider: subnetProvider}, - securitygroup: &SecurityGroup{securityGroupProvider: securityGroupProvider}, - instanceprofile: &InstanceProfile{instanceProfileProvider: instanceProfileProvider}, - launchtemplate: &LaunchTemplate{launchTemplateProvider: launchTemplateProvider}, + ami: &AMI{amiProvider: amiProvider}, + subnet: &Subnet{subnetProvider: subnetProvider}, + securitygroup: &SecurityGroup{securityGroupProvider: securityGroupProvider}, + capacityreservation: &CapacityReservation{capacityReservationProvider: capacityReservationProvider}, + instanceprofile: &InstanceProfile{instanceProfileProvider: instanceProfileProvider}, + launchtemplate: &LaunchTemplate{launchTemplateProvider: launchTemplateProvider}, }) } @@ -87,6 +90,7 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClass *v1beta1.EC2NodeCl c.securitygroup, c.instanceprofile, c.launchtemplate, + c.capacityreservation, } { res, err := reconciler.Reconcile(ctx, nodeClass) errs = multierr.Append(errs, err) diff --git a/pkg/controllers/nodeclass/status/suite_test.go b/pkg/controllers/nodeclass/status/suite_test.go index 545c9ba6cfd5..ac6e75848639 100644 --- a/pkg/controllers/nodeclass/status/suite_test.go +++ b/pkg/controllers/nodeclass/status/suite_test.go @@ -59,6 +59,7 @@ var _ = BeforeSuite(func() { env.Client, awsEnv.SubnetProvider, awsEnv.SecurityGroupProvider, + awsEnv.CapacityReservationProvider, awsEnv.AMIProvider, awsEnv.InstanceProfileProvider, awsEnv.LaunchTemplateProvider, diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5267aad4672d..26de0dbb4837 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -48,6 +48,7 @@ var ( "UnfulfillableCapacity", "Unsupported", "InsufficientFreeAddressesInSubnet", + "ReservationCapacityExceeded", ) ) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 113b863f9732..12ed5c5f06db 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -54,6 +54,7 @@ import ( awscache "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" @@ -73,19 +74,20 @@ func init() { type Operator struct { *operator.Operator - Session *session.Session - UnavailableOfferingsCache *awscache.UnavailableOfferings - EC2API ec2iface.EC2API - SubnetProvider subnet.Provider - SecurityGroupProvider securitygroup.Provider - InstanceProfileProvider instanceprofile.Provider - AMIProvider amifamily.Provider - AMIResolver *amifamily.Resolver - LaunchTemplateProvider launchtemplate.Provider - PricingProvider pricing.Provider - VersionProvider version.Provider - InstanceTypesProvider instancetype.Provider - InstanceProvider instance.Provider + Session *session.Session + UnavailableOfferingsCache *awscache.UnavailableOfferings + EC2API ec2iface.EC2API + SubnetProvider subnet.Provider + SecurityGroupProvider securitygroup.Provider + InstanceProfileProvider instanceprofile.Provider + AMIProvider amifamily.Provider + AMIResolver *amifamily.Resolver + LaunchTemplateProvider launchtemplate.Provider + PricingProvider pricing.Provider + VersionProvider version.Provider + InstanceTypesProvider instancetype.Provider + InstanceProvider instance.Provider + CapacityReservationProvider capacityreservation.Provider } func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) { @@ -174,22 +176,24 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont subnetProvider, launchTemplateProvider, ) + capacityReservationProvider := capacityreservation.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)) return ctx, &Operator{ - Operator: operator, - Session: sess, - UnavailableOfferingsCache: unavailableOfferingsCache, - EC2API: ec2api, - SubnetProvider: subnetProvider, - SecurityGroupProvider: securityGroupProvider, - InstanceProfileProvider: instanceProfileProvider, - AMIProvider: amiProvider, - AMIResolver: amiResolver, - VersionProvider: versionProvider, - LaunchTemplateProvider: launchTemplateProvider, - PricingProvider: pricingProvider, - InstanceTypesProvider: instanceTypeProvider, - InstanceProvider: instanceProvider, + Operator: operator, + Session: sess, + UnavailableOfferingsCache: unavailableOfferingsCache, + EC2API: ec2api, + SubnetProvider: subnetProvider, + SecurityGroupProvider: securityGroupProvider, + InstanceProfileProvider: instanceProfileProvider, + AMIProvider: amiProvider, + AMIResolver: amiResolver, + VersionProvider: versionProvider, + LaunchTemplateProvider: launchTemplateProvider, + PricingProvider: pricingProvider, + InstanceTypesProvider: instanceTypeProvider, + InstanceProvider: instanceProvider, + CapacityReservationProvider: capacityReservationProvider, } } diff --git a/pkg/providers/amifamily/resolver.go b/pkg/providers/amifamily/resolver.go index 65adec3a9abb..65e1ed9a7f20 100644 --- a/pkg/providers/amifamily/resolver.go +++ b/pkg/providers/amifamily/resolver.go @@ -24,6 +24,7 @@ import ( "github.com/imdario/mergo" "github.com/samber/lo" core "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1" @@ -56,6 +57,7 @@ type Options struct { InstanceStorePolicy *v1beta1.InstanceStorePolicy // Level-triggered fields that may change out of sync. SecurityGroups []v1beta1.SecurityGroup + CapacityReservations []v1beta1.CapacityReservation Tags map[string]string Labels map[string]string `hash:"ignore"` KubeDNSIP net.IP @@ -67,6 +69,7 @@ type Options struct { type LaunchTemplate struct { *Options UserData bootstrap.Bootstrapper + CapacityReservation *v1beta1.CapacityReservation BlockDeviceMappings []*v1beta1.BlockDeviceMapping MetadataOptions *v1beta1.MetadataOptions AMIID string @@ -154,12 +157,63 @@ func (r Resolver) Resolve(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, maxPods: int(instanceType.Capacity.Pods().Value()), } }) + + zones := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...).Get(v1.LabelTopologyZone) + capacityReservations := []v1beta1.CapacityReservation{} + if capacityType == "capacity-reservation" { + for _, capacityReservation := range nodeClass.Status.CapacityReservations { + if capacityReservation.AvailableInstanceCount == 0 { + continue + } + if !zones.Has(capacityReservation.AvailabilityZone) { + continue + } + capacityReservations = append(capacityReservations, capacityReservation) + } + if len(capacityReservations) == 0 { + return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("trying to resolve capacity-reservation but no available capacity reservations available")) + } + } + for params, instanceTypes := range paramsToInstanceTypes { - resolved, err := r.resolveLaunchTemplate(nodeClass, nodeClaim, instanceTypes, capacityType, amiFamily, amiID, params.maxPods, params.efaCount, options) - if err != nil { - return nil, err + + if len(capacityReservations) > 0 { + for _, capacityReservation := range capacityReservations { + resolved, err := r.resolveLaunchTemplate( + nodeClass, + nodeClaim, + &capacityReservation, + instanceTypes, + "on-demand", // capacity-type + amiFamily, + amiID, + params.maxPods, + params.efaCount, + options, + ) + if err != nil { + return nil, err + } + resolvedTemplates = append(resolvedTemplates, resolved) + } + } else { + resolved, err := r.resolveLaunchTemplate( + nodeClass, + nodeClaim, + nil, + instanceTypes, + capacityType, + amiFamily, + amiID, + params.maxPods, + params.efaCount, + options, + ) + if err != nil { + return nil, err + } + resolvedTemplates = append(resolvedTemplates, resolved) } - resolvedTemplates = append(resolvedTemplates, resolved) } } return resolvedTemplates, nil @@ -210,8 +264,18 @@ func (r Resolver) defaultClusterDNS(opts *Options, kubeletConfig *corev1beta1.Ku return newKubeletConfig } -func (r Resolver) resolveLaunchTemplate(nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, capacityType string, - amiFamily AMIFamily, amiID string, maxPods int, efaCount int, options *Options) (*LaunchTemplate, error) { +func (r Resolver) resolveLaunchTemplate( + nodeClass *v1beta1.EC2NodeClass, + nodeClaim *corev1beta1.NodeClaim, + capacityReservation *v1beta1.CapacityReservation, + instanceTypes []*cloudprovider.InstanceType, + capacityType string, + amiFamily AMIFamily, + amiID string, + maxPods int, + efaCount int, + options *Options, +) (*LaunchTemplate, error) { kubeletConfig := &corev1beta1.KubeletConfiguration{} if nodeClaim.Spec.Kubelet != nil { if err := mergo.Merge(kubeletConfig, nodeClaim.Spec.Kubelet); err != nil { @@ -232,6 +296,7 @@ func (r Resolver) resolveLaunchTemplate(nodeClass *v1beta1.EC2NodeClass, nodeCla nodeClass.Spec.UserData, options.InstanceStorePolicy, ), + CapacityReservation: capacityReservation, BlockDeviceMappings: nodeClass.Spec.BlockDeviceMappings, MetadataOptions: nodeClass.Spec.MetadataOptions, DetailedMonitoring: aws.BoolValue(nodeClass.Spec.DetailedMonitoring), diff --git a/pkg/providers/capacityreservation/capacityreservation.go b/pkg/providers/capacityreservation/capacityreservation.go new file mode 100644 index 000000000000..8be694cfc22d --- /dev/null +++ b/pkg/providers/capacityreservation/capacityreservation.go @@ -0,0 +1,206 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacityreservation + +import ( + "context" + "fmt" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/mitchellh/hashstructure/v2" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" + "knative.dev/pkg/logging" + + "sigs.k8s.io/karpenter/pkg/utils/pretty" + + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" +) + +type Provider interface { + List(context.Context, *v1beta1.EC2NodeClass) ([]*ec2.CapacityReservation, error) +} + +type DefaultProvider struct { + sync.Mutex + ec2api ec2iface.EC2API + cache *cache.Cache + cm *pretty.ChangeMonitor +} + +func NewDefaultProvider(ec2api ec2iface.EC2API, cache *cache.Cache) *DefaultProvider { + return &DefaultProvider{ + ec2api: ec2api, + cm: pretty.NewChangeMonitor(), + // TODO: Remove cache cache when we utilize the security groups from the EC2NodeClass.status + cache: cache, + } +} + +func (p *DefaultProvider) List(ctx context.Context, nodeClass *v1beta1.EC2NodeClass) ([]*ec2.CapacityReservation, error) { + p.Lock() + defer p.Unlock() + + capacityReservations, err := p.getCapacityReservations(ctx, nodeClass.Spec.CapacityReservationSelectorTerms) + if err != nil { + return nil, fmt.Errorf("get capacity reservations, %w", err) + } + if p.cm.HasChanged(fmt.Sprintf("capacity-reservations/%s", nodeClass.Name), capacityReservations) { + logging.FromContext(ctx). + With("capacity-reservations", lo.Map(capacityReservations, func(s *ec2.CapacityReservation, _ int) string { + return aws.StringValue(s.CapacityReservationId) + })). + Debugf("discovered capacity reservations") + } + return capacityReservations, nil +} + +func (p *DefaultProvider) getCapacityReservations(ctx context.Context, terms []v1beta1.CapacityReservationSelectorTerm) ([]*ec2.CapacityReservation, error) { + hash, err := hashstructure.Hash(terms, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) + if err != nil { + return nil, err + } + + if cr, ok := p.cache.Get(fmt.Sprint(hash)); ok { + return cr.([]*ec2.CapacityReservation), nil + } + + capacityReservationsUnfiltered, err := p.describeCapacityReservations(ctx) + if err != nil { + return nil, err + } + + capacityReservationsMap := map[string]*ec2.CapacityReservation{} + for _, term := range terms { + capacityReservations := getCapacityReservations(capacityReservationsUnfiltered, term) + for _, capacityReservation := range capacityReservations { + capacityReservationsMap[lo.FromPtr(capacityReservation.CapacityReservationId)] = capacityReservation + } + } + p.cache.SetDefault(fmt.Sprint(hash), lo.Values(capacityReservationsMap)) + return lo.Values(capacityReservationsMap), nil +} + +func (p *DefaultProvider) describeCapacityReservations(ctx context.Context) ([]*ec2.CapacityReservation, error) { + describeCapacityReservations := []*ec2.CapacityReservation{} + + err := p.ec2api.DescribeCapacityReservationsPagesWithContext( + ctx, + &ec2.DescribeCapacityReservationsInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("state"), + Values: aws.StringSlice([]string{ec2.CapacityReservationFleetStateActive}), + }, + // We currently only support targeted, there are a few challenges when supporting "open": + // - Unnecessary disruption when on-demand is linked indirectly to an open ODCR causes node + // but will get unnecessary rotated + { + Name: aws.String("instance-match-criteria"), + Values: aws.StringSlice([]string{ec2.InstanceMatchCriteriaTargeted}), + }, + }, + }, + func(describeCapacityReservationsOutput *ec2.DescribeCapacityReservationsOutput, lastPage bool) bool { + describeCapacityReservations = append( + describeCapacityReservations, + describeCapacityReservationsOutput.CapacityReservations..., + ) + return !lastPage + }, + ) + if err != nil { + return nil, err + } + + return describeCapacityReservations, nil +} + +func getCapacityReservations( + capacityReservationsUnfiltered []*ec2.CapacityReservation, + term v1beta1.CapacityReservationSelectorTerm, +) []*ec2.CapacityReservation { + capacityReservations := []*ec2.CapacityReservation{} + + for _, capacityReservation := range capacityReservationsUnfiltered { + if matches(capacityReservation, term) { + capacityReservations = append( + capacityReservations, + capacityReservation, + ) + } + } + + return capacityReservations +} + +func matches( + capacityReservation *ec2.CapacityReservation, + term v1beta1.CapacityReservationSelectorTerm, +) bool { + if term.ID != "" { + return term.ID == aws.StringValue(capacityReservation.CapacityReservationId) + } + + if term.AvailabilityZone != "" { + if term.AvailabilityZone != aws.StringValue(capacityReservation.AvailabilityZone) { + return false + } + } + + if term.InstanceType != "" { + if term.InstanceType != aws.StringValue(capacityReservation.InstanceType) { + return false + } + } + + if term.OwnerID != "" { + if term.OwnerID != aws.StringValue(capacityReservation.OwnerId) { + return false + } + } + + tags := getCapacityReservationTags(capacityReservation) + for key, value := range term.Tags { + if _, ok := tags[key]; !ok { + return false + } + + if value == "*" { + continue + } + + if tags[key] != value { + return false + } + + continue + } + + return true +} + +func getCapacityReservationTags(capacityReservation *ec2.CapacityReservation) map[string]string { + tags := map[string]string{} + + for _, tag := range capacityReservation.Tags { + tags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) + } + + return tags +} diff --git a/pkg/providers/capacityreservation/suite_test.go b/pkg/providers/capacityreservation/suite_test.go new file mode 100644 index 000000000000..502b44fbd2d8 --- /dev/null +++ b/pkg/providers/capacityreservation/suite_test.go @@ -0,0 +1,345 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacityreservation_test + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/samber/lo" + + "github.com/aws/karpenter-provider-aws/pkg/apis" + "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/test" + + coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" + "sigs.k8s.io/karpenter/pkg/operator/scheme" + coretest "sigs.k8s.io/karpenter/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "knative.dev/pkg/logging/testing" + . "sigs.k8s.io/karpenter/pkg/test/expectations" +) + +var ctx context.Context +var stop context.CancelFunc +var env *coretest.Environment +var awsEnv *test.Environment +var nodeClass *v1beta1.EC2NodeClass + +func TestAWS(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "SecurityGroupProvider") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...)) + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + ctx, stop = context.WithCancel(ctx) + awsEnv = test.NewEnvironment(ctx, env) +}) + +var _ = AfterSuite(func() { + stop() + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = BeforeEach(func() { + ctx = coreoptions.ToContext(ctx, coretest.Options()) + ctx = options.ToContext(ctx, test.Options()) + nodeClass = test.EC2NodeClass(v1beta1.EC2NodeClass{ + Spec: v1beta1.EC2NodeClassSpec{ + AMIFamily: aws.String(v1beta1.AMIFamilyAL2), + SubnetSelectorTerms: []v1beta1.SubnetSelectorTerm{ + { + Tags: map[string]string{ + "*": "*", + }, + }, + }, + SecurityGroupSelectorTerms: []v1beta1.SecurityGroupSelectorTerm{ + { + Tags: map[string]string{ + "*": "*", + }, + }, + }, + }, + }) + awsEnv.Reset() +}) + +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = Describe("SecurityGroupProvider", func() { + It("should default to the clusters security groups", func() { + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test1"), + GroupName: aws.String("securityGroup-test1"), + }, + { + GroupId: aws.String("sg-test2"), + GroupName: aws.String("securityGroup-test2"), + }, + { + GroupId: aws.String("sg-test3"), + GroupName: aws.String("securityGroup-test3"), + }, + }, securityGroups) + }) + It("should discover security groups by tag", func() { + awsEnv.EC2API.DescribeSecurityGroupsOutput.Set(&ec2.DescribeSecurityGroupsOutput{SecurityGroups: []*ec2.SecurityGroup{ + {GroupName: aws.String("test-sgName-1"), GroupId: aws.String("test-sg-1"), Tags: []*ec2.Tag{{Key: aws.String("kubernetes.io/cluster/test-cluster"), Value: aws.String("test-sg-1")}}}, + {GroupName: aws.String("test-sgName-2"), GroupId: aws.String("test-sg-2"), Tags: []*ec2.Tag{{Key: aws.String("kubernetes.io/cluster/test-cluster"), Value: aws.String("test-sg-2")}}}, + }}) + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("test-sg-1"), + GroupName: aws.String("test-sgName-1"), + }, + { + GroupId: aws.String("test-sg-2"), + GroupName: aws.String("test-sgName-2"), + }, + }, securityGroups) + }) + It("should discover security groups by multiple tag values", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + Tags: map[string]string{"Name": "test-security-group-1"}, + }, + { + Tags: map[string]string{"Name": "test-security-group-2"}, + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test1"), + GroupName: aws.String("securityGroup-test1"), + }, + { + GroupId: aws.String("sg-test2"), + GroupName: aws.String("securityGroup-test2"), + }, + }, securityGroups) + }) + It("should discover security groups by ID", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + ID: "sg-test1", + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test1"), + GroupName: aws.String("securityGroup-test1"), + }, + }, securityGroups) + }) + It("should discover security groups by IDs", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + ID: "sg-test1", + }, + { + ID: "sg-test2", + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test1"), + GroupName: aws.String("securityGroup-test1"), + }, + { + GroupId: aws.String("sg-test2"), + GroupName: aws.String("securityGroup-test2"), + }, + }, securityGroups) + }) + It("should discover security groups by IDs and tags", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + ID: "sg-test1", + Tags: map[string]string{"foo": "bar"}, + }, + { + ID: "sg-test2", + Tags: map[string]string{"foo": "bar"}, + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test1"), + GroupName: aws.String("securityGroup-test1"), + }, + { + GroupId: aws.String("sg-test2"), + GroupName: aws.String("securityGroup-test2"), + }, + }, securityGroups) + }) + It("should discover security groups by IDs intersected with tags", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + ID: "sg-test2", + Tags: map[string]string{"foo": "bar"}, + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test2"), + GroupName: aws.String("securityGroup-test2"), + }, + }, securityGroups) + }) + It("should discover security groups by names", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + Name: "securityGroup-test2", + }, + { + Name: "securityGroup-test3", + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test2"), + GroupName: aws.String("securityGroup-test2"), + }, + { + GroupId: aws.String("sg-test3"), + GroupName: aws.String("securityGroup-test3"), + }, + }, securityGroups) + }) + It("should discover security groups by names intersected with tags", func() { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + Name: "securityGroup-test3", + Tags: map[string]string{"TestTag": "*"}, + }, + } + securityGroups, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + ExpectConsistsOfSecurityGroups([]*ec2.SecurityGroup{ + { + GroupId: aws.String("sg-test3"), + GroupName: aws.String("securityGroup-test3"), + }, + }, securityGroups) + }) + Context("Provider Cache", func() { + It("should resolve security groups from cache that are filtered by id", func() { + expectedSecurityGroups := awsEnv.EC2API.DescribeSecurityGroupsOutput.Clone().SecurityGroups + for _, sg := range expectedSecurityGroups { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + ID: *sg.GroupId, + }, + } + // Call list to request from aws and store in the cache + _, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + } + + for _, cachedObject := range awsEnv.SecurityGroupCache.Items() { + cachedSecurityGroup := cachedObject.Object.([]*ec2.SecurityGroup) + Expect(cachedSecurityGroup).To(HaveLen(1)) + lo.Contains(expectedSecurityGroups, cachedSecurityGroup[0]) + } + }) + It("should resolve security groups from cache that are filtered by Name", func() { + expectedSecurityGroups := awsEnv.EC2API.DescribeSecurityGroupsOutput.Clone().SecurityGroups + for _, sg := range expectedSecurityGroups { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + Name: *sg.GroupName, + }, + } + // Call list to request from aws and store in the cache + _, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + } + + for _, cachedObject := range awsEnv.SecurityGroupCache.Items() { + cachedSecurityGroup := cachedObject.Object.([]*ec2.SecurityGroup) + Expect(cachedSecurityGroup).To(HaveLen(1)) + lo.Contains(expectedSecurityGroups, cachedSecurityGroup[0]) + } + }) + It("should resolve security groups from cache that are filtered by tags", func() { + expectedSecurityGroups := awsEnv.EC2API.DescribeSecurityGroupsOutput.Clone().SecurityGroups + tagSet := lo.Map(expectedSecurityGroups, func(sg *ec2.SecurityGroup, _ int) map[string]string { + tag, _ := lo.Find(sg.Tags, func(tag *ec2.Tag) bool { + return lo.FromPtr(tag.Key) == "Name" + }) + return map[string]string{"Name": lo.FromPtr(tag.Value)} + }) + for _, tag := range tagSet { + nodeClass.Spec.SecurityGroupSelectorTerms = []v1beta1.SecurityGroupSelectorTerm{ + { + Tags: tag, + }, + } + // Call list to request from aws and store in the cache + _, err := awsEnv.SecurityGroupProvider.List(ctx, nodeClass) + Expect(err).To(BeNil()) + } + + for _, cachedObject := range awsEnv.SubnetCache.Items() { + cachedSecurityGroup := cachedObject.Object.([]*ec2.SecurityGroup) + Expect(cachedSecurityGroup).To(HaveLen(1)) + lo.Contains(expectedSecurityGroups, cachedSecurityGroup[0]) + } + }) + }) +}) + +func ExpectConsistsOfSecurityGroups(expected, actual []*ec2.SecurityGroup) { + GinkgoHelper() + Expect(actual).To(HaveLen(len(expected))) + for _, elem := range expected { + _, ok := lo.Find(actual, func(s *ec2.SecurityGroup) bool { + return lo.FromPtr(s.GroupId) == lo.FromPtr(elem.GroupId) && + lo.FromPtr(s.GroupName) == lo.FromPtr(elem.GroupName) + }) + Expect(ok).To(BeTrue(), `Expected security group with {"GroupId": %q, "GroupName": %q} to exist`, lo.FromPtr(elem.GroupId), lo.FromPtr(elem.GroupName)) + } +} diff --git a/pkg/providers/instance/instance.go b/pkg/providers/instance/instance.go index f228815563a7..7aaab3952704 100644 --- a/pkg/providers/instance/instance.go +++ b/pkg/providers/instance/instance.go @@ -96,17 +96,17 @@ func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1beta1.EC2Node instanceTypes = p.filterInstanceTypes(nodeClaim, instanceTypes) } tags := getTags(ctx, nodeClass, nodeClaim) - fleetInstance, err := p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) + fleetInstance, capacityType, err := p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) if awserrors.IsLaunchTemplateNotFound(err) { // retry once if launch template is not found. This allows karpenter to generate a new LT if the // cache was out-of-sync on the first try - fleetInstance, err = p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) + fleetInstance, capacityType, err = p.launchInstance(ctx, nodeClass, nodeClaim, instanceTypes, tags) } if err != nil { return nil, err } efaEnabled := lo.Contains(lo.Keys(nodeClaim.Spec.Resources.Requests), v1beta1.ResourceEFA) - return NewInstanceFromFleet(fleetInstance, tags, efaEnabled), nil + return NewInstanceFromFleet(fleetInstance, tags, efaEnabled, capacityType), nil } func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) { @@ -193,17 +193,22 @@ func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[st return nil } -func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, tags map[string]string) (*ec2.CreateFleetInstance, error) { +// workaround until capacity-reservation natively supported by EC2 API to return capacity type +func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, tags map[string]string) (*ec2.CreateFleetInstance, string, error) { capacityType := p.getCapacityType(nodeClaim, instanceTypes) + zonalSubnets, err := p.subnetProvider.ZonalSubnetsForLaunch(ctx, nodeClass, instanceTypes, capacityType) if err != nil { - return nil, fmt.Errorf("getting subnets, %w", err) + return nil, "", fmt.Errorf("getting subnets, %w", err) } // Get Launch Template Configs, which may differ due to GPU or Architecture requirements launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, nodeClass, nodeClaim, instanceTypes, zonalSubnets, capacityType, tags) if err != nil { - return nil, fmt.Errorf("getting launch template configs, %w", err) + if cloudprovider.IsInsufficientCapacityError(err) { + return nil, "", cloudprovider.NewInsufficientCapacityError(fmt.Errorf("getting launch template configs, %w", err)) + } + return nil, "", fmt.Errorf("getting launch template configs, %w", err) } if err := p.checkODFallback(nodeClaim, instanceTypes, launchTemplateConfigs); err != nil { logging.FromContext(ctx).Warn(err.Error()) @@ -214,7 +219,8 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1 Context: nodeClass.Spec.Context, LaunchTemplateConfigs: launchTemplateConfigs, TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ - DefaultTargetCapacityType: aws.String(capacityType), + // workaround until capacity-reservation natively supported by EC2 API + DefaultTargetCapacityType: lo.Ternary(capacityType == "capacity-reservation", aws.String("on-demand"), aws.String(capacityType)), TotalTargetCapacity: aws.Int64(1), }, TagSpecifications: []*ec2.TagSpecification{ @@ -226,6 +232,7 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1 if capacityType == corev1beta1.CapacityTypeSpot { createFleetInput.SpotOptions = &ec2.SpotOptionsRequest{AllocationStrategy: aws.String(ec2.SpotAllocationStrategyPriceCapacityOptimized)} } else { + // will handle capacity-reservation and on-demand createFleetInput.OnDemandOptions = &ec2.OnDemandOptionsRequest{AllocationStrategy: aws.String(ec2.FleetOnDemandAllocationStrategyLowestPrice)} } @@ -236,19 +243,19 @@ func (p *DefaultProvider) launchInstance(ctx context.Context, nodeClass *v1beta1 for _, lt := range launchTemplateConfigs { p.launchTemplateProvider.InvalidateCache(ctx, aws.StringValue(lt.LaunchTemplateSpecification.LaunchTemplateName), aws.StringValue(lt.LaunchTemplateSpecification.LaunchTemplateId)) } - return nil, fmt.Errorf("creating fleet %w", err) + return nil, "", fmt.Errorf("creating fleet %w", err) } var reqFailure awserr.RequestFailure if errors.As(err, &reqFailure) { - return nil, fmt.Errorf("creating fleet %w (%s)", err, reqFailure.RequestID()) + return nil, "", fmt.Errorf("creating fleet %w (%s)", err, reqFailure.RequestID()) } - return nil, fmt.Errorf("creating fleet %w", err) + return nil, "", fmt.Errorf("creating fleet %w", err) } p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType) if len(createFleetOutput.Instances) == 0 || len(createFleetOutput.Instances[0].InstanceIds) == 0 { - return nil, combineFleetErrors(createFleetOutput.Errors) + return nil, "", combineFleetErrors(createFleetOutput.Errors) } - return createFleetOutput.Instances[0], nil + return createFleetOutput.Instances[0], capacityType, nil } func getTags(ctx context.Context, nodeClass *v1beta1.EC2NodeClass, nodeClaim *corev1beta1.NodeClaim) map[string]string { @@ -289,6 +296,9 @@ func (p *DefaultProvider) getLaunchTemplateConfigs(ctx context.Context, nodeClas var launchTemplateConfigs []*ec2.FleetLaunchTemplateConfigRequest launchTemplates, err := p.launchTemplateProvider.EnsureAll(ctx, nodeClass, nodeClaim, instanceTypes, capacityType, tags) if err != nil { + if cloudprovider.IsInsufficientCapacityError(err) { + return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("getting launch templates, %w", err)) + } return nil, fmt.Errorf("getting launch templates, %w", err) } for _, launchTemplate := range launchTemplates { @@ -361,12 +371,20 @@ func (p *DefaultProvider) updateUnavailableOfferingsCache(ctx context.Context, e } } -// getCapacityType selects spot if both constraints are flexible and there is an -// available offering. The AWS Cloud Provider defaults to [ on-demand ], so spot +// getCapacityType selects capacity-reservation or spot if both constraints are flexible and there is an +// available offering. The AWS Cloud Provider defaults to [ on-demand ], so capacity-reservation or spot // must be explicitly included in capacity type requirements. func (p *DefaultProvider) getCapacityType(nodeClaim *corev1beta1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) string { - requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim. - Spec.Requirements...) + requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...) + if requirements.Get(corev1beta1.CapacityTypeLabelKey).Has("capacity-reservation") { + for _, instanceType := range instanceTypes { + for _, offering := range instanceType.Offerings.Available() { + if requirements.Get(v1.LabelTopologyZone).Has(offering.Zone) && offering.CapacityType == "capacity-reservation" { + return "capacity-reservation" + } + } + } + } if requirements.Get(corev1beta1.CapacityTypeLabelKey).Has(corev1beta1.CapacityTypeSpot) { for _, instanceType := range instanceTypes { for _, offering := range instanceType.Offerings.Available() { diff --git a/pkg/providers/instance/types.go b/pkg/providers/instance/types.go index 5f3804f2d004..6cb86273e477 100644 --- a/pkg/providers/instance/types.go +++ b/pkg/providers/instance/types.go @@ -61,7 +61,8 @@ func NewInstance(out *ec2.Instance) *Instance { } -func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool) *Instance { +// workaround until capacity-reservation natively supported by EC2 API to return capacity type in out.Lifecycle +func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, efaEnabled bool, capacityType string) *Instance { return &Instance{ LaunchTime: time.Now(), // estimate the launch time since we just launched State: ec2.StatePending, @@ -69,7 +70,7 @@ func NewInstanceFromFleet(out *ec2.CreateFleetInstance, tags map[string]string, ImageID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.ImageId), Type: aws.StringValue(out.InstanceType), Zone: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), - CapacityType: aws.StringValue(out.Lifecycle), + CapacityType: capacityType, SubnetID: aws.StringValue(out.LaunchTemplateAndOverrides.Overrides.SubnetId), Tags: tags, EFAEnabled: efaEnabled, diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index 0d86dd941e8a..178859853101 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -164,10 +164,28 @@ func (p *DefaultProvider) List(ctx context.Context, kc *corev1beta1.KubeletConfi // Any changes to the values passed into the NewInstanceType method will require making updates to the cache key // so that Karpenter is able to cache the set of InstanceTypes based on values that alter the set of instance types // !!! Important !!! - return NewInstanceType(ctx, i, p.region, - nodeClass.Spec.BlockDeviceMappings, nodeClass.Spec.InstanceStorePolicy, - kc.MaxPods, kc.PodsPerCore, kc.KubeReserved, kc.SystemReserved, kc.EvictionHard, kc.EvictionSoft, - amiFamily, p.createOfferings(ctx, i, p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], allZones, subnetZones)) + return NewInstanceType( + ctx, + i, + p.region, + nodeClass.Spec.BlockDeviceMappings, + nodeClass.Spec.InstanceStorePolicy, + kc.MaxPods, + kc.PodsPerCore, + kc.KubeReserved, + kc.SystemReserved, + kc.EvictionHard, + kc.EvictionSoft, + amiFamily, + p.createOfferings( + ctx, + i, + p.instanceTypeOfferings[aws.StringValue(i.InstanceType)], + allZones, + subnetZones, + nodeClass.Status.CapacityReservations, + ), + ) }) p.instanceTypesCache.SetDefault(key, result) return result, nil @@ -251,11 +269,22 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error return nil } -func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2.InstanceTypeInfo, instanceTypeZones, zones, subnetZones sets.Set[string]) []cloudprovider.Offering { +func (p *DefaultProvider) createOfferings( + ctx context.Context, + instanceType *ec2.InstanceTypeInfo, + instanceTypeZones, + zones, + subnetZones sets.Set[string], + capacityReservations []v1beta1.CapacityReservation, +) []cloudprovider.Offering { var offerings []cloudprovider.Offering + + // workaround until ec2 supports "capacity-reservation" as a supported class natively + supportedUsageClasses := sets.NewString(append(aws.StringValueSlice(instanceType.SupportedUsageClasses), "capacity-reservation")...) + for zone := range zones { // while usage classes should be a distinct set, there's no guarantee of that - for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) { + for capacityType := range supportedUsageClasses { // exclude any offerings that have recently seen an insufficient capacity error from EC2 isUnavailable := p.unavailableOfferings.IsUnavailable(*instanceType.InstanceType, zone, capacityType) var price float64 @@ -265,6 +294,14 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 price, ok = p.pricingProvider.SpotPrice(*instanceType.InstanceType, zone) case ec2.UsageClassTypeOnDemand: price, ok = p.pricingProvider.OnDemandPrice(*instanceType.InstanceType) + case "capacity-reservation": + // logging.FromContext(ctx).Debugf("Creating offering for capacity reservation instance type %s and zone %s", *instanceType.InstanceType, zone) + price = 0.0 + ok = hasCapacityReservation( + capacityReservations, + instanceType, + zone, + ) case "capacity-block": // ignore since karpenter doesn't support it yet, but do not log an unknown capacity type error continue @@ -279,6 +316,7 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 Price: price, Available: available, }) + // TODO: Do we want to set this for capacityType capacity-reservation? instanceTypeOfferingAvailable.With(prometheus.Labels{ instanceTypeLabel: *instanceType.InstanceType, capacityTypeLabel: capacityType, @@ -294,6 +332,29 @@ func (p *DefaultProvider) createOfferings(ctx context.Context, instanceType *ec2 return offerings } +func hasCapacityReservation( + capacityReservations []v1beta1.CapacityReservation, + instanceType *ec2.InstanceTypeInfo, + zone string, +) bool { + for _, capacityReservation := range capacityReservations { + if capacityReservation.AvailableInstanceCount == 0 { + continue + } + + if capacityReservation.AvailabilityZone != zone { + continue + } + + if capacityReservation.InstanceType != aws.StringValue(instanceType.InstanceType) { + continue + } + + return true + } + return false +} + func (p *DefaultProvider) Reset() { p.instanceTypesInfo = []*ec2.InstanceTypeInfo{} p.instanceTypeOfferings = map[string]sets.Set[string]{} diff --git a/pkg/providers/launchtemplate/launchtemplate.go b/pkg/providers/launchtemplate/launchtemplate.go index b90949530f6f..1f2dc7a55ec9 100644 --- a/pkg/providers/launchtemplate/launchtemplate.go +++ b/pkg/providers/launchtemplate/launchtemplate.go @@ -174,17 +174,18 @@ func (p *DefaultProvider) createAMIOptions(ctx context.Context, nodeClass *v1bet return nil, fmt.Errorf("no security groups are present in the status") } options := &amifamily.Options{ - ClusterName: options.FromContext(ctx).ClusterName, - ClusterEndpoint: p.ClusterEndpoint, - ClusterCIDR: p.ClusterCIDR.Load(), - InstanceProfile: instanceProfile, - InstanceStorePolicy: nodeClass.Spec.InstanceStorePolicy, - SecurityGroups: nodeClass.Status.SecurityGroups, - Tags: tags, - Labels: labels, - CABundle: p.CABundle, - KubeDNSIP: p.KubeDNSIP, - NodeClassName: nodeClass.Name, + ClusterName: options.FromContext(ctx).ClusterName, + ClusterEndpoint: p.ClusterEndpoint, + ClusterCIDR: p.ClusterCIDR.Load(), + InstanceProfile: instanceProfile, + InstanceStorePolicy: nodeClass.Spec.InstanceStorePolicy, + SecurityGroups: nodeClass.Status.SecurityGroups, + CapacityReservations: nodeClass.Status.CapacityReservations, + Tags: tags, + Labels: labels, + CABundle: p.CABundle, + KubeDNSIP: p.KubeDNSIP, + NodeClassName: nodeClass.Name, } if nodeClass.Spec.AssociatePublicIPAddress != nil { options.AssociatePublicIPAddress = nodeClass.Spec.AssociatePublicIPAddress @@ -247,6 +248,7 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami launchTemplateDataTags = append(launchTemplateDataTags, &ec2.LaunchTemplateTagSpecificationRequest{ResourceType: aws.String(ec2.ResourceTypeSpotInstancesRequest), Tags: utils.MergeTags(options.Tags)}) } networkInterfaces := p.generateNetworkInterfaces(options) + capacityReservationSpecification := p.generateCapacityReservationSpecification(options) output, err := p.ec2api.CreateLaunchTemplateWithContext(ctx, &ec2.CreateLaunchTemplateInput{ LaunchTemplateName: aws.String(LaunchTemplateName(options)), LaunchTemplateData: &ec2.RequestLaunchTemplateData{ @@ -258,9 +260,10 @@ func (p *DefaultProvider) createLaunchTemplate(ctx context.Context, options *ami Enabled: aws.Bool(options.DetailedMonitoring), }, // If the network interface is defined, the security groups are defined within it - SecurityGroupIds: lo.Ternary(networkInterfaces != nil, nil, lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) })), - UserData: aws.String(userData), - ImageId: aws.String(options.AMIID), + SecurityGroupIds: lo.Ternary(networkInterfaces != nil, nil, lo.Map(options.SecurityGroups, func(s v1beta1.SecurityGroup, _ int) *string { return aws.String(s.ID) })), + CapacityReservationSpecification: capacityReservationSpecification, + UserData: aws.String(userData), + ImageId: aws.String(options.AMIID), MetadataOptions: &ec2.LaunchTemplateInstanceMetadataOptionsRequest{ HttpEndpoint: options.MetadataOptions.HTTPEndpoint, HttpProtocolIpv6: options.MetadataOptions.HTTPProtocolIPv6, @@ -313,6 +316,22 @@ func (p *DefaultProvider) generateNetworkInterfaces(options *amifamily.LaunchTem return nil } +func (p *DefaultProvider) generateCapacityReservationSpecification(options *amifamily.LaunchTemplate) *ec2.LaunchTemplateCapacityReservationSpecificationRequest { + if options == nil { + return nil + } + + if options.CapacityReservation == nil { + return nil + } + + return &ec2.LaunchTemplateCapacityReservationSpecificationRequest{ + CapacityReservationTarget: &ec2.CapacityReservationTarget{ + CapacityReservationId: &options.CapacityReservation.ID, + }, + } +} + func (p *DefaultProvider) blockDeviceMappings(blockDeviceMappings []*v1beta1.BlockDeviceMapping) []*ec2.LaunchTemplateBlockDeviceMappingRequest { if len(blockDeviceMappings) == 0 { // The EC2 API fails with empty slices and expects nil. diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 81aa70575470..cfa2a4af74c1 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -30,6 +30,7 @@ import ( awscache "github.com/aws/karpenter-provider-aws/pkg/cache" "github.com/aws/karpenter-provider-aws/pkg/fake" "github.com/aws/karpenter-provider-aws/pkg/providers/amifamily" + "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" @@ -67,19 +68,21 @@ type Environment struct { AvailableIPAdressCache *cache.Cache AssociatePublicIPAddressCache *cache.Cache SecurityGroupCache *cache.Cache + CapacityReservationCache *cache.Cache InstanceProfileCache *cache.Cache // Providers - InstanceTypesProvider *instancetype.DefaultProvider - InstanceProvider *instance.DefaultProvider - SubnetProvider *subnet.DefaultProvider - SecurityGroupProvider *securitygroup.DefaultProvider - InstanceProfileProvider *instanceprofile.DefaultProvider - PricingProvider *pricing.DefaultProvider - AMIProvider *amifamily.DefaultProvider - AMIResolver *amifamily.Resolver - VersionProvider *version.DefaultProvider - LaunchTemplateProvider *launchtemplate.DefaultProvider + InstanceTypesProvider *instancetype.DefaultProvider + InstanceProvider *instance.DefaultProvider + SubnetProvider *subnet.DefaultProvider + SecurityGroupProvider *securitygroup.DefaultProvider + CapacityReservationProvider *capacityreservation.DefaultProvider + InstanceProfileProvider *instanceprofile.DefaultProvider + PricingProvider *pricing.DefaultProvider + AMIProvider *amifamily.DefaultProvider + AMIResolver *amifamily.Resolver + VersionProvider *version.DefaultProvider + LaunchTemplateProvider *launchtemplate.DefaultProvider } func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment { @@ -99,6 +102,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment availableIPAdressCache := cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval) associatePublicIPAddressCache := cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval) securityGroupCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) + capacityReservationCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) instanceProfileCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) fakePricingAPI := &fake.PricingAPI{} @@ -106,6 +110,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment pricingProvider := pricing.NewDefaultProvider(ctx, fakePricingAPI, ec2api, fake.DefaultRegion) subnetProvider := subnet.NewDefaultProvider(ec2api, subnetCache, availableIPAdressCache, associatePublicIPAddressCache) securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, securityGroupCache) + capacityReservationProvider := capacityreservation.NewDefaultProvider(ec2api, capacityReservationCache) versionProvider := version.NewDefaultProvider(env.KubernetesInterface, kubernetesVersionCache) instanceProfileProvider := instanceprofile.NewDefaultProvider(fake.DefaultRegion, iamapi, instanceProfileCache) amiProvider := amifamily.NewDefaultProvider(versionProvider, ssmapi, ec2api, ec2Cache) @@ -149,19 +154,21 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment AvailableIPAdressCache: availableIPAdressCache, AssociatePublicIPAddressCache: associatePublicIPAddressCache, SecurityGroupCache: securityGroupCache, + CapacityReservationCache: capacityReservationCache, InstanceProfileCache: instanceProfileCache, UnavailableOfferingsCache: unavailableOfferingsCache, - InstanceTypesProvider: instanceTypesProvider, - InstanceProvider: instanceProvider, - SubnetProvider: subnetProvider, - SecurityGroupProvider: securityGroupProvider, - LaunchTemplateProvider: launchTemplateProvider, - InstanceProfileProvider: instanceProfileProvider, - PricingProvider: pricingProvider, - AMIProvider: amiProvider, - AMIResolver: amiResolver, - VersionProvider: versionProvider, + InstanceTypesProvider: instanceTypesProvider, + InstanceProvider: instanceProvider, + SubnetProvider: subnetProvider, + SecurityGroupProvider: securityGroupProvider, + CapacityReservationProvider: capacityReservationProvider, + LaunchTemplateProvider: launchTemplateProvider, + InstanceProfileProvider: instanceProfileProvider, + PricingProvider: pricingProvider, + AMIProvider: amiProvider, + AMIResolver: amiResolver, + VersionProvider: versionProvider, } } diff --git a/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh b/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh index cc3d7f929986..31af18ff9feb 100644 --- a/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh +++ b/website/content/en/preview/getting-started/migrating-from-cas/scripts/step04-controller-iam.sh @@ -71,6 +71,17 @@ cat << EOF > controller-policy.json "Resource": "arn:${AWS_PARTITION}:eks:${AWS_REGION}:${AWS_ACCOUNT_ID}:cluster/${CLUSTER_NAME}", "Sid": "EKSClusterEndpointLookup" }, + { + "Effect": "Allow", + "Action": "ec2:DescribeCapacityReservations", + "Resource": "*", + "Condition": { + "StringEquals": { + "ec2:Region": "${AWS_REGION}" + } + } + "Sid": "AllowReadCapacityReservations" + }, { "Sid": "AllowScopedInstanceProfileCreationActions", "Effect": "Allow",