Skip to content

Commit

Permalink
Convert singleton reconcilers to operatorpkg
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Jun 3, 2024
1 parent 5542dbf commit 1254647
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 62 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ require (
sigs.k8s.io/yaml v1.4.0
)

replace sigs.k8s.io/karpenter => github.com/jonathan-innis/karpenter v0.0.4-0.20240603073850-5253faaaa7a1

require (
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonathan-innis/karpenter v0.0.4-0.20240603073850-5253faaaa7a1 h1:LaxX8BD9S6arcmKVv2r8HIBSs/s0Zd57qALieXR+2HU=
github.com/jonathan-innis/karpenter v0.0.4-0.20240603073850-5253faaaa7a1/go.mod h1:5XYrIz9Bi7HgQyaUsx7O08ft+TJjrH+htlnPq8Sz9J8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down Expand Up @@ -761,8 +763,6 @@ sigs.k8s.io/controller-runtime v0.18.3 h1:B5Wmmo8WMWK7izei+2LlXLVDGzMwAHBNLX68lw
sigs.k8s.io/controller-runtime v0.18.3/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/karpenter v0.37.0 h1:eUFD9hJ2mpZrw31OUYhpbxLWEDmbXT05wX27dZB2E5o=
sigs.k8s.io/karpenter v0.37.0/go.mod h1:5XYrIz9Bi7HgQyaUsx7O08ft+TJjrH+htlnPq8Sz9J8=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
Expand Down
4 changes: 1 addition & 3 deletions hack/code/prices_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
ec22 "github.com/aws/aws-sdk-go/service/ec2"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

controllerspricing "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/pricing"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
Expand Down Expand Up @@ -110,7 +108,7 @@ func main() {
log.Println("fetching for", region)
pricingProvider := pricing.NewDefaultProvider(ctx, pricing.NewAPI(sess, region), ec2, region)
controller := controllerspricing.NewController(pricingProvider)
_, err := controller.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{}})
_, err := controller.Reconcile(ctx)
if err != nil {
log.Fatalf("failed to initialize pricing provider %s", err)
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ import (
"time"

sqsapi "github.com/aws/aws-sdk-go/service/sqs"
"github.com/awslabs/operatorpkg/singleton"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/controller"
"sigs.k8s.io/karpenter/pkg/operator/injection"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
Expand All @@ -44,7 +48,6 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/utils"

"sigs.k8s.io/karpenter/pkg/events"
corecontroller "sigs.k8s.io/karpenter/pkg/operator/controller"
)

type Action string
Expand Down Expand Up @@ -81,7 +84,8 @@ func NewController(kubeClient client.Client, clk clock.Clock, recorder events.Re
}
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "interruption")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name()))
if c.cm.HasChanged(c.sqsProvider.Name(), nil) {
log.FromContext(ctx).V(1).Info("watching interruption queue")
Expand All @@ -91,7 +95,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("getting messages from queue, %w", err)
}
if len(sqsMessages) == 0 {
return reconcile.Result{}, nil
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
nodeClaimInstanceIDMap, err := c.makeNodeClaimInstanceIDMap(ctx)
if err != nil {
Expand Down Expand Up @@ -119,13 +123,14 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
if err = multierr.Combine(errs...); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return corecontroller.NewSingletonManagedBy(m).
return controllerruntime.NewControllerManagedBy(m).
Named("interruption").
Complete(c)
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}

// parseMessage parses the passed SQS message into an internal Message interface
Expand Down
21 changes: 10 additions & 11 deletions pkg/controllers/interruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
Expand Down Expand Up @@ -119,7 +118,7 @@ var _ = Describe("InterruptionHandling", func() {
ExpectMessagesCreated(spotInterruptionMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID))))
ExpectApplied(ctx, env.Client, nodeClaim, node)

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
ExpectNotFound(ctx, env.Client, nodeClaim)
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1))
Expand All @@ -128,7 +127,7 @@ var _ = Describe("InterruptionHandling", func() {
ExpectMessagesCreated(scheduledChangeMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID))))
ExpectApplied(ctx, env.Client, nodeClaim, node)

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
ExpectNotFound(ctx, env.Client, nodeClaim)
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1))
Expand All @@ -153,7 +152,7 @@ var _ = Describe("InterruptionHandling", func() {
messages = append(messages, stateChangeMessage(instanceID, state))
}
ExpectMessagesCreated(messages...)
ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) client.Object { return nc })...)
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(4))
Expand Down Expand Up @@ -183,7 +182,7 @@ var _ = Describe("InterruptionHandling", func() {
messages = append(messages, spotInterruptionMessage(id))
}
ExpectMessagesCreated(messages...)
ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(nc *corev1beta1.NodeClaim, _ int) client.Object { return nc })...)
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(100))
Expand All @@ -199,15 +198,15 @@ var _ = Describe("InterruptionHandling", func() {

ExpectMessagesCreated(badMessage)

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1))
})
It("should delete a state change message when the state isn't in accepted states", func() {
ExpectMessagesCreated(stateChangeMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID)), "creating"))
ExpectApplied(ctx, env.Client, nodeClaim, node)

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
ExpectExists(ctx, env.Client, nodeClaim)
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1))
Expand All @@ -221,7 +220,7 @@ var _ = Describe("InterruptionHandling", func() {
ExpectMessagesCreated(spotInterruptionMessage(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID))))
ExpectApplied(ctx, env.Client, nodeClaim, node)

ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
Expect(sqsapi.ReceiveMessageBehavior.SuccessfulCalls()).To(Equal(1))
ExpectNotFound(ctx, env.Client, nodeClaim)
Expect(sqsapi.DeleteMessageBehavior.SuccessfulCalls()).To(Equal(1))
Expand All @@ -235,15 +234,15 @@ var _ = Describe("InterruptionHandling", func() {
var _ = Describe("Error Handling", func() {
It("should send an error on polling when QueueNotExists", func() {
sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode(servicesqs.ErrCodeQueueDoesNotExist), fake.MaxCalls(0))
ExpectReconcileFailed(ctx, controller, types.NamespacedName{})
_ = ExpectSingletonReconcileFailed(ctx, controller)
})
It("should send an error on polling when AccessDenied", func() {
sqsapi.ReceiveMessageBehavior.Error.Set(awsErrWithCode("AccessDenied"), fake.MaxCalls(0))
ExpectReconcileFailed(ctx, controller, types.NamespacedName{})
_ = ExpectSingletonReconcileFailed(ctx, controller)
})
It("should not return an error when deleting a nodeClaim that is already deleted", func() {
ExpectMessagesCreated(spotInterruptionMessage(fake.InstanceID()))
ExpectReconcileSucceeded(ctx, controller, types.NamespacedName{})
ExpectSingletonReconciled(ctx, controller)
})
})

Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@ import (
"fmt"
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/operator/injection"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/operator/controller"
)

type Controller struct {
Expand All @@ -49,7 +51,9 @@ func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudPr
}
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodeclaim.garbagecollection")

// We LIST machines on the CloudProvider BEFORE we grab Machines/Nodes on the cluster so that we make sure that, if
// LISTing instances takes a long time, our information is more updated by the time we get to Machine and Node LIST
// This works since our CloudProvider instances are deleted based on whether the Machine exists or not, not vise-versa
Expand Down Expand Up @@ -105,7 +109,8 @@ func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *v1beta1.Node
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controller.NewSingletonManagedBy(m).
return controllerruntime.NewControllerManagedBy(m).
Named("nodeclaim.garbagecollection").
Complete(c)
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}
20 changes: 9 additions & 11 deletions pkg/controllers/nodeclaim/garbagecollection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/client-go/tools/record"
corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"
corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand Down Expand Up @@ -136,7 +134,7 @@ var _ = Describe("GarbageCollection", func() {
instance.LaunchTime = aws.Time(time.Now().Add(-time.Minute))
awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance)

ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)
_, err := cloudProvider.Get(ctx, providerID)
Expect(err).To(HaveOccurred())
Expect(corecloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue())
Expand All @@ -151,7 +149,7 @@ var _ = Describe("GarbageCollection", func() {
})
ExpectApplied(ctx, env.Client, node)

ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)
_, err := cloudProvider.Get(ctx, providerID)
Expect(err).To(HaveOccurred())
Expect(corecloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue())
Expand Down Expand Up @@ -199,7 +197,7 @@ var _ = Describe("GarbageCollection", func() {
)
ids = append(ids, instanceID)
}
ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)

