diff --git a/internal/client/jetstream.go b/internal/client/jetstream.go index 6de7458..4b40520 100644 --- a/internal/client/jetstream.go +++ b/internal/client/jetstream.go @@ -208,11 +208,13 @@ func (client *Client) jetstreamSubscribe(h <-chan *Message, streamName string) { latency := time.Since(publishTime).Seconds() client.metrics.Latency.With(prometheus.Labels{ + "subject": msg.Subject, "stream": streamName, "cluster": clusterName, }).Observe(latency) client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": msg.Subject, "type": successfulSubscribe, "stream": streamName, "cluster": clusterName, @@ -246,10 +248,14 @@ func (client *Client) coreSubscribe(subject string) { latency := time.Since(publishTime).Seconds() client.metrics.Latency.With(prometheus.Labels{ + "subject": subject, + "stream": "-", "cluster": clusterName, }).Observe(latency) client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": subject, + "stream": "-", "type": successfulSubscribe, "cluster": clusterName, }).Add(1) @@ -269,6 +275,7 @@ func (client *Client) corePublish(subject string) { if err := client.connection.Publish(subject, t); err != nil { client.metrics.SuccessCounter.With(prometheus.Labels{ + "stream": "-", "subject": subject, "type": failedPublish, "cluster": clusterName, @@ -284,6 +291,7 @@ func (client *Client) corePublish(subject string) { } } else { client.metrics.SuccessCounter.With(prometheus.Labels{ + "stream": "-", "type": successfulPublish, "cluster": clusterName, "subject": subject, @@ -305,6 +313,7 @@ func (client *Client) jetstreamPublish(ctx context.Context, subject string, stre if ack, err := client.jetstream.Publish(ctx, subject, t); err != nil { client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": subject, "type": failedPublish, "stream": streamName, "cluster": clusterName, @@ -320,6 +329,7 @@ func (client *Client) jetstreamPublish(ctx context.Context, subject string, stre } } else { client.metrics.SuccessCounter.With(prometheus.Labels{ + "subject": subject, "type": successfulPublish, "stream": streamName, "cluster": clusterName, diff --git a/internal/client/metric.go b/internal/client/metric.go index a2c5f33..f662738 100644 --- a/internal/client/metric.go +++ b/internal/client/metric.go @@ -115,7 +115,7 @@ func NewMetrics(conn string) Metrics { "conn": conn, }, Buckets: latencyBuckets, - }, []string{"stream", "cluster"}), + }, []string{"stream", "cluster", "subject"}), SuccessCounter: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Subsystem: Subsystem, @@ -124,6 +124,6 @@ func NewMetrics(conn string) Metrics { ConstLabels: prometheus.Labels{ "conn": conn, }, - }, []string{"type", "stream", "cluster"}), + }, []string{"type", "stream", "cluster", "subject"}), } }