Skip to content

Commit

Permalink
ignore duplicate data points
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Mar 8, 2024
1 parent 47816cf commit e5d385b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 15 deletions.
6 changes: 4 additions & 2 deletions pkg/apiserver/controllers/v1/usagemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ func (c *Controller) UsageMetrics(gctx *gin.Context) {
}

if _, err := c.DBClient.CreateMetric(generatedType, generatedBy, collectedAt, string(jsonPayload)); err != nil {
log.Errorf("Failed to store usage metrics: %s", err)
log.Error(err)
c.HandleDBErrors(gctx, err)
return
}

// empty body
// if CreateMetrics() returned nil, the metric was already there, we're good
// and don't split hair about 201 vs 200/204

gctx.Status(http.StatusCreated)
}
10 changes: 5 additions & 5 deletions pkg/csconfig/crowdsec_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/crowdsecurity/go-cs-lib/ptr"
)

const (
defaultMetricsInterval = 30 * time.Minute
minimumMetricsInterval = 15 * time.Minute
)

// CrowdsecServiceCfg contains the location of parsers/scenarios/... and acquisition files
type CrowdsecServiceCfg struct {
Enable *bool `yaml:"enable"`
Expand Down Expand Up @@ -143,11 +148,6 @@ func (c *Config) LoadCrowdsec() error {
return nil
}

const (
defaultMetricsInterval = 30 * time.Second
minimumMetricsInterval = 15 * time.Second
)

func (c *CrowdsecServiceCfg) setMetricsInterval() {
switch {
case c.MetricsInterval == nil:
Expand Down
43 changes: 39 additions & 4 deletions pkg/database/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,24 @@ import (
"github.com/go-co-op/gocron"
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/go-cs-lib/ptr"

"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/event"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/machine"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/metric"
"github.com/crowdsecurity/crowdsec/pkg/types"
)

const (
// how long to keep metrics in the local database
defaultMetricsMaxAge = 7 * 24 * time.Hour
flushInterval = 1 * time.Minute
)


func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
maxItems := 0
Expand All @@ -32,7 +41,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched

// Init & Start cronjob every minute for alerts
scheduler := gocron.NewScheduler(time.UTC)
job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
job, err := scheduler.Every(flushInterval).Do(c.FlushAlerts, maxAge, maxItems)
if err != nil {
return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err)
}
Expand Down Expand Up @@ -77,19 +86,45 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
log.Warning("bouncers auto-delete for login/password auth is not supported (use cert or api)")
}
}
baJob, err := scheduler.Every(1).Minute().Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
if err != nil {
return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err)
}

baJob.SingletonMode()
scheduler.StartAsync()

// TODO: flush metrics here (MetricsMaxAge)
metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, config.MetricsMaxAge)
if err != nil {
return nil, fmt.Errorf("while starting flushMetrics scheduler: %w", err)
}

metricsJob.SingletonMode()

scheduler.StartAsync()

return scheduler, nil
}

// flushMetrics deletes metrics older than maxAge, regardless if they have been pushed to CAPI or not
func (c *Client) flushMetrics(maxAge *time.Duration) {
if maxAge == nil {
maxAge = ptr.Of(defaultMetricsMaxAge)
}

c.Log.Debugf("flushing metrics older than %s", maxAge)

deleted, err := c.Ent.Metric.Delete().Where(
metric.CollectedAtLTE(time.Now().UTC().Add(-*maxAge)),
).Exec(c.CTX)
if err != nil {
c.Log.Errorf("while flushing metrics: %s", err)
return
}

if deleted > 0 {
c.Log.Debugf("flushed %d metrics snapshots", deleted)
}
}

func (c *Client) FlushOrphans() {
/* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
Expand Down
13 changes: 9 additions & 4 deletions pkg/database/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package database

import (
"fmt"
"time"

"github.com/pkg/errors"

"github.com/crowdsecurity/crowdsec/pkg/database/ent"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/metric"
)
Expand All @@ -26,9 +25,15 @@ func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy st
SetPayload(payload).
Save(c.CTX)

if err != nil {
switch {
case ent.IsConstraintError(err):
// pretty safe guess, it's the unique index
c.Log.Infof("storing metrics snapshot for '%s' at %s: already exists", generatedBy, collectedAt)
// it's polite to accept a duplicate snapshot without any error
return nil, nil
case err != nil:
c.Log.Warningf("CreateMetric: %s", err)
return nil, errors.Wrapf(InsertFail, "creating metrics set for '%s' at %s", generatedBy, collectedAt)
return nil, fmt.Errorf("storing metrics snapshot for '%s' at %s: %w", generatedBy, collectedAt, InsertFail)
}

return metric, nil
Expand Down

0 comments on commit e5d385b

Please sign in to comment.