From 20b8d79418284d9a2b347950246503b8f4785966 Mon Sep 17 00:00:00 2001 From: Jaydip Gabani Date: Tue, 16 Jan 2024 19:38:39 +0000 Subject: [PATCH] fix for running multiple metrics backend Signed-off-by: Jaydip Gabani --- pkg/metrics/exporter.go | 2 + pkg/metrics/exporters/common/common.go | 63 +++++++++++++++++++ .../exporters/opentelemetry/opentelemetry.go | 27 +++----- .../prometheus/prometheus_exporter.go | 15 ++--- .../exporters/stackdriver/stackdriver.go | 17 +++-- 5 files changed, 88 insertions(+), 36 deletions(-) create mode 100644 pkg/metrics/exporters/common/common.go diff --git a/pkg/metrics/exporter.go b/pkg/metrics/exporter.go index 89dec9fb130..80e39b77e9f 100644 --- a/pkg/metrics/exporter.go +++ b/pkg/metrics/exporter.go @@ -3,6 +3,7 @@ package metrics import ( "context" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common" "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/registry" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -34,6 +35,7 @@ func (r *runner) Start(ctx context.Context) error { defer log.Info("Stopping metrics runner workers") errCh := make(chan error) exporters := registry.Exporters() + common.SetRequiredReaders(len(exporters)) for i := range exporters { startExporter := exporters[i] go func() { diff --git a/pkg/metrics/exporters/common/common.go b/pkg/metrics/exporters/common/common.go new file mode 100644 index 00000000000..b245fdf9b36 --- /dev/null +++ b/pkg/metrics/exporters/common/common.go @@ -0,0 +1,63 @@ +package common + +import ( + "sync" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" +) + +var ( + opts []metric.Option + res *resource.Resource + mutex sync.Mutex + requiredReaders int +) + +// SetRequiredReaders sets the number of required readers for the MeterProvider. +func SetRequiredReaders(num int) { + requiredReaders = num +} + +// AddReader adds a reader to the options and updates the MeterProvider if the required conditions are met. +func AddReader(opt metric.Option) { + mutex.Lock() + defer mutex.Unlock() + if opt == nil { + requiredReaders-- + } else { + opts = append(opts, opt) + } + setMeterProvider() +} + +// SetResource sets the resource to be used by the MeterProvider. +func SetResource(r *resource.Resource) { + mutex.Lock() + defer mutex.Unlock() + res = r +} + +// setMeterProvider sets the MeterProvider if the required conditions are met. +func setMeterProvider() { + // Check if we have the required number of readers and at least one reader. + if len(opts) != requiredReaders || len(opts) == 0 { + return + } + + // Start with the existing options. + options := opts + + // Add views to the options. + options = append(options, metric.WithView(view.Views()...)) + + // If a resource is available, add it to the options. + if res != nil { + options = append(options, metric.WithResource(res)) + } + + meterProvider := metric.NewMeterProvider(options...) + otel.SetMeterProvider(meterProvider) +} diff --git a/pkg/metrics/exporters/opentelemetry/opentelemetry.go b/pkg/metrics/exporters/opentelemetry/opentelemetry.go index 29ab161ba7f..138b7eaaae9 100644 --- a/pkg/metrics/exporters/opentelemetry/opentelemetry.go +++ b/pkg/metrics/exporters/opentelemetry/opentelemetry.go @@ -6,8 +6,7 @@ import ( "fmt" "time" - "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view" - "go.opentelemetry.io/otel" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/sdk/metric" ) @@ -25,27 +24,21 @@ var ( func Start(ctx context.Context) error { if *otlpEndPoint == "" { + common.AddReader(nil) return fmt.Errorf("otlp-endpoint must be specified") } + var err error exp, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithInsecure(), otlpmetrichttp.WithEndpoint(*otlpEndPoint)) if err != nil { + common.AddReader(nil) return err } - meterProvider := metric.NewMeterProvider( - metric.WithReader(metric.NewPeriodicReader( - exp, - metric.WithTimeout(defaultMetricsTimeout), - metric.WithInterval(*metricInterval), - )), - metric.WithView(view.Views()...), - ) - - otel.SetMeterProvider(meterProvider) - defer func() { - if err := meterProvider.Shutdown(ctx); err != nil { - panic(err) - } - }() + reader := metric.WithReader(metric.NewPeriodicReader( + exp, + metric.WithTimeout(defaultMetricsTimeout), + metric.WithInterval(*metricInterval), + )) + common.AddReader(reader) <-ctx.Done() return nil diff --git a/pkg/metrics/exporters/prometheus/prometheus_exporter.go b/pkg/metrics/exporters/prometheus/prometheus_exporter.go index 38863989efc..9761e09b35f 100644 --- a/pkg/metrics/exporters/prometheus/prometheus_exporter.go +++ b/pkg/metrics/exporters/prometheus/prometheus_exporter.go @@ -7,9 +7,8 @@ import ( "net/http" "time" - "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/sdk/metric" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -27,21 +26,19 @@ var ( ) func Start(ctx context.Context) error { + var err error e, err := prometheus.New( prometheus.WithNamespace(namespace), prometheus.WithoutScopeInfo(), ) if err != nil { + common.AddReader(nil) return err } - meterProvider := metric.NewMeterProvider( - metric.WithReader(e), - metric.WithView(view.Views()...), - ) - server := newPromSrv(*prometheusPort) - otel.SetMeterProvider(meterProvider) - otel.SetLogger(logf.Log.WithName("metrics")) + reader := metric.WithReader(e) + common.AddReader(reader) + server := newPromSrv(*prometheusPort) errCh := make(chan error) srv := func() { err := server.ListenAndServe() diff --git a/pkg/metrics/exporters/stackdriver/stackdriver.go b/pkg/metrics/exporters/stackdriver/stackdriver.go index db755e2b5e8..fdf8c5e1d57 100644 --- a/pkg/metrics/exporters/stackdriver/stackdriver.go +++ b/pkg/metrics/exporters/stackdriver/stackdriver.go @@ -8,10 +8,9 @@ import ( traceapi "cloud.google.com/go/trace/apiv2" stackdriver "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" - "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common" "go.opentelemetry.io/contrib/detectors/aws/ec2" "go.opentelemetry.io/contrib/detectors/gcp" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" @@ -35,9 +34,11 @@ func Start(ctx context.Context) error { // Verify that default stackdriver credentials are available if _, err := google.FindDefaultCredentials(ctx, traceapi.DefaultAuthScopes()...); err != nil { if *ignoreMissingCreds { + common.AddReader(nil) log.Error(err, "Missing credentials, cannot start stackdriver exporter") return nil } + common.AddReader(nil) return err } @@ -49,6 +50,7 @@ func Start(ctx context.Context) error { log.Error(err, "Error initializing stackdriver exporter, not exporting stackdriver metrics") return nil } + common.AddReader(nil) return err } res, err := resource.New(ctx, @@ -57,17 +59,12 @@ func Start(ctx context.Context) error { resource.WithFromEnv(), ) if err != nil { + common.AddReader(nil) return err } reader := metric.NewPeriodicReader(e, metric.WithInterval(*metricInterval)) - meterProvider := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithView(view.Views()...), - metric.WithResource(res), - ) - - otel.SetMeterProvider(meterProvider) - otel.SetLogger(logf.Log.WithName("metrics")) + common.SetResource(res) + common.AddReader(metric.WithReader(reader)) <-ctx.Done() return nil