diff --git a/.golangci.yml b/.golangci.yml index 4a753ef..5915abd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,8 +1,24 @@ --- linters: - enable: - - gofmt - - govet - - revive - - staticcheck - - errcheck + enable-all: true + disable: + - depguard + # we don't use json with camel-case + - tagliatelle + - nolintlint + # it should improve to support more known patterns + - varnamelen + - ireturn + # deprecated linters + - maligned + - scopelint + - golint + - ifshort + - interfacer + - exhaustivestruct + - nosnakecase + - varcheck + - deadcode + - structcheck + - gomnd + - execinquery diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 3c3d498..e85848b 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -13,9 +13,9 @@ import ( // ExitFailure status code. const ExitFailure = 1 -var configPath string - func Execute() { + var configPath string + pflag.StringVar(&configPath, "configPath", "./config.yaml", "Path to config file") pflag.Parse() diff --git a/internal/config/config.go b/internal/config/config.go index d0e1ea4..d0bfcc7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -25,7 +25,7 @@ type ( // Config holds all configurations. Config struct { Logger logger.Config `json:"logger,omitempty" koanf:"logger"` - NATS natsclient.Config `json:"nats,omitempty" koanf:"nats"` + NATS natsclient.Config `json:"nats,omitempty" koanf:"nats"` Metric metric.Config `json:"metric,omitempty" koanf:"metric"` } ) diff --git a/internal/config/default.go b/internal/config/default.go index 267bebc..512667e 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -18,10 +18,11 @@ func Default() Config { NATS: natsclient.Config{ AllExistingStreams: false, NewStreamAllow: true, - Streams: []natsclient.Stream{{ - Name: "test", - Subject: "test", - }, + Streams: []natsclient.Stream{ + { + Name: "test", + Subject: "test", + }, }, URL: "localhost:4222", PublishInterval: 2 * time.Second, diff --git a/internal/metric/config.go b/internal/metric/config.go index a5e3aea..ed5c5fe 100644 --- a/internal/metric/config.go +++ b/internal/metric/config.go @@ -1,7 +1,7 @@ package metric type Config struct { - Server Server `json:"server,omitempty" koanf:"server"` + Server Server `json:"server,omitempty" koanf:"server"` Enabled bool `json:"enabled,omitempty" koanf:"enabled"` } diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index fc4f6ed..fe74780 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -3,19 +3,19 @@ package natsclient import "time" type Config struct { - AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"` - NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` - Streams []Stream `json:"streams,omitempty" koanf:"streams"` + AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"` + NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` + Streams []Stream `json:"streams,omitempty" koanf:"streams"` URL string `json:"url,omitempty" koanf:"url"` - PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` - RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` - MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"` + PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"` + RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"` + MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"` QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"` - FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` - ClientName string `json:"client_name" koanf:"client_name"` + FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"` + ClientName string `json:"client_name" koanf:"client_name"` } type Stream struct { - Name string `json:"name,omitempty" koanf:"name"` - Subject string `json:"subject,omitempty" koanf:"subject"` + Name string `json:"name,omitempty" koanf:"name"` + Subject string `json:"subject,omitempty" koanf:"subject"` } diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 86f27aa..0a1ca5d 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -1,6 +1,7 @@ package natsclient import ( + "errors" "slices" "time" @@ -9,7 +10,7 @@ import ( "go.uber.org/zap" ) -var ( +const ( successfulSubscribe = "successful subscribe" failedPublish = "failed publish" successfulPublish = "successful publish" @@ -21,7 +22,7 @@ type Message struct { Data []byte } -// Jetstream represents the NATS core handler +// Jetstream represents the NATS core handler. type Jetstream struct { connection *nats.Conn jetstream nats.JetStreamContext @@ -30,12 +31,14 @@ type Jetstream struct { metrics Metrics } -// NewJetstream initializes NATS JetStream connection +// NewJetstream initializes NATS JetStream connection. func NewJetstream(config Config, logger *zap.Logger) *Jetstream { j := &Jetstream{ - config: &config, - logger: logger, - metrics: NewMetrics(), + config: &config, + logger: logger, + jetstream: nil, + connection: nil, + metrics: NewMetrics(), } j.connect() @@ -49,8 +52,8 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream { func (j *Jetstream) connect() { var err error - j.connection, err = nats.Connect(j.config.URL) - if err != nil { + + if j.connection, err = nats.Connect(j.config.URL); err != nil { j.logger.Panic("could not connect to nats", zap.Error(err)) } @@ -67,8 +70,8 @@ func (j *Jetstream) connect() { func (j *Jetstream) createJetstreamContext() { var err error - j.jetstream, err = j.connection.JetStream() - if err != nil { + + if j.jetstream, err = j.connection.JetStream(); err != nil { j.logger.Panic("could not connect to jetstream", zap.Error(err)) } } @@ -77,43 +80,52 @@ func (j *Jetstream) UpdateOrCreateStream() { if j.config.AllExistingStreams { streamNames := j.jetstream.StreamNames() for stream := range streamNames { - j.config.Streams = append(j.config.Streams, Stream{Name: stream}) + j.config.Streams = append(j.config.Streams, Stream{Name: stream, Subject: ""}) } } + for i, stream := range j.config.Streams { if stream.Subject == "" { j.config.Streams[i].Subject = stream.Name + subjectSuffix } info, err := j.jetstream.StreamInfo(stream.Name) - if err == nil { + if err == nil { // nolint: gocritic j.updateStream(j.config.Streams[i], info) - } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { + } else if errors.Is(err, nats.ErrStreamNotFound) && j.config.NewStreamAllow { j.createStream(j.config.Streams[i]) } else { j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err)) } } } + func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) { - subjects := append(info.Config.Subjects, stream.Subject) + subjects := make([]string, len(info.Config.Subjects)+1) + copy(subjects, info.Config.Subjects) + subjects[len(info.Config.Subjects)] = stream.Subject + + // remove duplicate subjects in the list. slices.Sort(subjects) info.Config.Subjects = slices.Compact(subjects) + _, err := j.jetstream.UpdateStream(&info.Config) if err != nil { j.logger.Error("could not add subject to existing stream", zap.String("stream", stream.Name), zap.Error(err)) } + j.logger.Info("stream updated") } func (j *Jetstream) createStream(stream Stream) { - _, err := j.jetstream.AddStream(&nats.StreamConfig{ + // nolint: exhaustruct + if _, err := j.jetstream.AddStream(&nats.StreamConfig{ Name: stream.Name, Subjects: []string{stream.Subject}, - }) - if err != nil { + }); err != nil { j.logger.Error("could not add stream", zap.String("stream", stream.Name), zap.Error(err)) } + j.logger.Info("add new stream") } @@ -121,6 +133,7 @@ func (j *Jetstream) StartBlackboxTest() { if j.config.Streams == nil { j.logger.Panic("at least one stream is required.") } + for _, stream := range j.config.Streams { messageChannel := j.createSubscribe(stream.Subject) go j.jetstreamPublish(stream.Subject, stream.Name) @@ -128,70 +141,80 @@ func (j *Jetstream) StartBlackboxTest() { } } -// Subscribe subscribes to a list of subjects and returns a channel with incoming messages +// Subscribe subscribes to a list of subjects and returns a channel with incoming messages. func (j *Jetstream) createSubscribe(subject string) chan *Message { - messageHandler, h := j.messageHandlerFactoryJetstream() - _, err := j.jetstream.Subscribe( + + if _, err := j.jetstream.Subscribe( subject, messageHandler, nats.DeliverNew(), nats.ReplayInstant(), nats.AckExplicit(), nats.MaxAckPending(j.config.MaxPubAcksInflight), - ) - if err != nil { + ); err != nil { j.logger.Panic("could not Subscribe", zap.Error(err)) - } else { - j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject)) } - return h + j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject)) + return h } func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) { clusterName := j.connection.ConnectedClusterName() + for msg := range h { var publishTime time.Time - err := publishTime.UnmarshalBinary(msg.Data) - if err != nil { + + if err := publishTime.UnmarshalBinary(msg.Data); err != nil { j.logger.Error("unable to unmarshal binary data for publishTime.") - j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject)) + j.logger.Info("received message but could not calculate latency due to unmarshalling error.", + zap.String("subject", msg.Subject), + ) + return } + latency := time.Since(publishTime).Seconds() + j.metrics.Latency.With(prometheus.Labels{ "stream": streamName, "cluster": clusterName, }).Observe(latency) + j.metrics.SuccessCounter.With(prometheus.Labels{ "type": successfulSubscribe, "stream": streamName, "cluster": clusterName, }).Add(1) + j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency)) } } func (j *Jetstream) jetstreamPublish(subject string, streamName string) { clusterName := j.connection.ConnectedClusterName() + for { t, err := time.Now().MarshalBinary() if err != nil { j.logger.Error("could not marshal current time.", zap.Error(err)) } + if ack, err := j.jetstream.Publish(subject, t); err != nil { j.metrics.SuccessCounter.With(prometheus.Labels{ "type": failedPublish, "stream": streamName, "cluster": clusterName, }).Add(1) - if err == nats.ErrTimeout { + + switch { + case errors.Is(err, nats.ErrTimeout): j.logger.Error("Request timeout: No response received within the timeout period.") - } else if err == nats.ErrNoStreamResponse { + case errors.Is(err, nats.ErrNoStreamResponse): j.logger.Error("Request failed: No Stream available for the subject.") - } else { + default: j.logger.Error("Request failed: %v", zap.Error(err)) } } else { @@ -209,19 +232,20 @@ func (j *Jetstream) jetstreamPublish(subject string, streamName string) { func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { ch := make(chan *Message) + return func(msg *nats.Msg) { ch <- &Message{ Subject: msg.Subject, Data: msg.Data, } - err := msg.Ack() - if err != nil { + + if err := msg.Ack(); err != nil { j.logger.Error("Failed to acknowledge the message", zap.Error(err)) } }, ch } -// Close closes NATS connection +// Close closes NATS connection. func (j *Jetstream) Close() { if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { j.logger.Error("could not flush", zap.Error(err)) diff --git a/internal/natsclient/metric.go b/internal/natsclient/metric.go index ca64ed8..9005e02 100644 --- a/internal/natsclient/metric.go +++ b/internal/natsclient/metric.go @@ -11,43 +11,6 @@ const ( Subsystem = "client" ) -var latencyBuckets = []float64{ - 0.001, - 0.0015, - 0.002, - 0.0025, - 0.003, - 0.0035, - 0.004, - 0.0045, - 0.005, - 0.0055, - 0.006, - 0.0065, - 0.007, - 0.0075, - 0.008, - 0.0085, - 0.009, - 0.0095, - 0.01, - 0.015, - 0.02, - 0.025, - 0.03, - 0.045, - 0.05, - 0.065, - 0.07, - 0.08, - 0.09, - 0.1, - 0.2, - 0.3, - 0.5, - 1, -} - // Metrics has all the client metrics. type Metrics struct { Connection prometheus.CounterVec @@ -93,7 +56,45 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom return *ev } +// nolint: funlen func NewMetrics() Metrics { + latencyBuckets := []float64{ + 0.001, + 0.0015, + 0.002, + 0.0025, + 0.003, + 0.0035, + 0.004, + 0.0045, + 0.005, + 0.0055, + 0.006, + 0.0065, + 0.007, + 0.0075, + 0.008, + 0.0085, + 0.009, + 0.0095, + 0.01, + 0.015, + 0.02, + 0.025, + 0.03, + 0.045, + 0.05, + 0.065, + 0.07, + 0.08, + 0.09, + 0.1, + 0.2, + 0.3, + 0.5, + 1, + } + return Metrics{ Connection: newCounterVec(prometheus.CounterOpts{ Namespace: Namespace,