From d39c920b583f4b1e1f2af79e4b78721186d27f65 Mon Sep 17 00:00:00 2001 From: Geoffrey Israel Date: Mon, 30 Oct 2023 12:45:30 +0100 Subject: [PATCH] add pause metric in prometheus for scaledobject (#5045) Signed-off-by: geoffrey1330 Signed-off-by: Geoffrey Israel Co-authored-by: Jorge Turrado Ferrero --- CHANGELOG.md | 1 + controllers/keda/scaledobject_controller.go | 2 + pkg/metricscollector/metricscollectors.go | 10 +++ pkg/metricscollector/opentelemetry.go | 26 +++++++ pkg/metricscollector/prommetrics.go | 22 ++++++ .../opentelemetry_metrics_test.go | 67 ++++++++++++++++++ .../prometheus_metrics_test.go | 69 ++++++++++++++++++- 7 files changed, 196 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e75673ee21..4500926390c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ Here is an overview of all new **experimental** features: - **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067)) - **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997)) - **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) +- **Prometheus Metrics**: Introduce paused ScaledObjects in Prometheus metrics ([#4430](https://github.com/kedacore/keda/issues/4430)) - **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069)) ### Fixes diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index fc8a896e498..3c7f930cbee 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -229,10 +229,12 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg return msg, err } conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg) + metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, true) return msg, nil } } else if conditions.GetPausedCondition().Status == metav1.ConditionTrue { conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject") + metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, false) } // Check scale target Name is specified diff --git a/pkg/metricscollector/metricscollectors.go b/pkg/metricscollector/metricscollectors.go index 41077b1a4ed..d367028e3b3 100644 --- a/pkg/metricscollector/metricscollectors.go +++ b/pkg/metricscollector/metricscollectors.go @@ -41,6 +41,9 @@ type MetricsCollector interface { // RecordScalerActive create a measurement of the activity of the scaler RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) + // RecordScaledObjectPaused marks whether the current ScaledObject is paused. + RecordScaledObjectPaused(namespace string, scaledObject string, active bool) + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) @@ -96,6 +99,13 @@ func RecordScalerActive(namespace string, scaledObject string, scaler string, sc } } +// RecordScaledObjectPaused marks whether the current ScaledObject is paused. +func RecordScaledObjectPaused(namespace string, scaledObject string, active bool) { + for _, element := range collectors { + element.RecordScaledObjectPaused(namespace, scaledObject, active) + } +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { for _, element := range collectors { diff --git a/pkg/metricscollector/opentelemetry.go b/pkg/metricscollector/opentelemetry.go index 500cc29ba35..3768f528ca5 100644 --- a/pkg/metricscollector/opentelemetry.go +++ b/pkg/metricscollector/opentelemetry.go @@ -158,6 +158,32 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, } } +// RecordScaledObjectPaused marks whether the current ScaledObject is paused. +func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject string, active bool) { + activeVal := 0 + if active { + activeVal = 1 + } + + opt := api.WithAttributes( + attribute.Key("namespace").String(namespace), + attribute.Key("scaledObject").String(scaledObject), + ) + + cback := func(ctx context.Context, obsrv api.Float64Observer) error { + obsrv.Observe(float64(activeVal), opt) + return nil + } + _, err := meter.Float64ObservableGauge( + "keda.scaled.object.paused", + api.WithDescription("Indicates whether a ScaledObject is paused"), + api.WithFloat64Callback(cback), + ) + if err != nil { + otLog.Error(err, "failed to register scaled object paused metric", "namespace", namespace, "scaledObject", scaledObject) + } +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/pkg/metricscollector/prommetrics.go b/pkg/metricscollector/prommetrics.go index 94ec66854bb..5d318d378ae 100644 --- a/pkg/metricscollector/prommetrics.go +++ b/pkg/metricscollector/prommetrics.go @@ -75,6 +75,15 @@ var ( }, metricLabels, ) + scaledObjectPaused = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_object", + Name: "paused", + Help: "Indicates whether a ScaledObject is paused", + }, + []string{"namespace", "scaledObject"}, + ) scalerErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, @@ -134,6 +143,7 @@ func NewPromMetrics() *PromMetrics { metrics.Registry.MustRegister(scalerActive) metrics.Registry.MustRegister(scalerErrors) metrics.Registry.MustRegister(scaledObjectErrors) + metrics.Registry.MustRegister(scaledObjectPaused) metrics.Registry.MustRegister(triggerTotalsGaugeVec) metrics.Registry.MustRegister(crdTotalsGaugeVec) @@ -177,6 +187,18 @@ func (p *PromMetrics) RecordScalerActive(namespace string, scaledObject string, scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal)) } +// RecordScaledObjectPaused marks whether the current ScaledObject is paused. +func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject string, active bool) { + labels := prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject} + + activeVal := 0 + if active { + activeVal = 1 + } + + scaledObjectPaused.With(labels).Set(float64(activeVal)) +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func (p *PromMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 10bc2abcd0f..57a057ec4ed 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -242,6 +242,28 @@ spec: name: {{.TestName}}-secret key: key --- +` + scaledObjectPausedTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + annotations: + autoscaling.keda.sh/paused-replicas: "2" +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + idleReplicaCount: 0 + minReplicaCount: 1 + maxReplicaCount: 2 + cooldownPeriod: 10 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' ` ) @@ -266,6 +288,7 @@ func TestPrometheusMetrics(t *testing.T) { testScalerErrors(t, data) testOperatorMetrics(t, kc, data) testScalableObjectMetrics(t) + testScaledObjectPausedMetric(t, data) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) @@ -499,6 +522,26 @@ func testScalerActiveMetric(t *testing.T) { } } +func testScaledObjectPausedMetric(t *testing.T, data templateData) { + t.Log("--- testing scaleobject pause metric ---") + + // Pause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectPausedTemplate", scaledObjectPausedTemplate) + + time.Sleep(20 * time.Second) + // Check that the paused metric is now true + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, true) + + // Unpause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + + time.Sleep(20 * time.Second) + // Check that the paused metric is back to false + families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, false) +} + func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing operator metrics ---") testOperatorMetricValues(t, kc) @@ -676,3 +719,27 @@ func checkCRTotalValues(t *testing.T, families map[string]*prommodel.MetricFamil expectedMetricValue, metricValue, crType, namespace) } } + +func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) { + family, ok := families["keda_scaled_object_paused"] + if !ok { + t.Errorf("keda_scaled_object_paused metric not available") + return + } + + metricValue := 0.0 + metrics := family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + metricValue = *metric.Gauge.Value + } + } + } + expectedMetricValue := 0 + if expected { + expectedMetricValue = 1 + } + assert.Equal(t, float64(expectedMetricValue), metricValue) +} diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index c9a91120b05..da6df210f71 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -244,6 +244,28 @@ spec: name: {{.TestName}}-secret key: key --- +` + scaledObjectPausedTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + annotations: + autoscaling.keda.sh/paused-replicas: "2" +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + idleReplicaCount: 0 + minReplicaCount: 1 + maxReplicaCount: 2 + cooldownPeriod: 10 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' ` ) @@ -271,7 +293,7 @@ func TestPrometheusMetrics(t *testing.T) { testMetricServerMetrics(t) testWebhookMetrics(t, data) testScalableObjectMetrics(t) - + testScaledObjectPausedMetric(t, data) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) } @@ -453,6 +475,31 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { return 0 } +func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) { + family, ok := families["keda_scaled_object_paused"] + if !ok { + t.Errorf("keda_scaled_object_paused metric not available") + return + } + + metricValue := 0.0 + metrics := family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + metricValue = *metric.Gauge.Value + } + } + } + + expectedMetricValue := 0 + if expected { + expectedMetricValue = 1 + } + assert.Equal(t, float64(expectedMetricValue), metricValue) +} + func testScalerMetricLatency(t *testing.T) { t.Log("--- testing scaler metric latency ---") @@ -536,6 +583,26 @@ func testScalerActiveMetric(t *testing.T) { } } +func testScaledObjectPausedMetric(t *testing.T, data templateData) { + t.Log("--- testing scaleobject pause metric ---") + + // Pause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectPausedTemplate", scaledObjectPausedTemplate) + time.Sleep(20 * time.Second) + + // Check that the paused metric is now true + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, true) + + // Unpause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + time.Sleep(20 * time.Second) + + // Check that the paused metric is back to false + families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, false) +} + func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing operator metrics ---") testOperatorMetricValues(t, kc)