From 85d934dfe2e965cd3b17bf6d967113c269c82b29 Mon Sep 17 00:00:00 2001 From: Parham Alvani Date: Sun, 6 Oct 2024 21:53:34 +0000 Subject: [PATCH] feat: add region field to messages --- .../templates/configmap.yaml | 1 + charts/nats-blackbox-exporter/values.yaml | 1 + internal/client/config.go | 1 + internal/client/jetstream.go | 44 ++++++++++++++----- internal/client/metric.go | 4 +- internal/config/default.go | 1 + 6 files changed, 38 insertions(+), 14 deletions(-) diff --git a/charts/nats-blackbox-exporter/templates/configmap.yaml b/charts/nats-blackbox-exporter/templates/configmap.yaml index 5d41b68..6e512ba 100644 --- a/charts/nats-blackbox-exporter/templates/configmap.yaml +++ b/charts/nats-blackbox-exporter/templates/configmap.yaml @@ -16,6 +16,7 @@ data: queue_subscription_group: {{ .Values.nats.queue_subscription_group | quote }} flush_timeout: {{ .Values.nats.flush_timeout | quote }} new_stream_allow: {{ .Values.nats.new_stream_allow }} + region: {{ .Values.nats.region }} metric: server: address: ":{{ .Values.metrics.port | default 8080 }}" diff --git a/charts/nats-blackbox-exporter/values.yaml b/charts/nats-blackbox-exporter/values.yaml index 039d298..9e5a321 100644 --- a/charts/nats-blackbox-exporter/values.yaml +++ b/charts/nats-blackbox-exporter/values.yaml @@ -60,6 +60,7 @@ nats: queue_subscription_group: "group" flush_timeout: 2s new_stream_allow: false + region: "-" metrics: port: 8080 diff --git a/internal/client/config.go b/internal/client/config.go index 43206a1..2fc3a84 100644 --- a/internal/client/config.go +++ b/internal/client/config.go @@ -13,6 +13,7 @@ type Config struct { QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` ClientName string `json:"client_name" koanf:"client_name"` + Region string `json:"region" koanf:"region"` } type Stream struct { diff --git a/internal/client/jetstream.go b/internal/client/jetstream.go index bea4aad..0d3ac0b 100644 --- a/internal/client/jetstream.go +++ b/internal/client/jetstream.go @@ -2,6 +2,7 @@ package client import ( "context" + "encoding/json" "errors" "slices" "time" @@ -19,6 +20,11 @@ const ( subjectSuffix = "_blackbox_exporter" ) +type Payload struct { + PublishTime time.Time `json:"publish_time"` + Region string `json:"region"` +} + type Message struct { Subject string Data []byte @@ -195,23 +201,24 @@ func (client *Client) jetstreamSubscribe(h <-chan *Message, streamName string) { clusterName := client.connection.ConnectedClusterName() for msg := range h { - var publishTime time.Time + var payload Payload - if err := publishTime.UnmarshalBinary(msg.Data); err != nil { - client.logger.Error("unable to unmarshal binary data for publishTime.") - client.logger.Info("received message but could not calculate latency due to unmarshalling error.", + if err := json.Unmarshal(msg.Data, &payload); err != nil { + client.logger.Error("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject), + zap.Error(err), ) return } - latency := time.Since(publishTime).Seconds() + latency := time.Since(payload.PublishTime).Seconds() client.metrics.Latency.With(prometheus.Labels{ "subject": msg.Subject, "stream": streamName, "cluster": clusterName, + "region": payload.Region, }).Observe(latency) client.metrics.SuccessCounter.With(prometheus.Labels{ @@ -219,6 +226,7 @@ func (client *Client) jetstreamSubscribe(h <-chan *Message, streamName string) { "type": successfulSubscribe, "stream": streamName, "cluster": clusterName, + "region": payload.Region, }).Add(1) client.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency)) @@ -235,23 +243,24 @@ func (client *Client) coreSubscribe(subject string) { } for msg := range h { - var publishTime time.Time + var payload Payload - if err := publishTime.UnmarshalBinary(msg.Data); err != nil { - client.logger.Error("unable to unmarshal binary data for publishTime.") - client.logger.Info("received message but could not calculate latency due to unmarshalling error.", + if err := json.Unmarshal(msg.Data, &payload); err != nil { + client.logger.Error("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject), + zap.Error(err), ) return } - latency := time.Since(publishTime).Seconds() + latency := time.Since(payload.PublishTime).Seconds() client.metrics.Latency.With(prometheus.Labels{ "subject": subject, "stream": "-", "cluster": clusterName, + "region": payload.Region, }).Observe(latency) client.metrics.SuccessCounter.With(prometheus.Labels{ @@ -259,6 +268,7 @@ func (client *Client) coreSubscribe(subject string) { "stream": "-", "type": successfulSubscribe, "cluster": clusterName, + "region": payload.Region, }).Add(1) client.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency)) @@ -269,9 +279,14 @@ func (client *Client) corePublish(subject string) { clusterName := client.connection.ConnectedClusterName() for { - t, err := time.Now().MarshalBinary() + t, err := json.Marshal(Payload{ + PublishTime: time.Now(), + Region: client.config.Region, + }) if err != nil { client.logger.Error("could not marshal current time.", zap.Error(err)) + + continue } if err := client.connection.Publish(subject, t); err != nil { @@ -307,9 +322,14 @@ func (client *Client) jetstreamPublish(ctx context.Context, subject string, stre clusterName := client.connection.ConnectedClusterName() for { - t, err := time.Now().MarshalBinary() + t, err := json.Marshal(Payload{ + PublishTime: time.Now(), + Region: client.config.Region, + }) if err != nil { client.logger.Error("could not marshal current time.", zap.Error(err)) + + continue } if ack, err := client.jetstream.Publish(ctx, subject, t); err != nil { diff --git a/internal/client/metric.go b/internal/client/metric.go index f662738..26c056a 100644 --- a/internal/client/metric.go +++ b/internal/client/metric.go @@ -115,7 +115,7 @@ func NewMetrics(conn string) Metrics { "conn": conn, }, Buckets: latencyBuckets, - }, []string{"stream", "cluster", "subject"}), + }, []string{"stream", "cluster", "subject", "region"}), SuccessCounter: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Subsystem: Subsystem, @@ -124,6 +124,6 @@ func NewMetrics(conn string) Metrics { ConstLabels: prometheus.Labels{ "conn": conn, }, - }, []string{"type", "stream", "cluster", "subject"}), + }, []string{"type", "stream", "cluster", "subject", "region"}), } } diff --git a/internal/config/default.go b/internal/config/default.go index 9ce9b97..7572f43 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -31,6 +31,7 @@ func Default() Config { QueueSubscriptionGroup: "group", FlushTimeout: 2 * time.Second, ClientName: "localhost", + Region: "-", }, Metric: metric.Config{ Server: metric.Server{Address: ":8080"},