Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the implementation of OpenTelemetry integration #5116

Merged
merged 9 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ New deprecation(s):

- **General**: Fix CVE-2023-45142 in Opentelemetry ([#5089](https://github.com/kedacore/keda/issues/5089))
- **General**: Fix logger in Opentelemetry collector ([#5094](https://github.com/kedacore/keda/issues/5094))
- **General**: Reduce amount of gauge creations for OpenTelemetry metrics ([#5101](https://github.com/kedacore/keda/issues/5101))
- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789))

## v2.12.0
Expand Down
186 changes: 119 additions & 67 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,28 @@ var (
otScaledObjectErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otelScalerActiveVal OtelMetricFloat64Val
)

type OtelMetrics struct {
}

type OtelMetricInt64Val struct {
val int64
measurementOption api.MeasurementOption
}

type OtelMetricFloat64Val struct {
val float64
measurementOption api.MeasurementOption
}

func NewOtelMetrics(options ...metric.Option) *OtelMetrics {
// create default options with env
if options == nil {
Expand All @@ -48,14 +65,14 @@ func NewOtelMetrics(options ...metric.Option) *OtelMetrics {
otel.SetMeterProvider(meterProvider)

meter = meterProvider.Meter(meterName)
initCounter()
initMeters()

otel := &OtelMetrics{}
otel.RecordBuildInfo()
return otel
}

func initCounter() {
func initMeters() {
var err error
msg := "create opentelemetry counter failed"

Expand All @@ -78,37 +95,107 @@ func initCounter() {
if err != nil {
otLog.Error(err, msg)
}
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.value",
api.WithDescription("Metric Value used for HPA"),
api.WithFloat64Callback(cback),
api.WithFloat64Callback(ScalerMetricValueCallback),
)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
otLog.Error(err, msg)
}
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency",
api.WithDescription("Scaler Metrics Latency"),
api.WithFloat64Callback(cback),
api.WithFloat64Callback(ScalerMetricsLatencyCallback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithFloat64Callback(ScalableObjectLatencyCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithFloat64Callback(ScalerActiveCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Int64ObservableGauge(
"keda.build.info",
api.WithDescription("A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built."),
api.WithInt64Callback(BuildInfoCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
if otelBuildInfoVal.measurementOption != nil {
obsrv.Observe(otelBuildInfoVal.val, otelBuildInfoVal.measurementOption)
}
otelBuildInfoVal = OtelMetricInt64Val{}
return nil
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func (o *OtelMetrics) RecordBuildInfo() {
opt := api.WithAttributes(
attribute.Key("version").String(version.Version),
attribute.Key("git_commit").String(version.GitCommit),
attribute.Key("goversion").String(runtime.Version()),
attribute.Key("goos").String(runtime.GOOS),
attribute.Key("goarch").String(runtime.GOARCH),
)
otelBuildInfoVal.val = 1
otelBuildInfoVal.measurementOption = opt
}

func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricVal.val, otelScalerMetricVal.measurementOption)
}
otelScalerMetricVal = OtelMetricFloat64Val{}
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyVal.val, otelScalerMetricsLatencyVal.measurementOption)
}
otelScalerMetricsLatencyVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
otelScalerMetricsLatencyVal.val = value
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyVal.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyVal.val, otelInternalLoopLatencyVal.measurementOption)
}
otelInternalLoopLatencyVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
Expand All @@ -123,18 +210,16 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, opt)
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register internal scale loop latency", "namespace", namespace, resourceType, name)
otelInternalLoopLatencyVal.val = value
otelInternalLoopLatencyVal.measurementOption = opt
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerActiveVal.measurementOption != nil {
obsrv.Observe(otelScalerActiveVal.val, otelScalerActiveVal.measurementOption)
}
otelScalerActiveVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalerActive create a measurement of the activity of the scaler
Expand All @@ -144,18 +229,8 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string,
activeVal = 1
}

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
}
otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand Down Expand Up @@ -204,29 +279,6 @@ func (o *OtelMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func (o *OtelMetrics) RecordBuildInfo() {
opt := api.WithAttributes(
attribute.Key("version").String(version.Version),
attribute.Key("git_commit").String(version.GitCommit),
attribute.Key("goversion").String(runtime.Version()),
attribute.Key("goos").String(runtime.GOOS),
attribute.Key("goarch").String(runtime.GOARCH),
)
cback := func(ctx context.Context, obsrv api.Int64Observer) error {
obsrv.Observe(1, opt)
return nil
}
_, err := meter.Int64ObservableGauge(
"keda.build.info",
api.WithDescription("A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built."),
api.WithInt64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register build info")
}
}

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
Expand Down