diff --git a/openmeter/watermill/driver/kafka/broker.go b/openmeter/watermill/driver/kafka/broker.go index 1f445e7a0..d50a07888 100644 --- a/openmeter/watermill/driver/kafka/broker.go +++ b/openmeter/watermill/driver/kafka/broker.go @@ -15,8 +15,7 @@ import ( ) const ( - defaultMeterPrefix = "sarama." - defaultKeepalive = time.Minute + defaultKeepalive = time.Minute ) type BrokerOptions struct { @@ -24,7 +23,6 @@ type BrokerOptions struct { ClientID string Logger *slog.Logger MetricMeter otelmetric.Meter - MeterPrefix string DebugLogging bool } @@ -53,11 +51,6 @@ func (o *BrokerOptions) createKafkaConfig(role string) (*sarama.Config, error) { if role == "" { return nil, errors.New("role is required") } - - if o.MeterPrefix == "" { - o.MeterPrefix = defaultMeterPrefix - } - if o.KafkaConfig.SocketKeepAliveEnabled { config.Net.KeepAlive = defaultKeepalive } @@ -100,7 +93,7 @@ func (o *BrokerOptions) createKafkaConfig(role string) (*sarama.Config, error) { meterRegistry, err := metrics.NewRegistry(metrics.NewRegistryOptions{ MetricMeter: o.MetricMeter, - NameTransformFn: metrics.MetricAddNamePrefix(fmt.Sprintf("%s%s.", o.MeterPrefix, role)), + NameTransformFn: SaramaMetricRenamer(role), ErrorHandler: metrics.LoggingErrorHandler(o.Logger), }) if err != nil { diff --git a/openmeter/watermill/driver/kafka/metrics.go b/openmeter/watermill/driver/kafka/metrics.go new file mode 100644 index 000000000..a2a177bcf --- /dev/null +++ b/openmeter/watermill/driver/kafka/metrics.go @@ -0,0 +1,83 @@ +package kafka + +import ( + "regexp" + "slices" + "strings" + + "go.opentelemetry.io/otel/attribute" + + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka/metrics" +) + +var ( + forBrokerMetricRegex = regexp.MustCompile("(.*)-for-broker-(.*)") + forTopicMetricRegex = regexp.MustCompile("(.*)-for-topic-(.*)") + + ignoreMetrics = []string{ + "batch-size", // we have batch-size-for-topic + + "consumer-batch-size", // we have batch-size-for-topic + "consumer-fetch-rate", // we have for topic metric + "incoming-byte-rate", // we have for broker metric + "outgoing-byte-rate", // we have for broker metric + "record-send-rate", // we have for broker metric + "request-latency-in-ms", // we have for broker metric + "request-size", // we have for broker metric + "request-rate-total", // we have for broker metric + "records-per-request", // we have for topic metric + "requests-in-flight", // we have for broker metric + "response-rate", // we have for broker metric + "response-size", // we have for broker metric + } + + ingorePrefixes = []string{ + "protocol-requests-rate", // too low level, we don't need it for now + "compression-", // don't care + } +) + +func SaramaMetricRenamer(role string) metrics.TransformMetricsNameToOtel { + return func(name string) metrics.TransformedMetric { + res := metrics.TransformedMetric{ + Name: "sarama." + name, + } + + if slices.Contains(ignoreMetrics, name) { + res.Drop = true + return res + } + + for _, prefix := range ingorePrefixes { + if strings.HasPrefix(name, prefix) { + res.Drop = true + return res + } + } + + attributes := []attribute.KeyValue{ + attribute.String("role", role), + } + + if matches := forBrokerMetricRegex.FindStringSubmatch(name); len(matches) == 3 { + res.Name = "sarama." + matches[1] + "_for_broker" + + attributes = append(attributes, attribute.String("broker_id", matches[2])) + + res.Attributes = attribute.NewSet(attributes...) + return res + } + + if matches := forTopicMetricRegex.FindStringSubmatch(name); len(matches) == 3 { + res.Name = "sarama." + matches[1] + "_for_topic" + + attributes = append(attributes, attribute.String("topic", matches[2])) + + res.Attributes = attribute.NewSet(attributes...) + return res + } + + res.Attributes = attribute.NewSet(attributes...) + return res + } +} diff --git a/openmeter/watermill/driver/kafka/metrics/adapter.go b/openmeter/watermill/driver/kafka/metrics/adapter.go index 0980e4beb..acfd57b30 100644 --- a/openmeter/watermill/driver/kafka/metrics/adapter.go +++ b/openmeter/watermill/driver/kafka/metrics/adapter.go @@ -9,11 +9,18 @@ import ( "sync" "github.com/rcrowley/go-metrics" + "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" ) +type TransformedMetric struct { + Name string + Attributes attribute.Set + Drop bool +} + type ( - TransformMetricsNameToOtel func(string) string + TransformMetricsNameToOtel func(string) TransformedMetric ErrorHandler func(error) ) @@ -23,12 +30,6 @@ func LoggingErrorHandler(dest *slog.Logger) ErrorHandler { } } -func MetricAddNamePrefix(prefix string) TransformMetricsNameToOtel { - return func(name string) string { - return prefix + name - } -} - type NewRegistryOptions struct { MetricMeter otelmetric.Meter NameTransformFn TransformMetricsNameToOtel @@ -41,8 +42,11 @@ func NewRegistry(opts NewRegistryOptions) (metrics.Registry, error) { } if opts.NameTransformFn == nil { - opts.NameTransformFn = func(name string) string { - return name + opts.NameTransformFn = func(name string) TransformedMetric { + return TransformedMetric{ + Name: name, + Attributes: attribute.NewSet(), + } } } @@ -110,43 +114,50 @@ func (r *registry) getWrappedMeter(name string, def interface{}) (interface{}, e def = v.Call(nil)[0].Interface() } + transfomedMetric := r.nameTransformFn(name) + + if transfomedMetric.Drop { + // If we are not interested in the metric, let's just return the original metric + return def, nil + } + switch meterDef := def.(type) { case metrics.Meter: - otelMeter, err := r.meticMeter.Int64Counter(r.nameTransformFn(name)) + otelMeter, err := r.meticMeter.Int64Counter(transfomedMetric.Name) if err != nil { return def, err } - return &wrappedMeter{Meter: meterDef, otelMeter: otelMeter}, nil + return &wrappedMeter{Meter: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil case metrics.Counter: - otelMeter, err := r.meticMeter.Int64UpDownCounter(r.nameTransformFn(name)) + otelMeter, err := r.meticMeter.Int64UpDownCounter(transfomedMetric.Name) if err != nil { return def, err } - return &wrappedCounter{Counter: meterDef, otelMeter: otelMeter}, nil + return &wrappedCounter{Counter: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil case metrics.GaugeFloat64: - otelMeter, err := r.meticMeter.Float64Gauge(r.nameTransformFn(name)) + otelMeter, err := r.meticMeter.Float64Gauge(transfomedMetric.Name) if err != nil { return def, err } - return &wrappedGaugeFloat64{GaugeFloat64: meterDef, otelMeter: otelMeter}, nil + return &wrappedGaugeFloat64{GaugeFloat64: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil case metrics.Gauge: - otelMeter, err := r.meticMeter.Int64Gauge(r.nameTransformFn(name)) + otelMeter, err := r.meticMeter.Int64Gauge(transfomedMetric.Name) if err != nil { return def, err } - return &wrappedGauge{Gauge: meterDef, otelMeter: otelMeter}, nil + return &wrappedGauge{Gauge: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil case metrics.Histogram: - otelMeter, err := r.meticMeter.Int64Histogram(r.nameTransformFn(name)) + otelMeter, err := r.meticMeter.Int64Histogram(transfomedMetric.Name) if err != nil { r.errorHandler(err) break } - return &wrappedHistogram{Histogram: meterDef, otelMeter: otelMeter}, nil + return &wrappedHistogram{Histogram: meterDef, otelMeter: otelMeter, attributes: transfomedMetric.Attributes}, nil default: // this is just a safety net, as we should have handled all the cases above (based on the lib) r.errorHandler(fmt.Errorf("unsupported metric type (name=%s): %v", name, def)) @@ -157,55 +168,60 @@ func (r *registry) getWrappedMeter(name string, def interface{}) (interface{}, e type wrappedMeter struct { metrics.Meter - otelMeter otelmetric.Int64Counter + otelMeter otelmetric.Int64Counter + attributes attribute.Set } func (m *wrappedMeter) Mark(n int64) { - m.otelMeter.Add(context.Background(), n) + m.otelMeter.Add(context.Background(), n, otelmetric.WithAttributeSet(m.attributes)) m.Meter.Mark(n) } type wrappedCounter struct { metrics.Counter - otelMeter otelmetric.Int64UpDownCounter + otelMeter otelmetric.Int64UpDownCounter + attributes attribute.Set } func (m *wrappedCounter) Inc(n int64) { - m.otelMeter.Add(context.Background(), n) + m.otelMeter.Add(context.Background(), n, otelmetric.WithAttributeSet(m.attributes)) m.Counter.Inc(n) } func (m *wrappedCounter) Dec(n int64) { - m.otelMeter.Add(context.Background(), -n) + m.otelMeter.Add(context.Background(), -n, otelmetric.WithAttributeSet(m.attributes)) m.Counter.Dec(n) } type wrappedGaugeFloat64 struct { metrics.GaugeFloat64 - otelMeter otelmetric.Float64Gauge + otelMeter otelmetric.Float64Gauge + attributes attribute.Set } func (m *wrappedGaugeFloat64) Update(newVal float64) { - m.otelMeter.Record(context.Background(), newVal) + m.otelMeter.Record(context.Background(), newVal, otelmetric.WithAttributeSet(m.attributes)) m.GaugeFloat64.Update(newVal) } type wrappedGauge struct { metrics.Gauge - otelMeter otelmetric.Int64Gauge + otelMeter otelmetric.Int64Gauge + attributes attribute.Set } func (m *wrappedGauge) Update(newVal int64) { - m.otelMeter.Record(context.Background(), newVal) + m.otelMeter.Record(context.Background(), newVal, otelmetric.WithAttributeSet(m.attributes)) m.Gauge.Update(newVal) } type wrappedHistogram struct { metrics.Histogram - otelMeter otelmetric.Int64Histogram + otelMeter otelmetric.Int64Histogram + attributes attribute.Set } func (m *wrappedHistogram) Update(newVal int64) { - m.otelMeter.Record(context.Background(), newVal) + m.otelMeter.Record(context.Background(), newVal, otelmetric.WithAttributeSet(m.attributes)) m.Histogram.Update(newVal) }