Skip to content

Commit

Permalink
refactor aws cloudwatch scaler (kedacore#5852)
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <[email protected]>
Signed-off-by: uucloud <[email protected]>
  • Loading branch information
dttung2905 authored and uucloud committed Jul 11, 2024
1 parent 913e605 commit a3a39ae
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 243 deletions.
263 changes: 81 additions & 182 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package scalers

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -18,13 +17,6 @@ import (
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

const (
defaultMetricCollectionTime = 300
defaultMetricStat = "Average"
defaultMetricStatPeriod = 300
defaultMetricEndTimeOffset = 0
)

type awsCloudwatchScaler struct {
metricType v2.MetricTargetType
metadata *awsCloudwatchMetadata
Expand All @@ -33,28 +25,68 @@ type awsCloudwatchScaler struct {
}

type awsCloudwatchMetadata struct {
namespace string
metricsName string
dimensionName []string
dimensionValue []string
expression string
awsAuthorization awsutils.AuthorizationMetadata

targetMetricValue float64
activationTargetMetricValue float64
minMetricValue float64
triggerIndex int
Namespace string `keda:"name=namespace, order=triggerMetadata, optional"`
MetricsName string `keda:"name=metricName, order=triggerMetadata, optional"`
DimensionName []string `keda:"name=dimensionName, order=triggerMetadata, optional"`
DimensionValue []string `keda:"name=dimensionValue, order=triggerMetadata, optional"`
Expression string `keda:"name=expression, order=triggerMetadata, optional"`

TargetMetricValue float64 `keda:"name=targetMetricValue, order=triggerMetadata"`
ActivationTargetMetricValue float64 `keda:"name=activationTargetMetricValue, order=triggerMetadata, optional"`
MinMetricValue float64 `keda:"name=minMetricValue, order=triggerMetadata"`

MetricCollectionTime int64 `keda:"name=metricCollectionTime, order=triggerMetadata, optional, default=300"`
MetricStat string `keda:"name=metricStat, order=triggerMetadata, optional, default=Average"`
MetricUnit string `keda:"name=metricUnit, order=triggerMetadata, optional"` // Need to check the metric unit
MetricStatPeriod int64 `keda:"name=metricStatPeriod, order=triggerMetadata, optional, default=300"`
MetricEndTimeOffset int64 `keda:"name=metricEndTimeOffset, order=triggerMetadata, optional, default=0"`

AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"`
AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
}

metricCollectionTime int64
metricStat string
metricUnit string
metricStatPeriod int64
metricEndTimeOffset int64
func (a *awsCloudwatchMetadata) Validate() error {
var err error
if a.Expression == "" {
if a.Namespace == "" {
return errors.New("namespace not given")
}

awsRegion string
awsEndpoint string
if a.MetricsName == "" {
return errors.New("metric name not given")
}

awsAuthorization awsutils.AuthorizationMetadata
if a.DimensionName == nil {
return errors.New("dimension name not given")
}

if a.DimensionValue == nil {
return errors.New("dimension value not given")
}

triggerIndex int
if len(a.DimensionName) != len(a.DimensionValue) {
return errors.New("dimensionName and dimensionValue are not matching in size")
}

if err = checkMetricUnit(a.MetricUnit); err != nil {
return err
}
}

if err = checkMetricStat(a.MetricStat); err != nil {
return err
}
if err = checkMetricStatPeriod(a.MetricStatPeriod); err != nil {
return err
}
if a.MetricCollectionTime < 0 || a.MetricCollectionTime%a.MetricStatPeriod != 0 {
return fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", a.MetricStatPeriod, a.MetricCollectionTime)
}

return nil
}

// NewAwsCloudwatchScaler creates a new awsCloudwatchScaler
Expand All @@ -81,156 +113,23 @@ func NewAwsCloudwatchScaler(ctx context.Context, config *scalersconfig.ScalerCon
}, nil
}

func getIntMetadataValue(metadata map[string]string, key string, required bool, defaultValue int64) (int64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.Atoi(val)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %w", key, err)
}
return int64(value), nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %w", key, err)
}
return value, nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return defaultValue, nil
}

func createCloudwatchClient(ctx context.Context, metadata *awsCloudwatchMetadata) (*cloudwatch.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)

if err != nil {
return nil, err
}
return cloudwatch.NewFromConfig(*cfg, func(options *cloudwatch.Options) {
if metadata.awsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
if metadata.AwsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
}
}), nil
}

func parseAwsCloudwatchMetadata(config *scalersconfig.ScalerConfig) (*awsCloudwatchMetadata, error) {
var err error
meta := awsCloudwatchMetadata{}

if config.TriggerMetadata["expression"] != "" {
if val, ok := config.TriggerMetadata["expression"]; ok && val != "" {
meta.expression = val
} else {
return nil, fmt.Errorf("expression not given")
}
} else {
if val, ok := config.TriggerMetadata["namespace"]; ok && val != "" {
meta.namespace = val
} else {
return nil, fmt.Errorf("namespace not given")
}

if val, ok := config.TriggerMetadata["metricName"]; ok && val != "" {
meta.metricsName = val
} else {
return nil, fmt.Errorf("metric name not given")
}

if val, ok := config.TriggerMetadata["dimensionName"]; ok && val != "" {
meta.dimensionName = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension name not given")
}

if val, ok := config.TriggerMetadata["dimensionValue"]; ok && val != "" {
meta.dimensionValue = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension value not given")
}

if len(meta.dimensionName) != len(meta.dimensionValue) {
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
}

meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}
}

