diff --git a/cmd/server/main.go b/cmd/server/main.go index 0e9a5b1e1..5cf92a7aa 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -36,6 +36,7 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/openmeter/debug" "github.com/openmeterio/openmeter/openmeter/ingest" + "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" "github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" @@ -573,17 +574,23 @@ func initKafkaProducer(ctx context.Context, config config.Configuration, logger return producer, nil } -func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) { +func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer) (ingest.Collector, *kafkaingest.NamespaceHandler, error) { + var collector ingest.Collector + collector, err := kafkaingest.NewCollector( producer, serializer, config.Ingest.Kafka.EventsTopicTemplate, - metricMeter, ) if err != nil { return nil, nil, fmt.Errorf("init kafka ingest: %w", err) } + collector, err = ingestadapter.WithMetrics(collector, metricMeter) + if err != nil { + return nil, nil, fmt.Errorf("init kafka ingest: %w", err) + } + kafkaAdminClient, err := kafka.NewAdminClientFromProducer(producer) if err != nil { return nil, nil, err diff --git a/openmeter/ingest/kafkaingest/collector.go b/openmeter/ingest/kafkaingest/collector.go index c6fb37968..e8daa9884 100644 --- a/openmeter/ingest/kafkaingest/collector.go +++ b/openmeter/ingest/kafkaingest/collector.go @@ -9,8 +9,6 @@ import ( "github.com/cloudevents/sdk-go/v2/event" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" @@ -25,15 +23,12 @@ type Collector struct { // NamespacedTopicTemplate needs to contain at least one string parameter passed to fmt.Sprintf. // For example: "om_%s_events" NamespacedTopicTemplate string - - ingestEventCounter metric.Int64Counter } func NewCollector( producer *kafka.Producer, serializer serializer.Serializer, namespacedTopicTemplate string, - metricMeter metric.Meter, ) (*Collector, error) { if producer == nil { return nil, fmt.Errorf("producer is required") @@ -44,25 +39,11 @@ func NewCollector( if namespacedTopicTemplate == "" { return nil, fmt.Errorf("namespaced topic template is required") } - if metricMeter == nil { - return nil, fmt.Errorf("metric meter is required") - } - - // Initialize OTel metrics - ingestEventCounter, err := metricMeter.Int64Counter( - "ingest.events", - metric.WithDescription("The number of events ingested"), - metric.WithUnit("{event}"), - ) - if err != nil { - return nil, fmt.Errorf("failed to create events counter: %w", err) - } return &Collector{ Producer: producer, Serializer: serializer, NamespacedTopicTemplate: namespacedTopicTemplate, - ingestEventCounter: ingestEventCounter, }, nil } @@ -96,10 +77,6 @@ func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event) return fmt.Errorf("producing kafka message: %w", err) } - // Increment the ingest event counter metric - namespaceAttr := attribute.String("namespace", namespace) - s.ingestEventCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr)) - return nil }