From cdde2fa357bb029382d468cb16e2ca3205f1c21a Mon Sep 17 00:00:00 2001 From: "maximilian.schubert@telekom.de" Date: Mon, 18 Dec 2023 11:25:24 +0100 Subject: [PATCH] feat: register collectors without error handling and no panic; split reconcileChecks --- pkg/sparrow/metrics.go | 1 + pkg/sparrow/run.go | 118 ++++++++++++++++++++++++----------------- 2 files changed, 70 insertions(+), 49 deletions(-) diff --git a/pkg/sparrow/metrics.go b/pkg/sparrow/metrics.go index 4b4e45d2..496b3919 100644 --- a/pkg/sparrow/metrics.go +++ b/pkg/sparrow/metrics.go @@ -7,6 +7,7 @@ import ( //go:generate moq -out metrics_moq.go . Metrics type Metrics interface { + // GetRegistry returns the prometheus registry instance GetRegistry() *prometheus.Registry } diff --git a/pkg/sparrow/run.go b/pkg/sparrow/run.go index 8547e01c..94627d5b 100644 --- a/pkg/sparrow/run.go +++ b/pkg/sparrow/run.go @@ -110,6 +110,7 @@ func (s *Sparrow) ReconcileChecks(ctx context.Context) { for name, checkCfg := range s.cfg.Checks { name := name log := logger.FromContext(ctx).With("name", name) + if existingCheck, ok := s.checks[name]; ok { // Check already registered, reset config err := existingCheck.SetConfig(ctx, checkCfg) @@ -118,72 +119,91 @@ func (s *Sparrow) ReconcileChecks(ctx context.Context) { } continue } - // Check is a new Check and needs to be registered - getRegisteredCheck := checks.RegisteredChecks[name] - if getRegisteredCheck == nil { - log.WarnContext(ctx, "Check is not registered") - continue - } - check := getRegisteredCheck() - s.checks[name] = check - // Create a fan in a channel for the check - checkChan := make(chan checks.Result, 1) - s.resultFanIn[name] = checkChan + // Check is a new Check and needs to be registered + s.registerCheck(ctx, name, checkCfg) + } - check.SetClient(s.client) - err := check.SetConfig(ctx, checkCfg) - if err != nil { - log.ErrorContext(ctx, "Failed to set config for check", "name", name, "error", err) - } - go fanInResults(checkChan, s.cResult, name) - err = check.Startup(ctx, checkChan) - if err != nil { - log.ErrorContext(ctx, "Failed to startup check", "name", name, "error", err) - close(checkChan) + for existingCheckName, existingCheck := range s.checks { + if _, ok := s.cfg.Checks[existingCheckName]; ok { + // Check is known check continue } - check.RegisterHandler(ctx, s.routingTree) - // Register prometheus collectors of check in registry - s.metrics.GetRegistry().MustRegister(check.GetMetricCollectors()...) + // Check has been removed form config + s.unregisterCheck(ctx, existingCheckName, existingCheck) + } +} + +// RegisterCheck registers and executes a new check +func (s *Sparrow) registerCheck(ctx context.Context, name string, checkCfg any) { + log := logger.FromContext(ctx).With("name", name) - go func() { - err := check.Run(ctx) - if err != nil { - log.ErrorContext(ctx, "Failed to run check", "name", name, "error", err) - } - }() + getRegisteredCheck := checks.RegisteredChecks[name] + if getRegisteredCheck == nil { + log.WarnContext(ctx, "Check is not registered") + return } + check := getRegisteredCheck() + s.checks[name] = check - for existingCheckName, existingCheck := range s.checks { - log := logger.FromContext(ctx).With("checkName", existingCheckName) - if _, ok := s.cfg.Checks[existingCheckName]; ok { - continue - } + // Create a fan in a channel for the check + checkChan := make(chan checks.Result, 1) + s.resultFanIn[name] = checkChan - // Check has been removed from config; shutdown and remove - existingCheck.DeregisterHandler(ctx, s.routingTree) + check.SetClient(s.client) + err := check.SetConfig(ctx, checkCfg) + if err != nil { + log.ErrorContext(ctx, "Failed to set config for check", "error", err) + } + go fanInResults(checkChan, s.cResult, name) + err = check.Startup(ctx, checkChan) + if err != nil { + log.ErrorContext(ctx, "Failed to startup check", "error", err) + close(checkChan) + return + } + check.RegisterHandler(ctx, s.routingTree) - // Remove prometheus collectors of check from registry - for _, metricsCollector := range existingCheck.GetMetricCollectors() { - if !s.metrics.GetRegistry().Unregister(metricsCollector) { - log.ErrorContext(ctx, "Could not remove collector from registry") - } + // Add prometheus collectors of check to registry + for _, collector := range check.GetMetricCollectors() { + if err := s.metrics.GetRegistry().Register(collector); err != nil { + log.ErrorContext(ctx, "Could not add metrics collector to registry") } + } - err := existingCheck.Shutdown(ctx) + go func() { + err := check.Run(ctx) if err != nil { - log.ErrorContext(ctx, "Failed to shutdown check", "error", err) + log.ErrorContext(ctx, "Failed to run check", "error", err) } - if c, ok := s.resultFanIn[existingCheckName]; ok { - // close fan in the channel if it exists - close(c) - delete(s.resultFanIn, existingCheckName) + }() +} + +// UnregisterCheck removes the check from sparrow and performs a soft shutdown for the check +func (s *Sparrow) unregisterCheck(ctx context.Context, name string, check checks.Check) { + log := logger.FromContext(ctx).With("name", name) + // Check has been removed from config; shutdown and remove + check.DeregisterHandler(ctx, s.routingTree) + + // Remove prometheus collectors of check from registry + for _, metricsCollector := range check.GetMetricCollectors() { + if !s.metrics.GetRegistry().Unregister(metricsCollector) { + log.ErrorContext(ctx, "Could not remove metrics collector from registry") } + } - delete(s.checks, existingCheckName) + err := check.Shutdown(ctx) + if err != nil { + log.ErrorContext(ctx, "Failed to shutdown check", "error", err) } + if c, ok := s.resultFanIn[name]; ok { + // close fan in the channel if it exists + close(c) + delete(s.resultFanIn, name) + } + + delete(s.checks, name) } // This is a fan in for the checks.