Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INFOPLAT-1575 Implement Prometheus to OTel metrics conversion and export #997

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 191 additions & 38 deletions go.mod

Large diffs are not rendered by default.

995 changes: 904 additions & 91 deletions go.sum

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions pkg/promotel/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
help: ## Print this help text
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}'

.PHONY: \
tidy \
fmt \
lint \
test \
run-example

tidy: fmt ## run go mod tidy
go mod tidy

fmt: ## run go fmt
go fmt ./...

lint: ## run golangci-lint
golangci-lint run ./...

test: ## run unit tests
cd otel/collector-gateway && go test -v ./beholder_kafka/... ./tokenauthextension/...

build: tidy ## build the demo
go build ./cmd/example.go

run-example: ## run the example
go run ./cmd/example.go
157 changes: 157 additions & 0 deletions pkg/promotel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Package Overview
The package provides components for performing Prometheus to OTel metrics conversion.

Main components: MetricsReceiver, MetricsExporter

## Receiver
- Wraps [prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver)
- Fetches prometheus metrics data via `prometheus.Gatherer` (same process memory, no HTTP calls)
- Uses custom implementation of `prometheus.scraper` (from here https://github.com/pkcll/prometheus/pull/1) to shortcut HTTP request calls and fetch data from `prometheus.Gatherer`
- Converts Prometheus metrics into OTel format using [prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver)
- Passes OTel metrics data to downstream OTel [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)

## Exporter
- Wraps [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)
- Receives metric data from the receiver
- Export OTel metrics data to otel collector endpoint via [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)

## OTel collector prometheusreceiver

[prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver).
is a component of otel-collector which collects metrics from Prometheus endpoints. It scrapes the metrics at regular intervals and converts them into a format that can be processed by the rest of the collector pipeline.

`promotel` is a wrapper around `prometheusreceiver` which provides a simple API to start and stop the receiver and process the metrics data.

`promotel` uses `prometheusreceiver` factory to create an instance of the receiver via `factory.CreateMetrics` with provided configuration. It also provides a callback function which is called every time new metrics data is received. The metrics data is a `pmetric.Metrics` object which contains the metrics data received from the Prometheus endpoint.

`promotel/inernal` contains implementations for `consumer.Metrics`, `component.Host`, `receiver.Settings`, `component.TelemetrySettings` which are dependencies required for `factory.CreateMetrics`.

`metrics.Consumer` is an interface which is used to process the metrics data. The `prometheusreceiver` calls `Consumer.ConsumeMetrics` function every time new metrics data is received.

`prometheusreceiver` has Start and Shutdown methods.

`github.com/pkcll/prometheus v0.54.1-promotel` fork overrides the `prometheus` package to provide a way to scrape metrics directly from `prometheus.DefaultGatherer` without making HTTP requests to the Prometheus endpoint. This is useful when the Prometheus endpoint is not accessible from the collector.

Example configuration:


```yaml
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'example'
static_configs:
- targets: ['localhost:9090']

```

## OTel collector otlpexporter

[otlpexporter](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlpexporter) is a component of the OpenTelemetry Collector that exports telemetry data (metrics, logs, and traces) using the OpenTelemetry Protocol (OTLP). It supports both gRPC and HTTP transport protocols.

Example configuration:

```yaml
exporters:
otlp:
endpoint: "localhost:4317"
tls:
insecure: true
retry_on_failure:
enabled: true
initial_interval: 5s
max_interval: 30s
max_elapsed_time: 300s
sending_queue:
enabled: true
queue_size: 5000
```

### `promotel` usage example:

```go
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/smartcontractkit/chainlink-common/pkg/promotel"
)

func main() {
exporterConfig, _ := promotel.NewDefaultExporterConfig()
exporter, _ := promotel.NewMetricExporter(exporterConfig, logger)
receiverConfig, _ := promotel.NewDefaultReceiverConfig()
// Fetches metrics data directly from DefaultGatherer without making HTTP requests to 127.0.0.1:8888
receiver, _ := promotel.NewMetricReceiver(receiverConfig, prometheus.DefaultGatherer, exporter.Consumer().ConsumeMetrics, logger)
fmt.Println("Starting promotel pipeline")
exporter.Start(context.Background())
receiver.Start(context.Background())
defer receiver.Close()
defer exporter.Close()
time.Sleep(1 * time.Minute)
}
```

### Debug Metric Receiver

`DebugMetricReceiver` is an implementation of `metrics.Consumer` which prints formatted metrics data to stdout. It is useful for testing purposes.

### `Debug Metric Receiver` usage example:

```go
...
// Debug metric receiver prints fetched metrics to stdout
receiver, err := promotel.NewDebugMetricReceiver(config, prometheus.DefaultGatherer, logger)
// Start metric receiver
receiver.Start(context.Background())
...
```

Output example

```
NumberDataPoints #0
StartTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
Value: 44.000000
Metric #18
Descriptor:
-> Name: otelcol_exporter_sent_metric_points
-> Description: Number of metric points successfully sent to destination.
-> Unit:
-> DataType: Sum
-> IsMonotonic: true
-> AggregationTemporality: Cumulative
NumberDataPoints #0
Data point attributes:
-> exporter: Str(debug)
-> service_version: Str(0.108.1)
StartTimestamp: 2025-01-02 18:38:05.905 +0000 UTC
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
Value: 137.000000
NumberDataPoints #1
Data point attributes:
-> exporter: Str(otlphttp)
-> service_version: Str(0.108.1)
StartTimestamp: 2025-01-02 18:38:05.905 +0000 UTC
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
Value: 137.000000
Metric #19
Descriptor:
-> Name: otelcol_process_cpu_seconds
-> Description: Total CPU user and system time in seconds
-> Unit:
-> DataType: Sum
-> IsMonotonic: true
-> AggregationTemporality: Cumulative
NumberDataPoints #0
Data point attributes:
-> service_version: Str(0.108.1)
StartTimestamp: 2025-01-02 18:38:05.905 +0000 UTC
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
Value: 0.930000
```
148 changes: 148 additions & 0 deletions pkg/promotel/cmd/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
dto "github.com/prometheus/client_model/go"

"github.com/smartcontractkit/chainlink-common/pkg/promotel"
)

