Skip to content

Commit

Permalink
Merge branch 'main' into rfc-deprecated-ami-observability
Browse files Browse the repository at this point in the history
  • Loading branch information
shabbskagalwala authored Nov 4, 2024
2 parents 5bf52f3 + 22f507c commit ae2a098
Show file tree
Hide file tree
Showing 76 changed files with 1,490 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.3
controller-gen.kubebuilder.io/version: v0.16.5
name: ec2nodeclasses.karpenter.k8s.aws
spec:
group: karpenter.k8s.aws
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func main() {
op.GetClient(),
op.EventRecorder,
op.UnavailableOfferingsCache,
op.SSMCache,
cloudProvider,
op.SubnetProvider,
op.SecurityGroupProvider,
Expand Down
5 changes: 3 additions & 2 deletions designs/limits.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ The next large problem is the inability to define a hard ceiling on cluster cost

We need to provide similar functionality via Karpenter as well wherein there's a hard limit a customer can configure.


## Current State

To address the runaway-scaling problem the current fix in place is to detect if the kubelet for a worker node has never reported its status to the K8s control plane. If it's been longer than 15 minutes, Karpenter assumes that there's a hard failure mode due to which this worker node will never become healthy and terminates the worker node. If the condition map of the node object in the API Server says `NodeStatusNeverUpdated` then we use that as an indicator of the node having never come up.

This fix ensures that if there are other scenarios where a worker node has become unhealthy due to a network partition or power outage in a availability zone, we don't terminate those worker nodes. It's important we don't make the static stability of a cluster worse during such an event. On the other hand, if there is an edge case where worker nodes come online and soon go offline, it will lead to runaway scaling again. This edge case should be unlikely to happen in the near term, so this document focuses on just the ability to limit costs within Karpenter. That way even if runaway scaling does occur there's a way to bound it. A longer-term solution to handle the runaway problem will be discussed separately.


## Proposed Solution for Limits

There are two broad forms of limiting we could apply. The first is that we could introduce a limit to the number of in-flight worker node being provisioned at a point in time. A worker node that's in the `NotReady` state could be considered to be in-flight. The second form is an absolute limit of the number of resources Karpenter can provision.
Expand All @@ -37,6 +35,7 @@ In the above example - `20%` indicates that if at any point in time, more than 2
The good bit about this approach is that we don't constrain how many total worker nodes can be spun up by Karpenter, while also making sure that if we keep launching worker nodes that aren't healthy, we stop the scaling and save costs.

The two main problems with this approach though are -

1. This limit while meant to just constrain the number of unhealthy worker nodes in a cluster, will also inhibit the rate at which Karpenter can respond to pods that aren't schedulable. This somewhat goes against the goal of minimizing launch times of workers.
2. While this helps ensure that costs don't increase due to runaway scaling, it won't help those who want a stricter cap on the amount of resources that's being provisioned even when nodes are otherwise healthy.

Expand All @@ -62,11 +61,13 @@ As a cost control mechanism, this requires a little more work from our users if
[CPU limits](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units), memory limits and GPU limits will be defined similar to resource requests and will not be required by default. Karpenter will also will not default to any limits itself.

The list of supported resource types is -

- `cpu`
- `memory`
- `nvidia.com/gpu`
- `amd.com/gpu`
- `aws.amazon.com/neuron`
- `aws.amazon.com/neuroncore`
- `habana.ai/gaudi`

Limits will be defined at the per-provisioner level. We'll rely on the `karpenter.sh/provisioner-name` node label when calculating resource usage by a specific provisioner. This is useful when multiple teams share a single cluster and use separate provisioners since each team's resource consumption will be limited separately.
Expand Down
2 changes: 1 addition & 1 deletion examples/workloads/neuron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ spec:
cpu: "1"
memory: 256M
securityContext:
allowPrivilegeEscalation: false
allowPrivilegeEscalation: false
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/aws/karpenter-provider-aws

go 1.23.0
go 1.23.2

require (
github.com/Pallinder/go-randomdata v1.2.0
Expand Down
23 changes: 15 additions & 8 deletions hack/code/instancetype_testdata_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ func getInstanceTypeInfo(info *ec2.InstanceTypeInfo) string {
fmt.Fprintf(src, "NvmeSupport: aws.String(\"%s\"),\n", lo.FromPtr(info.EbsInfo.NvmeSupport))
fmt.Fprintf(src, "},\n")
}
if info.InferenceAcceleratorInfo != nil {
fmt.Fprintf(src, "InferenceAcceleratorInfo: &ec2.InferenceAcceleratorInfo{\n")
fmt.Fprintf(src, "Accelerators: []*ec2.InferenceDeviceInfo{\n")
for _, elem := range info.InferenceAcceleratorInfo.Accelerators {
fmt.Fprintf(src, getInferenceAcceleratorDeviceInfo(elem))
if info.NeuronInfo != nil {
fmt.Fprintf(src, "NeuronInfo: &ec2.NeuronInfo{\n")
fmt.Fprintf(src, "NeuronDevices: []*ec2.NeuronDeviceInfo{\n")
for _, elem := range info.NeuronInfo.NeuronDevices {
fmt.Fprintf(src, getNeuronDeviceInfo(elem))
}
fmt.Fprintf(src, "},\n")
fmt.Fprintf(src, "},\n")
Expand Down Expand Up @@ -199,12 +199,19 @@ func getNetworkCardInfo(info *ec2.NetworkCardInfo) string {
return src.String()
}

func getInferenceAcceleratorDeviceInfo(info *ec2.InferenceDeviceInfo) string {
func getNeuronDeviceInfo(info *ec2.NeuronDeviceInfo) string {

src := &bytes.Buffer{}
fmt.Fprintf(src, "{\n")
fmt.Fprintf(src, "Name: aws.String(\"%s\"),\n", lo.FromPtr(info.Name))
fmt.Fprintf(src, "Manufacturer: aws.String(\"%s\"),\n", lo.FromPtr(info.Manufacturer))
fmt.Fprintf(src, "Count: aws.Int64(%d),\n", lo.FromPtr(info.Count))
fmt.Fprintf(src, "Name: aws.String(\"%s\"),\n", lo.FromPtr(info.Name))
fmt.Fprintf(src, "CoreInfo: &ec2.NeuronDeviceCoreInfo{\n")
fmt.Fprintf(src, "Count: aws.Int64(%d),\n", lo.FromPtr(info.CoreInfo.Count))
fmt.Fprintf(src, "Version: aws.Int64(%d),\n", lo.FromPtr(info.CoreInfo.Version))
fmt.Fprintf(src, "},\n")
fmt.Fprintf(src, "MemoryInfo: &ec2.NeuronDeviceMemoryInfo{\n")
fmt.Fprintf(src, "SizeInMiB: aws.Int64(%d),\n", lo.FromPtr(info.MemoryInfo.SizeInMiB))
fmt.Fprintf(src, "},\n")
fmt.Fprintf(src, "},\n")
return src.String()
}
Expand Down
2 changes: 1 addition & 1 deletion hack/codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ instanceTypeTestData() {
GENERATED_FILE="pkg/fake/zz_generated.describe_instance_types.go"

go run hack/code/instancetype_testdata_gen/main.go --out-file ${GENERATED_FILE} \
--instance-types t3.large,m5.large,m5.xlarge,p3.8xlarge,g4dn.8xlarge,c6g.large,inf1.2xlarge,inf1.6xlarge,trn1.2xlarge,m5.metal,dl1.24xlarge,m6idn.32xlarge,t4g.small,t4g.xlarge,t4g.medium,g4ad.16xlarge
--instance-types t3.large,m5.large,m5.xlarge,p3.8xlarge,g4dn.8xlarge,c6g.large,inf2.xlarge,inf2.24xlarge,trn1.2xlarge,m5.metal,dl1.24xlarge,m6idn.32xlarge,t4g.small,t4g.xlarge,t4g.medium,g4ad.16xlarge

checkForUpdates "${GENERATED_FILE}"
}
Expand Down
2 changes: 1 addition & 1 deletion hack/docs/instancetypes_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ below are the resources available with some assumptions and after the instance o
resourceNameMap := sets.New[string]()

// Iterate through regions and take the union of instance types we discover across both
for _, region := range []string{"us-east-1", "us-west-2"} {
for _, region := range []string{"us-east-1", "us-east-2", "us-west-2"} {
sess := session.Must(session.NewSession(&aws.Config{Region: lo.ToPtr(region)}))
ec2api := ec2.New(sess)
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.k8s.aws_ec2nodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.3
controller-gen.kubebuilder.io/version: v0.16.5
name: ec2nodeclasses.karpenter.k8s.aws
spec:
group: karpenter.k8s.aws
Expand Down
36 changes: 30 additions & 6 deletions pkg/apis/v1/ec2nodeclass.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,40 @@ func (in *EC2NodeClass) AMIFamily() string {
if in.Spec.AMIFamily != nil {
return *in.Spec.AMIFamily
}
if term, ok := lo.Find(in.Spec.AMISelectorTerms, func(t AMISelectorTerm) bool {
return t.Alias != ""
}); ok {
return AMIFamilyFromAlias(term.Alias)
if alias := in.Alias(); alias != nil {
return alias.Family
}
// Unreachable: validation enforces that one of the above conditions must be met
return AMIFamilyCustom
}

func AMIFamilyFromAlias(alias string) string {
type Alias struct {
Family string
Version string
}

const (
AliasVersionLatest = "latest"
)

func (a *Alias) String() string {
return fmt.Sprintf("%s@%s", a.Family, a.Version)
}

func (in *EC2NodeClass) Alias() *Alias {
term, ok := lo.Find(in.Spec.AMISelectorTerms, func(term AMISelectorTerm) bool {
return term.Alias != ""
})
if !ok {
return nil
}
return &Alias{
Family: amiFamilyFromAlias(term.Alias),
Version: amiVersionFromAlias(term.Alias),
}
}

func amiFamilyFromAlias(alias string) string {
components := strings.Split(alias, "@")
if len(components) != 2 {
log.Fatalf("failed to parse AMI alias %q, invalid format", alias)
Expand All @@ -513,7 +537,7 @@ func AMIFamilyFromAlias(alias string) string {
return family
}

func AMIVersionFromAlias(alias string) string {
func amiVersionFromAlias(alias string) string {
components := strings.Split(alias, "@")
if len(components) != 2 {
log.Fatalf("failed to parse AMI alias %q, invalid format", alias)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
ResourceNVIDIAGPU corev1.ResourceName = "nvidia.com/gpu"
ResourceAMDGPU corev1.ResourceName = "amd.com/gpu"
ResourceAWSNeuron corev1.ResourceName = "aws.amazon.com/neuron"
ResourceAWSNeuronCore corev1.ResourceName = "aws.amazon.com/neuroncore"
ResourceHabanaGaudi corev1.ResourceName = "habana.ai/gaudi"
ResourceAWSPodENI corev1.ResourceName = "vpc.amazonaws.com/pod-eni"
ResourcePrivateIPv4Address corev1.ResourceName = "vpc.amazonaws.com/PrivateIPv4Address"
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
AssociatePublicIPAddressTTL = 5 * time.Minute
// SSMGetParametersByPathTTL is the time to drop SSM Parameters by path data. This only queries EKS Optimized AMI
// releases, so we should expect this to be updated relatively infrequently.
SSMGetParametersByPathTTL = 24 * time.Hour
SSMCacheTTL = 24 * time.Hour
// DiscoveredCapacityCacheTTL is the time to drop discovered resource capacity data per-instance type
// if it is not updated by a node creation event or refreshed during controller reconciliation
DiscoveredCapacityCacheTTL = 60 * 24 * time.Hour
Expand Down
29 changes: 23 additions & 6 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/awslabs/operatorpkg/controller"
"github.com/awslabs/operatorpkg/status"
"github.com/patrickmn/go-cache"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/karpenter/pkg/cloudprovider"

Expand All @@ -29,6 +30,7 @@ import (
controllersinstancetype "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype"
controllersinstancetypecapacity "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/instancetype/capacity"
controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
ssminvalidation "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"

servicesqs "github.com/aws/aws-sdk-go-v2/service/sqs"
Expand All @@ -39,7 +41,7 @@ import (

"sigs.k8s.io/karpenter/pkg/events"

"github.com/aws/karpenter-provider-aws/pkg/cache"
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
nodeclaimgarbagecollection "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/garbagecollection"
nodeclaimtagging "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclaim/tagging"
Expand All @@ -56,11 +58,25 @@ import (
config "github.com/aws/aws-sdk-go-v2/config"
)

func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Session, clk clock.Clock, kubeClient client.Client, recorder events.Recorder,
unavailableOfferings *cache.UnavailableOfferings, cloudProvider cloudprovider.CloudProvider, subnetProvider subnet.Provider,
securityGroupProvider securitygroup.Provider, instanceProfileProvider instanceprofile.Provider, instanceProvider instance.Provider,
pricingProvider pricing.Provider, amiProvider amifamily.Provider, launchTemplateProvider launchtemplate.Provider, instanceTypeProvider *instancetype.DefaultProvider) []controller.Controller {

func NewControllers(
ctx context.Context,
mgr manager.Manager,
sess *session.Session,
clk clock.Clock,
kubeClient client.Client,
recorder events.Recorder,
unavailableOfferings *awscache.UnavailableOfferings,
ssmCache *cache.Cache,
cloudProvider cloudprovider.CloudProvider,
subnetProvider subnet.Provider,
securityGroupProvider securitygroup.Provider,
instanceProfileProvider instanceprofile.Provider,
instanceProvider instance.Provider,
pricingProvider pricing.Provider,
amiProvider amifamily.Provider,
launchTemplateProvider launchtemplate.Provider,
instanceTypeProvider *instancetype.DefaultProvider,
) []controller.Controller {
controllers := []controller.Controller{
nodeclasshash.NewController(kubeClient),
nodeclassstatus.NewController(kubeClient, subnetProvider, securityGroupProvider, amiProvider, instanceProfileProvider, launchTemplateProvider),
Expand All @@ -70,6 +86,7 @@ func NewControllers(ctx context.Context, mgr manager.Manager, sess *session.Sess
controllerspricing.NewController(pricingProvider),
controllersinstancetype.NewController(instanceTypeProvider),
controllersinstancetypecapacity.NewController(kubeClient, instanceTypeProvider),
ssminvalidation.NewController(ssmCache, amiProvider),
status.NewController[*v1.EC2NodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter")),
}
if options.FromContext(ctx).InterruptionQueue != "" {
Expand Down
33 changes: 1 addition & 32 deletions pkg/controllers/nodeclass/status/ami_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
ExpectObjectReconciled(ctx, env.Client, statusController, nodeClass)
nodeClass = ExpectExists(ctx, env.Client, nodeClass)

Expect(len(nodeClass.Status.AMIs)).To(Equal(6))
Expect(len(nodeClass.Status.AMIs)).To(Equal(4))
Expect(nodeClass.Status.AMIs).To(ContainElements([]v1.AMI{
{
Name: "amd64-standard",
Expand Down Expand Up @@ -374,37 +374,6 @@ var _ = Describe("NodeClass AMI Status Controller", func() {
},
},
},
// Note: Bottlerocket uses the same AMI for nvidia and neuron, we use the nvidia AMI here
{
Name: "amd64-nvidia",
ID: "ami-amd64-nvidia",
Requirements: []corev1.NodeSelectorRequirement{
{
Key: corev1.LabelArchStable,
Operator: corev1.NodeSelectorOpIn,
Values: []string{karpv1.ArchitectureAmd64},
},
{
Key: v1.LabelInstanceAcceleratorCount,
Operator: corev1.NodeSelectorOpExists,
},
},
},
{
Name: "arm64-nvidia",
ID: "ami-arm64-nvidia",
Requirements: []corev1.NodeSelectorRequirement{
{
Key: corev1.LabelArchStable,
Operator: corev1.NodeSelectorOpIn,
Values: []string{karpv1.ArchitectureArm64},
},
{
Key: v1.LabelInstanceAcceleratorCount,
Operator: corev1.NodeSelectorOpExists,
},
},
},
}))
Expect(nodeClass.StatusConditions().IsTrue(v1.ConditionTypeAMIsReady)).To(BeTrue())
})
Expand Down
Loading

0 comments on commit ae2a098

Please sign in to comment.