Skip to content

Commit

Permalink
feat: parameterize unavailable offering cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rschalo committed Nov 16, 2024
1 parent 5f55b1d commit 97d8124
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 7 deletions.
4 changes: 4 additions & 0 deletions charts/karpenter/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ spec:
- name: RESERVED_ENIS
value: "{{ . }}"
{{- end }}
{{- with .Values.settings.unavailableOfferingsTTL }}
- name: UNAVAILABLE_OFFERINGS_TTL
value: "{{ . }}"
{{- end }}
{{- with .Values.controller.env }}
{{- toYaml . | nindent 12 }}
{{- end }}
Expand Down
4 changes: 4 additions & 0 deletions charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ settings:
# -- Reserved ENIs are not included in the calculations for max-pods or kube-reserved
# This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html
reservedENIs: "0"
# -- Duration (default value is '3m') for cache eviction for unavailable offerings.
# Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'.
# Decimals are accepted and units can be used alone or together like '300ms', '1.5h' or '2h45m'"
unavailableOfferingsTTL: "3m"
# -- Feature Gate configuration values. Feature Gates will follow the same graduation process and requirements as feature gates
# in Kubernetes. More information here https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/#feature-gates-for-alpha-or-beta-features
featureGates:
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -34,9 +35,9 @@ type UnavailableOfferings struct {
SeqNum uint64
}

func NewUnavailableOfferings() *UnavailableOfferings {
func NewUnavailableOfferings(ttl time.Duration) *UnavailableOfferings {
uo := &UnavailableOfferings{
cache: cache.New(UnavailableOfferingsTTL, UnavailableOfferingsCleanupInterval),
cache: cache.New(ttl, UnavailableOfferingsCleanupInterval),
SeqNum: 0,
}
uo.cache.OnEvicted(func(_ string, _ interface{}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func benchmarkNotificationController(b *testing.B, messageCount int) {

// Load all the fundamental components before setting up the controllers
recorder := coretest.NewEventRecorder()
unavailableOfferingsCache = awscache.NewUnavailableOfferings()
unavailableOfferingsCache = awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL)

// Set-up the controllers
interruptionController := interruption.NewController(env.Client, fakeClock, recorder, providers.sqsProvider, unavailableOfferingsCache)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...))
fakeClock = &clock.FakeClock{}
unavailableOfferingsCache = awscache.NewUnavailableOfferings()
unavailableOfferingsCache = awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL)
sqsapi = &fake.SQSAPI{}
sqsProvider = lo.Must(sqs.NewDefaultProvider(sqsapi, fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/test-cluster", fake.DefaultRegion, fake.DefaultAccount)))
controller = interruption.NewController(env.Client, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, unavailableOfferingsCache)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
}

unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(options.FromContext(ctx).UnavailableOfferingsTTL)
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)

subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/utils/env"

awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
)

func init() {
Expand All @@ -42,6 +44,7 @@ type Options struct {
VMMemoryOverheadPercent float64
InterruptionQueue string
ReservedENIs int
UnavailableOfferingsTTL time.Duration
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -54,6 +57,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
fs.DurationVar(&o.UnavailableOfferingsTTL, "unavailable-offerings-ttl", env.WithDefaultDuration("UNAVAILABLE_OFFERINGS_TTL", awscache.UnavailableOfferingsTTL), "Duration (default value is '3m') for cache eviction for unavailable offerings. The flag accepts a value acceptable to time.ParseDuration.")
}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
Expand Down
12 changes: 11 additions & 1 deletion pkg/operator/options/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ var _ = Describe("Options", func() {
"--isolated-vpc",
"--vm-memory-overhead-percent", "0.1",
"--interruption-queue", "env-cluster",
"--reserved-enis", "10")
"--reserved-enis", "10",
"--unavailable-offerings-ttl", "5m",
)
Expect(err).ToNot(HaveOccurred())
expectOptionsEqual(opts, test.Options(test.OptionsFields{
AssumeRoleARN: lo.ToPtr("env-role"),
Expand All @@ -77,6 +79,7 @@ var _ = Describe("Options", func() {
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
UnavailableOfferingsTTL: lo.ToPtr(5 * time.Minute),
}))
})
It("should correctly fallback to env vars when CLI flags aren't set", func() {
Expand All @@ -89,6 +92,7 @@ var _ = Describe("Options", func() {
os.Setenv("VM_MEMORY_OVERHEAD_PERCENT", "0.1")
os.Setenv("INTERRUPTION_QUEUE", "env-cluster")
os.Setenv("RESERVED_ENIS", "10")
os.Setenv("UNAVAILABLE_OFFERINGS_TTL", "5m")

// Add flags after we set the environment variables so that the parsing logic correctly refers
// to the new environment variable values
Expand All @@ -105,6 +109,7 @@ var _ = Describe("Options", func() {
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
UnavailableOfferingsTTL: lo.ToPtr(5 * time.Minute),
}))
})

Expand Down Expand Up @@ -132,6 +137,10 @@ var _ = Describe("Options", func() {
err := opts.Parse(fs, "--cluster-name", "test-cluster", "--reserved-enis", "-1")
Expect(err).To(HaveOccurred())
})
It("should fail when invalid durations are parsed", func() {
err := opts.Parse(fs, "--cluster-name", "test-cluster", "--unavailable-offerings-ttl", "5abc")
Expect(err).To(HaveOccurred())
})
})
})

Expand All @@ -146,4 +155,5 @@ func expectOptionsEqual(optsA *options.Options, optsB *options.Options) {
Expect(optsA.VMMemoryOverheadPercent).To(Equal(optsB.VMMemoryOverheadPercent))
Expect(optsA.InterruptionQueue).To(Equal(optsB.InterruptionQueue))
Expect(optsA.ReservedENIs).To(Equal(optsB.ReservedENIs))
Expect(optsA.UnavailableOfferingsTTL).To(Equal(optsB.UnavailableOfferingsTTL))
}
2 changes: 1 addition & 1 deletion pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
ec2Cache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
kubernetesVersionCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL)
launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
availableIPAdressCache := cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval)
Expand Down
2 changes: 2 additions & 0 deletions pkg/test/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type OptionsFields struct {
VMMemoryOverheadPercent *float64
InterruptionQueue *string
ReservedENIs *int
UnavailableOfferingsTTL *time.Duration
}

func Options(overrides ...OptionsFields) *options.Options {
Expand All @@ -53,5 +54,6 @@ func Options(overrides ...OptionsFields) *options.Options {
VMMemoryOverheadPercent: lo.FromPtrOr(opts.VMMemoryOverheadPercent, 0.075),
InterruptionQueue: lo.FromPtrOr(opts.InterruptionQueue, ""),
ReservedENIs: lo.FromPtrOr(opts.ReservedENIs, 0),
UnavailableOfferingsTTL: lo.FromPtrOr(opts.UnavailableOfferingsTTL, 3*time.Minute),
}
}

0 comments on commit 97d8124

Please sign in to comment.