Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor elasticsearch scaler config #6101

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 58 additions & 196 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
Expand All @@ -22,28 +21,45 @@ import (

type elasticsearchScaler struct {
metricType v2.MetricTargetType
metadata *elasticsearchMetadata
metadata elasticsearchMetadata
esClient *elasticsearch.Client
logger logr.Logger
}

type elasticsearchMetadata struct {
addresses []string
unsafeSsl bool
username string
password string
cloudID string
apiKey string
indexes []string
searchTemplateName string
parameters []string
valueLocation string
targetValue float64
activationTargetValue float64
metricName string
Addresses []string `keda:"name=addresses, order=authParams;triggerMetadata, optional"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
Username string `keda:"name=username, order=authParams;triggerMetadata, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata, optional"`
CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"`
APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`
Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"`
Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"`
ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata, default=0"`
MetricName string `keda:"name=metricName, order=triggerMetadata, optional"`

TriggerIndex int
}

func (m *elasticsearchMetadata) Validate() error {
if (m.CloudID != "" || m.APIKey != "") && (len(m.Addresses) > 0 || m.Username != "" || m.Password != "") {
return fmt.Errorf("can't provide both cloud config and endpoint addresses")
}
if (m.CloudID == "" && m.APIKey == "") && (len(m.Addresses) == 0 && m.Username == "" && m.Password == "") {
return fmt.Errorf("must provide either cloud config or endpoint addresses")
}
if (m.CloudID != "" && m.APIKey == "") || (m.CloudID == "" && m.APIKey != "") {
return fmt.Errorf("both cloudID and apiKey must be provided when cloudID or apiKey is used")
}
if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") {
return fmt.Errorf("both username and password must be provided when addresses is used")
}
return nil
}

// NewElasticsearchScaler creates a new elasticsearch scaler
func NewElasticsearchScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -69,184 +85,37 @@ func NewElasticsearchScaler(config *scalersconfig.ScalerConfig) (Scaler, error)
}, nil
}

const defaultUnsafeSsl = false

func hasCloudConfig(meta *elasticsearchMetadata) bool {
if meta.cloudID != "" {
return true
}
if meta.apiKey != "" {
return true
}
return false
}

func hasEndpointsConfig(meta *elasticsearchMetadata) bool {
if len(meta.addresses) > 0 {
return true
}
if meta.username != "" {
return true
}
if meta.password != "" {
return true
}
return false
}

func extractEndpointsConfig(config *scalersconfig.ScalerConfig, meta *elasticsearchMetadata) error {
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return err
}

meta.addresses = splitAndTrimBySep(addresses, ",")
if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

return nil
}

func extractCloudConfig(config *scalersconfig.ScalerConfig, meta *elasticsearchMetadata) error {
cloudID, err := GetFromAuthOrMeta(config, "cloudID")
if err != nil {
return err
}
meta.cloudID = cloudID

apiKey, err := GetFromAuthOrMeta(config, "apiKey")
if err != nil {
return err
}
meta.apiKey = apiKey
return nil
}

var (
// ErrElasticsearchMissingAddressesOrCloudConfig is returned when endpoint addresses or cloud config is missing.
ErrElasticsearchMissingAddressesOrCloudConfig = errors.New("must provide either endpoint addresses or cloud config")

// ErrElasticsearchConfigConflict is returned when both endpoint addresses and cloud config are provided.
ErrElasticsearchConfigConflict = errors.New("can't provide endpoint addresses and cloud config at the same time")
)

func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (*elasticsearchMetadata, error) {
func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}
err := config.TypedConfig(&meta)

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
cloudID, errCloudConfig := GetFromAuthOrMeta(config, "cloudID")
if err != nil && errCloudConfig != nil {
return nil, ErrElasticsearchMissingAddressesOrCloudConfig
}

if err == nil && addresses != "" {
err = extractEndpointsConfig(config, &meta)
if err != nil {
return nil, err
}
}
if errCloudConfig == nil && cloudID != "" {
err = extractCloudConfig(config, &meta)
if err != nil {
return nil, err
}
}