targetMetricValue, err := getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}
meta.targetMetricValue = targetMetricValue

activationTargetMetricValue, err := getFloatMetadataValue(config.TriggerMetadata, "activationTargetMetricValue", false, 0)
if err != nil {
return nil, err
}
meta.activationTargetMetricValue = activationTargetMetricValue

minMetricValue, err := getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}
meta.minMetricValue = minMetricValue

meta.metricStat = defaultMetricStat
if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" {
meta.metricStat = val
}
if err = checkMetricStat(meta.metricStat); err != nil {
return nil, err
}

metricStatPeriod, err := getIntMetadataValue(config.TriggerMetadata, "metricStatPeriod", false, defaultMetricStatPeriod)
if err != nil {
return nil, err
}
meta.metricStatPeriod = metricStatPeriod

if err = checkMetricStatPeriod(meta.metricStatPeriod); err != nil {
return nil, err
}

metricCollectionTime, err := getIntMetadataValue(config.TriggerMetadata, "metricCollectionTime", false, defaultMetricCollectionTime)
if err != nil {
return nil, err
}
meta.metricCollectionTime = metricCollectionTime

if meta.metricCollectionTime < 0 || meta.metricCollectionTime%meta.metricStatPeriod != 0 {
return nil, fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", meta.metricStatPeriod, meta.metricCollectionTime)
}

metricEndTimeOffset, err := getIntMetadataValue(config.TriggerMetadata, "metricEndTimeOffset", false, defaultMetricEndTimeOffset)
if err != nil {
return nil, err
}
meta.metricEndTimeOffset = metricEndTimeOffset

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
meta := &awsCloudwatchMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}

awsAuthorization, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
Expand All @@ -241,7 +140,7 @@ func parseAwsCloudwatchMetadata(config *scalersconfig.ScalerConfig) (*awsCloudwa

meta.triggerIndex = config.TriggerIndex

return &meta, nil
return meta, nil
}

func checkMetricStat(stat string) error {
Expand Down Expand Up @@ -300,15 +199,15 @@ func (s *awsCloudwatchScaler) GetMetricsAndActivity(ctx context.Context, metricN

metric := GenerateMetricInMili(metricName, metricValue)

return []external_metrics.ExternalMetricValue{metric}, metricValue > s.metadata.activationTargetMetricValue, nil
return []external_metrics.ExternalMetricValue{metric}, metricValue > s.metadata.ActivationTargetMetricValue, nil
}

func (s *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "aws-cloudwatch"),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.targetMetricValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.TargetMetricValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -322,33 +221,33 @@ func (s *awsCloudwatchScaler) Close(context.Context) error {
func (s *awsCloudwatchScaler) GetCloudwatchMetrics(ctx context.Context) (float64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), s.metadata.metricStatPeriod, s.metadata.metricEndTimeOffset, s.metadata.metricCollectionTime)
startTime, endTime := computeQueryWindow(time.Now(), s.metadata.MetricStatPeriod, s.metadata.MetricEndTimeOffset, s.metadata.MetricCollectionTime)

if s.metadata.expression != "" {
if s.metadata.Expression != "" {
input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: types.ScanByTimestampDescending,
MetricDataQueries: []types.MetricDataQuery{
{
Expression: aws.String(s.metadata.expression),
Expression: aws.String(s.metadata.Expression),
Id: aws.String("q1"),
Period: aws.Int32(int32(s.metadata.metricStatPeriod)),
Period: aws.Int32(int32(s.metadata.MetricStatPeriod)),
},
},
}
} else {
var dimensions []types.Dimension
for i := range s.metadata.dimensionName {
for i := range s.metadata.DimensionName {
dimensions = append(dimensions, types.Dimension{
Name: &s.metadata.dimensionName[i],
Value: &s.metadata.dimensionValue[i],
Name: &s.metadata.DimensionName[i],
Value: &s.metadata.DimensionValue[i],
})
}

var metricUnit string
if s.metadata.metricUnit != "" {
metricUnit = s.metadata.metricUnit
if s.metadata.MetricUnit != "" {
metricUnit = s.metadata.MetricUnit
}

input = cloudwatch.GetMetricDataInput{
Expand All @@ -360,12 +259,12 @@ func (s *awsCloudwatchScaler) GetCloudwatchMetrics(ctx context.Context) (float64
Id: aws.String("c1"),
MetricStat: &types.MetricStat{
Metric: &types.Metric{
Namespace: aws.String(s.metadata.namespace),
Namespace: aws.String(s.metadata.Namespace),
Dimensions: dimensions,
MetricName: aws.String(s.metadata.metricsName),
MetricName: aws.String(s.metadata.MetricsName),
},
Period: aws.Int32(int32(s.metadata.metricStatPeriod)),
Stat: aws.String(s.metadata.metricStat),
Period: aws.Int32(int32(s.metadata.MetricStatPeriod)),
Stat: aws.String(s.metadata.MetricStat),
Unit: types.StandardUnit(metricUnit),
},
ReturnData: aws.Bool(true),
Expand All @@ -387,7 +286,7 @@ func (s *awsCloudwatchScaler) GetCloudwatchMetrics(ctx context.Context) (float64
metricValue = output.MetricDataResults[0].Values[0]
} else {
s.logger.Info("empty metric data received, returning minMetricValue")
metricValue = s.metadata.minMetricValue
metricValue = s.metadata.MinMetricValue
}

return metricValue, nil
Expand Down
Loading

0 comments on commit a3a39ae

Please sign in to comment.