From d39c920b583f4b1e1f2af79e4b78721186d27f65 Mon Sep 17 00:00:00 2001 From: Geoffrey Israel Date: Mon, 30 Oct 2023 12:45:30 +0100 Subject: [PATCH 1/2] add pause metric in prometheus for scaledobject (#5045) Signed-off-by: geoffrey1330 Signed-off-by: Geoffrey Israel Co-authored-by: Jorge Turrado Ferrero --- CHANGELOG.md | 1 + controllers/keda/scaledobject_controller.go | 2 + pkg/metricscollector/metricscollectors.go | 10 +++ pkg/metricscollector/opentelemetry.go | 26 +++++++ pkg/metricscollector/prommetrics.go | 22 ++++++ .../opentelemetry_metrics_test.go | 67 ++++++++++++++++++ .../prometheus_metrics_test.go | 69 ++++++++++++++++++- 7 files changed, 196 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e75673ee21..4500926390c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ Here is an overview of all new **experimental** features: - **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067)) - **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997)) - **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) +- **Prometheus Metrics**: Introduce paused ScaledObjects in Prometheus metrics ([#4430](https://github.com/kedacore/keda/issues/4430)) - **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069)) ### Fixes diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index fc8a896e498..3c7f930cbee 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -229,10 +229,12 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg return msg, err } conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg) + metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, true) return msg, nil } } else if conditions.GetPausedCondition().Status == metav1.ConditionTrue { conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject") + metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, false) } // Check scale target Name is specified diff --git a/pkg/metricscollector/metricscollectors.go b/pkg/metricscollector/metricscollectors.go index 41077b1a4ed..d367028e3b3 100644 --- a/pkg/metricscollector/metricscollectors.go +++ b/pkg/metricscollector/metricscollectors.go @@ -41,6 +41,9 @@ type MetricsCollector interface { // RecordScalerActive create a measurement of the activity of the scaler RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) + // RecordScaledObjectPaused marks whether the current ScaledObject is paused. + RecordScaledObjectPaused(namespace string, scaledObject string, active bool) + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) @@ -96,6 +99,13 @@ func RecordScalerActive(namespace string, scaledObject string, scaler string, sc } } +// RecordScaledObjectPaused marks whether the current ScaledObject is paused. +func RecordScaledObjectPaused(namespace string, scaledObject string, active bool) { + for _, element := range collectors { + element.RecordScaledObjectPaused(namespace, scaledObject, active) + } +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { for _, element := range collectors { diff --git a/pkg/metricscollector/opentelemetry.go b/pkg/metricscollector/opentelemetry.go index 500cc29ba35..3768f528ca5 100644 --- a/pkg/metricscollector/opentelemetry.go +++ b/pkg/metricscollector/opentelemetry.go @@ -158,6 +158,32 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, } } +// RecordScaledObjectPaused marks whether the current ScaledObject is paused. +func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject string, active bool) { + activeVal := 0 + if active { + activeVal = 1 + } + + opt := api.WithAttributes( + attribute.Key("namespace").String(namespace), + attribute.Key("scaledObject").String(scaledObject), + ) + + cback := func(ctx context.Context, obsrv api.Float64Observer) error { + obsrv.Observe(float64(activeVal), opt) + return nil + } + _, err := meter.Float64ObservableGauge( + "keda.scaled.object.paused", + api.WithDescription("Indicates whether a ScaledObject is paused"), + api.WithFloat64Callback(cback), + ) + if err != nil { + otLog.Error(err, "failed to register scaled object paused metric", "namespace", namespace, "scaledObject", scaledObject) + } +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/pkg/metricscollector/prommetrics.go b/pkg/metricscollector/prommetrics.go index 94ec66854bb..5d318d378ae 100644 --- a/pkg/metricscollector/prommetrics.go +++ b/pkg/metricscollector/prommetrics.go @@ -75,6 +75,15 @@ var ( }, metricLabels, ) + scaledObjectPaused = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_object", + Name: "paused", + Help: "Indicates whether a ScaledObject is paused", + }, + []string{"namespace", "scaledObject"}, + ) scalerErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, @@ -134,6 +143,7 @@ func NewPromMetrics() *PromMetrics { metrics.Registry.MustRegister(scalerActive) metrics.Registry.MustRegister(scalerErrors) metrics.Registry.MustRegister(scaledObjectErrors) + metrics.Registry.MustRegister(scaledObjectPaused) metrics.Registry.MustRegister(triggerTotalsGaugeVec) metrics.Registry.MustRegister(crdTotalsGaugeVec) @@ -177,6 +187,18 @@ func (p *PromMetrics) RecordScalerActive(namespace string, scaledObject string, scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal)) } +// RecordScaledObjectPaused marks whether the current ScaledObject is paused. +func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject string, active bool) { + labels := prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject} + + activeVal := 0 + if active { + activeVal = 1 + } + + scaledObjectPaused.With(labels).Set(float64(activeVal)) +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func (p *PromMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 10bc2abcd0f..57a057ec4ed 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -242,6 +242,28 @@ spec: name: {{.TestName}}-secret key: key --- +` + scaledObjectPausedTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + annotations: + autoscaling.keda.sh/paused-replicas: "2" +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + idleReplicaCount: 0 + minReplicaCount: 1 + maxReplicaCount: 2 + cooldownPeriod: 10 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' ` ) @@ -266,6 +288,7 @@ func TestPrometheusMetrics(t *testing.T) { testScalerErrors(t, data) testOperatorMetrics(t, kc, data) testScalableObjectMetrics(t) + testScaledObjectPausedMetric(t, data) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) @@ -499,6 +522,26 @@ func testScalerActiveMetric(t *testing.T) { } } +func testScaledObjectPausedMetric(t *testing.T, data templateData) { + t.Log("--- testing scaleobject pause metric ---") + + // Pause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectPausedTemplate", scaledObjectPausedTemplate) + + time.Sleep(20 * time.Second) + // Check that the paused metric is now true + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, true) + + // Unpause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + + time.Sleep(20 * time.Second) + // Check that the paused metric is back to false + families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, false) +} + func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing operator metrics ---") testOperatorMetricValues(t, kc) @@ -676,3 +719,27 @@ func checkCRTotalValues(t *testing.T, families map[string]*prommodel.MetricFamil expectedMetricValue, metricValue, crType, namespace) } } + +func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) { + family, ok := families["keda_scaled_object_paused"] + if !ok { + t.Errorf("keda_scaled_object_paused metric not available") + return + } + + metricValue := 0.0 + metrics := family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + metricValue = *metric.Gauge.Value + } + } + } + expectedMetricValue := 0 + if expected { + expectedMetricValue = 1 + } + assert.Equal(t, float64(expectedMetricValue), metricValue) +} diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index c9a91120b05..da6df210f71 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -244,6 +244,28 @@ spec: name: {{.TestName}}-secret key: key --- +` + scaledObjectPausedTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + annotations: + autoscaling.keda.sh/paused-replicas: "2" +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + idleReplicaCount: 0 + minReplicaCount: 1 + maxReplicaCount: 2 + cooldownPeriod: 10 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' ` ) @@ -271,7 +293,7 @@ func TestPrometheusMetrics(t *testing.T) { testMetricServerMetrics(t) testWebhookMetrics(t, data) testScalableObjectMetrics(t) - + testScaledObjectPausedMetric(t, data) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) } @@ -453,6 +475,31 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { return 0 } +func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) { + family, ok := families["keda_scaled_object_paused"] + if !ok { + t.Errorf("keda_scaled_object_paused metric not available") + return + } + + metricValue := 0.0 + metrics := family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + metricValue = *metric.Gauge.Value + } + } + } + + expectedMetricValue := 0 + if expected { + expectedMetricValue = 1 + } + assert.Equal(t, float64(expectedMetricValue), metricValue) +} + func testScalerMetricLatency(t *testing.T) { t.Log("--- testing scaler metric latency ---") @@ -536,6 +583,26 @@ func testScalerActiveMetric(t *testing.T) { } } +func testScaledObjectPausedMetric(t *testing.T, data templateData) { + t.Log("--- testing scaleobject pause metric ---") + + // Pause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectPausedTemplate", scaledObjectPausedTemplate) + time.Sleep(20 * time.Second) + + // Check that the paused metric is now true + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, true) + + // Unpause the ScaledObject + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + time.Sleep(20 * time.Second) + + // Check that the paused metric is back to false + families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + assertScaledObjectPausedMetric(t, families, scaledObjectName, false) +} + func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing operator metrics ---") testOperatorMetricValues(t, kc) From 4cbc74e51738eae35f9c8f476bdb3bc7fea03982 Mon Sep 17 00:00:00 2001 From: yuval weber Date: Tue, 31 Oct 2023 21:31:04 +0200 Subject: [PATCH 2/2] Support profiling for keda components (#5091) Signed-off-by: yuval weber Signed-off-by: Zbynek Roubalik Co-authored-by: Zbynek Roubalik --- CHANGELOG.md | 2 +- cmd/adapter/main.go | 9 ++++++--- cmd/operator/main.go | 3 +++ cmd/webhooks/main.go | 3 +++ 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4500926390c..e7aed57fdae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,7 +62,6 @@ Here is an overview of all new **experimental** features: - **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962)) - **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830)) -- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) - **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067)) - **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997)) - **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) @@ -92,6 +91,7 @@ New deprecation(s): - **General**: Fix CVE-2023-45142 in Opentelemetry ([#5089](https://github.com/kedacore/keda/issues/5089)) - **General**: Fix logger in Opentelemetry collector ([#5094](https://github.com/kedacore/keda/issues/5094)) +- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789)) ## v2.12.0 diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index cea7ff84238..c63a973c7f2 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -61,6 +61,7 @@ var ( metricsAPIServerPort int disableCompression bool metricsServiceAddr string + profilingAddr string ) func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsProvider, <-chan struct{}, error) { @@ -111,9 +112,10 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro Cache: ctrlcache.Options{ DefaultNamespaces: namespaces, }, - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, + PprofBindAddress: profilingAddr, + LeaseDuration: leaseDuration, + RenewDeadline: renewDeadline, + RetryPeriod: retryPeriod, }) if err != nil { logger.Error(err, "failed to setup manager") @@ -231,6 +233,7 @@ func main() { cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags cmd.Flags().IntVar(&metricsAPIServerPort, "port", 8080, "Set the port for the metrics API server") cmd.Flags().StringVar(&metricsServiceAddr, "metrics-service-address", generateDefaultMetricsServiceAddr(), "The address of the gRPRC Metrics Service Server.") + cmd.Flags().StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.") cmd.Flags().Float32Var(&adapterClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver") cmd.Flags().IntVar(&adapterClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver") cmd.Flags().BoolVar(&disableCompression, "disable-compression", true, "Disable response compression for k8s restAPI in client-go. ") diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 8a6d8043ad5..d72ff9b425b 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -67,6 +67,7 @@ func main() { var metricsAddr string var probeAddr string var metricsServiceAddr string + var profilingAddr string var enableLeaderElection bool var adapterClientRequestQPS float32 var adapterClientRequestBurst int @@ -84,6 +85,7 @@ func main() { pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the prometheus metric endpoint binds to.") pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") pflag.StringVar(&metricsServiceAddr, "metrics-service-bind-address", ":9666", "The address the gRPRC Metrics Service endpoint binds to.") + pflag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.") pflag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") @@ -151,6 +153,7 @@ func main() { DefaultNamespaces: namespaces, }, HealthProbeBindAddress: probeAddr, + PprofBindAddress: profilingAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "operator.keda.sh", LeaseDuration: leaseDuration, diff --git a/cmd/webhooks/main.go b/cmd/webhooks/main.go index 1ad2cbec2f1..0670081712f 100644 --- a/cmd/webhooks/main.go +++ b/cmd/webhooks/main.go @@ -55,6 +55,7 @@ func init() { func main() { var metricsAddr string var probeAddr string + var profilingAddr string var webhooksClientRequestQPS float32 var webhooksClientRequestBurst int var certDir string @@ -62,6 +63,7 @@ func main() { pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + pflag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.") pflag.Float32Var(&webhooksClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver") pflag.IntVar(&webhooksClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver") pflag.StringVar(&certDir, "cert-dir", "/certs", "Webhook certificates dir to use. Defaults to /certs") @@ -96,6 +98,7 @@ func main() { }, }), HealthProbeBindAddress: probeAddr, + PprofBindAddress: profilingAddr, }) if err != nil { setupLog.Error(err, "unable to start admission webhooks")