diff --git a/pkg/scalers/loki_scaler.go b/pkg/scalers/loki_scaler.go index 11a43e5384c..dff08107f02 100644 --- a/pkg/scalers/loki_scaler.go +++ b/pkg/scalers/loki_scaler.go @@ -19,37 +19,27 @@ import ( ) const ( - lokiServerAddress = "serverAddress" - lokiQuery = "query" - lokiThreshold = "threshold" - lokiActivationThreshold = "activationThreshold" - lokiNamespace = "namespace" - tenantName = "tenantName" + defaultIgnoreNullValues = true tenantNameHeaderKey = "X-Scope-OrgID" - lokiIgnoreNullValues = "ignoreNullValues" -) - -var ( - lokiDefaultIgnoreNullValues = true ) type lokiScaler struct { metricType v2.MetricTargetType - metadata *lokiMetadata + metadata lokiMetadata httpClient *http.Client logger logr.Logger } type lokiMetadata struct { - serverAddress string - query string - threshold float64 - activationThreshold float64 - lokiAuth *authentication.AuthMeta - triggerIndex int - tenantName string - ignoreNullValues bool - unsafeSsl bool + ServerAddress string `keda:"name=serverAddress,order=triggerMetadata"` + Query string `keda:"name=query,order=triggerMetadata"` + Threshold float64 `keda:"name=threshold,order=triggerMetadata"` + ActivationThreshold float64 `keda:"name=activationThreshold,order=triggerMetadata,default=0"` + TenantName string `keda:"name=tenantName,order=triggerMetadata,optional"` + IgnoreNullValues bool `keda:"name=ignoreNullValues,order=triggerMetadata,default=true"` + UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"` + TriggerIndex int + Auth *authentication.AuthMeta } type lokiQueryResult struct { @@ -57,113 +47,54 @@ type lokiQueryResult struct { Data struct { ResultType string `json:"resultType"` Result []struct { - Metric struct { - } `json:"metric"` - Value []interface{} `json:"value"` + Metric struct{} `json:"metric"` + Value []interface{} `json:"value"` } `json:"result"` } `json:"data"` } -// NewLokiScaler returns a new lokiScaler func NewLokiScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { return nil, fmt.Errorf("error getting scaler metric type: %w", err) } - logger := InitializeLogger(config, "loki_scaler") - meta, err := parseLokiMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing loki metadata: %w", err) } - httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl) + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl) return &lokiScaler{ metricType: metricType, metadata: meta, httpClient: httpClient, - logger: logger, + logger: InitializeLogger(config, "loki_scaler"), }, nil } -func parseLokiMetadata(config *scalersconfig.ScalerConfig) (meta *lokiMetadata, err error) { - meta = &lokiMetadata{} - - if val, ok := config.TriggerMetadata[lokiServerAddress]; ok && val != "" { - meta.serverAddress = val - } else { - return nil, fmt.Errorf("no %s given", lokiServerAddress) - } - - if val, ok := config.TriggerMetadata[lokiQuery]; ok && val != "" { - meta.query = val - } else { - return nil, fmt.Errorf("no %s given", lokiQuery) - } - - if val, ok := config.TriggerMetadata[lokiThreshold]; ok && val != "" { - t, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", lokiThreshold, err) - } - - meta.threshold = t - } else { - if config.AsMetricSource { - meta.threshold = 0 - } else { - return nil, fmt.Errorf("no %s given", lokiThreshold) - } - } - - meta.activationThreshold = 0 - if val, ok := config.TriggerMetadata[lokiActivationThreshold]; ok { - t, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThreshold parsing error %w", err) - } - - meta.activationThreshold = t - } - - if val, ok := config.TriggerMetadata[tenantName]; ok && val != "" { - meta.tenantName = val +func parseLokiMetadata(config *scalersconfig.ScalerConfig) (lokiMetadata, error) { + meta := lokiMetadata{} + err := config.TypedConfig(&meta) + if err != nil { + return meta, fmt.Errorf("error parsing loki metadata: %w", err) } - meta.ignoreNullValues = lokiDefaultIgnoreNullValues - if val, ok := config.TriggerMetadata[lokiIgnoreNullValues]; ok && val != "" { - ignoreNullValues, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("err incorrect value for ignoreNullValues given: %s please use true or false", val) - } - meta.ignoreNullValues = ignoreNullValues + if config.AsMetricSource { + meta.Threshold = 0 } - meta.unsafeSsl = false - if val, ok := config.TriggerMetadata[unsafeSsl]; ok && val != "" { - unsafeSslValue, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", unsafeSsl, err) - } - - meta.unsafeSsl = unsafeSslValue - } - - meta.triggerIndex = config.TriggerIndex - - // parse auth configs from ScalerConfig auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) if err != nil { - return nil, err + return meta, err } - meta.lokiAuth = auth + meta.Auth = auth + meta.TriggerIndex = config.TriggerIndex return meta, nil } -// Close returns a nil error func (s *lokiScaler) Close(context.Context) error { if s.httpClient != nil { s.httpClient.CloseIdleConnections() @@ -171,100 +102,101 @@ func (s *lokiScaler) Close(context.Context) error { return nil } -// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler func (s *lokiScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "loki"), + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, "loki"), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.threshold), - } - metricSpec := v2.MetricSpec{ - External: externalMetric, Type: externalMetricType, + Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold), } + metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} } -// ExecuteLokiQuery returns the result of the LogQL query execution func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) { - u, err := url.ParseRequestURI(s.metadata.serverAddress) + u, err := url.ParseRequestURI(s.metadata.ServerAddress) if err != nil { return -1, err } u.Path = "/loki/api/v1/query" - - u.RawQuery = url.Values{ - "query": []string{s.metadata.query}, - }.Encode() + u.RawQuery = url.Values{"query": []string{s.metadata.Query}}.Encode() req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) if err != nil { return -1, err } - if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth { - req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth)) - } else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth { - req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password) + if s.metadata.Auth != nil { + if s.metadata.Auth.EnableBearerAuth { + req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.Auth)) + } else if s.metadata.Auth.EnableBasicAuth { + req.SetBasicAuth(s.metadata.Auth.Username, s.metadata.Auth.Password) + } } - if s.metadata.tenantName != "" { - req.Header.Add(tenantNameHeaderKey, s.metadata.tenantName) + if s.metadata.TenantName != "" { + req.Header.Add(tenantNameHeaderKey, s.metadata.TenantName) } - r, err := s.httpClient.Do(req) + resp, err := s.httpClient.Do(req) if err != nil { return -1, err } - defer r.Body.Close() + defer resp.Body.Close() - b, err := io.ReadAll(r.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return -1, err } - if !(r.StatusCode >= 200 && r.StatusCode <= 299) { - err := fmt.Errorf("loki query api returned error. status: %d response: %s", r.StatusCode, string(b)) - s.logger.Error(err, "loki query api returned error") - return -1, err + if resp.StatusCode < 200 || resp.StatusCode > 299 { + return -1, fmt.Errorf("loki query api returned error. status: %d response: %s", resp.StatusCode, string(body)) } var result lokiQueryResult - err = json.Unmarshal(b, &result) - if err != nil { + if err := json.Unmarshal(body, &result); err != nil { return -1, err } - var v float64 = -1 + return s.parseQueryResult(result) +} - // allow for zero element or single element result sets +func (s *lokiScaler) parseQueryResult(result lokiQueryResult) (float64, error) { if len(result.Data.Result) == 0 { - if s.metadata.ignoreNullValues { + if s.metadata.IgnoreNullValues { return 0, nil } return -1, fmt.Errorf("loki metrics may be lost, the result is empty") - } else if len(result.Data.Result) > 1 { - return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.query) } - valueLen := len(result.Data.Result[0].Value) - if valueLen == 0 { - if s.metadata.ignoreNullValues { + if len(result.Data.Result) > 1 { + return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.Query) + } + + values := result.Data.Result[0].Value + if len(values) == 0 { + if s.metadata.IgnoreNullValues { return 0, nil } return -1, fmt.Errorf("loki metrics may be lost, the value list is empty") - } else if valueLen < 2 { - return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.query) } - val := result.Data.Result[0].Value[1] - if val != nil { - str := val.(string) - v, err = strconv.ParseFloat(str, 64) - if err != nil { - s.logger.Error(err, "Error converting loki value", "loki_value", str) - return -1, err - } + if len(values) < 2 { + return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.Query) + } + + if values[1] == nil { + return 0, nil + } + + str, ok := values[1].(string) + if !ok { + return -1, fmt.Errorf("failed to parse loki value as string") + } + + v, err := strconv.ParseFloat(str, 64) + if err != nil { + return -1, fmt.Errorf("error converting loki value %s: %w", str, err) } return v, nil @@ -279,6 +211,5 @@ func (s *lokiScaler) GetMetricsAndActivity(ctx context.Context, metricName strin } metric := GenerateMetricInMili(metricName, val) - - return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil } diff --git a/pkg/scalers/loki_scaler_test.go b/pkg/scalers/loki_scaler_test.go index 06f95f46419..e5f8082269d 100644 --- a/pkg/scalers/loki_scaler_test.go +++ b/pkg/scalers/loki_scaler_test.go @@ -38,7 +38,7 @@ var testLokiMetadata = []parseLokiMetadataTestData{ {map[string]string{"serverAddress": "http://localhost:3100", "threshold": "1", "query": ""}, true}, // ignoreNullValues with wrong value {map[string]string{"serverAddress": "http://localhost:3100", "threshold": "1", "query": "sum(rate({filename=\"/var/log/syslog\"}[1m])) by (level)", "ignoreNullValues": "xxxx"}, true}, - + // with unsafeSsl {map[string]string{"serverAddress": "https://localhost:3100", "threshold": "1", "query": "sum(rate({filename=\"/var/log/syslog\"}[1m])) by (level)", "unsafeSsl": "true"}, false}, } @@ -83,14 +83,14 @@ func TestLokiScalerAuthParams(t *testing.T) { } if err == nil { - if meta.lokiAuth.EnableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic") { + if meta.Auth.EnableBasicAuth && !strings.Contains(testData.metadata["authModes"], "basic") { t.Error("wrong auth mode detected") } } } } -type lokiQromQueryResultTestData struct { +type lokiQueryResultTestData struct { name string bodyStr string responseStatus int @@ -100,7 +100,7 @@ type lokiQromQueryResultTestData struct { unsafeSsl bool } -var testLokiQueryResult = []lokiQromQueryResultTestData{ +var testLokiQueryResult = []lokiQueryResultTestData{ { name: "no results", bodyStr: `{}`, @@ -189,17 +189,16 @@ func TestLokiScalerExecuteLogQLQuery(t *testing.T) { t.Run(testData.name, func(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(testData.responseStatus) - if _, err := writer.Write([]byte(testData.bodyStr)); err != nil { t.Fatal(err) } })) scaler := lokiScaler{ - metadata: &lokiMetadata{ - serverAddress: server.URL, - ignoreNullValues: testData.ignoreNullValues, - unsafeSsl: testData.unsafeSsl, + metadata: lokiMetadata{ + ServerAddress: server.URL, + IgnoreNullValues: testData.ignoreNullValues, + UnsafeSsl: testData.unsafeSsl, }, httpClient: http.DefaultClient, logger: logr.Discard(), @@ -208,7 +207,6 @@ func TestLokiScalerExecuteLogQLQuery(t *testing.T) { value, err := scaler.ExecuteLokiQuery(context.TODO()) assert.Equal(t, testData.expectedValue, value) - if testData.isError { assert.Error(t, err) } else { @@ -219,7 +217,7 @@ func TestLokiScalerExecuteLogQLQuery(t *testing.T) { } func TestLokiScalerTenantHeader(t *testing.T) { - testData := lokiQromQueryResultTestData{ + testData := lokiQueryResultTestData{ name: "no values", bodyStr: `{"data":{"result":[]}}`, responseStatus: http.StatusOK, @@ -238,15 +236,14 @@ func TestLokiScalerTenantHeader(t *testing.T) { })) scaler := lokiScaler{ - metadata: &lokiMetadata{ - serverAddress: server.URL, - tenantName: tenantName, - ignoreNullValues: testData.ignoreNullValues, + metadata: lokiMetadata{ + ServerAddress: server.URL, + TenantName: tenantName, + IgnoreNullValues: testData.ignoreNullValues, }, httpClient: http.DefaultClient, } _, err := scaler.ExecuteLokiQuery(context.TODO()) - assert.NoError(t, err) } diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 5a0516f42a0..521d5693442 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -25,18 +25,6 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) -const ( - promServerAddress = "serverAddress" - promQuery = "query" - promQueryParameters = "queryParameters" - promThreshold = "threshold" - promActivationThreshold = "activationThreshold" - promNamespace = "namespace" - promCustomHeaders = "customHeaders" - ignoreNullValues = "ignoreNullValues" - unsafeSsl = "unsafeSsl" -) - type prometheusScaler struct { metricType v2.MetricTargetType metadata *prometheusMetadata