From a3fdad14161380e1d717b11fd3dc4085cc3da08f Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 26 Aug 2023 18:35:12 +0900 Subject: [PATCH] Expose prometheus metrics at ScaledJob like ScaledObject --- CHANGELOG.md | 1 + pkg/prommetrics/prommetrics.go | 70 ++++++++++++++++++------- pkg/scaling/scale_handler.go | 95 ++++++++++++++++++++++------------ 3 files changed, 115 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0064b171a2f..b78b5f4c86d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ New deprecation(s): - **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902)) - **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) +- **General**: Expose prometheus metrics at ScaledJob like ScaledObject ([#4798](https://github.com/kedacore/keda/issues/4798)) ## v2.11.2 diff --git a/pkg/prommetrics/prommetrics.go b/pkg/prommetrics/prommetrics.go index 00d2a3a74d6..717ff3d6144 100644 --- a/pkg/prommetrics/prommetrics.go +++ b/pkg/prommetrics/prommetrics.go @@ -39,7 +39,7 @@ const ( ) var ( - metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"} + metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex", "type"} buildInfo = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, @@ -102,6 +102,15 @@ var ( }, []string{"namespace", "scaledObject"}, ) + scaledJobErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_job", + Name: "errors", + Help: "Number of scaled job errors", + }, + []string{"namespace", "scaledJob"}, + ) triggerTotalsGaugeVec = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -140,6 +149,7 @@ func init() { metrics.Registry.MustRegister(scalerActive) metrics.Registry.MustRegister(scalerErrors) metrics.Registry.MustRegister(scaledObjectErrors) + metrics.Registry.MustRegister(scaledJobErrors) metrics.Registry.MustRegister(triggerTotalsGaugeVec) metrics.Registry.MustRegister(crdTotalsGaugeVec) @@ -149,44 +159,44 @@ func init() { } // RecordScalerMetric create a measurement of the external metric used by the HPA -func RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { - scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) +func RecordScalerMetric(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { + scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value) } // RecordScalerLatency create a measurement of the latency to external metric -func RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { - scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) +func RecordScalerLatency(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { + scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value) } // RecordScaledObjectLatency create a measurement of the latency executing scalable object loop func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) { - resourceType := "scaledjob" - if isScaledObject { - resourceType = "scaledobject" - } - internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value) + internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value) } // RecordScalerActive create a measurement of the activity of the scaler -func RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) { +func RecordScalerActive(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) { activeVal := 0 if active { activeVal = 1 } - scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal)) + scalerActive.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(float64(activeVal)) } -// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { +// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA +func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) { if err != nil { - scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc() - RecordScaledObjectError(namespace, scaledObject, err) + scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Inc() + if isScaledObject { + RecordScaledObjectError(namespace, scaledObject, err) + } else { + RecordScaledJobError(namespace, scaledObject, err) + } scalerErrorsTotal.With(prometheus.Labels{}).Inc() return } // initialize metric with 0 if not already set - _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)) + _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject))) if errscaler != nil { log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v") } @@ -207,13 +217,28 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) { } } +// RecordScaleJobError counts the number of errors with the scaled job +func RecordScaledJobError(namespace string, scaledJob string, err error) { + labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob} + if err != nil { + scaledObjectErrors.With(labels).Inc() + return + } + // initialize metric with 0 if not already set + _, errscaledjob := scaledJobErrors.GetMetricWith(labels) + if errscaledjob != nil { + log.Error(err, "Unable to write to metrics to Prometheus Server: %v") + return + } +} + // RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge). func RecordBuildInfo() { buildInfo.WithLabelValues(version.Version, version.GitCommit, runtime.Version(), runtime.GOOS, runtime.GOARCH).Set(1) } -func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels { - return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric} +func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, resourceType string) prometheus.Labels { + return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric, "type": resourceType} } func IncrementTriggerTotal(triggerType string) { @@ -243,3 +268,10 @@ func DecrementCRDTotal(crdType, namespace string) { crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec() } + +func getResourceType(isScaledObject bool) string { + if isScaledObject { + return "scaledobject" + } + return "scaledjob" +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 4999bd01eb6..66f392ecc8c 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -466,7 +466,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN var latency int64 metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { - prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency)) } logger.V(1).Info("Getting metrics from scaler", "scaler", scalerName, "metricName", spec.External.Metric.Name, "metrics", metrics, "scalerError", err) } @@ -480,11 +480,11 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } else { for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, metricValue) + prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, true, metricValue) } matchingMetrics = append(matchingMetrics, metrics...) } - prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, err) + prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, true, err) } } } @@ -567,7 +567,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k var latency int64 metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { - prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency)) } logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) @@ -586,7 +586,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k } else { for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) + prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, true, metricValue) } if isMetricActive { @@ -599,8 +599,8 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k } } } - prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err) - prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive) + prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, err) + prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, isMetricActive) } } @@ -624,46 +624,77 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics { + logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) + cache, err := h.GetScalersCache(ctx, scaledJob) + prommetrics.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) return nil } var scalersMetrics []scaledjob.ScalerMetrics - scalers, _ := cache.GetScalers() - for i, s := range scalers { + scalers, scalerConfigs := cache.GetScalers() + for scalerIndex, scaler := range scalers { + scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) + if scalerConfigs[scalerIndex].TriggerName != "" { + scalerName = scalerConfigs[scalerIndex].TriggerName + } isActive := false - scalerType := fmt.Sprintf("%T:", s) + scalerType := fmt.Sprintf("%T:", scaler) scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) - metricSpecs := s.GetMetricSpecForScaling(ctx) + metricSpecs := scaler.GetMetricSpecForScaling(ctx) - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } + for _, spec := range metricSpecs { + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || spec.External == nil { + continue + } - metrics, isTriggerActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) - cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } - if isTriggerActive { - isActive = true - } + metricName := spec.External.Metric.Name + metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) + if latency != -1 { + prommetrics.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency)) + } + + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) + cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + if isTriggerActive { + isActive = true + } + + queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount()) - queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount()) + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) + scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ + QueueLength: queueLength, + MaxValue: maxValue, + IsActive: isActive, + }) - scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ - QueueLength: queueLength, - MaxValue: maxValue, - IsActive: isActive, - }) + for _, metric := range metrics { + metricValue := metric.Value.AsApproximateFloat64() + prommetrics.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, false, metricValue) + } + + if isTriggerActive { + if spec.External != nil { + logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", metricName) + } + if spec.Resource != nil { + logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", spec.Resource.Name) + } + } + + prommetrics.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, err) + prommetrics.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive) + } } return scalersMetrics }