From 97d81244bf9203293e91559350b723de8d2ecd66 Mon Sep 17 00:00:00 2001 From: Reed Schalo Date: Fri, 15 Nov 2024 17:08:19 -0800 Subject: [PATCH] feat: parameterize unavailable offering cache --- charts/karpenter/templates/deployment.yaml | 4 ++++ charts/karpenter/values.yaml | 4 ++++ pkg/cache/unavailableofferings.go | 5 +++-- .../interruption/interruption_benchmark_test.go | 2 +- pkg/controllers/interruption/suite_test.go | 2 +- pkg/operator/operator.go | 2 +- pkg/operator/options/options.go | 4 ++++ pkg/operator/options/suite_test.go | 12 +++++++++++- pkg/test/environment.go | 2 +- pkg/test/options.go | 2 ++ 10 files changed, 32 insertions(+), 7 deletions(-) diff --git a/charts/karpenter/templates/deployment.yaml b/charts/karpenter/templates/deployment.yaml index 398523b18335..c6a2270d4b14 100644 --- a/charts/karpenter/templates/deployment.yaml +++ b/charts/karpenter/templates/deployment.yaml @@ -148,6 +148,10 @@ spec: - name: RESERVED_ENIS value: "{{ . }}" {{- end }} + {{- with .Values.settings.unavailableOfferingsTTL }} + - name: UNAVAILABLE_OFFERINGS_TTL + value: "{{ . }}" + {{- end }} {{- with .Values.controller.env }} {{- toYaml . | nindent 12 }} {{- end }} diff --git a/charts/karpenter/values.yaml b/charts/karpenter/values.yaml index 8b06fc470af3..17e56bc85008 100644 --- a/charts/karpenter/values.yaml +++ b/charts/karpenter/values.yaml @@ -198,6 +198,10 @@ settings: # -- Reserved ENIs are not included in the calculations for max-pods or kube-reserved # This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html reservedENIs: "0" + # -- Duration (default value is '3m') for cache eviction for unavailable offerings. + # Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. + # Decimals are accepted and units can be used alone or together like '300ms', '1.5h' or '2h45m'" + unavailableOfferingsTTL: "3m" # -- Feature Gate configuration values. Feature Gates will follow the same graduation process and requirements as feature gates # in Kubernetes. More information here https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/#feature-gates-for-alpha-or-beta-features featureGates: diff --git a/pkg/cache/unavailableofferings.go b/pkg/cache/unavailableofferings.go index e909d4fce161..f268afa00b62 100644 --- a/pkg/cache/unavailableofferings.go +++ b/pkg/cache/unavailableofferings.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "sync/atomic" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -34,9 +35,9 @@ type UnavailableOfferings struct { SeqNum uint64 } -func NewUnavailableOfferings() *UnavailableOfferings { +func NewUnavailableOfferings(ttl time.Duration) *UnavailableOfferings { uo := &UnavailableOfferings{ - cache: cache.New(UnavailableOfferingsTTL, UnavailableOfferingsCleanupInterval), + cache: cache.New(ttl, UnavailableOfferingsCleanupInterval), SeqNum: 0, } uo.cache.OnEvicted(func(_ string, _ interface{}) { diff --git a/pkg/controllers/interruption/interruption_benchmark_test.go b/pkg/controllers/interruption/interruption_benchmark_test.go index 4c369e11900d..0b49f8a57253 100644 --- a/pkg/controllers/interruption/interruption_benchmark_test.go +++ b/pkg/controllers/interruption/interruption_benchmark_test.go @@ -113,7 +113,7 @@ func benchmarkNotificationController(b *testing.B, messageCount int) { // Load all the fundamental components before setting up the controllers recorder := coretest.NewEventRecorder() - unavailableOfferingsCache = awscache.NewUnavailableOfferings() + unavailableOfferingsCache = awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL) // Set-up the controllers interruptionController := interruption.NewController(env.Client, fakeClock, recorder, providers.sqsProvider, unavailableOfferingsCache) diff --git a/pkg/controllers/interruption/suite_test.go b/pkg/controllers/interruption/suite_test.go index 20d9ac2e30a1..aabf7bf5c8b6 100644 --- a/pkg/controllers/interruption/suite_test.go +++ b/pkg/controllers/interruption/suite_test.go @@ -79,7 +79,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...)) fakeClock = &clock.FakeClock{} - unavailableOfferingsCache = awscache.NewUnavailableOfferings() + unavailableOfferingsCache = awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL) sqsapi = &fake.SQSAPI{} sqsProvider = lo.Must(sqs.NewDefaultProvider(sqsapi, fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/test-cluster", fake.DefaultRegion, fake.DefaultAccount))) controller = interruption.NewController(env.Client, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, unavailableOfferingsCache) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 4789c8d376e3..5ccec99d5a14 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -136,7 +136,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns") } - unavailableOfferingsCache := awscache.NewUnavailableOfferings() + unavailableOfferingsCache := awscache.NewUnavailableOfferings(options.FromContext(ctx).UnavailableOfferingsTTL) ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval) subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval)) diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index e9684de8fa76..6e39643fdecf 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -24,6 +24,8 @@ import ( coreoptions "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/utils/env" + + awscache "github.com/aws/karpenter-provider-aws/pkg/cache" ) func init() { @@ -42,6 +44,7 @@ type Options struct { VMMemoryOverheadPercent float64 InterruptionQueue string ReservedENIs int + UnavailableOfferingsTTL time.Duration } func (o *Options) AddFlags(fs *coreoptions.FlagSet) { @@ -54,6 +57,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) { fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.") fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.") fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.") + fs.DurationVar(&o.UnavailableOfferingsTTL, "unavailable-offerings-ttl", env.WithDefaultDuration("UNAVAILABLE_OFFERINGS_TTL", awscache.UnavailableOfferingsTTL), "Duration (default value is '3m') for cache eviction for unavailable offerings. The flag accepts a value acceptable to time.ParseDuration.") } func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error { diff --git a/pkg/operator/options/suite_test.go b/pkg/operator/options/suite_test.go index 04281d8dacdf..48db2fafe569 100644 --- a/pkg/operator/options/suite_test.go +++ b/pkg/operator/options/suite_test.go @@ -65,7 +65,9 @@ var _ = Describe("Options", func() { "--isolated-vpc", "--vm-memory-overhead-percent", "0.1", "--interruption-queue", "env-cluster", - "--reserved-enis", "10") + "--reserved-enis", "10", + "--unavailable-offerings-ttl", "5m", + ) Expect(err).ToNot(HaveOccurred()) expectOptionsEqual(opts, test.Options(test.OptionsFields{ AssumeRoleARN: lo.ToPtr("env-role"), @@ -77,6 +79,7 @@ var _ = Describe("Options", func() { VMMemoryOverheadPercent: lo.ToPtr[float64](0.1), InterruptionQueue: lo.ToPtr("env-cluster"), ReservedENIs: lo.ToPtr(10), + UnavailableOfferingsTTL: lo.ToPtr(5 * time.Minute), })) }) It("should correctly fallback to env vars when CLI flags aren't set", func() { @@ -89,6 +92,7 @@ var _ = Describe("Options", func() { os.Setenv("VM_MEMORY_OVERHEAD_PERCENT", "0.1") os.Setenv("INTERRUPTION_QUEUE", "env-cluster") os.Setenv("RESERVED_ENIS", "10") + os.Setenv("UNAVAILABLE_OFFERINGS_TTL", "5m") // Add flags after we set the environment variables so that the parsing logic correctly refers // to the new environment variable values @@ -105,6 +109,7 @@ var _ = Describe("Options", func() { VMMemoryOverheadPercent: lo.ToPtr[float64](0.1), InterruptionQueue: lo.ToPtr("env-cluster"), ReservedENIs: lo.ToPtr(10), + UnavailableOfferingsTTL: lo.ToPtr(5 * time.Minute), })) }) @@ -132,6 +137,10 @@ var _ = Describe("Options", func() { err := opts.Parse(fs, "--cluster-name", "test-cluster", "--reserved-enis", "-1") Expect(err).To(HaveOccurred()) }) + It("should fail when invalid durations are parsed", func() { + err := opts.Parse(fs, "--cluster-name", "test-cluster", "--unavailable-offerings-ttl", "5abc") + Expect(err).To(HaveOccurred()) + }) }) }) @@ -146,4 +155,5 @@ func expectOptionsEqual(optsA *options.Options, optsB *options.Options) { Expect(optsA.VMMemoryOverheadPercent).To(Equal(optsB.VMMemoryOverheadPercent)) Expect(optsA.InterruptionQueue).To(Equal(optsB.InterruptionQueue)) Expect(optsA.ReservedENIs).To(Equal(optsB.ReservedENIs)) + Expect(optsA.UnavailableOfferingsTTL).To(Equal(optsB.UnavailableOfferingsTTL)) } diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 732e4b4dfa07..11b73e6be496 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -100,7 +100,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment ec2Cache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) kubernetesVersionCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) - unavailableOfferingsCache := awscache.NewUnavailableOfferings() + unavailableOfferingsCache := awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL) launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval) availableIPAdressCache := cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval) diff --git a/pkg/test/options.go b/pkg/test/options.go index 4657f28e9c53..22bfed2627db 100644 --- a/pkg/test/options.go +++ b/pkg/test/options.go @@ -34,6 +34,7 @@ type OptionsFields struct { VMMemoryOverheadPercent *float64 InterruptionQueue *string ReservedENIs *int + UnavailableOfferingsTTL *time.Duration } func Options(overrides ...OptionsFields) *options.Options { @@ -53,5 +54,6 @@ func Options(overrides ...OptionsFields) *options.Options { VMMemoryOverheadPercent: lo.FromPtrOr(opts.VMMemoryOverheadPercent, 0.075), InterruptionQueue: lo.FromPtrOr(opts.InterruptionQueue, ""), ReservedENIs: lo.FromPtrOr(opts.ReservedENIs, 0), + UnavailableOfferingsTTL: lo.FromPtrOr(opts.UnavailableOfferingsTTL, 3*time.Minute), } }