Skip to content

Commit

Permalink
Merge branch 'main' into aws-secretsmanager-additions
Browse files Browse the repository at this point in the history
  • Loading branch information
mpechner-akasa authored Dec 6, 2024
2 parents b3272b9 + c2e19c1 commit a6b4067
Show file tree
Hide file tree
Showing 28 changed files with 338 additions and 237 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Here is an overview of all new **experimental** features:
- **General**: Centralize and improve automaxprocs configuration with proper structured logging ([#5970](https://github.com/kedacore/keda/issues/5970))
- **General**: Paused ScaledObject count is reported correctly after operator restart ([#6321](https://github.com/kedacore/keda/issues/6321))
- **General**: ScaledJobs ready status set to true when recoverred problem ([#6329](https://github.com/kedacore/keda/pull/6329))
- **AWS Scalers**: Add AWS region to the AWS Config Cache key ([#6128](https://github.com/kedacore/keda/issues/6128))
- **Selenium Grid Scaler**: Exposes sum of pending and ongoing sessions to KDEA ([#6368](https://github.com/kedacore/keda/pull/6368))

### Deprecations

Expand All @@ -93,7 +95,8 @@ New deprecation(s):
### Other

- **General**: Bump newrelic-client-go deps to 2.51.2 (latest) ([#6325](https://github.com/kedacore/keda/pull/6325))

- **General**: New eventreason KEDAScalersInfo to display important information ([#6328](https://github.com/kedacore/keda/issues/6328))
- **General**: refactor: replace experimental `maps` and `slices` with stdlib ([#6372](https://github.com/kedacore/keda/pull/6372))

## v2.16.0

Expand Down
2 changes: 1 addition & 1 deletion apis/eventing/v1alpha1/cloudeventsource_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package v1alpha1
import (
"encoding/json"
"fmt"
"slices"

"golang.org/x/exp/slices"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/kedacore/keda/v2

go 1.22.1

toolchain go1.23.3
go 1.23.3

require (
cloud.google.com/go/compute/metadata v0.5.2
Expand Down Expand Up @@ -347,7 +345,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.28.0
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventemitter/eventfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package eventemitter

import (
"golang.org/x/exp/slices"
"slices"

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (
// ScaledJobDeleted is for event when ScaledJob is deleted
ScaledJobDeleted = "ScaledJobDeleted"

// KEDAScalersInfo is for event when Scaler has additional info
KEDAScalersInfo = "KEDAScalerInfo"

// KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
KEDAScalersStarted = "KEDAScalersStarted"

Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log
case KafkaSASLTypeOAuthbearer:
return nil, errors.New("SASL/OAUTHBEARER is not implemented yet")
case KafkaSASLTypeMskIam:
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AWSRegion, metadata.AWSAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AWSAuthorization)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/aws/aws_authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type AuthorizationMetadata struct {
AwsSecretAccessKey string
AwsSessionToken string

AwsRegion string

// Deprecated
PodIdentityOwner bool
// Pod identity owner is confusing and it'll be removed when we get
Expand Down
31 changes: 13 additions & 18 deletions pkg/scalers/aws/aws_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,43 +39,33 @@ import (
// ErrAwsNoAccessKey is returned when awsAccessKeyID is missing.
var ErrAwsNoAccessKey = errors.New("awsAccessKeyID not found")

type awsConfigMetadata struct {
awsRegion string
awsAuthorization AuthorizationMetadata
}

var awsSharedCredentialsCache = newSharedConfigsCache()

// GetAwsConfig returns an *aws.Config for a given AuthorizationMetadata
// If AuthorizationMetadata uses static credentials or `aws` auth,
// we recover the *aws.Config from the shared cache. If not, we generate
// a new entry on each request
func GetAwsConfig(ctx context.Context, awsRegion string, awsAuthorization AuthorizationMetadata) (*aws.Config, error) {
metadata := &awsConfigMetadata{
awsRegion: awsRegion,
awsAuthorization: awsAuthorization,
}

if metadata.awsAuthorization.UsingPodIdentity ||
(metadata.awsAuthorization.AwsAccessKeyID != "" && metadata.awsAuthorization.AwsSecretAccessKey != "") {
return awsSharedCredentialsCache.GetCredentials(ctx, metadata.awsRegion, metadata.awsAuthorization)
func GetAwsConfig(ctx context.Context, awsAuthorization AuthorizationMetadata) (*aws.Config, error) {
if awsAuthorization.UsingPodIdentity ||
(awsAuthorization.AwsAccessKeyID != "" && awsAuthorization.AwsSecretAccessKey != "") {
return awsSharedCredentialsCache.GetCredentials(ctx, awsAuthorization)
}

// TODO, remove when aws-eks are removed
configOptions := make([]func(*config.LoadOptions) error, 0)
configOptions = append(configOptions, config.WithRegion(metadata.awsRegion))
configOptions = append(configOptions, config.WithRegion(awsAuthorization.AwsRegion))
cfg, err := config.LoadDefaultConfig(ctx, configOptions...)
if err != nil {
return nil, err
}

if !metadata.awsAuthorization.PodIdentityOwner {
if !awsAuthorization.PodIdentityOwner {
return &cfg, nil
}

if metadata.awsAuthorization.AwsRoleArn != "" {
if awsAuthorization.AwsRoleArn != "" {
stsSvc := sts.NewFromConfig(cfg)
stsCredentialProvider := stscreds.NewAssumeRoleProvider(stsSvc, metadata.awsAuthorization.AwsRoleArn, func(_ *stscreds.AssumeRoleOptions) {})
stsCredentialProvider := stscreds.NewAssumeRoleProvider(stsSvc, awsAuthorization.AwsRoleArn, func(_ *stscreds.AssumeRoleOptions) {})
cfg.Credentials = aws.NewCredentialsCache(stsCredentialProvider)
}
return &cfg, err
Expand All @@ -88,13 +78,18 @@ func GetAwsAuthorization(uniqueKey string, podIdentity kedav1alpha1.AuthPodIdent
TriggerUniqueKey: uniqueKey,
}

if val, ok := authParams["awsRegion"]; ok && val != "" {
meta.AwsRegion = val
}

if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAws {
meta.UsingPodIdentity = true
if val, ok := authParams["awsRoleArn"]; ok && val != "" {
meta.AwsRoleArn = val
}
return meta, nil
}

// TODO, remove all the logic below and just keep the logic for
// parsing awsAccessKeyID, awsSecretAccessKey and awsSessionToken
// when aws-eks are removed
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/aws/aws_config_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func newSharedConfigsCache() sharedConfigCache {
// getCacheKey returns a unique key based on given AuthorizationMetadata.
// As it can contain sensitive data, the key is hashed to not expose secrets
func (a *sharedConfigCache) getCacheKey(awsAuthorization AuthorizationMetadata) string {
key := "keda"
key := "keda-" + awsAuthorization.AwsRegion
if awsAuthorization.AwsAccessKeyID != "" {
key = fmt.Sprintf("%s-%s-%s", awsAuthorization.AwsAccessKeyID, awsAuthorization.AwsSecretAccessKey, awsAuthorization.AwsSessionToken)
key = fmt.Sprintf("%s-%s-%s-%s", awsAuthorization.AwsAccessKeyID, awsAuthorization.AwsSecretAccessKey, awsAuthorization.AwsSessionToken, awsAuthorization.AwsRegion)
} else if awsAuthorization.AwsRoleArn != "" {
key = awsAuthorization.AwsRoleArn
key = fmt.Sprintf("%s-%s", awsAuthorization.AwsRoleArn, awsAuthorization.AwsRegion)
}
// to avoid sensitive data as key and to use a constant key size,
// we hash the key with sha3
Expand All @@ -86,7 +86,7 @@ func (a *sharedConfigCache) getCacheKey(awsAuthorization AuthorizationMetadata)
// sharing it between all the requests. To track if the *aws.Config is used by whom,
// every time when an scaler requests *aws.Config we register it inside
// the cached item.
func (a *sharedConfigCache) GetCredentials(ctx context.Context, awsRegion string, awsAuthorization AuthorizationMetadata) (*aws.Config, error) {
func (a *sharedConfigCache) GetCredentials(ctx context.Context, awsAuthorization AuthorizationMetadata) (*aws.Config, error) {
a.Lock()
defer a.Unlock()
key := a.getCacheKey(awsAuthorization)
Expand All @@ -97,7 +97,7 @@ func (a *sharedConfigCache) GetCredentials(ctx context.Context, awsRegion string
}

configOptions := make([]func(*config.LoadOptions) error, 0)
configOptions = append(configOptions, config.WithRegion(awsRegion))
configOptions = append(configOptions, config.WithRegion(awsAuthorization.AwsRegion))
cfg, err := config.LoadDefaultConfig(ctx, configOptions...)
if err != nil {
return nil, err
Expand Down
77 changes: 44 additions & 33 deletions pkg/scalers/aws/aws_config_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,84 +28,95 @@ import (
func TestGetCredentialsReturnNewItemAndStoreItIfNotExist(t *testing.T) {
cache := newSharedConfigsCache()
cache.logger = logr.Discard()
config := awsConfigMetadata{
awsRegion: "test-region",
awsAuthorization: AuthorizationMetadata{
TriggerUniqueKey: "test-key",
},
awsAuthorization := AuthorizationMetadata{
TriggerUniqueKey: "test-key",
AwsRegion: "test-region",
}
cacheKey := cache.getCacheKey(config.awsAuthorization)
_, err := cache.GetCredentials(context.Background(), config.awsRegion, config.awsAuthorization)
cacheKey := cache.getCacheKey(awsAuthorization)
_, err := cache.GetCredentials(context.Background(), awsAuthorization)
assert.NoError(t, err)
assert.Contains(t, cache.items, cacheKey)
assert.Contains(t, cache.items[cacheKey].usages, config.awsAuthorization.TriggerUniqueKey)
assert.Contains(t, cache.items[cacheKey].usages, awsAuthorization.TriggerUniqueKey)
}

func TestGetCredentialsReturnCachedItemIfExist(t *testing.T) {
cache := newSharedConfigsCache()
cache.logger = logr.Discard()
config := awsConfigMetadata{
awsRegion: "test1-region",
awsAuthorization: AuthorizationMetadata{
TriggerUniqueKey: "test1-key",
},
awsAuthorization := AuthorizationMetadata{
TriggerUniqueKey: "test1-key",
AwsRegion: "test1-region",
}
cfg := aws.Config{}
cfg.AppID = "test1-app"
cacheKey := cache.getCacheKey(config.awsAuthorization)
cacheKey := cache.getCacheKey(awsAuthorization)
cache.items[cacheKey] = cacheEntry{
config: &cfg,
usages: map[string]bool{
"other-usage": true,
},
}
configFromCache, err := cache.GetCredentials(context.Background(), config.awsRegion, config.awsAuthorization)
configFromCache, err := cache.GetCredentials(context.Background(), awsAuthorization)
assert.NoError(t, err)
assert.Equal(t, &cfg, configFromCache)
assert.Contains(t, cache.items[cacheKey].usages, config.awsAuthorization.TriggerUniqueKey)
assert.Contains(t, cache.items[cacheKey].usages, awsAuthorization.TriggerUniqueKey)
}

func TestRemoveCachedEntryRemovesCachedItemIfNotUsages(t *testing.T) {
cache := newSharedConfigsCache()
cache.logger = logr.Discard()
config := awsConfigMetadata{
awsRegion: "test2-region",
awsAuthorization: AuthorizationMetadata{
TriggerUniqueKey: "test2-key",
},
awsAuthorization := AuthorizationMetadata{
TriggerUniqueKey: "test2-key",
AwsRegion: "test2-region",
}
cfg := aws.Config{}
cfg.AppID = "test2-app"
cacheKey := cache.getCacheKey(config.awsAuthorization)
cacheKey := cache.getCacheKey(awsAuthorization)
cache.items[cacheKey] = cacheEntry{
config: &cfg,
usages: map[string]bool{
config.awsAuthorization.TriggerUniqueKey: true,
awsAuthorization.TriggerUniqueKey: true,
},
}
cache.RemoveCachedEntry(config.awsAuthorization)
cache.RemoveCachedEntry(awsAuthorization)
assert.NotContains(t, cache.items, cacheKey)
}

func TestRemoveCachedEntryNotRemoveCachedItemIfUsages(t *testing.T) {
cache := newSharedConfigsCache()
cache.logger = logr.Discard()
config := awsConfigMetadata{
awsRegion: "test3-region",
awsAuthorization: AuthorizationMetadata{
TriggerUniqueKey: "test3-key",
},
awsAuthorization := AuthorizationMetadata{
TriggerUniqueKey: "test3-key",
AwsRegion: "test3-region",
}
cfg := aws.Config{}
cfg.AppID = "test3-app"
cacheKey := cache.getCacheKey(config.awsAuthorization)
cacheKey := cache.getCacheKey(awsAuthorization)
cache.items[cacheKey] = cacheEntry{
config: &cfg,
usages: map[string]bool{
config.awsAuthorization.TriggerUniqueKey: true,
"other-usage": true,
awsAuthorization.TriggerUniqueKey: true,
"other-usage": true,
},
}
cache.RemoveCachedEntry(config.awsAuthorization)
cache.RemoveCachedEntry(awsAuthorization)
assert.Contains(t, cache.items, cacheKey)
}

func TestCredentialsShouldBeCachedPerRegion(t *testing.T) {
cache := newSharedConfigsCache()
cache.logger = logr.Discard()
awsAuthorization1 := AuthorizationMetadata{
TriggerUniqueKey: "test4-key",
AwsRegion: "test4-region1",
}
awsAuthorization2 := AuthorizationMetadata{
TriggerUniqueKey: "test4-key",
AwsRegion: "test4-region2",
}
cred1, err1 := cache.GetCredentials(context.Background(), awsAuthorization1)
cred2, err2 := cache.GetCredentials(context.Background(), awsAuthorization2)

assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NotEqual(t, cred1, cred2, "Credentials should be stored per region")
}
18 changes: 5 additions & 13 deletions pkg/scalers/aws/aws_sigv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,12 @@ func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
}

// parseAwsAMPMetadata parses the data to get the AWS sepcific auth info and metadata
func parseAwsAMPMetadata(config *scalersconfig.ScalerConfig) (*awsConfigMetadata, error) {
meta := awsConfigMetadata{}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
}

func parseAwsAMPMetadata(config *scalersconfig.ScalerConfig) (*AuthorizationMetadata, error) {
auth, err := GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth
return &meta, nil
return &auth, nil
}

// NewSigV4RoundTripper returns a new http.RoundTripper that will sign requests
Expand All @@ -100,11 +92,11 @@ func NewSigV4RoundTripper(config *scalersconfig.ScalerConfig) (http.RoundTripper
// which is probably the reason to create a SigV4RoundTripper.
// To prevent failures we check if the metadata is nil
// (missing AWS info) and we hide the error
metadata, _ := parseAwsAMPMetadata(config)
if metadata == nil {
awsAuthorization, _ := parseAwsAMPMetadata(config)
if awsAuthorization == nil {
return nil, nil
}
awsCfg, err := GetAwsConfig(context.Background(), metadata.awsRegion, metadata.awsAuthorization)
awsCfg, err := GetAwsConfig(context.Background(), *awsAuthorization)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewAwsCloudwatchScaler(ctx context.Context, config *scalersconfig.ScalerCon
}

func createCloudwatchClient(ctx context.Context, metadata *awsCloudwatchMetadata) (*cloudwatch.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
}

func createDynamoDBClient(ctx context.Context, metadata *awsDynamoDBMetadata) (*dynamodb.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/aws_dynamodb_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig) (*awsDy
}

func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func parseAwsKinesisStreamMetadata(config *scalersconfig.ScalerConfig, logger lo
}

func createKinesisClient(ctx context.Context, metadata *awsKinesisStreamMetadata) (*kinesis.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a6b4067

Please sign in to comment.