Skip to content

Commit

Permalink
feedback changes
Browse files Browse the repository at this point in the history
  • Loading branch information
edibble21 committed Nov 6, 2024
1 parent 321b04c commit 0dc890a
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 29 deletions.
5 changes: 5 additions & 0 deletions pkg/aws/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/pricing"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/ssm"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
)

type EC2API interface {
Expand Down Expand Up @@ -66,3 +67,7 @@ type SQSAPI interface {
DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
}

type TimestreamWriteAPI interface {
WriteRecords(ctx context.Context, params *timestreamwrite.WriteRecordsInput, optFns ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error)
}
3 changes: 1 addition & 2 deletions pkg/controllers/interruption/interruption_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"

sdk "github.com/aws/karpenter-provider-aws/pkg/aws"
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/events"
Expand Down Expand Up @@ -160,7 +159,7 @@ func benchmarkNotificationController(b *testing.B, messageCount int) {

type providerSet struct {
kubeClient client.Client
sqsAPI sdk.SQSAPI
sqsAPI sqs.Client
sqsProvider sqs.Provider
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
cfg := prometheusv2.WithPrometheusMetrics(WithUserAgent(lo.Must(config.LoadDefaultConfig(ctx))), crmetrics.Registry)
if cfg.Region == "" {
log.FromContext(ctx).V(1).Info("retrieving region from IMDS")
metaDataClient := imds.NewFromConfig(cfg)
region := lo.Must(metaDataClient.GetRegion(ctx, nil))
region := lo.Must(imds.NewFromConfig(cfg).GetRegion(ctx, nil))
cfg.Region = region.Region
}
ec2api := ec2.NewFromConfig(cfg)
Expand All @@ -100,6 +99,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
log.FromContext(ctx).Error(err, "ec2 api connectivity check failed")
os.Exit(1)
}
log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")
clusterEndpoint, err := ResolveClusterEndpoint(ctx, eksapi)
if err != nil {
log.FromContext(ctx).Error(err, "failed detecting cluster endpoint")
Expand Down
4 changes: 3 additions & 1 deletion pkg/providers/pricing/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,9 @@ func (p *DefaultProvider) UpdateSpotPricing(ctx context.Context) error {

totalOfferings := 0
for it, zoneData := range prices {
p.spotPrices[it] = newZonalPricing(0)
if _, ok := p.spotPrices[it]; !ok {
p.spotPrices[it] = newZonalPricing(0)
}
maps.Copy(p.spotPrices[it].prices, zoneData.prices)
totalOfferings += len(zoneData.prices)
}
Expand Down
14 changes: 8 additions & 6 deletions test/pkg/environment/aws/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"

v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
sdk "github.com/aws/karpenter-provider-aws/pkg/aws"
"github.com/aws/karpenter-provider-aws/pkg/providers/sqs"
"github.com/aws/karpenter-provider-aws/pkg/test"
"github.com/aws/karpenter-provider-aws/test/pkg/environment/common"
Expand All @@ -65,7 +66,7 @@ type Environment struct {
IAMAPI *iam.Client
FISAPI *fis.Client
EKSAPI *eks.Client
TimeStreamAPI *timestreamwrite.Client
TimeStreamAPI sdk.TimestreamWriteAPI

SQSProvider sqs.Provider

Expand Down Expand Up @@ -96,7 +97,7 @@ func NewEnvironment(t *testing.T) *Environment {
IAMAPI: iam.NewFromConfig(cfg),
FISAPI: fis.NewFromConfig(cfg),
EKSAPI: eks.NewFromConfig(cfg),
TimeStreamAPI: GetTimeStreamAPI(env.Context),
TimeStreamAPI: GetTimeStreamAPI(env.Context, cfg),

ClusterName: lo.Must(os.LookupEnv("CLUSTER_NAME")),
ClusterEndpoint: lo.Must(os.LookupEnv("CLUSTER_ENDPOINT")),
Expand Down Expand Up @@ -124,13 +125,14 @@ func NewEnvironment(t *testing.T) *Environment {
return awsEnv
}

func GetTimeStreamAPI(ctx context.Context) *timestreamwrite.Client {
func GetTimeStreamAPI(ctx context.Context, cfg aws.Config) sdk.TimestreamWriteAPI {
if lo.Must(env.GetBool("ENABLE_METRICS", false)) {
By("enabling metrics firing for this suite")
timecfg := lo.Must(config.LoadDefaultConfig(ctx, config.WithRegion(env.GetString("METRICS_REGION", metricsDefaultRegion))))
return timestreamwrite.NewFromConfig(timecfg)
timeCfg := cfg.Copy()
timeCfg.Region = env.GetString("METRICS_REGION", metricsDefaultRegion)
return timestreamwrite.NewFromConfig(timeCfg)
}
return nil
return &NoOpTimeStreamAPI{}
}

func (env *Environment) DefaultEC2NodeClass() *v1.EC2NodeClass {
Expand Down
6 changes: 3 additions & 3 deletions test/pkg/environment/aws/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,11 @@ func (env *Environment) EventuallyExpectRunInstances(instanceInput *ec2.RunInsta
}
var reservation ec2types.Reservation
Eventually(func(g Gomega) {
runInstancesOutput, err := env.EC2API.RunInstances(env.Context, instanceInput)
out, err := env.EC2API.RunInstances(env.Context, instanceInput)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(runInstancesOutput.Instances).ToNot(BeEmpty())
g.Expect(out.Instances).ToNot(BeEmpty())
reservation = ec2types.Reservation{
Instances: runInstancesOutput.Instances,
Instances: out.Instances,
}
}).WithTimeout(30 * time.Second).WithPolling(5 * time.Second).Should(Succeed())
return reservation
Expand Down
4 changes: 3 additions & 1 deletion test/pkg/environment/aws/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
timestreamwritetypes "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/samber/lo"

sdk "github.com/aws/karpenter-provider-aws/pkg/aws"

"github.com/aws/karpenter-provider-aws/test/pkg/environment/common"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -37,7 +39,7 @@ const (
)

type NoOpTimeStreamAPI struct {
timestreamwrite.Client
sdk.TimestreamWriteAPI
}

func (o NoOpTimeStreamAPI) WriteRecords(_ context.Context, _ *timestreamwrite.WriteRecordsInput, _ ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) {
Expand Down
8 changes: 4 additions & 4 deletions test/suites/integration/aws_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ var _ = Describe("MetadataOptions", func() {
env.ExpectCreatedNodeCount("==", 1)
env.ExpectInstance(pod.Spec.NodeName).To(HaveField("MetadataOptions", HaveValue(Equal(ec2types.InstanceMetadataOptionsResponse{
State: ec2types.InstanceMetadataOptionsStateApplied,
HttpEndpoint: "enabled",
HttpProtocolIpv6: "enabled",
HttpEndpoint: ec2types.InstanceMetadataEndpointStateEnabled,
HttpProtocolIpv6: ec2types.InstanceMetadataProtocolStateEnabled,
HttpPutResponseHopLimit: aws.Int32(1),
HttpTokens: "required",
InstanceMetadataTags: "disabled",
HttpTokens: ec2types.HttpTokensStateRequired,
InstanceMetadataTags: ec2types.InstanceMetadataTagsStateDisabled,
}))))
})
})
10 changes: 2 additions & 8 deletions test/suites/nodeclaim/garbage_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.
package nodeclaim_test

import (
"context"
"encoding/base64"
"fmt"
"os"
Expand Down Expand Up @@ -116,12 +115,7 @@ var _ = Describe("GarbageCollection", func() {
_, err := env.EC2API.TerminateInstances(env.Context, &ec2.TerminateInstancesInput{
InstanceIds: []string{*out.Instances[0].InstanceId},
})
if err != nil {
if awserrors.IsNotFound(err) {
return
}
Expect(err).ToNot(HaveOccurred())
}
Expect(awserrors.IgnoreNotFound(err)).ToNot(HaveOccurred())
})

// Wait for the node to register with the cluster
Expand Down Expand Up @@ -152,7 +146,7 @@ var _ = Describe("GarbageCollection", func() {
env.EventuallyExpectHealthy(pod)
node := env.ExpectCreatedNodeCount("==", 1)[0]

_, err := env.EC2API.TerminateInstances(context.Background(), &ec2.TerminateInstancesInput{
_, err := env.EC2API.TerminateInstances(env.Context, &ec2.TerminateInstancesInput{
InstanceIds: []string{lo.Must(utils.ParseInstanceID(node.Spec.ProviderID))},
})
Expect(err).ToNot(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion test/suites/nodeclaim/nodeclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ var _ = Describe("StandaloneNodeClaim", func() {
env.EventuallyExpectNotFound(nodeClaim, node)

Eventually(func(g Gomega) {
g.Expect(env.GetInstanceByID(instanceID).State.Name).To(BeElementOf(ec2types.InstanceStateName("terminated"), ec2types.InstanceStateName("shutting-down")))
g.Expect(env.GetInstanceByID(instanceID).State.Name).To(BeElementOf(ec2types.InstanceStateNameTerminated, ec2types.InstanceStateNameShuttingDown))
}, time.Second*10).Should(Succeed())
})
It("should delete a NodeClaim from the node termination finalizer", func() {
Expand Down
2 changes: 1 addition & 1 deletion test/suites/termination/termination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var _ = Describe("Termination", func() {
env.ExpectDeleted(nodes[0])
env.EventuallyExpectNotFound(nodes[0])
Eventually(func(g Gomega) {
g.Expect(env.GetInstanceByID(instanceID).State.Name).To(BeElementOf(ec2types.InstanceStateName("terminated"), ec2types.InstanceStateName("shutting-down")))
g.Expect(env.GetInstanceByID(instanceID).State.Name).To(BeElementOf(ec2types.InstanceStateNameTerminated, ec2types.InstanceStateNameShuttingDown))
}, time.Second*10).Should(Succeed())
})
// Pods from Karpenter nodes are expected to drain in the following order:
Expand Down

0 comments on commit 0dc890a

Please sign in to comment.