Skip to content

Commit

Permalink
Use prometheus label to diffentiate scaledObject and scaledJob
Browse files Browse the repository at this point in the history
  • Loading branch information
yoongon committed Aug 26, 2023
1 parent 9bd1160 commit d664edf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
67 changes: 49 additions & 18 deletions pkg/prommetrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
)

var (
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"}
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex", "type"}
buildInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Expand Down Expand Up @@ -102,6 +102,15 @@ var (
},
[]string{"namespace", "scaledObject"},
)
scaledJobErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_job",
Name: "errors",
Help: "Number of scaled job errors",
},
[]string{"namespace", "scaledJob"},
)

triggerTotalsGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -149,44 +158,44 @@ func init() {
}

// RecordScalerMetric create a measurement of the external metric used by the HPA
func RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
func RecordScalerMetric(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value)
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
func RecordScalerLatency(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value)
}

// RecordScaledObjectLatency create a measurement of the latency executing scalable object loop
func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
resourceType := "scaledjob"
if isScaledObject {
resourceType = "scaledobject"
}
internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value)
internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value)
}

// RecordScalerActive create a measurement of the activity of the scaler
func RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) {
func RecordScalerActive(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := 0
if active {
activeVal = 1
}

scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal))
scalerActive.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(float64(activeVal))
}

// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc()
RecordScaledObjectError(namespace, scaledObject, err)
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Inc()
if isScaledObject {
RecordScaledObjectError(namespace, scaledObject, err)
} else {
RecordScaledJobError(namespace, scaledObject, err)
}
scalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric))
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject)))
if errscaler != nil {
log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v")
}
Expand All @@ -207,13 +216,28 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) {
}
}

// RecordScaleJobError counts the number of errors with the scaled job
func RecordScaledJobError(namespace string, scaledJob string, err error) {
labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob}
if err != nil {
scaledObjectErrors.With(labels).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaledjob := scaledObjectErrors.GetMetricWith(labels)
if errscaledjob != nil {
log.Error(err, "Unable to write to metrics to Prometheus Server: %v")
return
}
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func RecordBuildInfo() {
buildInfo.WithLabelValues(version.Version, version.GitCommit, runtime.Version(), runtime.GOOS, runtime.GOARCH).Set(1)
}

func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric}
func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, resourceType string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric, "type": resourceType}
}

func IncrementTriggerTotal(triggerType string) {
Expand Down Expand Up @@ -243,3 +267,10 @@ func DecrementCRDTotal(crdType, namespace string) {

crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec()
}

func getResourceType(isScaledObject bool) string {
if isScaledObject {
return "scaledobject"
}
return "scaledjob"
}
24 changes: 12 additions & 12 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
var latency int64
metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency))
}
logger.V(1).Info("Getting metrics from scaler", "scaler", scalerName, "metricName", spec.External.Metric.Name, "metrics", metrics, "scalerError", err)
}
Expand All @@ -479,11 +479,11 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, metricValue)
prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, true, metricValue)
}
matchingMetrics = append(matchingMetrics, metrics...)
}
prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, true, err)
}
}
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
var latency int64
metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency))
}
logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err)

Expand All @@ -585,7 +585,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, true, metricValue)
}

if isMetricActive {
Expand All @@ -598,8 +598,8 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}
}
}
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive)
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, err)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, isMetricActive)
}
}

Expand All @@ -626,7 +626,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)

cache, err := h.GetScalersCache(ctx, scaledJob)
prommetrics.RecordScaledObjectError(scaledJob.Namespace, scaledJob.Name, err)
prommetrics.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil
Expand Down Expand Up @@ -655,7 +655,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
metricName := spec.External.Metric.Name
metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, float64(latency))
prommetrics.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency))
}

if err != nil {
Expand All @@ -679,7 +679,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav

for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
prommetrics.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, false, metricValue)
}

if isTriggerActive {
Expand All @@ -691,8 +691,8 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
}
}

prommetrics.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, isTriggerActive)
prommetrics.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, err)
prommetrics.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive)
}
}
return scalersMetrics
Expand Down

0 comments on commit d664edf

Please sign in to comment.