Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve api consistency and use some server structs #626

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion audit/jetstream_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func checkStreamMetadataMonitoring(_ *Check, r *archive.Reader, examples *Exampl
}

opts.StreamName = streamName
monitor.StreamInfoHealthCheck(&streamDetails, check, *opts, log)
monitor.CheckStreamInfoHealth(&streamDetails, check, *opts, log)

for _, warning := range check.Warnings {
examples.Add("WARNING: stream %s in %s: %s", streamName, accountName, warning)
Expand Down
6 changes: 3 additions & 3 deletions monitor/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/nats-io/nats.go"
)

// ConnectionCheckOptions configures the NATS Connection check
type ConnectionCheckOptions struct {
// CheckConnectionOptions configures the NATS Connection check
type CheckConnectionOptions struct {
// ConnectTimeWarning warning threshold for time to establish the connection (seconds)
ConnectTimeWarning float64 `json:"connect_time_warning" yaml:"connect_time_warning"`
// ConnectTimeCritical critical threshold for time to establish the connection (seconds)
Expand All @@ -36,7 +36,7 @@ type ConnectionCheckOptions struct {
RequestRttCritical float64 `json:"request_rtt_critical" yaml:"request_rtt_critical"`
}

func CheckConnection(server string, nopts []nats.Option, timeout time.Duration, check *Result, opts ConnectionCheckOptions) error {
func CheckConnection(server string, nopts []nats.Option, timeout time.Duration, check *Result, opts CheckConnectionOptions) error {
connStart := time.Now()
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "connection failed: %v", err) {
Expand Down
6 changes: 3 additions & 3 deletions monitor/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/nats-io/nkeys"
)

// CredentialCheckOptions configures the credentials check
type CredentialCheckOptions struct {
// CheckCredentialOptions configures the credentials check
type CheckCredentialOptions struct {
// File is the file holding the credential
File string `json:"file" yaml:"file"`
// ValidityWarning is the warning threshold for credential validity (seconds)
Expand All @@ -33,7 +33,7 @@ type CredentialCheckOptions struct {
RequiresExpiry bool `json:"requires_expiry" yaml:"requires_expiry"`
}

func CheckCredential(check *Result, opts CredentialCheckOptions) error {
func CheckCredential(check *Result, opts CheckCredentialOptions) error {
ok, err := fileAccessible(opts.File)
if err != nil {
check.Critical("credential not accessible: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions monitor/credentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI
}

t.Run("no expiry", func(t *testing.T) {
opts := monitor.CredentialCheckOptions{
opts := monitor.CheckCredentialOptions{
File: writeCred(t, noExpiry),
RequiresExpiry: true,
}
Expand All @@ -80,7 +80,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI

t.Run("critical", func(t *testing.T) {
check := &monitor.Result{}
assertNoError(t, monitor.CheckCredential(check, monitor.CredentialCheckOptions{
assertNoError(t, monitor.CheckCredential(check, monitor.CheckCredentialOptions{
File: writeCred(t, noExpiry),
ValidityCritical: 100 * 24 * 365 * 60 * 60,
}))
Expand All @@ -91,7 +91,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI

t.Run("warning", func(t *testing.T) {
check := &monitor.Result{}
assertNoError(t, monitor.CheckCredential(check, monitor.CredentialCheckOptions{
assertNoError(t, monitor.CheckCredential(check, monitor.CheckCredentialOptions{
File: writeCred(t, noExpiry),
ValidityWarning: 100 * 24 * 365 * 60 * 60,
}))
Expand Down
8 changes: 4 additions & 4 deletions monitor/js_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/nats-io/nats.go"
)

type JetStreamAccountOptions struct {
type CheckJetStreamAccountOptions struct {
MemoryWarning int `json:"memory_warning" yaml:"memory_warning"`
MemoryCritical int `json:"memory_critical" yaml:"memory_critical"`
FileWarning int `json:"file_warning" yaml:"file_warning"`
Expand All @@ -37,7 +37,7 @@ type JetStreamAccountOptions struct {
Resolver func() *api.JetStreamAccountStats `json:"-" yaml:"-"`
}

func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts JetStreamAccountOptions) error {
func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts CheckJetStreamAccountOptions) error {
var mgr *jsm.Manager
var err error

Expand Down Expand Up @@ -82,7 +82,7 @@ func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, op
return nil
}

func checkStreamClusterHealth(check *Result, opts *JetStreamAccountOptions, info []*jsm.Stream) error {
func checkStreamClusterHealth(check *Result, opts *CheckJetStreamAccountOptions, info []*jsm.Stream) error {
var okCnt, noLeaderCnt, notEnoughReplicasCnt, critCnt, lagCritCnt, seenCritCnt int

for _, s := range info {
Expand Down Expand Up @@ -177,7 +177,7 @@ func checkStreamClusterHealth(check *Result, opts *JetStreamAccountOptions, info
return nil
}

func checkJSAccountInfo(check *Result, opts *JetStreamAccountOptions, info *api.JetStreamAccountStats) error {
func checkJSAccountInfo(check *Result, opts *CheckJetStreamAccountOptions, info *api.JetStreamAccountStats) error {
if info == nil {
return fmt.Errorf("invalid account status")
}
Expand Down
4 changes: 2 additions & 2 deletions monitor/js_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func TestCheckAccountInfo(t *testing.T) {
setDefaults := func() (*monitor.JetStreamAccountOptions, *api.JetStreamAccountStats, *monitor.Result) {
setDefaults := func() (*monitor.CheckJetStreamAccountOptions, *api.JetStreamAccountStats, *monitor.Result) {
info := &api.JetStreamAccountStats{
JetStreamTier: api.JetStreamTier{
Memory: 128,
Expand All @@ -38,7 +38,7 @@ func TestCheckAccountInfo(t *testing.T) {
}

// cli defaults
cmd := &monitor.JetStreamAccountOptions{
cmd := &monitor.CheckJetStreamAccountOptions{
ConsumersCritical: -1,
ConsumersWarning: -1,
StreamCritical: -1,
Expand Down
6 changes: 3 additions & 3 deletions monitor/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/nats-io/nats.go"
)

// KVCheckOptions configures the KV check
type KVCheckOptions struct {
// CheckKVBucketAndKeyOptions configures the KV check
type CheckKVBucketAndKeyOptions struct {
// Bucket is the bucket to check
Bucket string `json:"bucket" yaml:"bucket"`
// Key requires a key to have a non delete/purge value set
Expand All @@ -31,7 +31,7 @@ type KVCheckOptions struct {
ValuesCritical int64 `json:"values_critical" yaml:"values_critical"`
}

func CheckKVBucketAndKey(server string, nopts []nats.Option, check *Result, opts KVCheckOptions) error {
func CheckKVBucketAndKey(server string, nopts []nats.Option, check *Result, opts CheckKVBucketAndKeyOptions) error {
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "connection failed: %v", err) {
return nil
Expand Down
8 changes: 4 additions & 4 deletions monitor/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
t.Run("Bucket", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}
err := monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.KVCheckOptions{
err := monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
})
checkErr(t, err, "check failed: %v", err)
Expand All @@ -121,7 +121,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
checkErr(t, err, "kv create failed")

check = &monitor.Result{}
err = monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.KVCheckOptions{
err = monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
ValuesCritical: -1,
ValuesWarning: -1,
Expand All @@ -142,7 +142,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
bucket, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
checkErr(t, err, "kv create failed: %v", err)

opts := monitor.KVCheckOptions{
opts := monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
ValuesWarning: 1,
ValuesCritical: 2,
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
bucket, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
checkErr(t, err, "kv create failed")

opts := monitor.KVCheckOptions{
opts := monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
Key: "KEY",
ValuesWarning: -1,
Expand Down
30 changes: 19 additions & 11 deletions monitor/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,25 @@ package monitor

import (
"encoding/json"
"fmt"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

type CheckMetaOptions struct {
type CheckJetstreamMetaOptions struct {
// ExpectServers the expected number of known servers in the meta cluster
ExpectServers int `json:"expect_servers" yaml:"expect_servers"`
// LagCritical the critical threshold for how many operations behind a peer may be
LagCritical uint64 `json:"lag_critical" yaml:"lag_critical"`
// SeenCritical the critical threshold for how long ago a peer was seen (seconds)
SeenCritical float64 `json:"seen_critical" yaml:"seen_critical"`

Resolver func(*nats.Conn) (*JSZResponse, error) `json:"-" yaml:"-"`
Resolver func(*nats.Conn) (*server.ServerAPIJszResponse, error) `json:"-" yaml:"-"`
}

type JSZResponse struct {
Data server.JSInfo `json:"data"`
Server server.ServerInfo `json:"server"`
}

func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts CheckMetaOptions) error {
func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts CheckJetstreamMetaOptions) error {
var nc *nats.Conn
var err error

Expand All @@ -47,8 +43,8 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts
return nil
}

opts.Resolver = func(conn *nats.Conn) (*JSZResponse, error) {
jszresp := &JSZResponse{}
opts.Resolver = func(conn *nats.Conn) (*server.ServerAPIJszResponse, error) {
jszresp := &server.ServerAPIJszResponse{}
jreq, err := json.Marshal(&server.JSzOptions{LeaderOnly: true})
if check.CriticalIfErr(err, "request failed: %v", err) {
return nil, err
Expand All @@ -60,7 +56,14 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts
}

err = json.Unmarshal(res.Data, jszresp)
check.CriticalIfErr(err, "invalid result received: %s", err)
if check.CriticalIfErr(err, "invalid result received: %s", err) {
return nil, err
}

if jszresp.Error == nil {
check.Critical("invalid result received: %s", jszresp.Error.Error())
return nil, fmt.Errorf("invalid result received: %s", jszresp.Error.Error())
}

return jszresp, nil
}
Expand All @@ -71,6 +74,11 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts
return nil
}

if jszresp.Data == nil {
check.Critical("no JSZ response received")
return nil
}

ci := jszresp.Data.Meta
if ci == nil {
check.Critical("no cluster information")
Expand Down
Loading