Skip to content

Commit

Permalink
Merge pull request #608 from ripienaar/check_helpers_2
Browse files Browse the repository at this point in the history
Add more helpers and adjust checks
  • Loading branch information
ripienaar authored Jan 8, 2025
2 parents 8f24f76 + 7b788b8 commit e4ccd32
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 204 deletions.
154 changes: 70 additions & 84 deletions audit/account_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package audit
import (
"errors"
"fmt"

"github.com/nats-io/jsm.go/api"
"github.com/nats-io/jsm.go/audit/archive"
"github.com/nats-io/nats-server/v2/server"
Expand Down Expand Up @@ -50,6 +49,7 @@ func RegisterAccountChecks(collection *CheckCollection) error {
func checkAccountLimits(check *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) {
connectionsThreshold := check.Configuration["connections"].Value()
subscriptionsThreshold := check.Configuration["subscriptions"].Value()
accountDetailsTag := archive.TagAccountInfo()

// Check value against limit threshold, create example if exceeded
checkLimit := func(limitName, serverName, accountName string, value, limit int64, percentThreshold float64) {
Expand All @@ -60,100 +60,86 @@ func checkAccountLimits(check *Check, r *archive.Reader, examples *ExamplesColle

threshold := int64(float64(limit) * percentThreshold)
if value > threshold {
examples.Add(
"account %s (on %s) using %.1f%% of %s limit (%d/%d)",
accountName,
serverName,
float64(value)*100/float64(limit),
limitName,
value,
limit,
)
examples.Add("account %s (on %s) using %.1f%% of %s limit (%d/%d)", accountName, serverName, float64(value)*100/float64(limit), limitName, value, limit)
}
}

accountsTag := archive.TagServerAccounts()
accountDetailsTag := archive.TagAccountInfo()
for _, clusterName := range r.ClusterNames() {
clusterTag := archive.TagCluster(clusterName)
_, err := r.EachClusterServerAccountz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, az *server.ServerAPIAccountzResponse) error {
if errors.Is(err, archive.ErrNoMatches) {
log.Warnf("Artifact 'ACCOUNTZ' is missing for server %s", serverTag)
return nil
} else if err != nil {
return fmt.Errorf("failed to load variables for server %s: %w", serverTag, err)
}

for _, serverName := range r.ClusterServerNames(clusterName) {
serverTag := archive.TagServer(serverName)
accountz := az.Data
for _, accountName := range accountz.Accounts {
accountTag := archive.TagAccount(accountName)

var resp server.ServerAPIAccountzResponse
var accountz *server.Accountz
err := r.Load(&resp, clusterTag, serverTag, accountsTag)
var accountInfo server.AccountInfo
err := r.Load(&accountInfo, serverTag, accountTag, accountDetailsTag)
if errors.Is(err, archive.ErrNoMatches) {
log.Warnf("Artifact 'ACCOUNTZ' is missing for server %s cluster %s", serverName, clusterName)
log.Warnf("Account details is missing for account %s, server %s", accountName, serverTag)
continue
} else if err != nil {
return Skipped, fmt.Errorf("failed to load ACCOUNTZ for server %s: %w", serverName, err)
return fmt.Errorf("failed to load Account details from server %s for account %s, error: %w", serverTag, accountName, err)
}
accountz = resp.Data

for _, accountName := range accountz.Accounts {
accountTag := archive.TagAccount(accountName)

var accountInfo server.AccountInfo
err := r.Load(&accountInfo, serverTag, accountTag, accountDetailsTag)
if errors.Is(err, archive.ErrNoMatches) {
log.Warnf("Account details is missing for account %s, server %s", accountName, serverName)
continue
} else if err != nil {
return Skipped, fmt.Errorf("failed to load Account details from server %s for account %s, error: %w", serverTag.Value, accountName, err)
}

if accountInfo.Claim == nil {
// Can't check limits without a claim
continue
}

checkLimit(
"client connections",
serverTag.Value,
accountName,
int64(accountInfo.ClientCnt),
accountInfo.Claim.Limits.Conn,
connectionsThreshold,
)

checkLimit(
"client connections (account)",
serverTag.Value,
accountName,
int64(accountInfo.ClientCnt),
accountInfo.Claim.Limits.AccountLimits.Conn,
connectionsThreshold,
)

checkLimit(
"leaf connections",
serverTag.Value,
accountName,
int64(accountInfo.LeafCnt),
accountInfo.Claim.Limits.LeafNodeConn,
connectionsThreshold,
)

checkLimit(
"leaf connections (account)",
serverTag.Value,
accountName,
int64(accountInfo.LeafCnt),
accountInfo.Claim.Limits.AccountLimits.LeafNodeConn,
connectionsThreshold,
)

checkLimit(
"subscriptions",
serverTag.Value,
accountName,
int64(accountInfo.SubCnt),
accountInfo.Claim.Limits.Subs,
subscriptionsThreshold,
)

if accountInfo.Claim == nil {
// Can't check limits without a claim
continue
}

checkLimit(
"client connections",
serverTag.Value,
accountName,
int64(accountInfo.ClientCnt),
accountInfo.Claim.Limits.Conn,
connectionsThreshold,
)

checkLimit(
"client connections (account)",
serverTag.Value,
accountName,
int64(accountInfo.ClientCnt),
accountInfo.Claim.Limits.AccountLimits.Conn,
connectionsThreshold,
)

checkLimit(
"leaf connections",
serverTag.Value,
accountName,
int64(accountInfo.LeafCnt),
accountInfo.Claim.Limits.LeafNodeConn,
connectionsThreshold,
)

checkLimit(
"leaf connections (account)",
serverTag.Value,
accountName,
int64(accountInfo.LeafCnt),
accountInfo.Claim.Limits.AccountLimits.LeafNodeConn,
connectionsThreshold,
)

checkLimit(
"subscriptions",
serverTag.Value,
accountName,
int64(accountInfo.SubCnt),
accountInfo.Claim.Limits.Subs,
subscriptionsThreshold,
)
}

return nil
})
if err != nil {
return Skipped, err
}

if examples.Count() > 0 {
Expand Down
34 changes: 34 additions & 0 deletions audit/archive/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,40 @@ func (r *Reader) EachClusterServerJsz(cb func(clusterTag *Tag, serverTag *Tag, e
})
}

// EachClusterServerAccountz iterates over all servers ordered by cluster and calls the callback function with the loaded Accountz response
//
// The callback function will receive any error encountered during loading the server varz file and should check that and handle it
// If the callback returns an error iteration is stopped and that error is returned
//
// Errors returned match those documented in Load() otherwise any other error that are encountered
func (r *Reader) EachClusterServerAccountz(cb func(clusterTag *Tag, serverTag *Tag, err error, jsz *server.ServerAPIAccountzResponse) error) (int, error) {
return r.eachClusterServer(TagServerAccounts(), server.ServerAPIAccountzResponse{}, func(clusterTag *Tag, serverTag *Tag, err error, resp any) error {
az := resp.(*server.ServerAPIAccountzResponse)
if az == nil || az.Data == nil {
err = ErrNoMatches
}

return cb(clusterTag, serverTag, err, az)
})
}

// EachClusterServerLeafz iterates over all servers ordered by cluster and calls the callback function with the loaded Leafz response
//
// The callback function will receive any error encountered during loading the server varz file and should check that and handle it
// If the callback returns an error iteration is stopped and that error is returned
//
// Errors returned match those documented in Load() otherwise any other error that are encountered
func (r *Reader) EachClusterServerLeafz(cb func(clusterTag *Tag, serverTag *Tag, err error, jsz *server.ServerAPILeafzResponse) error) (int, error) {
return r.eachClusterServer(TagServerLeafs(), server.ServerAPIAccountzResponse{}, func(clusterTag *Tag, serverTag *Tag, err error, resp any) error {
az := resp.(*server.ServerAPILeafzResponse)
if az == nil || az.Data == nil {
err = ErrNoMatches
}

return cb(clusterTag, serverTag, err, az)
})
}

// helper to iterate all servers, creates instances of targetType based on tag. targetType must be a non pointer like server.ServerAPIJszResponse{}
func (r *Reader) eachClusterServer(tag *Tag, targetType any, cb func(clusterTag *Tag, serverTag *Tag, err error, resp any) error) (int, error) {
found := 0
Expand Down
53 changes: 17 additions & 36 deletions audit/cluster_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,7 @@ func checkClusterMemoryUsageOutliers(check *Check, r *archive.Reader, examples *

for serverName, serverMemoryUsage := range clusterMemoryUsageMap {
if serverMemoryUsage > threshold {
examples.Add(
"Cluster %s avg: %s, server %s: %s",
clusterName,
humanize.IBytes(uint64(clusterMemoryUsageMean)),
serverName,
humanize.IBytes(uint64(serverMemoryUsage)),
)
examples.Add("Cluster %s avg: %s, server %s: %s", clusterName, humanize.IBytes(uint64(clusterMemoryUsageMean)), serverName, humanize.IBytes(uint64(serverMemoryUsage)))
clustersWithIssuesMap[clusterName] = nil
}
}
Expand Down Expand Up @@ -202,15 +196,7 @@ func checkClusterUniformGatewayConfig(_ *Check, r *archive.Reader, examples *Exa
var previousTargetClusterNames []string
for serverName, targetClusterNames := range t.configuredGateways {
if previousTargetClusterNames != nil {
log.Debugf(
"Cluster %s - Comparing configured %s gateways of %s (%d) to %s (%d)",
clusterName,
t.gatewayType,
serverName,
len(targetClusterNames),
previousServerName,
len(previousTargetClusterNames),
)
log.Debugf("Cluster %s - Comparing configured %s gateways of %s (%d) to %s (%d)", clusterName, t.gatewayType, serverName, len(targetClusterNames), previousServerName, len(previousTargetClusterNames))
if !reflect.DeepEqual(targetClusterNames, previousTargetClusterNames) {
examples.Add(
"Cluster %s, %s gateways server %s: %v != server %s: %v",
Expand Down Expand Up @@ -238,33 +224,28 @@ func checkClusterUniformGatewayConfig(_ *Check, r *archive.Reader, examples *Exa

// checkClusterHighHAAssets verifies the number of HA assets is below some the given number for each known server in each known cluster
func checkClusterHighHAAssets(check *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) {
jsTag := archive.TagServerJetStream()
haAssetsThreshold := check.Configuration["assets"].Value()

for _, clusterName := range r.ClusterNames() {
clusterTag := archive.TagCluster(clusterName)
for _, serverName := range r.ClusterServerNames(clusterName) {
serverTag := archive.TagServer(serverName)

var resp server.ServerAPIJszResponse
var serverJSInfo *server.JSInfo
err := r.Load(&resp, clusterTag, serverTag, jsTag)
if errors.Is(err, archive.ErrNoMatches) {
log.Warnf("Artifact 'JSZ' is missing for server %s cluster %s", serverName, clusterName)
continue
} else if err != nil {
return Skipped, fmt.Errorf("failed to load JSZ for server %s: %w", serverName, err)
}
serverJSInfo = resp.Data
_, err := r.EachClusterServerJsz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, jsz *server.ServerAPIJszResponse) error {
if errors.Is(err, archive.ErrNoMatches) {
log.Warnf("Artifact 'JSZ' is missing for server %s", serverTag)
return nil
} else if err != nil {
return fmt.Errorf("failed to load variables for server %s: %w", serverTag, err)
}

if float64(serverJSInfo.HAAssets) > haAssetsThreshold {
examples.Add("%s: %d HA assets", serverName, serverJSInfo.HAAssets)
}
if float64(jsz.Data.HAAssets) > haAssetsThreshold {
examples.Add("%s: %d HA assets", serverTag, jsz.Data.HAAssets)
}

return nil
})
if err != nil {
return Skipped, err
}

if examples.Count() > 0 {
log.Errorf("Found %d servers with >%d HA assets", examples.Count(), haAssetsThreshold)
log.Errorf("Found %d servers with JetStream domains containing whitespace", examples.Count())
return PassWithIssues, nil
}

Expand Down
45 changes: 5 additions & 40 deletions audit/jetstream_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,7 @@ func checkStreamLaggingReplicas(check *Check, r *archive.Reader, examples *Examp
streamTag := archive.TagStream(streamName)
serverNames := r.StreamServerNames(accountName, streamName)

log.Debugf(
"Inspecting account '%s' stream '%s', found %d servers: %v",
accountName,
streamName,
len(serverNames),
serverNames,
)
log.Debugf("Inspecting account '%s' stream '%s', found %d servers: %v", accountName, streamName, len(serverNames), serverNames)

// Create map server->streamDetails
replicasStreamDetails := make(map[string]*api.StreamInfo, len(serverNames))
Expand All @@ -144,13 +138,7 @@ func checkStreamLaggingReplicas(check *Check, r *archive.Reader, examples *Examp
streamDetails := &api.StreamInfo{}
err := r.Load(streamDetails, accountTag, streamTag, serverTag, typeTag)
if errors.Is(err, archive.ErrNoMatches) {
log.Warnf(
"Artifact not found: %s for stream %s in account %s by server %s",
typeTag.Value,
streamName,
accountName,
serverName,
)
log.Warnf("Artifact not found: %s for stream %s in account %s by server %s", typeTag, streamName, accountName, serverName)
continue
} else if err != nil {
return Skipped, fmt.Errorf("failed to lookup stream artifact: %w", err)
Expand All @@ -176,13 +164,7 @@ func checkStreamLaggingReplicas(check *Check, r *archive.Reader, examples *Examp
highestLastSeqServer = serverName
}
}
log.Debugf(
"Stream %s / %s highest last sequence: %d @ %s",
accountName,
streamName,
highestLastSeq,
highestLastSeqServer,
)
log.Debugf("Stream %s / %s highest last sequence: %d @ %s", accountName, streamName, highestLastSeq, highestLastSeqServer)

// Check if some server's sequence is below warning threshold
maxDelta := uint64(float64(highestLastSeq) * lastSequenceLagThreshold)
Expand All @@ -193,15 +175,7 @@ func checkStreamLaggingReplicas(check *Check, r *archive.Reader, examples *Examp
for serverName, streamDetail := range replicasStreamDetails {
lastSeq := streamDetail.State.LastSeq
if lastSeq < threshold {
examples.Add(
"%s/%s server %s lastSequence: %d is behind highest lastSequence: %d on server: %s",
accountName,
streamName,
serverName,
lastSeq,
highestLastSeq,
highestLastSeqServer,
)
examples.Add("%s/%s server %s lastSequence: %d is behind highest lastSequence: %d on server: %s", accountName, streamName, serverName, lastSeq, highestLastSeq, highestLastSeqServer)
laggingReplicas += 1
}
}
Expand Down Expand Up @@ -272,16 +246,7 @@ func checkStreamLimits(check *Check, r *archive.Reader, examples *ExamplesCollec
}
threshold := int64(float64(limit) * percentThreshold)
if value > threshold {
examples.Add(
"stream %s (in %s on %s) using %.1f%% of %s limit (%d/%d)",
streamName,
accountName,
serverName,
float64(value)*100/float64(limit),
limitName,
value,
limit,
)
examples.Add("stream %s (in %s on %s) using %.1f%% of %s limit (%d/%d)", streamName, accountName, serverName, float64(value)*100/float64(limit), limitName, value, limit)
}
}

Expand Down
Loading

0 comments on commit e4ccd32

Please sign in to comment.