wg := sync.WaitGroup{}
for _, id := range ids {
Expand Down Expand Up @@ -257,7 +255,7 @@ var _ = Describe("GarbageCollection", func() {
nodeClaims = append(nodeClaims, nodeClaim)
ids = append(ids, instanceID)
}
ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)

wg := sync.WaitGroup{}
for _, id := range ids {
Expand All @@ -281,7 +279,7 @@ var _ = Describe("GarbageCollection", func() {
instance.LaunchTime = aws.Time(time.Now())
awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance)

ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)
_, err := cloudProvider.Get(ctx, providerID)
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -295,7 +293,7 @@ var _ = Describe("GarbageCollection", func() {
instance.LaunchTime = aws.Time(time.Now().Add(-time.Minute))
awsEnv.EC2API.Instances.Store(aws.StringValue(instance.InstanceId), instance)

ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)
_, err := cloudProvider.Get(ctx, providerID)
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -319,7 +317,7 @@ var _ = Describe("GarbageCollection", func() {
})
ExpectApplied(ctx, env.Client, nodeClaim, node)

ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)
_, err := cloudProvider.Get(ctx, providerID)
Expect(err).ToNot(HaveOccurred())
ExpectExists(ctx, env.Client, node)
Expand Down Expand Up @@ -377,7 +375,7 @@ var _ = Describe("GarbageCollection", func() {
ids = append(ids, instanceID)
nodes = append(nodes, node)
}
ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectSingletonReconciled(ctx, garbageCollectionController)

wg := sync.WaitGroup{}
for i := range ids {
Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/providers/instancetype/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"fmt"
"time"

"github.com/awslabs/operatorpkg/singleton"
lop "github.com/samber/lo/parallel"
"go.uber.org/multierr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/karpenter/pkg/operator/controller"
"sigs.k8s.io/karpenter/pkg/operator/injection"

"github.com/aws/karpenter-provider-aws/pkg/providers/instancetype"
)
Expand All @@ -38,7 +40,9 @@ func NewController(instancetypeProvider instancetype.Provider) *Controller {
}
}

func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "providers.instancetype")

work := []func(ctx context.Context) error{
c.instancetypeProvider.UpdateInstanceTypes,
c.instancetypeProvider.UpdateInstanceTypeOfferings,
Expand All @@ -57,7 +61,8 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
// Includes a default exponential failure rate limiter of base: time.Millisecond, and max: 1000*time.Second
return controller.NewSingletonManagedBy(m).
return controllerruntime.NewControllerManagedBy(m).
Named("providers.instancetype").
Complete(c)
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}
Loading

0 comments on commit 1254647

Please sign in to comment.