From d664edfd1634c772f1874b40cd35edd0ed7d8538 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 26 Aug 2023 16:50:45 +0900 Subject: [PATCH] Use prometheus label to diffentiate scaledObject and scaledJob --- pkg/prommetrics/prommetrics.go | 67 +++++++++++++++++++++++++--------- pkg/scaling/scale_handler.go | 24 ++++++------ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/pkg/prommetrics/prommetrics.go b/pkg/prommetrics/prommetrics.go index 2540d0480e5..4f7400599bd 100644 --- a/pkg/prommetrics/prommetrics.go +++ b/pkg/prommetrics/prommetrics.go @@ -39,7 +39,7 @@ const ( ) var ( - metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"} + metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex", "type"} buildInfo = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, @@ -102,6 +102,15 @@ var ( }, []string{"namespace", "scaledObject"}, ) + scaledJobErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_job", + Name: "errors", + Help: "Number of scaled job errors", + }, + []string{"namespace", "scaledJob"}, + ) triggerTotalsGaugeVec = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -149,44 +158,44 @@ func init() { } // RecordScalerMetric create a measurement of the external metric used by the HPA -func RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { - scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) +func RecordScalerMetric(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { + scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value) } // RecordScalerLatency create a measurement of the latency to external metric -func RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { - scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) +func RecordScalerLatency(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) { + scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value) } // RecordScaledObjectLatency create a measurement of the latency executing scalable object loop func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) { - resourceType := "scaledjob" - if isScaledObject { - resourceType = "scaledobject" - } - internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value) + internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value) } // RecordScalerActive create a measurement of the activity of the scaler -func RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) { +func RecordScalerActive(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) { activeVal := 0 if active { activeVal = 1 } - scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal)) + scalerActive.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(float64(activeVal)) } // RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA -func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { +func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) { if err != nil { - scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc() - RecordScaledObjectError(namespace, scaledObject, err) + scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Inc() + if isScaledObject { + RecordScaledObjectError(namespace, scaledObject, err) + } else { + RecordScaledJobError(namespace, scaledObject, err) + } scalerErrorsTotal.With(prometheus.Labels{}).Inc() return } // initialize metric with 0 if not already set - _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)) + _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject))) if errscaler != nil { log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v") } @@ -207,13 +216,28 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) { } } +// RecordScaleJobError counts the number of errors with the scaled job +func RecordScaledJobError(namespace string, scaledJob string, err error) { + labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob} + if err != nil { + scaledObjectErrors.With(labels).Inc() + return + } + // initialize metric with 0 if not already set + _, errscaledjob := scaledObjectErrors.GetMetricWith(labels) + if errscaledjob != nil { + log.Error(err, "Unable to write to metrics to Prometheus Server: %v") + return + } +} + // RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge). func RecordBuildInfo() { buildInfo.WithLabelValues(version.Version, version.GitCommit, runtime.Version(), runtime.GOOS, runtime.GOARCH).Set(1) } -func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels { - return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric} +func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, resourceType string) prometheus.Labels { + return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric, "type": resourceType} } func IncrementTriggerTotal(triggerType string) { @@ -243,3 +267,10 @@ func DecrementCRDTotal(crdType, namespace string) { crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec() } + +func getResourceType(isScaledObject bool) string { + if isScaledObject { + return "scaledobject" + } + return "scaledjob" +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index db3f0423c00..05240325cc1 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -465,7 +465,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN var latency int64 metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { - prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency)) } logger.V(1).Info("Getting metrics from scaler", "scaler", scalerName, "metricName", spec.External.Metric.Name, "metrics", metrics, "scalerError", err) } @@ -479,11 +479,11 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } else { for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, metricValue) + prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, true, metricValue) } matchingMetrics = append(matchingMetrics, metrics...) } - prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, err) + prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, true, err) } } } @@ -566,7 +566,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k var latency int64 metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { - prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency)) } logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) @@ -585,7 +585,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k } else { for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) + prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, true, metricValue) } if isMetricActive { @@ -598,8 +598,8 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k } } } - prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err) - prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive) + prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, err) + prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, isMetricActive) } } @@ -626,7 +626,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) cache, err := h.GetScalersCache(ctx, scaledJob) - prommetrics.RecordScaledObjectError(scaledJob.Namespace, scaledJob.Name, err) + prommetrics.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) return nil @@ -655,7 +655,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav metricName := spec.External.Metric.Name metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { - prommetrics.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, float64(latency)) + prommetrics.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency)) } if err != nil { @@ -679,7 +679,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - prommetrics.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, metricValue) + prommetrics.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, false, metricValue) } if isTriggerActive { @@ -691,8 +691,8 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav } } - prommetrics.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, err) - prommetrics.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, isTriggerActive) + prommetrics.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, err) + prommetrics.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive) } } return scalersMetrics