From ce514ea2a1cfa166af6b21faacab6c37f85aac1b Mon Sep 17 00:00:00 2001 From: Igor Shishkin Date: Sun, 15 Dec 2024 03:39:04 +0300 Subject: [PATCH] Fix exporter's observe run (#288) Some time after the start exporter stopped to collect metrics because of timer leak Signed-off-by: Igor Shishkin --- exporter/service/service.go | 43 ++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/exporter/service/service.go b/exporter/service/service.go index 8bc645a..610ef51 100644 --- a/exporter/service/service.go +++ b/exporter/service/service.go @@ -3,9 +3,12 @@ package service import ( "context" "strconv" + "sync" "time" "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "github.com/teran/archived/repositories/metadata" ) @@ -22,13 +25,23 @@ type service struct { blobsSize *prometheus.GaugeVec blobsTotalRawSize *prometheus.GaugeVec - repo metadata.Repository + repo metadata.Repository + mutex *sync.Mutex } func New(repo metadata.Repository) (Service, error) { svc := &service{ repo: repo, + namespacesTotal: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "archived", + Name: "namespaces_amount", + Help: "Total amount of namespaces", + }, + []string{}, + ), + containersTotal: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "archived", @@ -80,9 +93,12 @@ func New(repo metadata.Repository) (Service, error) { Help: "Total effective size of blobs (i.e. after deduplication)", }, []string{}, ), + + mutex: &sync.Mutex{}, } for _, m := range []*prometheus.GaugeVec{ + svc.namespacesTotal, svc.containersTotal, svc.versionsTotal, svc.objectsTotal, @@ -99,11 +115,18 @@ func New(repo metadata.Repository) (Service, error) { } func (s *service) observe(ctx context.Context) error { + log.Trace("running observe() to gather metrics ...") + stats, err := s.repo.CountStats(ctx) if err != nil { return err } + log.WithFields(log.Fields{ + "namespaces": stats.NamespacesCount, + "containers": stats.ContainersCount, + }).Trace("publishing metrics ...") + s.namespacesTotal.WithLabelValues().Set(float64(stats.NamespacesCount)) s.containersTotal.WithLabelValues().Set(float64(stats.ContainersCount)) @@ -133,14 +156,24 @@ func (s *service) observe(ctx context.Context) error { } func (s *service) Run(ctx context.Context) error { + ticker := time.NewTicker(60 * time.Second) for { select { case <-ctx.Done(): + ticker.Stop() return ctx.Err() - case <-time.After(30 * time.Second): - if err := s.observe(ctx); err != nil { - return err - } + case <-ticker.C: + go func() { + if !s.mutex.TryLock() { + log.Warn("lock is already taken. Skipping the run ...") + return + } + defer s.mutex.Unlock() + + if err := s.observe(ctx); err != nil { + log.Warnf("error running observe(): %s", err) + } + }() } } }