From e5d385b62f7a7381d583ce39312756e7c1b7da5c Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 8 Mar 2024 13:35:10 +0100 Subject: [PATCH] ignore duplicate data points --- pkg/apiserver/controllers/v1/usagemetrics.go | 6 ++- pkg/csconfig/crowdsec_service.go | 10 ++--- pkg/database/flush.go | 43 ++++++++++++++++++-- pkg/database/metrics.go | 13 ++++-- 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/pkg/apiserver/controllers/v1/usagemetrics.go b/pkg/apiserver/controllers/v1/usagemetrics.go index 8fda148cee44..f77239e56c65 100644 --- a/pkg/apiserver/controllers/v1/usagemetrics.go +++ b/pkg/apiserver/controllers/v1/usagemetrics.go @@ -131,11 +131,13 @@ func (c *Controller) UsageMetrics(gctx *gin.Context) { } if _, err := c.DBClient.CreateMetric(generatedType, generatedBy, collectedAt, string(jsonPayload)); err != nil { - log.Errorf("Failed to store usage metrics: %s", err) + log.Error(err) c.HandleDBErrors(gctx, err) return } - // empty body + // if CreateMetrics() returned nil, the metric was already there, we're good + // and don't split hair about 201 vs 200/204 + gctx.Status(http.StatusCreated) } diff --git a/pkg/csconfig/crowdsec_service.go b/pkg/csconfig/crowdsec_service.go index f4103293f0d9..c54b850f622f 100644 --- a/pkg/csconfig/crowdsec_service.go +++ b/pkg/csconfig/crowdsec_service.go @@ -12,6 +12,11 @@ import ( "github.com/crowdsecurity/go-cs-lib/ptr" ) +const ( + defaultMetricsInterval = 30 * time.Minute + minimumMetricsInterval = 15 * time.Minute +) + // CrowdsecServiceCfg contains the location of parsers/scenarios/... and acquisition files type CrowdsecServiceCfg struct { Enable *bool `yaml:"enable"` @@ -143,11 +148,6 @@ func (c *Config) LoadCrowdsec() error { return nil } -const ( - defaultMetricsInterval = 30 * time.Second - minimumMetricsInterval = 15 * time.Second -) - func (c *CrowdsecServiceCfg) setMetricsInterval() { switch { case c.MetricsInterval == nil: diff --git a/pkg/database/flush.go b/pkg/database/flush.go index 4828ebac907e..73c9adb9d481 100644 --- a/pkg/database/flush.go +++ b/pkg/database/flush.go @@ -7,15 +7,24 @@ import ( "github.com/go-co-op/gocron" log "github.com/sirupsen/logrus" + "github.com/crowdsecurity/go-cs-lib/ptr" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert" "github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer" "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision" "github.com/crowdsecurity/crowdsec/pkg/database/ent/event" "github.com/crowdsecurity/crowdsec/pkg/database/ent/machine" + "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" "github.com/crowdsecurity/crowdsec/pkg/types" ) +const ( + // how long to keep metrics in the local database + defaultMetricsMaxAge = 7 * 24 * time.Hour + flushInterval = 1 * time.Minute +) + func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) { maxItems := 0 @@ -32,7 +41,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched // Init & Start cronjob every minute for alerts scheduler := gocron.NewScheduler(time.UTC) - job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems) + job, err := scheduler.Every(flushInterval).Do(c.FlushAlerts, maxAge, maxItems) if err != nil { return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err) } @@ -77,19 +86,45 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched log.Warning("bouncers auto-delete for login/password auth is not supported (use cert or api)") } } - baJob, err := scheduler.Every(1).Minute().Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC) + baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC) if err != nil { return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err) } baJob.SingletonMode() - scheduler.StartAsync() - // TODO: flush metrics here (MetricsMaxAge) + metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, config.MetricsMaxAge) + if err != nil { + return nil, fmt.Errorf("while starting flushMetrics scheduler: %w", err) + } + + metricsJob.SingletonMode() + + scheduler.StartAsync() return scheduler, nil } +// flushMetrics deletes metrics older than maxAge, regardless if they have been pushed to CAPI or not +func (c *Client) flushMetrics(maxAge *time.Duration) { + if maxAge == nil { + maxAge = ptr.Of(defaultMetricsMaxAge) + } + + c.Log.Debugf("flushing metrics older than %s", maxAge) + + deleted, err := c.Ent.Metric.Delete().Where( + metric.CollectedAtLTE(time.Now().UTC().Add(-*maxAge)), + ).Exec(c.CTX) + if err != nil { + c.Log.Errorf("while flushing metrics: %s", err) + return + } + + if deleted > 0 { + c.Log.Debugf("flushed %d metrics snapshots", deleted) + } +} func (c *Client) FlushOrphans() { /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */ diff --git a/pkg/database/metrics.go b/pkg/database/metrics.go index 4a75f2c32e84..bd525449741f 100644 --- a/pkg/database/metrics.go +++ b/pkg/database/metrics.go @@ -1,10 +1,9 @@ package database import ( + "fmt" "time" - "github.com/pkg/errors" - "github.com/crowdsecurity/crowdsec/pkg/database/ent" "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" ) @@ -26,9 +25,15 @@ func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy st SetPayload(payload). Save(c.CTX) - if err != nil { + switch { + case ent.IsConstraintError(err): + // pretty safe guess, it's the unique index + c.Log.Infof("storing metrics snapshot for '%s' at %s: already exists", generatedBy, collectedAt) + // it's polite to accept a duplicate snapshot without any error + return nil, nil + case err != nil: c.Log.Warningf("CreateMetric: %s", err) - return nil, errors.Wrapf(InsertFail, "creating metrics set for '%s' at %s", generatedBy, collectedAt) + return nil, fmt.Errorf("storing metrics snapshot for '%s' at %s: %w", generatedBy, collectedAt, InsertFail) } return metric, nil