Skip to content

Commit

Permalink
feat: add more linting rules
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Jul 29, 2024
1 parent c75a59c commit 22c00bd
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 95 deletions.
28 changes: 22 additions & 6 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
---
linters:
enable:
- gofmt
- govet
- revive
- staticcheck
- errcheck
enable-all: true
disable:
- depguard
# we don't use json with camel-case
- tagliatelle
- nolintlint
# it should improve to support more known patterns
- varnamelen
- ireturn
# deprecated linters
- maligned
- scopelint
- golint
- ifshort
- interfacer
- exhaustivestruct
- nosnakecase
- varcheck
- deadcode
- structcheck
- gomnd
- execinquery
4 changes: 2 additions & 2 deletions internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
// ExitFailure status code.
const ExitFailure = 1

var configPath string

func Execute() {
var configPath string

pflag.StringVar(&configPath, "configPath", "./config.yaml", "Path to config file")
pflag.Parse()

Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type (
// Config holds all configurations.
Config struct {
Logger logger.Config `json:"logger,omitempty" koanf:"logger"`
NATS natsclient.Config `json:"nats,omitempty" koanf:"nats"`
NATS natsclient.Config `json:"nats,omitempty" koanf:"nats"`
Metric metric.Config `json:"metric,omitempty" koanf:"metric"`
}
)
Expand Down
9 changes: 5 additions & 4 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ func Default() Config {
NATS: natsclient.Config{
AllExistingStreams: false,
NewStreamAllow: true,
Streams: []natsclient.Stream{{
Name: "test",
Subject: "test",
},
Streams: []natsclient.Stream{
{
Name: "test",
Subject: "test",
},
},
URL: "localhost:4222",
PublishInterval: 2 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion internal/metric/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package metric

type Config struct {
Server Server `json:"server,omitempty" koanf:"server"`
Server Server `json:"server,omitempty" koanf:"server"`
Enabled bool `json:"enabled,omitempty" koanf:"enabled"`
}

Expand Down
20 changes: 10 additions & 10 deletions internal/natsclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ package natsclient
import "time"

type Config struct {
AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"`
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Streams []Stream `json:"streams,omitempty" koanf:"streams"`
AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"`
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Streams []Stream `json:"streams,omitempty" koanf:"streams"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
}

type Stream struct {
Name string `json:"name,omitempty" koanf:"name"`
Subject string `json:"subject,omitempty" koanf:"subject"`
Name string `json:"name,omitempty" koanf:"name"`
Subject string `json:"subject,omitempty" koanf:"subject"`
}
92 changes: 58 additions & 34 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package natsclient

import (
"errors"
"slices"
"time"

Expand All @@ -9,7 +10,7 @@ import (
"go.uber.org/zap"
)

var (
const (
successfulSubscribe = "successful subscribe"
failedPublish = "failed publish"
successfulPublish = "successful publish"
Expand All @@ -21,7 +22,7 @@ type Message struct {
Data []byte
}

// Jetstream represents the NATS core handler
// Jetstream represents the NATS core handler.
type Jetstream struct {
connection *nats.Conn
jetstream nats.JetStreamContext
Expand All @@ -30,12 +31,14 @@ type Jetstream struct {
metrics Metrics
}

// NewJetstream initializes NATS JetStream connection
// NewJetstream initializes NATS JetStream connection.
func NewJetstream(config Config, logger *zap.Logger) *Jetstream {
j := &Jetstream{
config: &config,
logger: logger,
metrics: NewMetrics(),
config: &config,
logger: logger,
jetstream: nil,
connection: nil,
metrics: NewMetrics(),
}

j.connect()
Expand All @@ -49,8 +52,8 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream {

func (j *Jetstream) connect() {
var err error
j.connection, err = nats.Connect(j.config.URL)
if err != nil {

if j.connection, err = nats.Connect(j.config.URL); err != nil {
j.logger.Panic("could not connect to nats", zap.Error(err))
}

Expand All @@ -67,8 +70,8 @@ func (j *Jetstream) connect() {

func (j *Jetstream) createJetstreamContext() {
var err error
j.jetstream, err = j.connection.JetStream()
if err != nil {

if j.jetstream, err = j.connection.JetStream(); err != nil {
j.logger.Panic("could not connect to jetstream", zap.Error(err))
}
}
Expand All @@ -77,121 +80,141 @@ func (j *Jetstream) UpdateOrCreateStream() {
if j.config.AllExistingStreams {
streamNames := j.jetstream.StreamNames()
for stream := range streamNames {
j.config.Streams = append(j.config.Streams, Stream{Name: stream})
j.config.Streams = append(j.config.Streams, Stream{Name: stream, Subject: ""})
}
}

for i, stream := range j.config.Streams {
if stream.Subject == "" {
j.config.Streams[i].Subject = stream.Name + subjectSuffix
}

info, err := j.jetstream.StreamInfo(stream.Name)
if err == nil {
if err == nil { // nolint: gocritic
j.updateStream(j.config.Streams[i], info)
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
} else if errors.Is(err, nats.ErrStreamNotFound) && j.config.NewStreamAllow {
j.createStream(j.config.Streams[i])
} else {
j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err))
}
}
}

func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) {
subjects := append(info.Config.Subjects, stream.Subject)
subjects := make([]string, len(info.Config.Subjects)+1)
copy(subjects, info.Config.Subjects)
subjects[len(info.Config.Subjects)] = stream.Subject

// remove duplicate subjects in the list.
slices.Sort(subjects)
info.Config.Subjects = slices.Compact(subjects)

_, err := j.jetstream.UpdateStream(&info.Config)
if err != nil {
j.logger.Error("could not add subject to existing stream", zap.String("stream", stream.Name), zap.Error(err))
}

j.logger.Info("stream updated")
}

func (j *Jetstream) createStream(stream Stream) {
_, err := j.jetstream.AddStream(&nats.StreamConfig{
// nolint: exhaustruct
if _, err := j.jetstream.AddStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: []string{stream.Subject},
})
if err != nil {
}); err != nil {
j.logger.Error("could not add stream", zap.String("stream", stream.Name), zap.Error(err))
}

j.logger.Info("add new stream")
}

func (j *Jetstream) StartBlackboxTest() {
if j.config.Streams == nil {
j.logger.Panic("at least one stream is required.")
}

for _, stream := range j.config.Streams {
messageChannel := j.createSubscribe(stream.Subject)
go j.jetstreamPublish(stream.Subject, stream.Name)
go j.jetstreamSubscribe(messageChannel, stream.Name)
}
}

// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
// Subscribe subscribes to a list of subjects and returns a channel with incoming messages.
func (j *Jetstream) createSubscribe(subject string) chan *Message {

messageHandler, h := j.messageHandlerFactoryJetstream()
_, err := j.jetstream.Subscribe(

if _, err := j.jetstream.Subscribe(
subject,
messageHandler,
nats.DeliverNew(),
nats.ReplayInstant(),
nats.AckExplicit(),
nats.MaxAckPending(j.config.MaxPubAcksInflight),
)
if err != nil {
); err != nil {
j.logger.Panic("could not Subscribe", zap.Error(err))
} else {
j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject))
}

return h
j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject))

return h
}

func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {
clusterName := j.connection.ConnectedClusterName()

for msg := range h {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
if err != nil {

if err := publishTime.UnmarshalBinary(msg.Data); err != nil {
j.logger.Error("unable to unmarshal binary data for publishTime.")
j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject))
j.logger.Info("received message but could not calculate latency due to unmarshalling error.",
zap.String("subject", msg.Subject),
)

return
}

latency := time.Since(publishTime).Seconds()

j.metrics.Latency.With(prometheus.Labels{
"stream": streamName,
"cluster": clusterName,
}).Observe(latency)

j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulSubscribe,
"stream": streamName,
"cluster": clusterName,
}).Add(1)

j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
}
}

func (j *Jetstream) jetstreamPublish(subject string, streamName string) {
clusterName := j.connection.ConnectedClusterName()

for {
t, err := time.Now().MarshalBinary()
if err != nil {
j.logger.Error("could not marshal current time.", zap.Error(err))
}

if ack, err := j.jetstream.Publish(subject, t); err != nil {
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": failedPublish,
"stream": streamName,
"cluster": clusterName,
}).Add(1)
if err == nats.ErrTimeout {

switch {
case errors.Is(err, nats.ErrTimeout):
j.logger.Error("Request timeout: No response received within the timeout period.")
} else if err == nats.ErrNoStreamResponse {
case errors.Is(err, nats.ErrNoStreamResponse):
j.logger.Error("Request failed: No Stream available for the subject.")
} else {
default:
j.logger.Error("Request failed: %v", zap.Error(err))
}
} else {
Expand All @@ -209,19 +232,20 @@ func (j *Jetstream) jetstreamPublish(subject string, streamName string) {

func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) {
ch := make(chan *Message)

return func(msg *nats.Msg) {
ch <- &Message{
Subject: msg.Subject,
Data: msg.Data,
}
err := msg.Ack()
if err != nil {

if err := msg.Ack(); err != nil {
j.logger.Error("Failed to acknowledge the message", zap.Error(err))
}
}, ch
}

// Close closes NATS connection
// Close closes NATS connection.
func (j *Jetstream) Close() {
if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil {
j.logger.Error("could not flush", zap.Error(err))
Expand Down
Loading

0 comments on commit 22c00bd

Please sign in to comment.