Skip to content

Commit

Permalink
feat: register collectors without error handling and no panic; split …
Browse files Browse the repository at this point in the history
…reconcileChecks
  • Loading branch information
y-eight committed Dec 18, 2023
1 parent d3f8985 commit cdde2fa
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/sparrow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

//go:generate moq -out metrics_moq.go . Metrics
type Metrics interface {
// GetRegistry returns the prometheus registry instance
GetRegistry() *prometheus.Registry
}

Expand Down
118 changes: 69 additions & 49 deletions pkg/sparrow/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (s *Sparrow) ReconcileChecks(ctx context.Context) {
for name, checkCfg := range s.cfg.Checks {
name := name
log := logger.FromContext(ctx).With("name", name)

if existingCheck, ok := s.checks[name]; ok {
// Check already registered, reset config
err := existingCheck.SetConfig(ctx, checkCfg)
Expand All @@ -118,72 +119,91 @@ func (s *Sparrow) ReconcileChecks(ctx context.Context) {
}
continue
}
// Check is a new Check and needs to be registered
getRegisteredCheck := checks.RegisteredChecks[name]
if getRegisteredCheck == nil {
log.WarnContext(ctx, "Check is not registered")
continue
}
check := getRegisteredCheck()
s.checks[name] = check

// Create a fan in a channel for the check
checkChan := make(chan checks.Result, 1)
s.resultFanIn[name] = checkChan
// Check is a new Check and needs to be registered
s.registerCheck(ctx, name, checkCfg)
}

check.SetClient(s.client)
err := check.SetConfig(ctx, checkCfg)
if err != nil {
log.ErrorContext(ctx, "Failed to set config for check", "name", name, "error", err)
}
go fanInResults(checkChan, s.cResult, name)
err = check.Startup(ctx, checkChan)
if err != nil {
log.ErrorContext(ctx, "Failed to startup check", "name", name, "error", err)
close(checkChan)
for existingCheckName, existingCheck := range s.checks {
if _, ok := s.cfg.Checks[existingCheckName]; ok {
// Check is known check
continue
}
check.RegisterHandler(ctx, s.routingTree)

// Register prometheus collectors of check in registry
s.metrics.GetRegistry().MustRegister(check.GetMetricCollectors()...)
// Check has been removed form config
s.unregisterCheck(ctx, existingCheckName, existingCheck)
}
}

// RegisterCheck registers and executes a new check
func (s *Sparrow) registerCheck(ctx context.Context, name string, checkCfg any) {
log := logger.FromContext(ctx).With("name", name)

go func() {
err := check.Run(ctx)
if err != nil {
log.ErrorContext(ctx, "Failed to run check", "name", name, "error", err)
}
}()
getRegisteredCheck := checks.RegisteredChecks[name]
if getRegisteredCheck == nil {
log.WarnContext(ctx, "Check is not registered")
return
}
check := getRegisteredCheck()
s.checks[name] = check

for existingCheckName, existingCheck := range s.checks {
log := logger.FromContext(ctx).With("checkName", existingCheckName)
if _, ok := s.cfg.Checks[existingCheckName]; ok {
continue
}
// Create a fan in a channel for the check
checkChan := make(chan checks.Result, 1)
s.resultFanIn[name] = checkChan

// Check has been removed from config; shutdown and remove
existingCheck.DeregisterHandler(ctx, s.routingTree)
check.SetClient(s.client)
err := check.SetConfig(ctx, checkCfg)
if err != nil {
log.ErrorContext(ctx, "Failed to set config for check", "error", err)
}
go fanInResults(checkChan, s.cResult, name)
err = check.Startup(ctx, checkChan)
if err != nil {
log.ErrorContext(ctx, "Failed to startup check", "error", err)
close(checkChan)
return
}
check.RegisterHandler(ctx, s.routingTree)

// Remove prometheus collectors of check from registry
for _, metricsCollector := range existingCheck.GetMetricCollectors() {
if !s.metrics.GetRegistry().Unregister(metricsCollector) {
log.ErrorContext(ctx, "Could not remove collector from registry")
}
// Add prometheus collectors of check to registry
for _, collector := range check.GetMetricCollectors() {
if err := s.metrics.GetRegistry().Register(collector); err != nil {
log.ErrorContext(ctx, "Could not add metrics collector to registry")
}
}

err := existingCheck.Shutdown(ctx)
go func() {
err := check.Run(ctx)
if err != nil {
log.ErrorContext(ctx, "Failed to shutdown check", "error", err)
log.ErrorContext(ctx, "Failed to run check", "error", err)
}
if c, ok := s.resultFanIn[existingCheckName]; ok {
// close fan in the channel if it exists
close(c)
delete(s.resultFanIn, existingCheckName)
}()
}

// UnregisterCheck removes the check from sparrow and performs a soft shutdown for the check
func (s *Sparrow) unregisterCheck(ctx context.Context, name string, check checks.Check) {
log := logger.FromContext(ctx).With("name", name)
// Check has been removed from config; shutdown and remove
check.DeregisterHandler(ctx, s.routingTree)

// Remove prometheus collectors of check from registry
for _, metricsCollector := range check.GetMetricCollectors() {
if !s.metrics.GetRegistry().Unregister(metricsCollector) {
log.ErrorContext(ctx, "Could not remove metrics collector from registry")
}
}

delete(s.checks, existingCheckName)
err := check.Shutdown(ctx)
if err != nil {
log.ErrorContext(ctx, "Failed to shutdown check", "error", err)
}
if c, ok := s.resultFanIn[name]; ok {
// close fan in the channel if it exists
close(c)
delete(s.resultFanIn, name)
}

delete(s.checks, name)
}

// This is a fan in for the checks.
Expand Down

0 comments on commit cdde2fa

Please sign in to comment.