Skip to content

Commit

Permalink
Merge branch 'main' into spiritzhou/imporveopentelemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik authored Nov 1, 2023
2 parents 53cd373 + 4cbc74e commit 258c09f
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Here is an overview of all new **experimental** features:
- **Hashicorp Vault**: Add support to get secret that needs write operation (e.g. pki) ([#5067](https://github.com/kedacore/keda/issues/5067))
- **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997))
- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))
- **Prometheus Metrics**: Introduce paused ScaledObjects in Prometheus metrics ([#4430](https://github.com/kedacore/keda/issues/4430))
- **Pulsar Scaler**: support endpointParams in pulsar oauth ([#5069](https://github.com/kedacore/keda/issues/5069))

### Fixes
Expand All @@ -91,6 +92,7 @@ New deprecation(s):
- **General**: Fix CVE-2023-45142 in Opentelemetry ([#5089](https://github.com/kedacore/keda/issues/5089))
- **General**: Fix logger in Opentelemetry collector ([#5094](https://github.com/kedacore/keda/issues/5094))
- **General**: Reduce amount of gauge creations for OpenTelemetry metrics ([#5101](https://github.com/kedacore/keda/issues/5101))
- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789))

## v2.12.0

Expand Down
9 changes: 6 additions & 3 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
metricsAPIServerPort int
disableCompression bool
metricsServiceAddr string
profilingAddr string
)

func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsProvider, <-chan struct{}, error) {
Expand Down Expand Up @@ -111,9 +112,10 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
Cache: ctrlcache.Options{
DefaultNamespaces: namespaces,
},
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
PprofBindAddress: profilingAddr,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
})
if err != nil {
logger.Error(err, "failed to setup manager")
Expand Down Expand Up @@ -231,6 +233,7 @@ func main() {
cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags
cmd.Flags().IntVar(&metricsAPIServerPort, "port", 8080, "Set the port for the metrics API server")
cmd.Flags().StringVar(&metricsServiceAddr, "metrics-service-address", generateDefaultMetricsServiceAddr(), "The address of the gRPRC Metrics Service Server.")
cmd.Flags().StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
cmd.Flags().Float32Var(&adapterClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver")
cmd.Flags().IntVar(&adapterClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
cmd.Flags().BoolVar(&disableCompression, "disable-compression", true, "Disable response compression for k8s restAPI in client-go. ")
Expand Down
3 changes: 3 additions & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
var metricsAddr string
var probeAddr string
var metricsServiceAddr string
var profilingAddr string
var enableLeaderElection bool
var adapterClientRequestQPS float32
var adapterClientRequestBurst int
Expand All @@ -84,6 +85,7 @@ func main() {
pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the prometheus metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&metricsServiceAddr, "metrics-service-bind-address", ":9666", "The address the gRPRC Metrics Service endpoint binds to.")
pflag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
pflag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand Down Expand Up @@ -151,6 +153,7 @@ func main() {
DefaultNamespaces: namespaces,
},
HealthProbeBindAddress: probeAddr,
PprofBindAddress: profilingAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "operator.keda.sh",
LeaseDuration: leaseDuration,
Expand Down
3 changes: 3 additions & 0 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ func init() {
func main() {
var metricsAddr string
var probeAddr string
var profilingAddr string
var webhooksClientRequestQPS float32
var webhooksClientRequestBurst int
var certDir string
var webhooksPort int

pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
pflag.Float32Var(&webhooksClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver")
pflag.IntVar(&webhooksClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
pflag.StringVar(&certDir, "cert-dir", "/certs", "Webhook certificates dir to use. Defaults to /certs")
Expand Down Expand Up @@ -96,6 +98,7 @@ func main() {
},
}),
HealthProbeBindAddress: probeAddr,
PprofBindAddress: profilingAddr,
})
if err != nil {
setupLog.Error(err, "unable to start admission webhooks")
Expand Down
2 changes: 2 additions & 0 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,12 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
return msg, err
}
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionPausedReason, msg)
metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, true)
return msg, nil
}
} else if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledObjectUnpaused", "pause annotation removed for ScaledObject")
metricscollector.RecordScaledObjectPaused(scaledObject.Namespace, scaledObject.Name, false)
}

// Check scale target Name is specified
Expand Down
10 changes: 10 additions & 0 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type MetricsCollector interface {
// RecordScalerActive create a measurement of the activity of the scaler
RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool)

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
RecordScaledObjectPaused(namespace string, scaledObject string, active bool)

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error)

Expand Down Expand Up @@ -96,6 +99,13 @@ func RecordScalerActive(namespace string, scaledObject string, scaler string, sc
}
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
func RecordScaledObjectPaused(namespace string, scaledObject string, active bool) {
for _, element := range collectors {
element.RecordScaledObjectPaused(namespace, scaledObject, active)
}
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
for _, element := range collectors {
Expand Down
26 changes: 26 additions & 0 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,32 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string,
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject string, active bool) {
activeVal := 0
if active {
activeVal = 1
}

opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledObject),
)

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), opt)
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.scaled.object.paused",
api.WithDescription("Indicates whether a ScaledObject is paused"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaled object paused metric", "namespace", namespace, "scaledObject", scaledObject)
}
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ var (
},
metricLabels,
)
scaledObjectPaused = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_object",
Name: "paused",
Help: "Indicates whether a ScaledObject is paused",
},
[]string{"namespace", "scaledObject"},
)
scalerErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Expand Down Expand Up @@ -134,6 +143,7 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(scalerActive)
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)
metrics.Registry.MustRegister(scaledObjectPaused)

