From e0c9557d837272febc618ca2f0beacb2c8d8ca2b Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Mon, 6 Nov 2023 18:47:41 +0900 Subject: [PATCH] Support e2e for scaledJob prometheus Signed-off-by: Yoon Park --- pkg/metricscollector/opentelemetry.go | 51 +++++-- .../opentelemetry_metrics_test.go | 132 +++++++++++++++++- .../prometheus_metrics_test.go | 129 ++++++++++++++++- 3 files changed, 289 insertions(+), 23 deletions(-) diff --git a/pkg/metricscollector/opentelemetry.go b/pkg/metricscollector/opentelemetry.go index 3f4b6ab16b0..6a6cb0eef0b 100644 --- a/pkg/metricscollector/opentelemetry.go +++ b/pkg/metricscollector/opentelemetry.go @@ -26,6 +26,7 @@ var ( meter api.Meter otScalerErrorsCounter api.Int64Counter otScaledObjectErrorsCounter api.Int64Counter + otScaledJobErrorsCounter api.Int64Counter otTriggerTotalsCounter api.Int64UpDownCounter otCrdTotalsCounter api.Int64UpDownCounter ) @@ -69,6 +70,11 @@ func initCounter() { otLog.Error(err, msg) } + otScaledJobErrorsCounter, err = meter.Int64Counter("keda.scaledjob.errors", api.WithDescription("Number of scaled job errors")) + if err != nil { + otLog.Error(err, msg) + } + otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers")) if err != nil { otLog.Error(err, msg) @@ -80,9 +86,9 @@ func initCounter() { } } -func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { +func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { cback := func(ctx context.Context, obsrv api.Float64Observer) error { - obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject)) + obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject)) return nil } _, err := meter.Float64ObservableGauge( @@ -91,14 +97,19 @@ func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, api.WithFloat64Callback(cback), ) if err != nil { - otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + if isScaledObject { + otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + } else { + otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledJob", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + } + } } // RecordScalerLatency create a measurement of the latency to external metric -func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { +func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { cback := func(ctx context.Context, obsrv api.Float64Observer) error { - obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject)) + obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject)) return nil } _, err := meter.Float64ObservableGauge( @@ -107,7 +118,12 @@ func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, api.WithFloat64Callback(cback), ) if err != nil { - otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + if isScaledObject { + otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + } else { + otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledJob", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + } + } } @@ -138,14 +154,14 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string, } // RecordScalerActive create a measurement of the activity of the scaler -func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) { +func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) { activeVal := -1 if active { activeVal = 1 } cback := func(ctx context.Context, obsrv api.Float64Observer) error { - obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject)) + obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject)) return nil } _, err := meter.Float64ObservableGauge( @@ -154,7 +170,12 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, api.WithFloat64Callback(cback), ) if err != nil { - otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + if isScaledObject { + otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + } else { + otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledJob", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric) + } + } } @@ -185,10 +206,14 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st } // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) { +func (o *OtelMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) { if err != nil { - otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject)) - o.RecordScaledObjectError(namespace, scaledObject, err) + otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject)) + if isScaledObject { + o.RecordScaledObjectError(namespace, scaledResource, err) + } else { + o.RecordScaledJobError(namespace, scaledResource, err) + } return } } @@ -210,7 +235,7 @@ func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, e attribute.Key("namespace").String(namespace), attribute.Key("scaledJob").String(scaledJob)) if err != nil { - otScaledObjectErrorsCounter.Add(context.Background(), 1, opt) + otScaledJobErrorsCounter.Add(context.Background(), 1, opt) return } } diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 57a057ec4ed..b0d268e1048 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -20,12 +20,12 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kedacore/keda/v2/pkg/metricscollector" - . "github.com/kedacore/keda/v2/tests/helper" ) const ( testName = "opentelemetry-metrics-test" labelScaledObject = "scaledObject" + labelScaledJob = "scaledJob" labelType = "type" ) @@ -34,7 +34,9 @@ var ( deploymentName = fmt.Sprintf("%s-deployment", testName) monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) - wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName) + wrongScaledObjectName = fmt.Sprintf("%s-so-wrong", testName) + scaledJobName = fmt.Sprintf("%s-sj", testName) + wrongScaledJobName = fmt.Sprintf("%s-sj-wrong", testName) wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName) cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName) clientName = fmt.Sprintf("%s-client", testName) @@ -47,7 +49,9 @@ type templateData struct { TestNamespace string DeploymentName string ScaledObjectName string + ScaledJobName string WrongScaledObjectName string + WrongScaledJobName string WrongScalerName string CronScaledJobName string MonitoredDeploymentName string @@ -146,6 +150,69 @@ spec: query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledObject="{{.WrongScaledObjectName}}"}' ` + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' +` + + wrongScaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 2 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: prometheus + name: {{.WrongScalerName}} + metadata: + serverAddress: http://keda-prometheus.keda.svc.cluster.local:8080 + metricName: keda_scaler_errors_total + threshold: '1' + query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledJob="{{.WrongScaledJobName}}"}' +` + cronScaledJobTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledJob @@ -285,6 +352,7 @@ func TestPrometheusMetrics(t *testing.T) { testScalerMetricLatency(t) testScalerActiveMetric(t) testScaledObjectErrors(t, data) + testScaledJobErrors(t, data) testScalerErrors(t, data) testOperatorMetrics(t, kc, data) testScalableObjectMetrics(t) @@ -301,6 +369,7 @@ func getTemplateData() (templateData, []Template) { DeploymentName: deploymentName, ScaledObjectName: scaledObjectName, WrongScaledObjectName: wrongScaledObjectName, + ScaledJobName: scaledJobName, WrongScalerName: wrongScalerName, MonitoredDeploymentName: monitoredDeploymentName, ClientName: clientName, @@ -309,6 +378,7 @@ func getTemplateData() (templateData, []Template) { {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "scaledJobTemplate", Config: scaledJobTemplate}, {Name: "clientTemplate", Config: clientTemplate}, {Name: "authenticatioNTemplate", Config: authenticationTemplate}, } @@ -337,7 +407,8 @@ func testScalerMetricValue(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(4), *metric.Gauge.Value) found = true } @@ -382,12 +453,48 @@ func testScaledObjectErrors(t *testing.T, data templateData) { time.Sleep(10 * time.Second) } +func testScaledJobErrors(t *testing.T, data templateData) { + t.Log("--- testing scaled job errors ---") + + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + + time.Sleep(20 * time.Second) + + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + if val, ok := family["keda_scaledjob_errors_total"]; ok { + errCounterVal1 := getErrorMetricsValue(val) + + // wait for 2 seconds as pollinginterval is 2 + time.Sleep(5 * time.Second) + + family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + if val, ok := family["keda_scaledjob_errors_total"]; ok { + errCounterVal2 := getErrorMetricsValue(val) + assert.NotEqual(t, errCounterVal2, float64(0)) + assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1) + } else { + t.Errorf("metric not available") + } + } else { + t.Errorf("metric not available") + } + + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + // wait for 10 seconds to correctly fetch metrics. + time.Sleep(10 * time.Second) +} + func testScalerErrors(t *testing.T, data templateData) { t.Log("--- testing scaler errors ---") KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + time.Sleep(15 * time.Second) family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) @@ -409,6 +516,9 @@ func testScalerErrors(t *testing.T, data templateData) { t.Errorf("metric not available") } + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) } @@ -425,6 +535,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { } } } + case "keda_scaledjob_errors_total": + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName { + return *metric.Counter.Value + } + } + } case "keda_scaler_errors_total": metrics := val.GetMetric() for _, metric := range metrics { @@ -450,7 +570,8 @@ func testScalerMetricLatency(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(0), *metric.Gauge.Value) found = true } @@ -510,7 +631,8 @@ func testScalerActiveMetric(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(1), *metric.Gauge.Value) found = true } diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index da6df210f71..0255b1593aa 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -20,12 +20,12 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kedacore/keda/v2/pkg/metricscollector" - . "github.com/kedacore/keda/v2/tests/helper" ) const ( testName = "prometheus-metrics-test" labelScaledObject = "scaledObject" + labelScaledJob = "scaledJob" labelType = "type" ) @@ -34,7 +34,9 @@ var ( deploymentName = fmt.Sprintf("%s-deployment", testName) monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) - wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName) + wrongScaledObjectName = fmt.Sprintf("%s-so-wrong", testName) + scaledJobName = fmt.Sprintf("%s-sj", testName) + wrongScaledJobName = fmt.Sprintf("%s-sj-wrong", testName) wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName) cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName) clientName = fmt.Sprintf("%s-client", testName) @@ -49,7 +51,9 @@ type templateData struct { TestNamespace string DeploymentName string ScaledObjectName string + ScaledJobName string WrongScaledObjectName string + WrongScaledJobName string WrongScalerName string CronScaledJobName string MonitoredDeploymentName string @@ -148,6 +152,68 @@ spec: query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledObject="{{.WrongScaledObjectName}}"}' ` + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' +` + + wrongScaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 2 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: prometheus + name: {{.WrongScalerName}} + metadata: + serverAddress: http://keda-prometheus.keda.svc.cluster.local:8080 + metricName: keda_scaler_errors_total + threshold: '1' + query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledJob="{{.WrongScaledJobName}}"}' +` cronScaledJobTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledJob @@ -287,6 +353,7 @@ func TestPrometheusMetrics(t *testing.T) { testScalerMetricLatency(t) testScalerActiveMetric(t) testScaledObjectErrors(t, data) + testScaledJobErrors(t, data) testScalerErrors(t, data) testScalerErrorsTotal(t, data) testOperatorMetrics(t, kc, data) @@ -305,6 +372,7 @@ func getTemplateData() (templateData, []Template) { DeploymentName: deploymentName, ScaledObjectName: scaledObjectName, WrongScaledObjectName: wrongScaledObjectName, + ScaledJobName: scaledJobName, WrongScalerName: wrongScalerName, MonitoredDeploymentName: monitoredDeploymentName, ClientName: clientName, @@ -313,6 +381,7 @@ func getTemplateData() (templateData, []Template) { {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "scaledJobTemplate", Config: scaledJobTemplate}, {Name: "clientTemplate", Config: clientTemplate}, {Name: "authenticatioNTemplate", Config: authenticationTemplate}, } @@ -342,7 +411,8 @@ func testScalerMetricValue(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(4), *metric.Gauge.Value) found = true } @@ -386,12 +456,47 @@ func testScaledObjectErrors(t *testing.T, data templateData) { KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) } +func testScaledJobErrors(t *testing.T, data templateData) { + t.Log("--- testing scaled job errors ---") + + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + + // wait for 2 seconds as pollinginterval is 2 + time.Sleep(20 * time.Second) + + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + if val, ok := family["keda_scaled_job_errors"]; ok { + errCounterVal1 := getErrorMetricsValue(val) + + // wait for 2 seconds as pollinginterval is 2 + time.Sleep(2 * time.Second) + + family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + if val, ok := family["keda_scaled_job_errors"]; ok { + errCounterVal2 := getErrorMetricsValue(val) + assert.NotEqual(t, errCounterVal2, float64(0)) + assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1) + } else { + t.Errorf("metric not available") + } + } else { + t.Errorf("metric not available") + } + + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) +} + func testScalerErrors(t *testing.T, data templateData) { t.Log("--- testing scaler errors ---") KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) if val, ok := family["keda_scaler_errors"]; ok { errCounterVal1 := getErrorMetricsValue(val) @@ -410,6 +515,8 @@ func testScalerErrors(t *testing.T, data templateData) { } else { t.Errorf("metric not available") } + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) @@ -461,6 +568,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { } } } + case "keda_scaled_job_errors": + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName { + return *metric.Counter.Value + } + } + } case "keda_scaler_errors": metrics := val.GetMetric() for _, metric := range metrics { @@ -511,7 +628,8 @@ func testScalerMetricLatency(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(0), *metric.Gauge.Value) found = true } @@ -571,7 +689,8 @@ func testScalerActiveMetric(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(1), *metric.Gauge.Value) found = true }