diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 8261f6c4c..2d9425a84 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -217,6 +217,7 @@ func main() { conf.Postgres.URL, pgdriver.WithTracerProvider(otelTracerProvider), pgdriver.WithMeterProvider(otelMeterProvider), + pgdriver.WithMetricMeter(metricMeter), ) if err != nil { logger.Error("failed to initialize postgres driver", "error", err) diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index bc1cffa20..dfdaba314 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -219,6 +219,7 @@ func main() { conf.Postgres.URL, pgdriver.WithTracerProvider(otelTracerProvider), pgdriver.WithMeterProvider(otelMeterProvider), + pgdriver.WithMetricMeter(metricMeter), ) if err != nil { logger.Error("failed to initialize postgres driver", "error", err) diff --git a/cmd/server/main.go b/cmd/server/main.go index 5cf92a7aa..f9c28895e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -329,6 +329,7 @@ func main() { conf.Postgres.URL, pgdriver.WithTracerProvider(otelTracerProvider), pgdriver.WithMeterProvider(otelMeterProvider), + pgdriver.WithMetricMeter(metricMeter), ) if err != nil { logger.Error("failed to initialize postgres driver", "error", err) diff --git a/go.mod b/go.mod index 88bc25eb7..71934122f 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 - go.opentelemetry.io/otel/exporters/prometheus v0.50.0 + go.opentelemetry.io/otel/exporters/prometheus v0.51.0 go.opentelemetry.io/otel/metric v1.29.0 go.opentelemetry.io/otel/sdk v1.29.0 go.opentelemetry.io/otel/sdk/metric v1.29.0 diff --git a/go.sum b/go.sum index d9096a79f..9e7a0a2f1 100644 --- a/go.sum +++ b/go.sum @@ -1372,8 +1372,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6Z go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 h1:cfuy3bXmLJS7M1RZmAL6SuhGtKUp2KEsrm00OlAXkq4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1/go.mod h1:22jr92C6KwlwItJmQzfixzQM3oyyuYLCfHiMY+rpsPU= -go.opentelemetry.io/otel/exporters/prometheus v0.50.0 h1:2Ewsda6hejmbhGFyUvWZjUThC98Cf8Zy6g0zkIimOng= -go.opentelemetry.io/otel/exporters/prometheus v0.50.0/go.mod h1:pMm5PkUo5YwbLiuEf7t2xg4wbP0/eSJrMxIMxKosynY= +go.opentelemetry.io/otel/exporters/prometheus v0.51.0 h1:G7uexXb/K3T+T9fNLCCKncweEtNEBMTO+46hKX5EdKw= +go.opentelemetry.io/otel/exporters/prometheus v0.51.0/go.mod h1:v0mFe5Kk7woIh938mrZBJBmENYquyA0IICrlYm4Y0t4= go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= diff --git a/pkg/framework/pgdriver/driver.go b/pkg/framework/pgdriver/driver.go index 9708fddc0..6e212e2c6 100644 --- a/pkg/framework/pgdriver/driver.go +++ b/pkg/framework/pgdriver/driver.go @@ -11,6 +11,8 @@ import ( "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.20.0" "go.opentelemetry.io/otel/trace" + + "github.com/openmeterio/openmeter/pkg/pgxpoolobserver" ) type Option interface { @@ -35,9 +37,16 @@ func WithMeterProvider(p metric.MeterProvider) Option { }) } +func WithMetricMeter(m metric.Meter) Option { + return optionFunc(func(o *options) { + o.metricMeter = m + }) +} + type options struct { connConfig *pgxpool.Config otelOptions []otelsql.Option + metricMeter metric.Meter } type Driver struct { @@ -79,6 +88,12 @@ func NewPostgresDriver(ctx context.Context, url string, opts ...Option) (*Driver return nil, fmt.Errorf("failed to create postgres pool: %w", err) } + if o.metricMeter != nil { + if err := pgxpoolobserver.ObservePoolMetrics(o.metricMeter, pool); err != nil { + return nil, err + } + } + db := otelsql.OpenDB(pgxstdlib.GetPoolConnector(pool), o.otelOptions...) // Set maximum idle connections to 0 as connections are managed from pgx.Pool. diff --git a/pkg/pgxpoolobserver/observer.go b/pkg/pgxpoolobserver/observer.go new file mode 100644 index 000000000..49ee3bc64 --- /dev/null +++ b/pkg/pgxpoolobserver/observer.go @@ -0,0 +1,158 @@ +package pgxpoolobserver + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// ObservePoolMetrics registers a callback that observes the metrics of the provided pgxpool.Pool. +// the implementation is based on https://github.com/cmackenzie1/pgxpool-prometheus +func ObservePoolMetrics(meter metric.Meter, pool *pgxpool.Pool, additionalAttributes ...attribute.KeyValue) error { + allMetrics := []metric.Observable{} + + acquireCountMetric, err := meter.Int64ObservableCounter( + "pgxpool.acquire_count", + metric.WithDescription("The cumulative count of successful acquires from the pool."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, acquireCountMetric) + + acquiredDurationMetric, err := meter.Int64ObservableGauge( + "pgxpool.acquire_duration", + metric.WithDescription("The total duration of all successful acquires from the pool in ms."), + metric.WithUnit("ms"), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, acquiredDurationMetric) + + avgAcquiredDurationMetric, err := meter.Int64ObservableGauge( + "pgxpool.acquire_duration_avg", + metric.WithDescription("The average duration of all successful acquires from the pool in ms."), + metric.WithUnit("ms"), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, avgAcquiredDurationMetric) + + acquiredConnsMetric, err := meter.Int64ObservableGauge( + "pgxpool.acquired_conns", + metric.WithDescription("The number of currently acquired connections in the pool."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, acquiredConnsMetric) + + canceledAcquireCountMetric, err := meter.Int64ObservableCounter( + "pgxpool.canceled_acquire_count", + metric.WithDescription("The cumulative count of acquires from the pool that were canceled by a context."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, canceledAcquireCountMetric) + + constructingConnsMetric, err := meter.Int64ObservableGauge( + "pgxpool.constructing_conns", + metric.WithDescription("The number of conns with construction in progress in the pool."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, constructingConnsMetric) + + emptyAcquireCountMetric, err := meter.Int64ObservableCounter( + "pgxpool.empty_acquire_count", + metric.WithDescription("The cumulative count of successful acquires from the pool that waited for a resource to be released or constructed because the pool was empty."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, emptyAcquireCountMetric) + + idleConnsMetric, err := meter.Int64ObservableGauge( + "pgxpool.idle_conns", + metric.WithDescription("The number of currently idle conns in the pool."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, idleConnsMetric) + + maxConns, err := meter.Int64ObservableGauge( + "pgxpool.max_conns", + metric.WithDescription("The maximum size of the pool."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, maxConns) + + totalConns, err := meter.Int64ObservableGauge( + "pgxpool.total_conns", + metric.WithDescription("The total number of resources currently in the pool. The value is the sum of ConstructingConns, AcquiredConns, and IdleConns."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, totalConns) + + newConnsCount, err := meter.Int64ObservableCounter( + "pgxpool.new_conns_count", + metric.WithDescription("The cumulative count of new connections opened."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, newConnsCount) + + maxLifetimeDestroyCount, err := meter.Int64ObservableCounter( + "pgxpool.max_lifetime_destroy_count", + metric.WithDescription("The cumulative count of connections closed due to reaching their maximum lifetime (MaxConnLifetime)."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, maxLifetimeDestroyCount) + + maxIdleDestroyCount, err := meter.Int64ObservableCounter( + "pgxpool.max_idle_destroy_count", + metric.WithDescription("The cumulative count of connections closed due to reaching their maximum idle time (MaxConnIdleTime)."), + ) + if err != nil { + return err + } + allMetrics = append(allMetrics, maxIdleDestroyCount) + + _, err = meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + stat := pool.Stat() + o.ObserveInt64(acquireCountMetric, stat.AcquireCount(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(acquiredDurationMetric, stat.AcquireDuration().Milliseconds(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(avgAcquiredDurationMetric, stat.AcquireDuration().Milliseconds()/stat.AcquireCount(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(acquiredConnsMetric, int64(stat.AcquiredConns()), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(canceledAcquireCountMetric, stat.CanceledAcquireCount(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(constructingConnsMetric, int64(stat.ConstructingConns()), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(emptyAcquireCountMetric, stat.EmptyAcquireCount(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(idleConnsMetric, int64(stat.IdleConns()), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(maxConns, int64(stat.MaxConns()), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(totalConns, int64(stat.TotalConns()), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(newConnsCount, stat.NewConnsCount(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(maxLifetimeDestroyCount, stat.MaxLifetimeDestroyCount(), metric.WithAttributes(additionalAttributes...)) + o.ObserveInt64(maxIdleDestroyCount, stat.MaxIdleDestroyCount(), metric.WithAttributes(additionalAttributes...)) + + return nil + }, allMetrics...) + if err != nil { + return err + } + + return nil +}