Skip to content

Commit

Permalink
fix for running multiple metrics backend
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Jan 16, 2024
1 parent 8919e95 commit 20b8d79
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 36 deletions.
2 changes: 2 additions & 0 deletions pkg/metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metrics
import (
"context"

"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/registry"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -34,6 +35,7 @@ func (r *runner) Start(ctx context.Context) error {
defer log.Info("Stopping metrics runner workers")
errCh := make(chan error)
exporters := registry.Exporters()
common.SetRequiredReaders(len(exporters))
for i := range exporters {
startExporter := exporters[i]
go func() {
Expand Down
63 changes: 63 additions & 0 deletions pkg/metrics/exporters/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package common

import (
"sync"

"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

var (
opts []metric.Option
res *resource.Resource
mutex sync.Mutex
requiredReaders int
)

// SetRequiredReaders sets the number of required readers for the MeterProvider.
func SetRequiredReaders(num int) {
requiredReaders = num
}

// AddReader adds a reader to the options and updates the MeterProvider if the required conditions are met.
func AddReader(opt metric.Option) {
mutex.Lock()
defer mutex.Unlock()
if opt == nil {
requiredReaders--
} else {
opts = append(opts, opt)
}
setMeterProvider()
}

// SetResource sets the resource to be used by the MeterProvider.
func SetResource(r *resource.Resource) {
mutex.Lock()
defer mutex.Unlock()
res = r
}

// setMeterProvider sets the MeterProvider if the required conditions are met.
func setMeterProvider() {
// Check if we have the required number of readers and at least one reader.
if len(opts) != requiredReaders || len(opts) == 0 {
return
}

// Start with the existing options.
options := opts

// Add views to the options.
options = append(options, metric.WithView(view.Views()...))

// If a resource is available, add it to the options.
if res != nil {
options = append(options, metric.WithResource(res))
}

meterProvider := metric.NewMeterProvider(options...)
otel.SetMeterProvider(meterProvider)
}
27 changes: 10 additions & 17 deletions pkg/metrics/exporters/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"fmt"
"time"

"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view"
"go.opentelemetry.io/otel"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/sdk/metric"
)
Expand All @@ -25,27 +24,21 @@ var (

func Start(ctx context.Context) error {
if *otlpEndPoint == "" {
common.AddReader(nil)
return fmt.Errorf("otlp-endpoint must be specified")
}
var err error
exp, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithInsecure(), otlpmetrichttp.WithEndpoint(*otlpEndPoint))
if err != nil {
common.AddReader(nil)
return err
}
meterProvider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(
exp,
metric.WithTimeout(defaultMetricsTimeout),
metric.WithInterval(*metricInterval),
)),
metric.WithView(view.Views()...),
)

otel.SetMeterProvider(meterProvider)
defer func() {
if err := meterProvider.Shutdown(ctx); err != nil {
panic(err)
}
}()
reader := metric.WithReader(metric.NewPeriodicReader(
exp,
metric.WithTimeout(defaultMetricsTimeout),
metric.WithInterval(*metricInterval),
))
common.AddReader(reader)

<-ctx.Done()
return nil
Expand Down
15 changes: 6 additions & 9 deletions pkg/metrics/exporters/prometheus/prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"net/http"
"time"

"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -27,21 +26,19 @@ var (
)

func Start(ctx context.Context) error {
var err error
e, err := prometheus.New(
prometheus.WithNamespace(namespace),
prometheus.WithoutScopeInfo(),
)
if err != nil {
common.AddReader(nil)
return err
}
meterProvider := metric.NewMeterProvider(
metric.WithReader(e),
metric.WithView(view.Views()...),
)
server := newPromSrv(*prometheusPort)
otel.SetMeterProvider(meterProvider)
otel.SetLogger(logf.Log.WithName("metrics"))
reader := metric.WithReader(e)
common.AddReader(reader)

server := newPromSrv(*prometheusPort)
errCh := make(chan error)
srv := func() {
err := server.ListenAndServe()
Expand Down
17 changes: 7 additions & 10 deletions pkg/metrics/exporters/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (

traceapi "cloud.google.com/go/trace/apiv2"
stackdriver "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/view"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics/exporters/common"
"go.opentelemetry.io/contrib/detectors/aws/ec2"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -35,9 +34,11 @@ func Start(ctx context.Context) error {
// Verify that default stackdriver credentials are available
if _, err := google.FindDefaultCredentials(ctx, traceapi.DefaultAuthScopes()...); err != nil {
if *ignoreMissingCreds {
common.AddReader(nil)
log.Error(err, "Missing credentials, cannot start stackdriver exporter")
return nil
}
common.AddReader(nil)
return err
}

Expand All @@ -49,6 +50,7 @@ func Start(ctx context.Context) error {
log.Error(err, "Error initializing stackdriver exporter, not exporting stackdriver metrics")
return nil
}
common.AddReader(nil)
return err
}
res, err := resource.New(ctx,
Expand All @@ -57,17 +59,12 @@ func Start(ctx context.Context) error {
resource.WithFromEnv(),
)
if err != nil {
common.AddReader(nil)
return err
}
reader := metric.NewPeriodicReader(e, metric.WithInterval(*metricInterval))
meterProvider := metric.NewMeterProvider(
metric.WithReader(reader),
metric.WithView(view.Views()...),
metric.WithResource(res),
)

otel.SetMeterProvider(meterProvider)
otel.SetLogger(logf.Log.WithName("metrics"))
common.SetResource(res)
common.AddReader(metric.WithReader(reader))

<-ctx.Done()
return nil
Expand Down

0 comments on commit 20b8d79

Please sign in to comment.