Skip to content

Commit

Permalink
rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Oct 19, 2023
1 parent e2e1496 commit c9a12de
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 37 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/PuerkitoBio/goquery v1.8.1
github.com/avast/retry-go v3.0.0+incompatible
github.com/aws/aws-sdk-go v1.45.28
github.com/aws/karpenter-core v0.31.1-0.20231018213242-4555b8dfcd10
github.com/aws/karpenter-core v0.31.1-0.20231019061253-604d23cb7140
github.com/aws/karpenter/tools/kompat v0.0.0-20231010173459-62c25a3ea85c
github.com/imdario/mergo v0.3.16
github.com/mitchellh/hashstructure/v2 v2.0.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHS
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.45.28 h1:p2ATcaK6ffSw4yZ2UAGzgRyRXwKyOJY6ZCiKqj5miJE=
github.com/aws/aws-sdk-go v1.45.28/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/karpenter-core v0.31.1-0.20231018213242-4555b8dfcd10 h1:ZKoz01A7DXGVEi0V4tGMd0LbLHEesIzFzYlFQTMQXSE=
github.com/aws/karpenter-core v0.31.1-0.20231018213242-4555b8dfcd10/go.mod h1:rb3kp/3cj38tACF6udfpmIvKoQMwirSVoHNlrd66LyE=
github.com/aws/karpenter-core v0.31.1-0.20231019061253-604d23cb7140 h1:6996xW3nY1C8Ettn25GNltKEu9RoKv6RWrzibuVGWbk=
github.com/aws/karpenter-core v0.31.1-0.20231019061253-604d23cb7140/go.mod h1:rb3kp/3cj38tACF6udfpmIvKoQMwirSVoHNlrd66LyE=
github.com/aws/karpenter/tools/kompat v0.0.0-20231010173459-62c25a3ea85c h1:oXWwIttmjYLbBKhLazG21aQvpJ3NOOr8IXhCJ/p6e/M=
github.com/aws/karpenter/tools/kompat v0.0.0-20231010173459-62c25a3ea85c/go.mod h1:l/TIBsaCx/IrOr0Xvlj/cHLOf05QzuQKEZ1hx2XWmfU=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down
4 changes: 3 additions & 1 deletion hack/docs/configuration_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func main() {
topDoc := fmt.Sprintf("%s%s\n\n", startDocSections[0], genStart)
bottomDoc := fmt.Sprintf("\n%s%s", genEnd, endDocSections[1])

fs := flag.NewFlagSet("karpenter", flag.ContinueOnError)
fs := &coreoptions.FlagSet {
FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError),
}
(&coreoptions.Options{}).AddFlags(fs)
(&options.Options{}).AddFlags(fs)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewControllers(ctx context.Context, sess *session.Session, clk clock.Clock,
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider, linkController),
nodeclaimtagging.NewController(kubeClient, instanceProvider),
}
if options.FromContext(ctx).InterruptionQueueName != "" {
if options.FromContext(ctx).InterruptionQueue != "" {
controllers = append(controllers, interruption.NewController(kubeClient, clk, recorder, interruption.NewSQSProvider(sqs.New(sess)), unavailableOfferings))
}
if options.FromContext(ctx).IsolatedVPC {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func NewController(kubeClient client.Client, clk clock.Clock, recorder events.Re
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("queue", options.FromContext(ctx).InterruptionQueueName))
if c.cm.HasChanged(options.FromContext(ctx).InterruptionQueueName, nil) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("queue", options.FromContext(ctx).InterruptionQueue))
if c.cm.HasChanged(options.FromContext(ctx).InterruptionQueue, nil) {
logging.FromContext(ctx).Debugf("watching interruption queue")
}
sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/interruption/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewSQSProvider(client sqsiface.SQSAPI) *SQSProvider {
}
provider.queueURL.Resolve = func(ctx context.Context) (string, error) {
input := &sqs.GetQueueUrlInput{
QueueName: aws.String(options.FromContext(ctx).InterruptionQueueName),
QueueName: aws.String(options.FromContext(ctx).InterruptionQueue),
}
ret, err := provider.client.GetQueueUrlWithContext(ctx, input)
if err != nil {
Expand All @@ -66,12 +66,12 @@ func (s *SQSProvider) QueueExists(ctx context.Context) (bool, error) {
}

func (s *SQSProvider) DiscoverQueueURL(ctx context.Context) (string, error) {
if options.FromContext(ctx).InterruptionQueueName != lo.FromPtr(s.queueName.Load()) {
if options.FromContext(ctx).InterruptionQueue != lo.FromPtr(s.queueName.Load()) {
res, err := s.queueURL.TryGet(ctx, atomic.IgnoreCacheOption)
if err != nil {
return res, err
}
s.queueName.Store(lo.ToPtr(options.FromContext(ctx).InterruptionQueueName))
s.queueName.Store(lo.ToPtr(options.FromContext(ctx).InterruptionQueue))
return res, nil
}
return s.queueURL.TryGet(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var _ = AfterSuite(func() {
var _ = BeforeEach(func() {
ctx = coreoptions.ToContext(ctx, coretest.Options())
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{
InterruptionQueueName: lo.ToPtr("test-cluster"),
InterruptionQueue: lo.ToPtr("test-cluster"),
}))
ctx = settings.ToContext(ctx, test.Settings())
unavailableOfferingsCache.Flush()
Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type Options struct {
ClusterEndpoint string
IsolatedVPC bool
VMMemoryOverheadPercent float64
InterruptionQueueName string
InterruptionQueue string
ReservedENIs int

setFlags map[string]bool
setFlags map[string]bool
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -58,7 +58,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.StringVar(&o.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with. If not specified, will discover the cluster endpoint using DescribeCluster API.")
fs.BoolVarWithEnv(&o.IsolatedVPC, "isolated-vpc", "ISOLATED_VPC", false, "If true, then assume we can't reach AWS services which don't have a VPC endpoint. This also has the effect of disabling look-ups to the AWS pricing endpoint.")
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.StringVar(&o.InterruptionQueueName, "interruption-queue-name", env.WithDefaultString("INTERRUPTION_QUEUE_NAME", ""), "Interruption queue is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func (o *Options) MergeSettings(ctx context.Context) {
mergeField(&o.ClusterEndpoint, s.ClusterEndpoint, o.setFlags["cluster-endpoint"])
mergeField(&o.IsolatedVPC, s.IsolatedVPC, o.setFlags["isolated-vpc"])
mergeField(&o.VMMemoryOverheadPercent, s.VMMemoryOverheadPercent, o.setFlags["vm-memory-overhead-percent"])
mergeField(&o.InterruptionQueueName, s.InterruptionQueueName, o.setFlags["interruption-queue-name"])
mergeField(&o.InterruptionQueue, s.InterruptionQueueName, o.setFlags["interruption-queue"])
mergeField(&o.ReservedENIs, s.ReservedENIs, o.setFlags["reserved-enis"])
if err := o.validateRequiredFields(); err != nil {
panic(fmt.Errorf("checking required fields, %w", err))
Expand Down
23 changes: 12 additions & 11 deletions pkg/operator/options/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var _ = Describe("Options", func() {
"CLUSTER_NAME",
"CLUSTER_ENDPOINT",
"ISOLATED_VPC",
"VM_MOMORY_OVERHEAD_PERCENT",
"INTERRUPTION_QUEUE_NAME",
"VM_MEMORY_OVERHEAD_PERCENT",
"INTERRUPTION_QUEUE",
"RESERVED_ENIS",
}

Expand Down Expand Up @@ -99,7 +99,7 @@ var _ = Describe("Options", func() {
"--cluster-endpoint", "https://options-cluster",
"--isolated-vpc",
"--vm-memory-overhead-percent", "0.1",
"--interruption-queue-name", "options-cluster",
"--interruption-queue", "options-cluster",
"--reserved-enis", "10",
)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -123,7 +123,7 @@ var _ = Describe("Options", func() {
ClusterEndpoint: lo.ToPtr("https://options-cluster"),
IsolatedVPC: lo.ToPtr(true),
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueueName: lo.ToPtr("options-cluster"),
InterruptionQueue: lo.ToPtr("options-cluster"),
ReservedENIs: lo.ToPtr(10),
}))

Expand Down Expand Up @@ -151,7 +151,7 @@ var _ = Describe("Options", func() {
ClusterEndpoint: lo.ToPtr("https://settings-cluster"),
IsolatedVPC: lo.ToPtr(true),
VMMemoryOverheadPercent: lo.ToPtr[float64](0.05),
InterruptionQueueName: lo.ToPtr("settings-cluster"),
InterruptionQueue: lo.ToPtr("settings-cluster"),
ReservedENIs: lo.ToPtr(8),
}))

Expand All @@ -163,7 +163,7 @@ var _ = Describe("Options", func() {
"--cluster-ca-bundle", "options-bundle",
"--cluster-name", "options-cluster",
"--cluster-endpoint", "https://options-cluster",
"--interruption-queue-name", "options-cluster",
"--interruption-queue", "options-cluster",
)
Expect(err).ToNot(HaveOccurred())
ctx = settings.ToContext(ctx, &settings.Settings{
Expand All @@ -186,7 +186,7 @@ var _ = Describe("Options", func() {
ClusterEndpoint: lo.ToPtr("https://options-cluster"),
IsolatedVPC: lo.ToPtr(true),
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueueName: lo.ToPtr("options-cluster"),
InterruptionQueue: lo.ToPtr("options-cluster"),
ReservedENIs: lo.ToPtr(10),
}))
})
Expand All @@ -199,13 +199,14 @@ var _ = Describe("Options", func() {
os.Setenv("CLUSTER_ENDPOINT", "https://env-cluster")
os.Setenv("ISOLATED_VPC", "true")
os.Setenv("VM_MEMORY_OVERHEAD_PERCENT", "0.1")
os.Setenv("INTERRUPTION_QUEUE_NAME", "env-cluster")
os.Setenv("INTERRUPTION_QUEUE", "env-cluster")
os.Setenv("RESERVED_ENIS", "10")
fs = &coreoptions.FlagSet{
FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError),
}
opts.AddFlags(fs)
opts.Parse(fs)
err := opts.Parse(fs)
Expect(err).ToNot(HaveOccurred())
expectOptionsEqual(opts, test.Options(test.OptionsFields{
AssumeRoleARN: lo.ToPtr("env-role"),
AssumeRoleDuration: lo.ToPtr(20 * time.Minute),
Expand All @@ -214,7 +215,7 @@ var _ = Describe("Options", func() {
ClusterEndpoint: lo.ToPtr("https://env-cluster"),
IsolatedVPC: lo.ToPtr(true),
VMMemoryOverheadPercent: lo.ToPtr[float64](0.1),
InterruptionQueueName: lo.ToPtr("env-cluster"),
InterruptionQueue: lo.ToPtr("env-cluster"),
ReservedENIs: lo.ToPtr(10),
}))
})
Expand Down Expand Up @@ -259,6 +260,6 @@ func expectOptionsEqual(optsA *options.Options, optsB *options.Options) {
Expect(optsA.ClusterEndpoint).To(Equal(optsB.ClusterEndpoint))
Expect(optsA.IsolatedVPC).To(Equal(optsB.IsolatedVPC))
Expect(optsA.VMMemoryOverheadPercent).To(Equal(optsB.VMMemoryOverheadPercent))
Expect(optsA.InterruptionQueueName).To(Equal(optsB.InterruptionQueueName))
Expect(optsA.InterruptionQueue).To(Equal(optsB.InterruptionQueue))
Expect(optsA.ReservedENIs).To(Equal(optsB.ReservedENIs))
}
4 changes: 2 additions & 2 deletions pkg/providers/instance/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ var _ = AfterSuite(func() {
})

var _ = BeforeEach(func() {
ctx = options.ToContext(ctx, opts)
ctx = coresettings.ToContext(ctx, coretest.Settings())
ctx = coreoptions.ToContext(ctx, coretest.Options())
ctx = options.ToContext(ctx, test.Options())
ctx = settings.ToContext(ctx, test.Settings())
awsEnv.Reset()
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type OptionsFields struct {
ClusterEndpoint *string
IsolatedVPC *bool
VMMemoryOverheadPercent *float64
InterruptionQueueName *string
InterruptionQueue *string
ReservedENIs *int
}

Expand All @@ -51,7 +51,7 @@ func Options(overrides ...OptionsFields) *options.Options {
ClusterEndpoint: lo.FromPtrOr(opts.ClusterEndpoint, "https://test-cluster"),
IsolatedVPC: lo.FromPtrOr(opts.IsolatedVPC, false),
VMMemoryOverheadPercent: lo.FromPtrOr(opts.VMMemoryOverheadPercent, 0.075),
InterruptionQueueName: lo.FromPtrOr(opts.InterruptionQueueName, ""),
InterruptionQueue: lo.FromPtrOr(opts.InterruptionQueue, ""),
ReservedENIs: lo.FromPtrOr(opts.ReservedENIs, 0),
}
}
12 changes: 6 additions & 6 deletions test/pkg/environment/aws/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ type Environment struct {

SQSProvider *interruption.SQSProvider

ClusterName string
ClusterEndpoint string
InterruptionQueueName string
ClusterName string
ClusterEndpoint string
InterruptionQueue string
}

func NewEnvironment(t *testing.T) *Environment {
Expand Down Expand Up @@ -86,9 +86,9 @@ func NewEnvironment(t *testing.T) *Environment {
SQSProvider: interruption.NewSQSProvider(sqs.New(session)),
TimeStreamAPI: GetTimeStreamAPI(session),

ClusterName: lo.Must(os.LookupEnv("CLUSTER_NAME")),
ClusterEndpoint: lo.Must(os.LookupEnv("CLUSTER_ENDPOINT")),
InterruptionQueueName: lo.Must(os.LookupEnv("INTERRUPTION_QUEUE_NAME")),
ClusterName: lo.Must(os.LookupEnv("CLUSTER_NAME")),
ClusterEndpoint: lo.Must(os.LookupEnv("CLUSTER_ENDPOINT")),
InterruptionQueue: lo.Must(os.LookupEnv("INTERRUPTION_QUEUE")),
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/suites/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestInterruption(t *testing.T) {

var _ = BeforeEach(func() {
env.Context = options.ToContext(env.Context, awstest.Options(awstest.OptionsFields{
InterruptionQueueName: lo.ToPtr(env.InterruptionQueueName),
InterruptionQueue: lo.ToPtr(env.InterruptionQueue),
}))
env.BeforeEach()
env.ExpectQueueExists()
Expand Down

0 comments on commit c9a12de

Please sign in to comment.