Skip to content

Commit

Permalink
Support e2e for scaledJob prometheus
Browse files Browse the repository at this point in the history
Signed-off-by: Yoon Park <[email protected]>
  • Loading branch information
yoongon committed Nov 6, 2023
1 parent 2990192 commit 9d3b856
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 23 deletions.
51 changes: 38 additions & 13 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter
)
Expand Down Expand Up @@ -69,6 +70,11 @@ func initCounter() {
otLog.Error(err, msg)
}

otScaledJobErrorsCounter, err = meter.Int64Counter("keda.scaledjob.errors", api.WithDescription("Number of scaled job errors"))
if err != nil {
otLog.Error(err, msg)
}

otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers"))
if err != nil {
otLog.Error(err, msg)
Expand All @@ -80,9 +86,9 @@ func initCounter() {
}
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject))
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject))
return nil
}
_, err := meter.Float64ObservableGauge(
Expand All @@ -91,14 +97,19 @@ func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string,
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
if isScaledObject {
otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
} else {
otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledJob", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
}

}
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject))
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject))
return nil
}
_, err := meter.Float64ObservableGauge(
Expand All @@ -107,7 +118,12 @@ func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string,
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
if isScaledObject {
otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
} else {
otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledJob", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
}

}
}

Expand Down Expand Up @@ -138,14 +154,14 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
}

// RecordScalerActive create a measurement of the activity of the scaler
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) {
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := -1
if active {
activeVal = 1
}

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject))
obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject))
return nil
}
_, err := meter.Float64ObservableGauge(
Expand All @@ -154,7 +170,12 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string,
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
if isScaledObject {
otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
} else {
otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledJob", scaledResource, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
}

}
}

Expand Down Expand Up @@ -185,10 +206,14 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) {
func (o *OtelMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, scalerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric, isScaledObject))
o.RecordScaledObjectError(namespace, scaledObject, err)
otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledResource, scaler, scalerIndex, metric, isScaledObject))
if isScaledObject {
o.RecordScaledObjectError(namespace, scaledResource, err)
} else {
o.RecordScaledJobError(namespace, scaledResource, err)
}
return
}
}
Expand All @@ -210,7 +235,7 @@ func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, e
attribute.Key("namespace").String(namespace),
attribute.Key("scaledJob").String(scaledJob))
if err != nil {
otScaledObjectErrorsCounter.Add(context.Background(), 1, opt)
otScaledJobErrorsCounter.Add(context.Background(), 1, opt)
return
}
}
Expand Down
133 changes: 128 additions & 5 deletions tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/kedacore/keda/v2/pkg/metricscollector"
. "github.com/kedacore/keda/v2/tests/helper"
)

const (
testName = "opentelemetry-metrics-test"
labelScaledObject = "scaledObject"
labelScaledJob = "scaledJob"
labelType = "type"
)

Expand All @@ -34,7 +34,9 @@ var (
deploymentName = fmt.Sprintf("%s-deployment", testName)
monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName)
wrongScaledObjectName = fmt.Sprintf("%s-so-wrong", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
wrongScaledJobName = fmt.Sprintf("%s-sj-wrong", testName)
wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName)
cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName)
clientName = fmt.Sprintf("%s-client", testName)
Expand All @@ -47,7 +49,9 @@ type templateData struct {
TestNamespace string
DeploymentName string
ScaledObjectName string
ScaledJobName string
WrongScaledObjectName string
WrongScaledJobName string
WrongScalerName string
CronScaledJobName string
MonitoredDeploymentName string
Expand Down Expand Up @@ -146,6 +150,69 @@ spec:
query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledObject="{{.WrongScaledObjectName}}"}'
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
maxReplicaCount: 3
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'app={{.MonitoredDeploymentName}}'
value: '1'
`

wrongScaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 2
maxReplicaCount: 3
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
triggers:
- type: prometheus
name: {{.WrongScalerName}}
metadata:
serverAddress: http://keda-prometheus.keda.svc.cluster.local:8080
metricName: keda_scaler_errors_total
threshold: '1'
query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledJob="{{.WrongScaledJobName}}"}'
`

cronScaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
Expand Down Expand Up @@ -285,6 +352,7 @@ func TestPrometheusMetrics(t *testing.T) {
testScalerMetricLatency(t)
testScalerActiveMetric(t)
testScaledObjectErrors(t, data)
testScaledJobErrors(t, data)
testScalerErrors(t, data)
testOperatorMetrics(t, kc, data)
testScalableObjectMetrics(t)
Expand All @@ -301,6 +369,8 @@ func getTemplateData() (templateData, []Template) {
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
WrongScaledObjectName: wrongScaledObjectName,
ScaledJobName: scaledJobName,
WrongScalerName: wrongScalerName,
WrongScalerName: wrongScalerName,
MonitoredDeploymentName: monitoredDeploymentName,
ClientName: clientName,
Expand All @@ -309,6 +379,7 @@ func getTemplateData() (templateData, []Template) {
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
{Name: "clientTemplate", Config: clientTemplate},
{Name: "authenticatioNTemplate", Config: authenticationTemplate},
}
Expand Down Expand Up @@ -337,7 +408,8 @@ func testScalerMetricValue(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(4), *metric.Gauge.Value)
found = true
}
Expand Down Expand Up @@ -382,12 +454,48 @@ func testScaledObjectErrors(t *testing.T, data templateData) {
time.Sleep(10 * time.Second)
}

func testScaledJobErrors(t *testing.T, data templateData) {
t.Log("--- testing scaled job errors ---")

KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)

time.Sleep(20 * time.Second)

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
if val, ok := family["keda_scaledjob_errors_total"]; ok {
errCounterVal1 := getErrorMetricsValue(val)

// wait for 2 seconds as pollinginterval is 2
time.Sleep(5 * time.Second)

family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
if val, ok := family["keda_scaledjob_errors_total"]; ok {
errCounterVal2 := getErrorMetricsValue(val)
assert.NotEqual(t, errCounterVal2, float64(0))
assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1)
} else {
t.Errorf("metric not available")
}
} else {
t.Errorf("metric not available")
}

KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)
KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
// wait for 10 seconds to correctly fetch metrics.
time.Sleep(10 * time.Second)
}

func testScalerErrors(t *testing.T, data templateData) {
t.Log("--- testing scaler errors ---")

KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate)

KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)

time.Sleep(15 * time.Second)

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
Expand All @@ -409,6 +517,9 @@ func testScalerErrors(t *testing.T, data templateData) {
t.Errorf("metric not available")
}

KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)
KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)

KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
}
Expand All @@ -425,6 +536,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 {
}
}
}
case "keda_scaledjob_errors_total":
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName {
return *metric.Counter.Value
}
}
}
case "keda_scaler_errors_total":
metrics := val.GetMetric()
for _, metric := range metrics {
Expand All @@ -450,7 +571,8 @@ func testScalerMetricLatency(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
found = true
}
Expand Down Expand Up @@ -510,7 +632,8 @@ func testScalerActiveMetric(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
found = true
}
Expand Down
Loading

0 comments on commit 9d3b856

Please sign in to comment.