metrics.Registry.MustRegister(triggerTotalsGaugeVec)
metrics.Registry.MustRegister(crdTotalsGaugeVec)
Expand Down Expand Up @@ -177,6 +187,18 @@ func (p *PromMetrics) RecordScalerActive(namespace string, scaledObject string,
scalerActive.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(float64(activeVal))
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject string, active bool) {
labels := prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject}

activeVal := 0
if active {
activeVal = 1
}

scaledObjectPaused.With(labels).Set(float64(activeVal))
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (p *PromMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,28 @@ spec:
name: {{.TestName}}-secret
key: key
---
`
scaledObjectPausedTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
annotations:
autoscaling.keda.sh/paused-replicas: "2"
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 5
idleReplicaCount: 0
minReplicaCount: 1
maxReplicaCount: 2
cooldownPeriod: 10
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'app={{.MonitoredDeploymentName}}'
value: '1'
`
)

Expand All @@ -266,6 +288,7 @@ func TestPrometheusMetrics(t *testing.T) {
testScalerErrors(t, data)
testOperatorMetrics(t, kc, data)
testScalableObjectMetrics(t)
testScaledObjectPausedMetric(t, data)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
Expand Down Expand Up @@ -499,6 +522,26 @@ func testScalerActiveMetric(t *testing.T) {
}
}

func testScaledObjectPausedMetric(t *testing.T, data templateData) {
t.Log("--- testing scaleobject pause metric ---")

// Pause the ScaledObject
KubectlApplyWithTemplate(t, data, "scaledObjectPausedTemplate", scaledObjectPausedTemplate)

time.Sleep(20 * time.Second)
// Check that the paused metric is now true
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, true)

// Unpause the ScaledObject
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)

time.Sleep(20 * time.Second)
// Check that the paused metric is back to false
families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, false)
}

func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing operator metrics ---")
testOperatorMetricValues(t, kc)
Expand Down Expand Up @@ -676,3 +719,27 @@ func checkCRTotalValues(t *testing.T, families map[string]*prommodel.MetricFamil
expectedMetricValue, metricValue, crType, namespace)
}
}

func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) {
family, ok := families["keda_scaled_object_paused"]
if !ok {
t.Errorf("keda_scaled_object_paused metric not available")
return
}

metricValue := 0.0
metrics := family.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
metricValue = *metric.Gauge.Value
}
}
}
expectedMetricValue := 0
if expected {
expectedMetricValue = 1
}
assert.Equal(t, float64(expectedMetricValue), metricValue)
}
69 changes: 68 additions & 1 deletion tests/sequential/prometheus_metrics/prometheus_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,28 @@ spec:
name: {{.TestName}}-secret
key: key
---
`
scaledObjectPausedTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
annotations:
autoscaling.keda.sh/paused-replicas: "2"
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 5
idleReplicaCount: 0
minReplicaCount: 1
maxReplicaCount: 2
cooldownPeriod: 10
triggers:
- type: kubernetes-workload
metadata:
podSelector: 'app={{.MonitoredDeploymentName}}'
value: '1'
`
)

Expand Down Expand Up @@ -271,7 +293,7 @@ func TestPrometheusMetrics(t *testing.T) {
testMetricServerMetrics(t)
testWebhookMetrics(t, data)
testScalableObjectMetrics(t)

testScaledObjectPausedMetric(t, data)
// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}
Expand Down Expand Up @@ -453,6 +475,31 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 {
return 0
}

func assertScaledObjectPausedMetric(t *testing.T, families map[string]*prommodel.MetricFamily, scaledObjectName string, expected bool) {
family, ok := families["keda_scaled_object_paused"]
if !ok {
t.Errorf("keda_scaled_object_paused metric not available")
return
}

metricValue := 0.0
metrics := family.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
metricValue = *metric.Gauge.Value
}
}
}

expectedMetricValue := 0
if expected {
expectedMetricValue = 1
}
assert.Equal(t, float64(expectedMetricValue), metricValue)
}

func testScalerMetricLatency(t *testing.T) {
t.Log("--- testing scaler metric latency ---")

Expand Down Expand Up @@ -536,6 +583,26 @@ func testScalerActiveMetric(t *testing.T) {
}
}

func testScaledObjectPausedMetric(t *testing.T, data templateData) {
t.Log("--- testing scaleobject pause metric ---")

// Pause the ScaledObject
KubectlApplyWithTemplate(t, data, "scaledObjectPausedTemplate", scaledObjectPausedTemplate)
time.Sleep(20 * time.Second)

// Check that the paused metric is now true
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, true)

// Unpause the ScaledObject
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
time.Sleep(20 * time.Second)

// Check that the paused metric is back to false
families = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL))
assertScaledObjectPausedMetric(t, families, scaledObjectName, false)
}

func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing operator metrics ---")
testOperatorMetricValues(t, kc)
Expand Down

0 comments on commit 258c09f

Please sign in to comment.