Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: parameterize unavailable offering cache #7388

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/karpenter/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ spec:
- name: RESERVED_ENIS
value: "{{ . }}"
{{- end }}
{{- with .Values.settings.unavailableOfferingsTTL }}
- name: UNAVAILABLE_OFFERINGS_TTL
value: "{{ . }}"
{{- end }}
{{- with .Values.controller.env }}
{{- toYaml . | nindent 12 }}
{{- end }}
Expand Down
4 changes: 4 additions & 0 deletions charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ settings:
# -- Reserved ENIs are not included in the calculations for max-pods or kube-reserved
# This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html
reservedENIs: "0"
# -- Duration (default value is '3m') for cache eviction for unavailable offerings.
# Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'.
# Decimals are accepted and units can be used alone or together like '300ms', '1.5h' or '2h45m'"
unavailableOfferingsTTL: "3m"
# -- Feature Gate configuration values. Feature Gates will follow the same graduation process and requirements as feature gates
# in Kubernetes. More information here https://kubernetes.io/docs/reference/command-line-tools-reference/feature-gates/#feature-gates-for-alpha-or-beta-features
featureGates:
Expand Down
2 changes: 1 addition & 1 deletion hack/docs/instancetypes_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ below are the resources available with some assumptions and after the instance o
ec2api,
cfg.Region,
),
awscache.NewUnavailableOfferings(),
awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL),
),
)
if err = instanceTypeProvider.UpdateInstanceTypes(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion hack/tools/launchtemplate_counter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
ec2api,
cfg.Region,
),
awscache.NewUnavailableOfferings(),
awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL),
),
)
if err := instanceTypeProvider.UpdateInstanceTypes(ctx); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
Expand All @@ -36,9 +37,9 @@ type UnavailableOfferings struct {
SeqNum uint64
}

func NewUnavailableOfferings() *UnavailableOfferings {
func NewUnavailableOfferings(ttl time.Duration) *UnavailableOfferings {
uo := &UnavailableOfferings{
cache: cache.New(UnavailableOfferingsTTL, UnavailableOfferingsCleanupInterval),
cache: cache.New(ttl, UnavailableOfferingsCleanupInterval),
SeqNum: 0,
}
uo.cache.OnEvicted(func(_ string, _ interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
env = coretest.NewEnvironment(coretest.WithCRDs(apis.CRDs...), coretest.WithCRDs(v1alpha1.CRDs...))
fakeClock = &clock.FakeClock{}
unavailableOfferingsCache = awscache.NewUnavailableOfferings()
unavailableOfferingsCache = awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL)
sqsapi = &fake.SQSAPI{}
sqsProvider = lo.Must(sqs.NewDefaultProvider(sqsapi, fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/test-cluster", fake.DefaultRegion, fake.DefaultAccount)))
controller = interruption.NewController(env.Client, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, unavailableOfferingsCache)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
} else {
log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
}
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(options.FromContext(ctx).UnavailableOfferingsTTL)
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)

subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"flag"
"fmt"
"os"
"time"

coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/utils/env"

awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
"github.com/aws/karpenter-provider-aws/pkg/utils"
)

Expand All @@ -41,6 +43,7 @@ type Options struct {
VMMemoryOverheadPercent float64
InterruptionQueue string
ReservedENIs int
UnavailableOfferingsTTL time.Duration
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -51,6 +54,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types when cached information is unavailable.")
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
fs.DurationVar(&o.UnavailableOfferingsTTL, "unavailable-offerings-ttl", env.WithDefaultDuration("UNAVAILABLE_OFFERINGS_TTL", awscache.UnavailableOfferingsTTL), "Duration (default value is '3m') for cache eviction for unavailable offerings. The flag accepts a value acceptable to time.ParseDuration.")
}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
Expand Down
13 changes: 12 additions & 1 deletion pkg/operator/options/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"flag"
"os"
"testing"
"time"

"github.com/samber/lo"
coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
Expand Down Expand Up @@ -62,7 +63,9 @@ var _ = Describe("Options", func() {
"--isolated-vpc",
"--vm-memory-overhead-percent", "0.1",
"--interruption-queue", "env-cluster",
"--reserved-enis", "10")
"--reserved-enis", "10",
"--unavailable-offerings-ttl", "5m",
)
Expect(err).ToNot(HaveOccurred())
expectOptionsEqual(opts, test.Options(test.OptionsFields{
ClusterCABundle: lo.ToPtr("env-bundle"),
Expand All @@ -72,6 +75,7 @@ var _ = Describe("Options", func() {
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
UnavailableOfferingsTTL: lo.ToPtr(5 * time.Minute),
}))
})
It("should correctly fallback to env vars when CLI flags aren't set", func() {
Expand All @@ -82,6 +86,7 @@ var _ = Describe("Options", func() {
os.Setenv("VM_MEMORY_OVERHEAD_PERCENT", "0.1")
os.Setenv("INTERRUPTION_QUEUE", "env-cluster")
os.Setenv("RESERVED_ENIS", "10")
os.Setenv("UNAVAILABLE_OFFERINGS_TTL", "5m")

// Add flags after we set the environment variables so that the parsing logic correctly refers
// to the new environment variable values
Expand All @@ -96,6 +101,7 @@ var _ = Describe("Options", func() {
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
UnavailableOfferingsTTL: lo.ToPtr(5 * time.Minute),
}))
})

Expand All @@ -119,6 +125,10 @@ var _ = Describe("Options", func() {
err := opts.Parse(fs, "--cluster-name", "test-cluster", "--reserved-enis", "-1")
Expect(err).To(HaveOccurred())
})
It("should fail when invalid durations are parsed", func() {
err := opts.Parse(fs, "--cluster-name", "test-cluster", "--unavailable-offerings-ttl", "5abc")
Expect(err).To(HaveOccurred())
})
})
})

