Skip to content

Commit

Permalink
Support e2e for scaledJob prometheus
Browse files Browse the repository at this point in the history
Signed-off-by: Yoon Park <[email protected]>
  • Loading branch information
yoongon committed Nov 6, 2023
1 parent 2990192 commit 18f4adc
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 13 deletions.
139 changes: 131 additions & 8 deletions tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/kedacore/keda/v2/pkg/metricscollector"
. "github.com/kedacore/keda/v2/tests/helper"
)

const (
testName = "opentelemetry-metrics-test"
labelScaledObject = "scaledObject"
labelScaledJob = "scaledJob"
labelType = "type"
)

Expand All @@ -34,7 +34,9 @@ var (
deploymentName = fmt.Sprintf("%s-deployment", testName)
monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName)
wrongScaledObjectName = fmt.Sprintf("%s-so-wrong", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
wrongScaledJobName = fmt.Sprintf("%s-sj-wrong", testName)
wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName)
cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName)
clientName = fmt.Sprintf("%s-client", testName)
Expand All @@ -47,7 +49,9 @@ type templateData struct {
TestNamespace string
DeploymentName string
ScaledObjectName string
ScaledJobName string
WrongScaledObjectName string
WrongScaledJobName string
WrongScalerName string
CronScaledJobName string
MonitoredDeploymentName string
Expand Down Expand Up @@ -146,6 +150,69 @@ spec:
query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledObject="{{.WrongScaledObjectName}}"}'
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
maxReplicaCount: 3
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'app={{.MonitoredDeploymentName}}'
value: '1'
`

wrongScaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "30"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
pollingInterval: 2
maxReplicaCount: 3
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
triggers:
- type: prometheus
name: {{.WrongScalerName}}
metadata:
serverAddress: http://keda-prometheus.keda.svc.cluster.local:8080
metricName: keda_scaler_errors_total
threshold: '1'
query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledJob="{{.WrongScaledJobName}}"}'
`

cronScaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
Expand Down Expand Up @@ -285,6 +352,7 @@ func TestPrometheusMetrics(t *testing.T) {
testScalerMetricLatency(t)
testScalerActiveMetric(t)
testScaledObjectErrors(t, data)
testScaledJobErrors(t, data)
testScalerErrors(t, data)
testOperatorMetrics(t, kc, data)
testScalableObjectMetrics(t)
Expand All @@ -301,6 +369,8 @@ func getTemplateData() (templateData, []Template) {
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
WrongScaledObjectName: wrongScaledObjectName,
ScaledJobName: scaledJobName,
WrongScalerName: wrongScalerName,
WrongScalerName: wrongScalerName,
MonitoredDeploymentName: monitoredDeploymentName,
ClientName: clientName,
Expand All @@ -309,6 +379,7 @@ func getTemplateData() (templateData, []Template) {
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
{Name: "clientTemplate", Config: clientTemplate},
{Name: "authenticatioNTemplate", Config: authenticationTemplate},
}
Expand Down Expand Up @@ -337,7 +408,8 @@ func testScalerMetricValue(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(4), *metric.Gauge.Value)
found = true
}
Expand All @@ -358,14 +430,14 @@ func testScaledObjectErrors(t *testing.T, data templateData) {
time.Sleep(20 * time.Second)

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
if val, ok := family["keda_scaledobject_errors_total"]; ok {
if val, ok := family["keda_scaled_object_errors_total"]; ok {
errCounterVal1 := getErrorMetricsValue(val)

// wait for 2 seconds as pollinginterval is 2
time.Sleep(5 * time.Second)

family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
if val, ok := family["keda_scaledobject_errors_total"]; ok {
if val, ok := family["keda_scaled_object_errors_total"]; ok {
errCounterVal2 := getErrorMetricsValue(val)
assert.NotEqual(t, errCounterVal2, float64(0))
assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1)
Expand All @@ -382,12 +454,48 @@ func testScaledObjectErrors(t *testing.T, data templateData) {
time.Sleep(10 * time.Second)
}

func testScaledJobErrors(t *testing.T, data templateData) {
t.Log("--- testing scaled job errors ---")

KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)

time.Sleep(20 * time.Second)

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
if val, ok := family["keda_scaled_job_errors_total"]; ok {
errCounterVal1 := getErrorMetricsValue(val)

// wait for 2 seconds as pollinginterval is 2
time.Sleep(5 * time.Second)

family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
if val, ok := family["keda_scaled_job_errors_total"]; ok {
errCounterVal2 := getErrorMetricsValue(val)
assert.NotEqual(t, errCounterVal2, float64(0))
assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1)
} else {
t.Errorf("metric not available")
}
} else {
t.Errorf("metric not available")
}

KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)
KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
// wait for 10 seconds to correctly fetch metrics.
time.Sleep(10 * time.Second)
}

func testScalerErrors(t *testing.T, data templateData) {
t.Log("--- testing scaler errors ---")

KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate)

KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)
KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)

time.Sleep(15 * time.Second)

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
Expand All @@ -409,13 +517,16 @@ func testScalerErrors(t *testing.T, data templateData) {
t.Errorf("metric not available")
}

KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate)
KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate)

KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
}

func getErrorMetricsValue(val *prommodel.MetricFamily) float64 {
switch val.GetName() {
case "keda_scaledobject_errors_total":
case "keda_scaled_object_errors_total":
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
Expand All @@ -425,6 +536,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 {
}
}
}
case "keda_scaled_job_errors_total":
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName {
return *metric.Counter.Value
}
}
}
case "keda_scaler_errors_total":
metrics := val.GetMetric()
for _, metric := range metrics {
Expand All @@ -450,7 +571,8 @@ func testScalerMetricLatency(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
found = true
}
Expand Down Expand Up @@ -510,7 +632,8 @@ func testScalerActiveMetric(t *testing.T) {
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) ||
(*label.Name == labelScaledJob && *label.Value == scaledJobName) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
found = true
}
Expand Down
Loading

0 comments on commit 18f4adc

Please sign in to comment.