if hasEndpointsConfig(&meta) && hasCloudConfig(&meta) {
return nil, ErrElasticsearchConfigConflict
}

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
unsafeSsl, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %w", err)
}
meta.unsafeSsl = unsafeSsl
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
}
meta.indexes = splitAndTrimBySep(index, ";")

searchTemplateName, err := GetFromAuthOrMeta(config, "searchTemplateName")
if err != nil {
return nil, err
}
meta.searchTemplateName = searchTemplateName

if val, ok := config.TriggerMetadata["parameters"]; ok {
meta.parameters = splitAndTrimBySep(val, ";")
}

valueLocation, err := GetFromAuthOrMeta(config, "valueLocation")
if err != nil {
return nil, err
}
meta.valueLocation = valueLocation

targetValueString, err := GetFromAuthOrMeta(config, "targetValue")
if err != nil {
if config.AsMetricSource {
targetValueString = "0"
} else {
return nil, err
}
}
targetValue, err := strconv.ParseFloat(targetValueString, 64)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error: %w", err)
return meta, err
}
meta.targetValue = targetValue

meta.activationTargetValue = 0
if val, ok := config.TriggerMetadata["activationTargetValue"]; ok {
activationTargetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetValue parsing error: %w", err)
}
meta.activationTargetValue = activationTargetValue
}
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName)))
meta.TriggerIndex = config.TriggerIndex

meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName)))
return &meta, nil
return meta, nil
}

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata, logger logr.Logger) (*elasticsearch.Client, error) {
func newElasticsearchClient(meta elasticsearchMetadata, logger logr.Logger) (*elasticsearch.Client, error) {
var config elasticsearch.Config

if hasCloudConfig(meta) {
if meta.CloudID != "" {
config = elasticsearch.Config{
CloudID: meta.cloudID,
APIKey: meta.apiKey,
CloudID: meta.CloudID,
APIKey: meta.APIKey,
}
} else {
config = elasticsearch.Config{
Addresses: meta.addresses,
}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
Addresses: meta.Addresses,
Username: meta.Username,
Password: meta.Password,
}
}

config.Transport = util.CreateHTTPTransport(meta.unsafeSsl)
config.Transport = util.CreateHTTPTransport(meta.UnsafeSsl)
esClient, err := elasticsearch.NewClient(config)
if err != nil {
logger.Error(err, fmt.Sprintf("Found error when creating client: %s", err))
Expand All @@ -269,14 +138,14 @@ func (s *elasticsearchScaler) Close(_ context.Context) error {
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil {
s.logger.Error(err, "Error encoding query: %s", err)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
s.esClient.SearchTemplate.WithIndex(s.metadata.Index...),
s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
Expand All @@ -289,7 +158,7 @@ func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, erro
if err != nil {
return 0, err
}
v, err := getValueFromSearch(b, s.metadata.valueLocation)
v, err := getValueFromSearch(b, s.metadata.ValueLocation)
if err != nil {
return 0, err
}
Expand All @@ -298,14 +167,16 @@ func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, erro

func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} {
parameters := map[string]interface{}{}
for _, p := range metadata.parameters {
for _, p := range metadata.Parameters {
if p != "" {
kv := splitAndTrimBySep(p, ":")
parameters[kv[0]] = kv[1]
kv := strings.Split(p, ":")
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
parameters[key] = value
}
}
query := map[string]interface{}{
"id": metadata.searchTemplateName,
"id": metadata.SearchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
Expand Down Expand Up @@ -333,9 +204,9 @@ func getValueFromSearch(body []byte, valueLocation string) (float64, error) {
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
Name: s.metadata.MetricName,
},
Target: GetMetricTargetMili(s.metricType, s.metadata.targetValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand All @@ -352,14 +223,5 @@ func (s *elasticsearchScaler) GetMetricsAndActivity(ctx context.Context, metricN

metric := GenerateMetricInMili(metricName, num)

return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.activationTargetValue, nil
}

// Splits a string separated by a specified separator and trims space from all the elements.
func splitAndTrimBySep(s string, sep string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil
}
Loading
Loading