const testCounterMetricName = "test_counter_metric"

func reportMetrics(reg prometheus.Registerer, logger *zap.Logger) {
testCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: testCounterMetricName,
ConstLabels: prometheus.Labels{
"app": "promotel-demo",
},
})
for {
testCounter.Inc()
m := &dto.Metric{}
_ = testCounter.Write(m)
logger.Info("Reported Prometheus metric ", zap.Any("name", testCounterMetricName), zap.Any("value", m.GetCounter().GetValue()))
time.Sleep(1 * time.Second)
}
}

func gatherMetricsDirectly(reg prometheus.Gatherer, logger *zap.Logger) {
for {
mf, err := reg.Gather()
if err != nil {
fmt.Printf("Error gathering metrics: %v\n", err)
}
for _, metricFamily := range mf {
if *metricFamily.Name == testCounterMetricName {
for _, metric := range metricFamily.Metric {
logger.Info("Received Prometheus metric ", zap.Any("name", testCounterMetricName), zap.Any("value", metric.Counter.GetValue()))
}
}
}
time.Sleep(1 * time.Second)
}
}

func startExporter(ctx context.Context, logger *zap.Logger) promotel.MetricExporter {
expConfig, err := promotel.NewExporterConfig(map[string]any{
"endpoint": "localhost:4317",
"tls": map[string]any{
"insecure": true,
},
})
if err != nil {
logger.Fatal("Failed to create exporter config", zap.Error(err))
}
// Sends metrics data in OTLP format to otel-collector endpoint
exporter, err := promotel.NewMetricExporter(expConfig, logger)
if err != nil {
logger.Fatal("Failed to create metric exporter", zap.Error(err))
}
err = exporter.Start(ctx)
if err != nil {
logger.Fatal("Failed to start exporter", zap.Error(err))
}
return exporter
}

func startMetricReceiver(reg prometheus.Gatherer, logger *zap.Logger, next consumer.ConsumeMetricsFunc) promotel.Runnable {
logger.Info("Starting promotel metric receiver")
config, err := promotel.NewDefaultReceiverConfig()
if err != nil {
logger.Fatal("Failed to create config", zap.Error(err))
}

// Gather metrics via promotel
// MetricReceiver fetches metrics from pormetheus.Gatherer, then converts it to OTel format and writes formatted metrics to stdout
receiver, err := promotel.NewMetricReceiver(config, reg, next, logger)

if err != nil {
logger.Fatal("Failed to create debug metric receiver", zap.Error(err))
}
// Starts the promotel
if err := receiver.Start(context.Background()); err != nil {
logger.Fatal("Failed to start metric receiver", zap.Error(err))
}
return receiver
}

func main() {
logger, _ := zap.NewDevelopment()

go reportMetrics(prometheus.DefaultRegisterer, logger)
// Gather metrics directly from DefaultGatherer to verify that the metrics are being reported
go gatherMetricsDirectly(prometheus.DefaultGatherer, logger)

exporter := startExporter(context.Background(), logger)
// Fetches metrics from in memory prometheus.Gatherer and converts to OTel format
receiver := startMetricReceiver(prometheus.DefaultGatherer, logger, func(ctx context.Context, md pmetric.Metrics) error {
// Logs the converted OTel metric
logOtelMetric(md, testCounterMetricName, logger)
// Exports the converted OTel metric
return exporter.Consumer().ConsumeMetrics(ctx, md)
})

// Wait for a signal to exit
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

// Block until a signal is received
<-signalChan
logger.Info("Exiting promotel")
// Gracefully shuts down promotel
if err := receiver.Close(); err != nil {
logger.Fatal("Failed to close scraper", zap.Error(err))
}
if err := exporter.Close(); err != nil {
logger.Fatal("Failed to close exporter", zap.Error(err))
}
}

func logOtelMetric(md pmetric.Metrics, name string, logger *zap.Logger) {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metrics := ilm.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
if metric.Name() == name {
logger.Info("Exporting OTel metric ", zap.Any("name", metric.Name()), zap.Any("value", metric.Sum().DataPoints().At(0).DoubleValue()))
}
}
}
}
}
Loading
Loading