Skip to content

Commit

Permalink
Refactor azure queue scaler (#6269)
Browse files Browse the repository at this point in the history
* Refactor azure queue scaler

Signed-off-by: rickbrouwer <[email protected]>

* update after feedback

Signed-off-by: Rick Brouwer <[email protected]>

---------

Signed-off-by: rickbrouwer <[email protected]>
Signed-off-by: Rick Brouwer <[email protected]>
Co-authored-by: Jorge Turrado Ferrero <[email protected]>
  • Loading branch information
rickbrouwer and JorTurFer authored Nov 4, 2024
1 parent c1a1b3d commit 54c315d
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 149 deletions.
1 change: 1 addition & 0 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const (
defaultTargetQueueLength = 5
targetQueueLengthDefault = 5
activationTargetQueueLengthDefault = 0
defaultScaleOnInFlight = true
Expand Down
124 changes: 34 additions & 90 deletions pkg/scalers/azure_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
Expand All @@ -34,37 +33,30 @@ import (
)

const (
queueLengthMetricName = "queueLength"
activationQueueLengthMetricName = "activationQueueLength"
defaultTargetQueueLength = 5
externalMetricType = "External"
QueueLengthStrategyAll string = "all"
QueueLengthStrategyVisibleOnly string = "visibleonly"
externalMetricType = "External"
queueLengthStrategyVisibleOnly = "visibleonly"
)

var (
maxPeekMessages int32 = 32
)
var maxPeekMessages int32 = 32

type azureQueueScaler struct {
metricType v2.MetricTargetType
metadata *azureQueueMetadata
metadata azureQueueMetadata
queueClient *azqueue.QueueClient
logger logr.Logger
}

type azureQueueMetadata struct {
targetQueueLength int64
activationTargetQueueLength int64
queueName string
connection string
accountName string
endpointSuffix string
queueLengthStrategy string
triggerIndex int
ActivationQueueLength int64 `keda:"name=activationQueueLength, order=triggerMetadata, default=0"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueLength int64 `keda:"name=queueLength, order=triggerMetadata, default=5"`
Connection string `keda:"name=connection, order=authParams;triggerMetadata;resolvedEnv, optional"`
AccountName string `keda:"name=accountName, order=triggerMetadata, optional"`
EndpointSuffix string `keda:"name=endpointSuffix, order=triggerMetadata, optional"`
QueueLengthStrategy string `keda:"name=queueLengthStrategy, order=triggerMetadata, enum=all;visibleonly, default=all"`
TriggerIndex int
}

// NewAzureQueueScaler creates a new scaler for queue
func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -73,14 +65,14 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

logger := InitializeLogger(config, "azure_queue_scaler")

meta, podIdentity, err := parseAzureQueueMetadata(config, logger)
meta, podIdentity, err := parseAzureQueueMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing azure queue metadata: %w", err)
}

queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.connection, meta.accountName, meta.endpointSuffix, meta.queueName, config.GlobalHTTPTimeout)
queueClient, err := azure.GetStorageQueueClient(logger, podIdentity, meta.Connection, meta.AccountName, meta.EndpointSuffix, meta.QueueName, config.GlobalHTTPTimeout)
if err != nil {
return nil, fmt.Errorf("error creating azure blob client: %w", err)
return nil, fmt.Errorf("error creating azure queue client: %w", err)
}

return &azureQueueScaler{
Expand All @@ -91,105 +83,58 @@ func NewAzureQueueScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig) (azureQueueMetadata, kedav1alpha1.AuthPodIdentity, error) {
meta := azureQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength

if val, ok := config.TriggerMetadata[queueLengthMetricName]; ok {
queueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
logger.Error(err, "Error parsing azure queue metadata", "queueLengthMetricName", queueLengthMetricName)
return nil, kedav1alpha1.AuthPodIdentity{},
fmt.Errorf("error parsing azure queue metadata %s: %w", queueLengthMetricName, err)
}

meta.targetQueueLength = queueLength
}

meta.activationTargetQueueLength = 0
if val, ok := config.TriggerMetadata[activationQueueLengthMetricName]; ok {
activationQueueLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
logger.Error(err, "Error parsing azure queue metadata", activationQueueLengthMetricName, activationQueueLengthMetricName)
return nil, kedav1alpha1.AuthPodIdentity{},
fmt.Errorf("error parsing azure queue metadata %s: %w", activationQueueLengthMetricName, err)
}

meta.activationTargetQueueLength = activationQueueLength
err := config.TypedConfig(&meta)
if err != nil {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("error parsing azure queue metadata: %w", err)
}

endpointSuffix, err := azure.ParseAzureStorageEndpointSuffix(config.TriggerMetadata, azure.QueueEndpoint)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{}, err
}

meta.endpointSuffix = endpointSuffix

if val, ok := config.TriggerMetadata["queueName"]; ok && val != "" {
meta.queueName = val
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given")
}

if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" {
strategy := strings.ToLower(val)
if strategy == QueueLengthStrategyAll || strategy == QueueLengthStrategyVisibleOnly {
meta.queueLengthStrategy = strategy
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val)
}
} else {
meta.queueLengthStrategy = QueueLengthStrategyAll
return meta, kedav1alpha1.AuthPodIdentity{}, err
}
meta.EndpointSuffix = endpointSuffix

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
// Azure Queue Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
if config.AuthParams["connection"] != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.connection) == 0 {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
if meta.Connection == "" {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no connection setting given")
}
case kedav1alpha1.PodIdentityProviderAzureWorkload:
// If the Use AAD Pod Identity is present then check account name
if val, ok := config.TriggerMetadata["accountName"]; ok && val != "" {
meta.accountName = val
} else {
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
if meta.AccountName == "" {
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no accountName given")
}
default:
return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
return meta, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("pod identity %s not supported for azure storage queues", config.PodIdentity.Provider)
}

meta.triggerIndex = config.TriggerIndex

return &meta, config.PodIdentity, nil
meta.TriggerIndex = config.TriggerIndex
return meta, config.PodIdentity, nil
}

func (s *azureQueueScaler) Close(context.Context) error {
return nil
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-queue-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueLength),
Target: GetMetricTarget(s.metricType, s.metadata.QueueLength),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queuelen, err := s.getMessageCount(ctx)
if err != nil {
Expand All @@ -198,12 +143,11 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName
}

metric := GenerateMetricInMili(metricName, float64(queuelen))
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.activationTargetQueueLength, nil
return []external_metrics.ExternalMetricValue{metric}, queuelen > s.metadata.ActivationQueueLength, nil
}

func (s *azureQueueScaler) getMessageCount(ctx context.Context) (int64, error) {
strategy := strings.ToLower(s.metadata.queueLengthStrategy)
if strategy == QueueLengthStrategyVisibleOnly {
if strings.ToLower(s.metadata.QueueLengthStrategy) == queueLengthStrategyVisibleOnly {
queue, err := s.queueClient.PeekMessages(ctx, &azqueue.PeekMessagesOptions{NumberOfMessages: &maxPeekMessages})
if err != nil {
return 0, err
Expand Down
Loading

0 comments on commit 54c315d

Please sign in to comment.