From becf56a3c28522decdbe992a4ecac944a6ecd89f Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Tue, 23 Jan 2024 15:11:40 -0500 Subject: [PATCH] Add subscribed topics gauge metric (#339) * Add subscribe/unsubscribe gauge metric * Emit unsubscribe in defer --- pkg/api/message/v1/service.go | 15 +++++++++++---- pkg/metrics/api-subscribe.go | 36 +++++++++++++++++++++++++++++++++++ pkg/metrics/api.go | 14 -------------- pkg/metrics/metrics.go | 1 + 4 files changed, 48 insertions(+), 18 deletions(-) create mode 100644 pkg/metrics/api-subscribe.go diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index bb1b256a..28cccdad 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -178,7 +178,7 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) - metrics.EmitSubscribeTopicsLength(stream.Context(), log, len(req.ContentTopics)) + metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics)) var streamLock sync.Mutex for _, topic := range req.ContentTopics { @@ -211,17 +211,20 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi } defer func() { _ = sub.Unsubscribe() + metrics.EmitUnsubscribeTopics(stream.Context(), log, 1) }() } select { case <-stream.Context().Done(): log.Debug("stream closed") - return nil + break case <-s.ctx.Done(): log.Info("service closed") - return nil + break } + + return nil } func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { @@ -264,6 +267,7 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { for _, sub := range subs { _ = sub.Unsubscribe() } + metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs)) }() var streamLock sync.Mutex for { @@ -280,7 +284,7 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { } log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics))) - metrics.EmitSubscribeTopicsLength(stream.Context(), log, len(req.ContentTopics)) + metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics)) topics := map[string]bool{} for _, topic := range req.ContentTopics { @@ -313,13 +317,16 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { } // If subscription not in topic, then unsubscribe. + var count int for topic, sub := range subs { if topics[topic] { continue } _ = sub.Unsubscribe() delete(subs, topic) + count++ } + metrics.EmitUnsubscribeTopics(stream.Context(), log, count) } } } diff --git a/pkg/metrics/api-subscribe.go b/pkg/metrics/api-subscribe.go new file mode 100644 index 00000000..94acb081 --- /dev/null +++ b/pkg/metrics/api-subscribe.go @@ -0,0 +1,36 @@ +package metrics + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +var subscribedTopics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmtp_subscribed_topics", + Help: "Number of subscribed topics", + }, + appClientVersionTagKeys, +) + +var subscribeTopicsLength = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "xmtp_subscribe_topics_length", + Help: "Number of subscribed topics per request", + Buckets: []float64{1, 2, 4, 8, 16, 50, 100, 10000, 100000}, + }, + appClientVersionTagKeys, +) + +func EmitSubscribeTopics(ctx context.Context, log *zap.Logger, topics int) { + labels := contextLabels(ctx) + subscribeTopicsLength.With(labels).Observe(float64(topics)) + subscribedTopics.With(labels).Add(float64(topics)) +} + +func EmitUnsubscribeTopics(ctx context.Context, log *zap.Logger, topics int) { + labels := contextLabels(ctx) + subscribedTopics.With(labels).Add(-float64(topics)) +} diff --git a/pkg/metrics/api.go b/pkg/metrics/api.go index 5eb95ee8..dd5cf73d 100644 --- a/pkg/metrics/api.go +++ b/pkg/metrics/api.go @@ -67,20 +67,6 @@ func EmitAPIRequest(ctx context.Context, log *zap.Logger, fields []zapcore.Field apiRequestsDuration.With(labels).Observe(float64(duration / time.Millisecond)) } -var subscribeTopicsLength = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "xmtp_subscribe_topics_length", - Help: "Number of subscribed topics per request", - Buckets: []float64{1, 2, 4, 8, 16, 50, 100, 10000, 100000}, - }, - appClientVersionTagKeys, -) - -func EmitSubscribeTopicsLength(ctx context.Context, log *zap.Logger, topics int) { - labels := contextLabels(ctx) - subscribeTopicsLength.With(labels).Observe(float64(topics)) -} - var publishedEnvelopeSize = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "xmtp_published_envelope", diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 4b005000..3b0cd0e8 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -59,6 +59,7 @@ func registerCollectors(reg prometheus.Registerer) { apiRequests, apiRequestsDuration, subscribeTopicsLength, + subscribedTopics, publishedEnvelopeSize, publishedEnvelopeCount, queryDuration,