Skip to content

Commit

Permalink
Expose prometheus metrics at ScaledJob like ScaledObject
Browse files Browse the repository at this point in the history
Signed-off-by: Yoon Park <[email protected]>
  • Loading branch information
yoongon committed Aug 26, 2023
1 parent ae716c7 commit dddc7e5
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ New deprecation(s):

- **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))
- **General**: Expose prometheus metrics at ScaledJob like ScaledObject ([#4798](https://github.com/kedacore/keda/issues/4798))

## v2.11.2

Expand Down
70 changes: 51 additions & 19 deletions pkg/prommetrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
)

var (
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"}
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex", "type"}
buildInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Expand Down Expand Up @@ -102,6 +102,15 @@ var (
},
[]string{"namespace", "scaledObject"},
)
scaledJobErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_job",
Name: "errors",
Help: "Number of scaled job errors",
},
[]string{"namespace", "scaledJob"},
)

triggerTotalsGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -140,6 +149,7 @@ func init() {
metrics.Registry.MustRegister(scalerActive)
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)
metrics.Registry.MustRegister(scaledJobErrors)

metrics.Registry.MustRegister(triggerTotalsGaugeVec)
metrics.Registry.MustRegister(crdTotalsGaugeVec)
Expand All @@ -149,44 +159,44 @@ func init() {
}

// RecordScalerMetric create a measurement of the external metric used by the HPA
func RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
func RecordScalerMetric(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value)
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
func RecordScalerLatency(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(value)
}

// RecordScaledObjectLatency create a measurement of the latency executing scalable object loop
func RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
resourceType := "scaledjob"
if isScaledObject {
resourceType = "scaledobject"
}
internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value)
internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value)
}

// RecordScalerActive create a measurement of the activity of the scaler
func RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) {
func RecordScalerActive(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := 0
if active {
activeVal = 1
}

scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal))
scalerActive.With(getLabels(namespace, scaledResource, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Set(float64(activeVal))
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc()
RecordScaledObjectError(namespace, scaledObject, err)
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject))).Inc()
if isScaledObject {
RecordScaledObjectError(namespace, scaledObject, err)
} else {
RecordScaledJobError(namespace, scaledObject, err)
}
scalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric))
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric, getResourceType(isScaledObject)))
if errscaler != nil {
log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v")
}
Expand All @@ -207,13 +217,28 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) {
}
}

// RecordScaleJobError counts the number of errors with the scaled job
func RecordScaledJobError(namespace string, scaledJob string, err error) {
labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob}
if err != nil {
scaledObjectErrors.With(labels).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaledjob := scaledJobErrors.GetMetricWith(labels)
if errscaledjob != nil {
log.Error(err, "Unable to write to metrics to Prometheus Server: %v")
return
}
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func RecordBuildInfo() {
buildInfo.WithLabelValues(version.Version, version.GitCommit, runtime.Version(), runtime.GOOS, runtime.GOARCH).Set(1)
}

func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric}
func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, resourceType string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric, "type": resourceType}
}

func IncrementTriggerTotal(triggerType string) {
Expand Down Expand Up @@ -243,3 +268,10 @@ func DecrementCRDTotal(crdType, namespace string) {

crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec()
}

func getResourceType(isScaledObject bool) string {
if isScaledObject {
return "scaledobject"
}
return "scaledjob"
}
95 changes: 63 additions & 32 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
var latency int64
metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency))
}
logger.V(1).Info("Getting metrics from scaler", "scaler", scalerName, "metricName", spec.External.Metric.Name, "metrics", metrics, "scalerError", err)
}
Expand All @@ -480,11 +480,11 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, metricValue)
prommetrics.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, true, metricValue)
}
matchingMetrics = append(matchingMetrics, metrics...)
}
prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, true, err)
}
}
}
Expand Down Expand Up @@ -567,7 +567,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
var latency int64
metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, float64(latency))
}
logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err)

Expand All @@ -586,7 +586,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, true, metricValue)
}

if isMetricActive {
Expand All @@ -599,8 +599,8 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}
}
}
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive)
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, err)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, true, isMetricActive)
}
}

Expand All @@ -624,46 +624,77 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)

cache, err := h.GetScalersCache(ctx, scaledJob)
prommetrics.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil
}
var scalersMetrics []scaledjob.ScalerMetrics
scalers, _ := cache.GetScalers()
for i, s := range scalers {
scalers, scalerConfigs := cache.GetScalers()
for scalerIndex, scaler := range scalers {
scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1)
if scalerConfigs[scalerIndex].TriggerName != "" {
scalerName = scalerConfigs[scalerIndex].TriggerName
}
isActive := false
scalerType := fmt.Sprintf("%T:", s)
scalerType := fmt.Sprintf("%T:", scaler)

scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType)

metricSpecs := s.GetMetricSpecForScaling(ctx)
metricSpecs := scaler.GetMetricSpecForScaling(ctx)

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}
for _, spec := range metricSpecs {
// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || spec.External == nil {
continue
}

metrics, isTriggerActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name)
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
}
if isTriggerActive {
isActive = true
}
metricName := spec.External.Metric.Name
metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName)
if latency != -1 {
prommetrics.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency))
}

if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
}
if isTriggerActive {
isActive = true
}

queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount())

queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount())
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue)

scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue)
scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{
QueueLength: queueLength,
MaxValue: maxValue,
IsActive: isActive,
})

scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{
QueueLength: queueLength,
MaxValue: maxValue,
IsActive: isActive,
})
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, false, metricValue)
}

if isTriggerActive {
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", metricName)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", spec.Resource.Name)
}
}

prommetrics.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, err)
prommetrics.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive)
}
}
return scalersMetrics
}
Expand Down

0 comments on commit dddc7e5

Please sign in to comment.