Expand All @@ -131,4 +141,5 @@ func expectOptionsEqual(optsA *options.Options, optsB *options.Options) {
Expect(optsA.VMMemoryOverheadPercent).To(Equal(optsB.VMMemoryOverheadPercent))
Expect(optsA.InterruptionQueue).To(Equal(optsB.InterruptionQueue))
Expect(optsA.ReservedENIs).To(Equal(optsB.ReservedENIs))
Expect(optsA.UnavailableOfferingsTTL).To(Equal(optsB.UnavailableOfferingsTTL))
}
2 changes: 1 addition & 1 deletion pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment
kubernetesVersionCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
instanceTypeCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
discoveredCapacityCache := cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval)
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
unavailableOfferingsCache := awscache.NewUnavailableOfferings(awscache.UnavailableOfferingsTTL)
launchTemplateCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
subnetCache := cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval)
availableIPAdressCache := cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval)
Expand Down
3 changes: 3 additions & 0 deletions pkg/test/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test

import (
"fmt"
"time"

"github.com/imdario/mergo"
"github.com/samber/lo"
Expand All @@ -31,6 +32,7 @@ type OptionsFields struct {
VMMemoryOverheadPercent *float64
InterruptionQueue *string
ReservedENIs *int
UnavailableOfferingsTTL *time.Duration
}

func Options(overrides ...OptionsFields) *options.Options {
Expand All @@ -48,5 +50,6 @@ func Options(overrides ...OptionsFields) *options.Options {
VMMemoryOverheadPercent: lo.FromPtrOr(opts.VMMemoryOverheadPercent, 0.075),
InterruptionQueue: lo.FromPtrOr(opts.InterruptionQueue, ""),
ReservedENIs: lo.FromPtrOr(opts.ReservedENIs, 0),
UnavailableOfferingsTTL: lo.FromPtrOr(opts.UnavailableOfferingsTTL, 3*time.Minute),
}
}
Loading