Skip to content

Commit

Permalink
feat: add region field to messages
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Oct 6, 2024
1 parent 5411f7d commit 85d934d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 14 deletions.
1 change: 1 addition & 0 deletions charts/nats-blackbox-exporter/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ data:
queue_subscription_group: {{ .Values.nats.queue_subscription_group | quote }}
flush_timeout: {{ .Values.nats.flush_timeout | quote }}
new_stream_allow: {{ .Values.nats.new_stream_allow }}
region: {{ .Values.nats.region }}
metric:
server:
address: ":{{ .Values.metrics.port | default 8080 }}"
Expand Down
1 change: 1 addition & 0 deletions charts/nats-blackbox-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ nats:
queue_subscription_group: "group"
flush_timeout: 2s
new_stream_allow: false
region: "-"

metrics:
port: 8080
1 change: 1 addition & 0 deletions internal/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
Region string `json:"region" koanf:"region"`
}

type Stream struct {
Expand Down
44 changes: 32 additions & 12 deletions internal/client/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"encoding/json"
"errors"
"slices"
"time"
Expand All @@ -19,6 +20,11 @@ const (
subjectSuffix = "_blackbox_exporter"
)

type Payload struct {
PublishTime time.Time `json:"publish_time"`
Region string `json:"region"`
}

type Message struct {
Subject string
Data []byte
Expand Down Expand Up @@ -195,30 +201,32 @@ func (client *Client) jetstreamSubscribe(h <-chan *Message, streamName string) {
clusterName := client.connection.ConnectedClusterName()

for msg := range h {
var publishTime time.Time
var payload Payload

if err := publishTime.UnmarshalBinary(msg.Data); err != nil {
client.logger.Error("unable to unmarshal binary data for publishTime.")
client.logger.Info("received message but could not calculate latency due to unmarshalling error.",
if err := json.Unmarshal(msg.Data, &payload); err != nil {
client.logger.Error("received message but could not calculate latency due to unmarshalling error.",
zap.String("subject", msg.Subject),
zap.Error(err),
)

return
}

latency := time.Since(publishTime).Seconds()
latency := time.Since(payload.PublishTime).Seconds()

client.metrics.Latency.With(prometheus.Labels{
"subject": msg.Subject,
"stream": streamName,
"cluster": clusterName,
"region": payload.Region,
}).Observe(latency)

client.metrics.SuccessCounter.With(prometheus.Labels{
"subject": msg.Subject,
"type": successfulSubscribe,
"stream": streamName,
"cluster": clusterName,
"region": payload.Region,
}).Add(1)

client.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
Expand All @@ -235,30 +243,32 @@ func (client *Client) coreSubscribe(subject string) {
}

for msg := range h {
var publishTime time.Time
var payload Payload

if err := publishTime.UnmarshalBinary(msg.Data); err != nil {
client.logger.Error("unable to unmarshal binary data for publishTime.")
client.logger.Info("received message but could not calculate latency due to unmarshalling error.",
if err := json.Unmarshal(msg.Data, &payload); err != nil {
client.logger.Error("received message but could not calculate latency due to unmarshalling error.",
zap.String("subject", msg.Subject),
zap.Error(err),
)

return
}

latency := time.Since(publishTime).Seconds()
latency := time.Since(payload.PublishTime).Seconds()

client.metrics.Latency.With(prometheus.Labels{
"subject": subject,
"stream": "-",
"cluster": clusterName,
"region": payload.Region,
}).Observe(latency)

client.metrics.SuccessCounter.With(prometheus.Labels{
"subject": subject,
"stream": "-",
"type": successfulSubscribe,
"cluster": clusterName,
"region": payload.Region,
}).Add(1)

client.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
Expand All @@ -269,9 +279,14 @@ func (client *Client) corePublish(subject string) {
clusterName := client.connection.ConnectedClusterName()

for {
t, err := time.Now().MarshalBinary()
t, err := json.Marshal(Payload{
PublishTime: time.Now(),
Region: client.config.Region,
})
if err != nil {
client.logger.Error("could not marshal current time.", zap.Error(err))

continue
}

if err := client.connection.Publish(subject, t); err != nil {
Expand Down Expand Up @@ -307,9 +322,14 @@ func (client *Client) jetstreamPublish(ctx context.Context, subject string, stre
clusterName := client.connection.ConnectedClusterName()

for {
t, err := time.Now().MarshalBinary()
t, err := json.Marshal(Payload{
PublishTime: time.Now(),
Region: client.config.Region,
})
if err != nil {
client.logger.Error("could not marshal current time.", zap.Error(err))

continue
}

if ack, err := client.jetstream.Publish(ctx, subject, t); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/client/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewMetrics(conn string) Metrics {
"conn": conn,
},
Buckets: latencyBuckets,
}, []string{"stream", "cluster", "subject"}),
}, []string{"stream", "cluster", "subject", "region"}),
SuccessCounter: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Expand All @@ -124,6 +124,6 @@ func NewMetrics(conn string) Metrics {
ConstLabels: prometheus.Labels{
"conn": conn,
},
}, []string{"type", "stream", "cluster", "subject"}),
}, []string{"type", "stream", "cluster", "subject", "region"}),
}
}
1 change: 1 addition & 0 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Default() Config {
QueueSubscriptionGroup: "group",
FlushTimeout: 2 * time.Second,
ClientName: "localhost",
Region: "-",
},
Metric: metric.Config{
Server: metric.Server{Address: ":8080"},
Expand Down

0 comments on commit 85d934d

Please sign in to comment.