From de499da9388ca17b49ca590009cbcffc9efc660c Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Wed, 8 Jan 2025 16:30:58 +0100 Subject: [PATCH] Add some check helpers Update server checks to use them Signed-off-by: R.I.Pienaar --- audit/archive/reader.go | 79 +++++++++ audit/archive/tag.go | 2 + audit/checks.go | 7 + audit/examples_collection.go | 3 +- audit/gather/gather.go | 2 +- audit/gather/logger.go | 2 +- audit/server_checks.go | 318 ++++++++++++++--------------------- 7 files changed, 222 insertions(+), 191 deletions(-) diff --git a/audit/archive/reader.go b/audit/archive/reader.go index 72e0108a..3147fb5e 100644 --- a/audit/archive/reader.go +++ b/audit/archive/reader.go @@ -17,6 +17,9 @@ import ( "archive/zip" "encoding/json" "fmt" + "reflect" + + "github.com/nats-io/nats-server/v2/server" "io" "os" "slices" @@ -375,3 +378,79 @@ func shrinkMapOfSets[T any](m map[string]map[string]T) ([]string, map[string][]s slices.Sort(keysList) return keysList, newMap } + +// EachClusterServerVarz iterates over all servers ordered by cluster and calls the callback function with the loaded Varz response +// +// The callback function will receive any error encountered during loading the server varz file and should check that and handle it +// If the callback returns an error iteration is stopped and that error is returned +// +// Errors returned match those documented in Load() otherwise any other error that are encountered +func (r *Reader) EachClusterServerVarz(cb func(clusterTag *Tag, serverTag *Tag, err error, vz *server.ServerAPIVarzResponse) error) (int, error) { + return r.eachClusterServer(TagServerVars(), server.ServerAPIVarzResponse{}, func(clusterTag *Tag, serverTag *Tag, err error, resp any) error { + vz := resp.(*server.ServerAPIVarzResponse) + if vz == nil || vz.Data == nil { + err = ErrNoMatches + } + + return cb(clusterTag, serverTag, err, vz) + }) +} + +// EachClusterServerHealthz iterates over all servers ordered by cluster and calls the callback function with the loaded Healthz response +// +// The callback function will receive any error encountered during loading the server varz file and should check that and handle it +// If the callback returns an error iteration is stopped and that error is returned +// +// Errors returned match those documented in Load() otherwise any other error that are encountered +func (r *Reader) EachClusterServerHealthz(cb func(clusterTag *Tag, serverTag *Tag, err error, vz *server.ServerAPIHealthzResponse) error) (int, error) { + return r.eachClusterServer(TagServerJetStream(), server.ServerAPIHealthzResponse{}, func(clusterTag *Tag, serverTag *Tag, err error, resp any) error { + hz := resp.(*server.ServerAPIHealthzResponse) + if hz == nil || hz.Data == nil { + err = ErrNoMatches + } + + return cb(clusterTag, serverTag, err, hz) + }) +} + +// EachClusterServerJsz iterates over all servers ordered by cluster and calls the callback function with the loaded Jsz response +// +// The callback function will receive any error encountered during loading the server varz file and should check that and handle it +// If the callback returns an error iteration is stopped and that error is returned +// +// Errors returned match those documented in Load() otherwise any other error that are encountered +func (r *Reader) EachClusterServerJsz(cb func(clusterTag *Tag, serverTag *Tag, err error, jsz *server.ServerAPIJszResponse) error) (int, error) { + return r.eachClusterServer(TagServerJetStream(), server.ServerAPIJszResponse{}, func(clusterTag *Tag, serverTag *Tag, err error, resp any) error { + jsz := resp.(*server.ServerAPIJszResponse) + if jsz == nil || jsz.Data == nil { + err = ErrNoMatches + } + + return cb(clusterTag, serverTag, err, jsz) + }) +} + +// helper to iterate all servers, creates instances of targetType based on tag. targetType must be a non pointer like server.ServerAPIJszResponse{} +func (r *Reader) eachClusterServer(tag *Tag, targetType any, cb func(clusterTag *Tag, serverTag *Tag, err error, resp any) error) (int, error) { + found := 0 + + for _, clusterName := range r.ClusterNames() { + clusterTag := TagCluster(clusterName) + servers := r.ClusterServerNames(clusterName) + found = len(servers) + + for _, serverName := range servers { + serverTag := TagServer(serverName) + + resp := reflect.New(reflect.TypeOf(targetType)).Interface() + err := r.Load(&resp, clusterTag, serverTag, tag) + + err = cb(clusterTag, serverTag, err, resp) + if err != nil { + return found, err + } + } + } + + return found, nil +} diff --git a/audit/archive/tag.go b/audit/archive/tag.go index 6df0de9d..b4463e1e 100644 --- a/audit/archive/tag.go +++ b/audit/archive/tag.go @@ -25,6 +25,8 @@ type Tag struct { Value string } +func (t *Tag) String() string { return t.Value } + const ( serverTagLabel TagLabel = "server" clusterTagLabel TagLabel = "cluster" diff --git a/audit/checks.go b/audit/checks.go index 1ff99759..24291617 100644 --- a/audit/checks.go +++ b/audit/checks.go @@ -229,6 +229,13 @@ func (c *CheckCollection) ConfigurationItems() []*CheckConfiguration { } sort.Slice(res, func(i, j int) bool { + switch strings.Compare(res[i].Check, res[j].Check) { + case -1: + return true + case 1: + return false + } + return res[i].Key < res[j].Key }) diff --git a/audit/examples_collection.go b/audit/examples_collection.go index be9473e1..b38f8de1 100644 --- a/audit/examples_collection.go +++ b/audit/examples_collection.go @@ -41,7 +41,8 @@ func (c *ExamplesCollection) Add(format string, a ...any) { c.Examples = append(c.Examples, fmt.Sprintf(format, a...)) } -func (c *ExamplesCollection) clear() { +// Clear removes all added examples +func (c *ExamplesCollection) Clear() { c.Examples = []string{} } diff --git a/audit/gather/gather.go b/audit/gather/gather.go index d265d637..0893af4c 100644 --- a/audit/gather/gather.go +++ b/audit/gather/gather.go @@ -695,7 +695,7 @@ func Gather(nc *nats.Conn, conf *Configuration) error { cfg: conf, nc: nc, capture: &captureLogBuffer, - log: NewLogger(&captureLogBuffer, conf.LogLevel), + log: newLogger(&captureLogBuffer, conf.LogLevel), } return g.start() diff --git a/audit/gather/logger.go b/audit/gather/logger.go index 7c5ede7d..0c67350f 100644 --- a/audit/gather/logger.go +++ b/audit/gather/logger.go @@ -14,7 +14,7 @@ type logger struct { logFunc func(format string, a ...any) } -func NewLogger(capture io.Writer, level api.Level) api.Logger { +func newLogger(capture io.Writer, level api.Level) api.Logger { l := &logger{ lvl: level, logFunc: log.Printf, diff --git a/audit/server_checks.go b/audit/server_checks.go index 7d168b10..5d11d854 100644 --- a/audit/server_checks.go +++ b/audit/server_checks.go @@ -16,12 +16,11 @@ package audit import ( "errors" "fmt" - "strings" - "github.com/dustin/go-humanize" "github.com/nats-io/jsm.go/api" "github.com/nats-io/jsm.go/audit/archive" "github.com/nats-io/nats-server/v2/server" + "strings" ) func RegisterServerChecks(collection *CheckCollection) error { @@ -101,163 +100,129 @@ func RegisterServerChecks(collection *CheckCollection) error { } func checkServerAuthRequired(_ *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - notHealthy, healthy := 0, 0 - - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIVarzResponse - var varz *server.Varz - err := r.Load(&resp, clusterTag, serverTag, archive.TagServerVars()) - if errors.Is(err, archive.ErrNoMatches) { - log.Warnf("Artifact 'VARZ' is missing for server %s", serverName) - continue - } else if err != nil { - return Skipped, fmt.Errorf("failed to load variables for server %s: %w", serverName, err) - } - varz = resp.Data + total, err := r.EachClusterServerVarz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, vz *server.ServerAPIVarzResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'VARZ' is missing for server %s", serverTag) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag, err) + } - if varz.AuthRequired { - healthy++ - } else { - examples.Add("%s: authentication not required", serverName) - notHealthy++ - } + if !vz.Data.AuthRequired { + examples.Add("%s: authentication not required", serverTag) } + + return nil + }) + if err != nil { + return Skipped, err } - if notHealthy > 0 { - log.Errorf("%d/%d servers do not require authentication", notHealthy, healthy+notHealthy) + if examples.Count() > 0 { + log.Errorf("%d/%d servers do not require authentication", examples.Count(), total) return PassWithIssues, nil } - log.Infof("%d/%d servers require authentication", healthy, healthy) + log.Infof("%d/%d servers require authentication", total, total) return Pass, nil } // checkServerHealth verify all known servers are reporting healthy func checkServerHealth(_ *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - notHealthy, healthy := 0, 0 - - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIHealthzResponse - var health *server.HealthStatus - err := r.Load(&resp, clusterTag, serverTag, archive.TagServerHealth()) - if errors.Is(err, archive.ErrNoMatches) { - log.Warnf("Artifact 'HEALTHZ' is missing for server %s", serverName) - continue - } else if err != nil { - return Skipped, fmt.Errorf("failed to load health for server %s: %w", serverName, err) - } - health = resp.Data + total, err := r.EachClusterServerHealthz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, hz *server.ServerAPIHealthzResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'VARZ' is missing for server %s", serverTag) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag, err) + } - if health.Status != "ok" { - examples.Add("%s: %d - %s", serverName, health.StatusCode, health.Status) - notHealthy += 1 - } else { - healthy += 1 - } + if hz.Data.Status != "ok" { + examples.Add("%s: %d - %s", serverTag, hz.Data.StatusCode, hz.Data.Status) } + + return nil + }) + if err != nil { + return Skipped, err } - if notHealthy > 0 { - log.Errorf("%d/%d servers are not healthy", notHealthy, healthy+notHealthy) + if examples.Count() > 0 { + log.Errorf("%d/%d servers are not healthy", examples.Count(), total) return PassWithIssues, nil } - log.Infof("%d/%d servers are healthy", healthy, healthy) + log.Infof("%d/%d servers are healthy", total, total) + return Pass, nil } // checkServerVersions verify all known servers are running the same version func checkServerVersion(_ *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - versionsToServersMap := make(map[string][]string) - + seenVersions := make(map[string]struct{}) var lastVersionSeen string - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIVarzResponse - var serverVarz *server.Varz - err := r.Load(&resp, clusterTag, serverTag, archive.TagServerVars()) - if errors.Is(err, archive.ErrNoMatches) { - log.Warnf("Artifact 'VARZ' is missing for server %s", serverName) - continue - } else if err != nil { - return Skipped, fmt.Errorf("failed to load variables for server %s: %w", serverTag.Value, err) - } - serverVarz = resp.Data - version := serverVarz.Version + _, err := r.EachClusterServerVarz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, vz *server.ServerAPIVarzResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'VARZ' is missing for server %s", serverTag) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag, err) + } - _, exists := versionsToServersMap[version] - if !exists { - // First time encountering this version, create map entry - versionsToServersMap[version] = []string{} - // Add one example server for each version - examples.Add("%s - %s", serverName, version) - } - // Add this server to the list running this version - versionsToServersMap[version] = append(versionsToServersMap[version], serverName) - lastVersionSeen = version + lastVersionSeen = vz.Data.Version + _, exists := seenVersions[lastVersionSeen] + if !exists { + seenVersions[lastVersionSeen] = struct{}{} + examples.Add("%s - %s", serverTag, lastVersionSeen) } + + return nil + }) + if err != nil { + return Skipped, err } - if len(versionsToServersMap) == 0 { + if len(seenVersions) == 0 { log.Warnf("No servers version information found") return Skipped, nil - } else if len(versionsToServersMap) > 1 { - log.Errorf("Servers are running %d different versions", len(versionsToServersMap)) + } else if len(seenVersions) > 1 { + log.Errorf("Servers are running %d different versions", len(seenVersions)) return Fail, nil } // Map contains exactly one element (i.e. one version) - examples.clear() + examples.Clear() log.Infof("All servers are running version %s", lastVersionSeen) + return Pass, nil } // checkServerCPUUsage verify CPU usage is below the given threshold for each server func checkServerCPUUsage(check *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - severVarsTag := archive.TagServerVars() cpuThreshold := check.Configuration["cpu"].Value() - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIVarzResponse - var serverVarz *server.Varz - err := r.Load(&resp, serverTag, clusterTag, severVarsTag) - if errors.Is(err, archive.ErrNoMatches) { - log.Warnf("Artifact 'VARZ' is missing for server %s", serverName) - continue - } else if err != nil { - return Skipped, fmt.Errorf("failed to load VARZ for server %s: %w", serverName, err) - } - serverVarz = resp.Data + _, err := r.EachClusterServerVarz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, vz *server.ServerAPIVarzResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'VARZ' is missing for server %s", serverTag.Value) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag.Value, err) + } - // Example: 350% usage with 4 cores => 87.5% averaged - averageCpuUtilization := serverVarz.CPU / float64(serverVarz.Cores) + // Example: 350% usage with 4 cores => 87.5% averaged + averageCpuUtilization := vz.Data.CPU / float64(vz.Data.Cores) - if averageCpuUtilization > cpuThreshold { - examples.Add("%s - %s: %.1f%%", clusterName, serverName, averageCpuUtilization) - } + if averageCpuUtilization > cpuThreshold { + examples.Add("%s - %s: %.1f%%", clusterTag, serverTag, averageCpuUtilization) } + + return nil + }) + if err != nil { + return Skipped, err } if examples.Count() > 0 { @@ -270,85 +235,65 @@ func checkServerCPUUsage(check *Check, r *archive.Reader, examples *ExamplesColl // checkSlowConsumers verify that no server is reporting slow consumers func checkSlowConsumers(_ *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - totalSlowConsumers := int64(0) - - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIVarzResponse - var serverVarz *server.Varz - err := r.Load(&resp, clusterTag, serverTag, archive.TagServerVars()) - if err != nil { - return Skipped, fmt.Errorf("failed to load Varz for server %s: %w", serverName, err) - } - serverVarz = resp.Data + _, err := r.EachClusterServerVarz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, vz *server.ServerAPIVarzResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'VARZ' is missing for server %s", serverTag.Value) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag.Value, err) + } - if slowConsumers := serverVarz.SlowConsumers; slowConsumers > 0 { - examples.Add("%s/%s: %d slow consumers", clusterName, serverName, slowConsumers) - totalSlowConsumers += slowConsumers - } + if slowConsumers := vz.Data.SlowConsumers; slowConsumers > 0 { + examples.Add("%s/%s: %d slow consumers", clusterTag, serverTag, slowConsumers) } + + return nil + }) + if err != nil { + return Skipped, err } - if totalSlowConsumers > 0 { - log.Errorf("Total slow consumers: %d", totalSlowConsumers) + if examples.Count() > 0 { + log.Errorf("Total slow consumers: %d", examples.Count()) return PassWithIssues, nil } log.Infof("No slow consumers detected") + return Pass, nil } // checkServerResourceLimits verifies that the resource usage of memory and store is not approaching the reserved amount for each known server func checkServerResourceLimits(check *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - jsTag := archive.TagServerJetStream() - memoryUsageThreshold := check.Configuration["memory"].Value() storeUsageThreshold := check.Configuration["store"].Value() - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIJszResponse - var serverJSInfo *server.JSInfo - err := r.Load(&resp, clusterTag, serverTag, jsTag) - if errors.Is(err, archive.ErrNoMatches) { - log.Warnf("Artifact 'JSZ' is missing for server %s cluster %s", serverName, clusterName) - continue - } else if err != nil { - return Skipped, fmt.Errorf("failed to load JSZ for server %s: %w", serverName, err) - } - serverJSInfo = resp.Data - - if serverJSInfo.ReservedMemory > 0 { - threshold := uint64(float64(serverJSInfo.ReservedMemory) * memoryUsageThreshold) - if serverJSInfo.Memory > threshold { - examples.Add( - "%s memory usage: %s of %s", - serverName, - humanize.IBytes(serverJSInfo.Memory), - humanize.IBytes(serverJSInfo.ReservedMemory), - ) - } + _, err := r.EachClusterServerJsz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, jsz *server.ServerAPIJszResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'JSZ' is missing for server %s", serverTag.Value) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag.Value, err) + } + + if jsz.Data.ReservedMemory > 0 { + threshold := uint64(float64(jsz.Data.ReservedMemory) * memoryUsageThreshold) + if jsz.Data.Memory > threshold { + examples.Add("%s memory usage: %s of %s", serverTag, humanize.IBytes(jsz.Data.Memory), humanize.IBytes(jsz.Data.ReservedMemory)) } + } - if serverJSInfo.ReservedStore > 0 { - threshold := uint64(float64(serverJSInfo.ReservedStore) * storeUsageThreshold) - if serverJSInfo.Store > threshold { - examples.Add( - "%s store usage: %s of %s", - serverName, - humanize.IBytes(serverJSInfo.Store), - humanize.IBytes(serverJSInfo.ReservedStore), - ) - } + if jsz.Data.ReservedStore > 0 { + threshold := uint64(float64(jsz.Data.ReservedStore) * storeUsageThreshold) + if jsz.Data.Store > threshold { + examples.Add("%s store usage: %s of %s", serverTag, humanize.IBytes(jsz.Data.Store), humanize.IBytes(jsz.Data.ReservedStore)) } } + + return nil + }) + if err != nil { + return Skipped, err } if examples.Count() > 0 { @@ -360,31 +305,28 @@ func checkServerResourceLimits(check *Check, r *archive.Reader, examples *Exampl } func checkJetStreamDomainsForWhitespace(_ *Check, r *archive.Reader, examples *ExamplesCollection, log api.Logger) (Outcome, error) { - for _, clusterName := range r.ClusterNames() { - clusterTag := archive.TagCluster(clusterName) - - for _, serverName := range r.ClusterServerNames(clusterName) { - serverTag := archive.TagServer(serverName) - - var resp server.ServerAPIJszResponse - var serverJsz *server.JSInfo - err := r.Load(&resp, clusterTag, serverTag, archive.TagServerJetStream()) - if err != nil { - log.Warnf("Artifact 'JSZ' is missing for server %s", serverName) - continue - } - serverJsz = resp.Data + _, err := r.EachClusterServerJsz(func(clusterTag *archive.Tag, serverTag *archive.Tag, err error, jsz *server.ServerAPIJszResponse) error { + if errors.Is(err, archive.ErrNoMatches) { + log.Warnf("Artifact 'JSZ' is missing for server %s", serverTag.Value) + return nil + } else if err != nil { + return fmt.Errorf("failed to load variables for server %s: %w", serverTag.Value, err) + } - // check if jetstream domain contains whitespace - if strings.ContainsAny(serverJsz.Config.Domain, " \n") { - examples.Add("Cluster %s Server %s Domain %s", clusterName, serverName, serverJsz.Config.Domain) - } + // check if jetstream domain contains whitespace + if strings.ContainsAny(jsz.Data.Config.Domain, " \n") { + examples.Add("Cluster %s Server %s Domain %s", clusterTag, serverTag, jsz.Data.Config.Domain) } + + return nil + }) + if err != nil { + return Skipped, err } if examples.Count() > 0 { log.Errorf("Found %d servers with JetStream domains containing whitespace", examples.Count()) - return Fail, nil + return PassWithIssues, nil } return Pass, nil