diff --git a/cmd/crowdsec-cli/climetrics/statbouncer.go b/cmd/crowdsec-cli/climetrics/statbouncer.go index 1a803cefbd2..7d80e902961 100644 --- a/cmd/crowdsec-cli/climetrics/statbouncer.go +++ b/cmd/crowdsec-cli/climetrics/statbouncer.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "io" + "sort" + "strings" "time" "github.com/jedib0t/go-pretty/v6/table" @@ -15,12 +17,15 @@ import ( "github.com/crowdsecurity/crowdsec/cmd/crowdsec-cli/cstable" "github.com/crowdsecurity/crowdsec/pkg/database" + "github.com/crowdsecurity/crowdsec/pkg/database/ent" "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" "github.com/crowdsecurity/crowdsec/pkg/models" ) -// un-aggregated data, de-normalized. +// bouncerMetricItem represents unaggregated, denormalized metric data. +// Possibly not unique if a bouncer sent the same data multiple times. type bouncerMetricItem struct { + collectedAt time.Time bouncerName string ipType string origin string @@ -29,14 +34,82 @@ type bouncerMetricItem struct { value float64 } +// aggregationOverTime is the first level of aggregation: we aggregate +// over time, then over ip type, then over origin. we only sum values +// for non-gauge metrics, and take the last value for gauge metrics. +type aggregationOverTime map[string]map[string]map[string]map[string]map[string]int64 + +func (a aggregationOverTime) add(bouncerName, origin, name, unit, ipType string, value float64, isGauge bool) { + if _, ok := a[bouncerName]; !ok { + a[bouncerName] = make(map[string]map[string]map[string]map[string]int64) + } + + if _, ok := a[bouncerName][origin]; !ok { + a[bouncerName][origin] = make(map[string]map[string]map[string]int64) + } + + if _, ok := a[bouncerName][origin][name]; !ok { + a[bouncerName][origin][name] = make(map[string]map[string]int64) + } + + if _, ok := a[bouncerName][origin][name][unit]; !ok { + a[bouncerName][origin][name][unit] = make(map[string]int64) + } + + if isGauge { + a[bouncerName][origin][name][unit][ipType] = int64(value) + } else { + a[bouncerName][origin][name][unit][ipType] += int64(value) + } +} + +// aggregationOverIPType is the second level of aggregation: data is summed +// regardless of the metrics type (gauge or not). This is used to display +// table rows, they won't differentiate ipv4 and ipv6 +type aggregationOverIPType map[string]map[string]map[string]map[string]int64 + +func (a aggregationOverIPType) add(bouncerName, origin, name, unit string, value int64) { + if _, ok := a[bouncerName]; !ok { + a[bouncerName] = make(map[string]map[string]map[string]int64) + } + + if _, ok := a[bouncerName][origin]; !ok { + a[bouncerName][origin] = make(map[string]map[string]int64) + } + + if _, ok := a[bouncerName][origin][name]; !ok { + a[bouncerName][origin][name] = make(map[string]int64) + } + + a[bouncerName][origin][name][unit] += value +} + +// aggregationOverOrigin is the third level of aggregation: these are +// the totals at the end of the table. Metrics without an origin will +// be added to the totals but not displayed in the rows, only in the footer. +type aggregationOverOrigin map[string]map[string]map[string]int64 + +func (a aggregationOverOrigin) add(bouncerName, name, unit string, value int64) { + if _, ok := a[bouncerName]; !ok { + a[bouncerName] = make(map[string]map[string]int64) + } + + if _, ok := a[bouncerName][name]; !ok { + a[bouncerName][name] = make(map[string]int64) + } + + a[bouncerName][name][unit] += value +} + type statBouncer struct { // oldest collection timestamp for each bouncer - oldestTS map[string]*time.Time - // we keep de-normalized metrics so we can iterate - // over them multiple times and keep the aggregation code simple - rawMetrics []bouncerMetricItem - aggregated map[string]map[string]map[string]map[string]int64 - aggregatedAllOrigin map[string]map[string]map[string]int64 + oldestTS map[string]time.Time + // aggregate over ip type: always sum + // [bouncer][origin][name][unit]value + aggOverIPType aggregationOverIPType + // aggregate over origin: always sum + // [bouncer][name][unit]value + aggOverOrigin aggregationOverOrigin } var knownPlurals = map[string]string{ @@ -46,15 +119,15 @@ var knownPlurals = map[string]string{ } func (s *statBouncer) MarshalJSON() ([]byte, error) { - return json.Marshal(s.aggregated) + return json.Marshal(s.aggOverIPType) } -func (s *statBouncer) Description() (string, string) { +func (*statBouncer) Description() (string, string) { return "Bouncer Metrics", `Network traffic blocked by bouncers.` } -func warnOnce(warningsLogged map[string]bool, msg string) { +func logWarningOnce(warningsLogged map[string]bool, msg string) { if _, ok := warningsLogged[msg]; !ok { log.Warningf(msg) @@ -62,67 +135,58 @@ func warnOnce(warningsLogged map[string]bool, msg string) { } } -func (s *statBouncer) Fetch(ctx context.Context, db *database.Client) error { - if db == nil { - return nil - } - - // query all bouncer metrics that have not been flushed - - metrics, err := db.Ent.Metric.Query(). - Where( - metric.GeneratedTypeEQ(metric.GeneratedTypeRC), - ).All(ctx) - if err != nil { - return fmt.Errorf("unable to fetch metrics: %w", err) - } - - s.oldestTS = make(map[string]*time.Time) +// extractRawMetrics converts metrics from the database to a de-normalized, de-duplicated slice +// it returns the slice and the oldest timestamp for each bouncer +func (*statBouncer) extractRawMetrics(metrics []*ent.Metric) ([]bouncerMetricItem, map[string]time.Time) { + oldestTS := make(map[string]time.Time) // don't spam the user with the same warnings warningsLogged := make(map[string]bool) + // store raw metrics, de-duplicated in case some were sent multiple times + uniqueRaw := make(map[bouncerMetricItem]struct{}) + for _, met := range metrics { bouncerName := met.GeneratedBy - collectedAt := met.CollectedAt - if s.oldestTS[bouncerName] == nil || collectedAt.Before(*s.oldestTS[bouncerName]) { - s.oldestTS[bouncerName] = &collectedAt - } - - type bouncerMetrics struct { + var payload struct { Metrics []models.DetailedMetrics `json:"metrics"` } - payload := bouncerMetrics{} - - err := json.Unmarshal([]byte(met.Payload), &payload) - if err != nil { + if err := json.Unmarshal([]byte(met.Payload), &payload); err != nil { log.Warningf("while parsing metrics for %s: %s", bouncerName, err) continue } for _, m := range payload.Metrics { - for _, item := range m.Items { - labels := item.Labels + // fields like timestamp, name, etc. are mandatory but we got pointers, so we check anyway + if m.Meta.UtcNowTimestamp == nil { + logWarningOnce(warningsLogged, "missing 'utc_now_timestamp' field in metrics reported by "+bouncerName) + continue + } + + collectedAt := time.Unix(*m.Meta.UtcNowTimestamp, 0).UTC() - // these are mandatory but we got pointers, so... + if oldestTS[bouncerName].IsZero() || collectedAt.Before(oldestTS[bouncerName]) { + oldestTS[bouncerName] = collectedAt + } + for _, item := range m.Items { valid := true if item.Name == nil { - warnOnce(warningsLogged, "missing 'name' field in metrics reported by "+bouncerName) + logWarningOnce(warningsLogged, "missing 'name' field in metrics reported by "+bouncerName) // no continue - keep checking the rest valid = false } if item.Unit == nil { - warnOnce(warningsLogged, "missing 'unit' field in metrics reported by "+bouncerName) + logWarningOnce(warningsLogged, "missing 'unit' field in metrics reported by "+bouncerName) valid = false } if item.Value == nil { - warnOnce(warningsLogged, "missing 'value' field in metrics reported by "+bouncerName) + logWarningOnce(warningsLogged, "missing 'value' field in metrics reported by "+bouncerName) valid = false } @@ -130,94 +194,152 @@ func (s *statBouncer) Fetch(ctx context.Context, db *database.Client) error { continue } - name := *item.Name - unit := *item.Unit - value := *item.Value - rawMetric := bouncerMetricItem{ + collectedAt: collectedAt, bouncerName: bouncerName, - ipType: labels["ip_type"], - origin: labels["origin"], - name: name, - unit: unit, - value: value, + ipType: item.Labels["ip_type"], + origin: item.Labels["origin"], + name: *item.Name, + unit: *item.Unit, + value: *item.Value, } - s.rawMetrics = append(s.rawMetrics, rawMetric) + uniqueRaw[rawMetric] = struct{}{} } } } - s.aggregate() + // extract raw metric structs + keys := make([]bouncerMetricItem, 0, len(uniqueRaw)) + for key := range uniqueRaw { + keys = append(keys, key) + } - return nil + // order them by timestamp + sort.Slice(keys, func(i, j int) bool { + return keys[i].collectedAt.Before(keys[j].collectedAt) + }) + + return keys, oldestTS } -func (s *statBouncer) aggregate() { - // [bouncer][origin][name][unit]value - if s.aggregated == nil { - s.aggregated = make(map[string]map[string]map[string]map[string]int64) +func (s *statBouncer) Fetch(ctx context.Context, db *database.Client) error { + if db == nil { + return nil } - if s.aggregatedAllOrigin == nil { - s.aggregatedAllOrigin = make(map[string]map[string]map[string]int64) + // query all bouncer metrics that have not been flushed + + metrics, err := db.Ent.Metric.Query(). + Where(metric.GeneratedTypeEQ(metric.GeneratedTypeRC)). + All(ctx) + if err != nil { + return fmt.Errorf("unable to fetch metrics: %w", err) } - for _, raw := range s.rawMetrics { - if _, ok := s.aggregated[raw.bouncerName]; !ok { - s.aggregated[raw.bouncerName] = make(map[string]map[string]map[string]int64) - } + // de-normalize, de-duplicate metrics and keep the oldest timestamp for each bouncer - if _, ok := s.aggregated[raw.bouncerName][raw.origin]; !ok { - s.aggregated[raw.bouncerName][raw.origin] = make(map[string]map[string]int64) - } + rawMetrics, oldestTS := s.extractRawMetrics(metrics) - if _, ok := s.aggregated[raw.bouncerName][raw.origin][raw.name]; !ok { - s.aggregated[raw.bouncerName][raw.origin][raw.name] = make(map[string]int64) - } + s.oldestTS = oldestTS + aggOverTime := s.newAggregationOverTime(rawMetrics) + s.aggOverIPType = s.newAggregationOverIPType(aggOverTime) + s.aggOverOrigin = s.newAggregationOverOrigin(s.aggOverIPType) - if _, ok := s.aggregated[raw.bouncerName][raw.origin][raw.name][raw.unit]; !ok { - s.aggregated[raw.bouncerName][raw.origin][raw.name][raw.unit] = 0 - } + return nil +} - s.aggregated[raw.bouncerName][raw.origin][raw.name][raw.unit] += int64(raw.value) +// return true if the metric is a gauge and should not be aggregated +func (*statBouncer) isGauge(name string) bool { + return name == "active_decisions" || strings.HasSuffix(name, "_gauge") +} - if _, ok := s.aggregatedAllOrigin[raw.bouncerName]; !ok { - s.aggregatedAllOrigin[raw.bouncerName] = make(map[string]map[string]int64) - } +// formatMetricName returns the metric name to display in the table header +func (*statBouncer) formatMetricName(name string) string { + return strings.TrimSuffix(name, "_gauge") +} - if _, ok := s.aggregatedAllOrigin[raw.bouncerName][raw.name]; !ok { - s.aggregatedAllOrigin[raw.bouncerName][raw.name] = make(map[string]int64) - } +// formatMetricOrigin returns the origin to display in the table rows +// (for example, some users don't know what capi is) +func (*statBouncer) formatMetricOrigin(origin string) string { + switch origin { + case "CAPI": + return origin + " (community blocklist)" + case "cscli": + return origin + " (manual decisions)" + case "crowdsec": + return origin + " (security engine)" + default: + return origin + } +} + +func (s *statBouncer) newAggregationOverTime(rawMetrics []bouncerMetricItem) aggregationOverTime { + ret := aggregationOverTime{} - if _, ok := s.aggregatedAllOrigin[raw.bouncerName][raw.name][raw.unit]; !ok { - s.aggregatedAllOrigin[raw.bouncerName][raw.name][raw.unit] = 0 + for _, raw := range rawMetrics { + ret.add(raw.bouncerName, raw.origin, raw.name, raw.unit, raw.ipType, raw.value, s.isGauge(raw.name)) + } + + return ret +} + +func (*statBouncer) newAggregationOverIPType(aggMetrics aggregationOverTime) aggregationOverIPType { + ret := aggregationOverIPType{} + + for bouncerName := range aggMetrics { + for origin := range aggMetrics[bouncerName] { + for name := range aggMetrics[bouncerName][origin] { + for unit := range aggMetrics[bouncerName][origin][name] { + for ipType := range aggMetrics[bouncerName][origin][name][unit] { + value := aggMetrics[bouncerName][origin][name][unit][ipType] + ret.add(bouncerName, origin, name, unit, value) + } + } + } } + } - s.aggregatedAllOrigin[raw.bouncerName][raw.name][raw.unit] += int64(raw.value) + return ret +} + +func (*statBouncer) newAggregationOverOrigin(aggMetrics aggregationOverIPType) aggregationOverOrigin { + ret := aggregationOverOrigin{} + + for bouncerName := range aggMetrics { + for origin := range aggMetrics[bouncerName] { + for name := range aggMetrics[bouncerName][origin] { + for unit := range aggMetrics[bouncerName][origin][name] { + val := aggMetrics[bouncerName][origin][name][unit] + ret.add(bouncerName, name, unit, val) + } + } + } } + + return ret } // bouncerTable displays a table of metrics for a single bouncer func (s *statBouncer) bouncerTable(out io.Writer, bouncerName string, wantColor string, noUnit bool) { - columns := make(map[string]map[string]bool) + columns := make(map[string]map[string]struct{}) - for _, item := range s.rawMetrics { - if item.bouncerName != bouncerName { - continue - } + bouncerData, ok := s.aggOverOrigin[bouncerName] + if !ok { + // no metrics for this bouncer, skip. how did we get here ? + // anyway we can't honor the "showEmpty" flag in this case, + // we don't even have the table headers + return + } + + for metricName, units := range bouncerData { // build a map of the metric names and units, to display dynamic columns - if _, ok := columns[item.name]; !ok { - columns[item.name] = make(map[string]bool) + columns[metricName] = make(map[string]struct{}) + for unit := range units { + columns[metricName][unit] = struct{}{} } - - columns[item.name][item.unit] = true } - // no metrics for this bouncer, skip. how did we get here ? - // anyway we can't honor the "showEmpty" flag in this case, - // we don't heven have the table headers - if len(columns) == 0 { return } @@ -238,11 +360,11 @@ func (s *statBouncer) bouncerTable(out io.Writer, bouncerName string, wantColor for _, unit := range maptools.SortedKeys(columns[name]) { colNum += 1 - header1 = append(header1, name) + header1 = append(header1, s.formatMetricName(name)) // we don't add "s" to random words - if knownPlurals[unit] != "" { - unit = knownPlurals[unit] + if plural, ok := knownPlurals[unit]; ok { + unit = plural } header2 = append(header2, unit) @@ -264,7 +386,7 @@ func (s *statBouncer) bouncerTable(out io.Writer, bouncerName string, wantColor // sort all the ranges for stable output - for _, origin := range maptools.SortedKeys(s.aggregated[bouncerName]) { + for _, origin := range maptools.SortedKeys(s.aggOverIPType[bouncerName]) { if origin == "" { // if the metric has no origin (i.e. processed bytes/packets) // we don't display it in the table body but it still gets aggreagted @@ -272,21 +394,15 @@ func (s *statBouncer) bouncerTable(out io.Writer, bouncerName string, wantColor continue } - metrics := s.aggregated[bouncerName][origin] - - // some users don't know what capi is - if origin == "CAPI" { - origin += " (community blocklist)" - } + metrics := s.aggOverIPType[bouncerName][origin] - row := table.Row{origin} + row := table.Row{s.formatMetricOrigin(origin)} for _, name := range maptools.SortedKeys(columns) { for _, unit := range maptools.SortedKeys(columns[name]) { valStr := "-" - val, ok := metrics[name][unit] - if ok { + if val, ok := metrics[name][unit]; ok { valStr = formatNumber(val, !noUnit) } @@ -299,7 +415,7 @@ func (s *statBouncer) bouncerTable(out io.Writer, bouncerName string, wantColor numRows += 1 } - totals := s.aggregatedAllOrigin[bouncerName] + totals := s.aggOverOrigin[bouncerName] if numRows == 0 { t.Style().Options.SeparateFooter = false @@ -319,27 +435,20 @@ func (s *statBouncer) bouncerTable(out io.Writer, bouncerName string, wantColor title = fmt.Sprintf("%s (%s)", title, bouncerName) if s.oldestTS != nil { - // if we change this to .Local() beware of tests + // if you change this to .Local() beware of tests title = fmt.Sprintf("%s since %s", title, s.oldestTS[bouncerName].String()) } - title += ":" - // don't use SetTitle() because it draws the title inside table box - io.WriteString(out, title+"\n") - io.WriteString(out, t.Render() + "\n") + io.WriteString(out, title+":\n") + io.WriteString(out, t.Render()+"\n") // empty line between tables io.WriteString(out, "\n") } // Table displays a table of metrics for each bouncer func (s *statBouncer) Table(out io.Writer, wantColor string, noUnit bool, _ bool) { - bouncerNames := make(map[string]bool) - for _, item := range s.rawMetrics { - bouncerNames[item.bouncerName] = true - } - - for _, bouncerName := range maptools.SortedKeys(bouncerNames) { + for _, bouncerName := range maptools.SortedKeys(s.aggOverOrigin) { s.bouncerTable(out, bouncerName, wantColor, noUnit) } } diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 2be8a84fec0..5aafc6b0dfe 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -140,6 +140,19 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H }) outputWg.Wait() + mp := NewMetricsProvider( + apiClient, + lpMetricsDefaultInterval, + log.WithField("service", "lpmetrics"), + []string{}, + datasources, + hub, + ) + + lpMetricsTomb.Go(func() error { + return mp.Run(context.Background(), &lpMetricsTomb) + }) + if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled { aggregated := false if cConfig.Prometheus.Level == configuration.CFG_METRICS_AGGREGATE { diff --git a/cmd/crowdsec/lpmetrics.go b/cmd/crowdsec/lpmetrics.go new file mode 100644 index 00000000000..0fd27054071 --- /dev/null +++ b/cmd/crowdsec/lpmetrics.go @@ -0,0 +1,182 @@ +package main + +import ( + "context" + "errors" + "net/http" + "time" + + "github.com/sirupsen/logrus" + + "gopkg.in/tomb.v2" + + "github.com/crowdsecurity/go-cs-lib/ptr" + "github.com/crowdsecurity/go-cs-lib/trace" + "github.com/crowdsecurity/go-cs-lib/version" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition" + "github.com/crowdsecurity/crowdsec/pkg/apiclient" + "github.com/crowdsecurity/crowdsec/pkg/cwhub" + "github.com/crowdsecurity/crowdsec/pkg/fflag" + "github.com/crowdsecurity/crowdsec/pkg/models" +) + +const lpMetricsDefaultInterval = 30 * time.Minute + +// MetricsProvider collects metrics from the LP and sends them to the LAPI +type MetricsProvider struct { + apic *apiclient.ApiClient + interval time.Duration + static staticMetrics + logger *logrus.Entry +} + +type staticMetrics struct { + osName string + osVersion string + startupTS int64 + featureFlags []string + consoleOptions []string + datasourceMap map[string]int64 + hubState models.HubItems +} + +func getHubState(hub *cwhub.Hub) models.HubItems { + ret := models.HubItems{} + + for _, itemType := range cwhub.ItemTypes { + ret[itemType] = []models.HubItem{} + items, _ := hub.GetInstalledItemsByType(itemType) + cwhub.SortItemSlice(items) + + for _, item := range items { + status := "official" + if item.State.IsLocal() { + status = "custom" + } + if item.State.Tainted { + status = "tainted" + } + ret[itemType] = append(ret[itemType], models.HubItem{ + Name: item.Name, + Status: status, + Version: item.Version, + }) + } + } + + return ret +} + +// newStaticMetrics is called when the process starts, or reloads the configuration +func newStaticMetrics(consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics { + datasourceMap := map[string]int64{} + + for _, ds := range datasources { + datasourceMap[ds.GetName()] += 1 + } + + osName, osVersion := version.DetectOS() + + return staticMetrics{ + osName: osName, + osVersion: osVersion, + startupTS: time.Now().UTC().Unix(), + featureFlags: fflag.Crowdsec.GetEnabledFeatures(), + consoleOptions: consoleOptions, + datasourceMap: datasourceMap, + hubState: getHubState(hub), + } +} + +func NewMetricsProvider(apic *apiclient.ApiClient, interval time.Duration, logger *logrus.Entry, + consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) *MetricsProvider { + return &MetricsProvider{ + apic: apic, + interval: interval, + logger: logger, + static: newStaticMetrics(consoleOptions, datasources, hub), + } +} + +func (m *MetricsProvider) metricsPayload() *models.AllMetrics { + os := &models.OSversion{ + Name: ptr.Of(m.static.osName), + Version: ptr.Of(m.static.osVersion), + } + + base := models.BaseMetrics{ + UtcStartupTimestamp: ptr.Of(m.static.startupTS), + Os: os, + Version: ptr.Of(version.String()), + FeatureFlags: m.static.featureFlags, + Metrics: make([]*models.DetailedMetrics, 0), + } + + met := &models.LogProcessorsMetrics{ + BaseMetrics: base, + Datasources: m.static.datasourceMap, + HubItems: m.static.hubState, + } + + met.Metrics = append(met.Metrics, &models.DetailedMetrics{ + Meta: &models.MetricsMeta{ + UtcNowTimestamp: ptr.Of(time.Now().Unix()), + WindowSizeSeconds: ptr.Of(int64(m.interval.Seconds())), + }, + Items: make([]*models.MetricsDetailItem, 0), + }) + + return &models.AllMetrics{ + LogProcessors: []*models.LogProcessorsMetrics{met}, + } +} + +func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error { + defer trace.CatchPanic("crowdsec/MetricsProvider.Run") + + if m.interval == time.Duration(0) { + return nil + } + + met := m.metricsPayload() + + ticker := time.NewTicker(1) // Send on start + + for { + select { + case <-ticker.C: + ctxTime, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, resp, err := m.apic.UsageMetrics.Add(ctxTime, met) + switch { + case errors.Is(err, context.DeadlineExceeded): + m.logger.Warnf("timeout sending lp metrics") + ticker.Reset(m.interval) + continue + case err != nil && resp != nil && resp.Response.StatusCode == http.StatusNotFound: + m.logger.Warnf("metrics endpoint not found, older LAPI?") + ticker.Reset(m.interval) + continue + case err != nil: + m.logger.Warnf("failed to send lp metrics: %s", err) + ticker.Reset(m.interval) + continue + } + + if resp.Response.StatusCode != http.StatusCreated { + m.logger.Warnf("failed to send lp metrics: %s", resp.Response.Status) + ticker.Reset(m.interval) + continue + } + + ticker.Reset(m.interval) + + m.logger.Tracef("lp usage metrics sent") + case <-myTomb.Dying(): + ticker.Stop() + return nil + } + } +} diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 26e39eb069c..18416e044e7 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -29,28 +29,29 @@ import ( ) var ( - /*tombs for the parser, buckets and outputs.*/ - acquisTomb tomb.Tomb - parsersTomb tomb.Tomb - bucketsTomb tomb.Tomb - outputsTomb tomb.Tomb - apiTomb tomb.Tomb - crowdsecTomb tomb.Tomb - pluginTomb tomb.Tomb + // tombs for the parser, buckets and outputs. + acquisTomb tomb.Tomb + parsersTomb tomb.Tomb + bucketsTomb tomb.Tomb + outputsTomb tomb.Tomb + apiTomb tomb.Tomb + crowdsecTomb tomb.Tomb + pluginTomb tomb.Tomb + lpMetricsTomb tomb.Tomb flags *Flags - /*the state of acquisition*/ + // the state of acquisition dataSources []acquisition.DataSource - /*the state of the buckets*/ + // the state of the buckets holders []leakybucket.BucketFactory buckets *leakybucket.Buckets inputLineChan chan types.Event inputEventChan chan types.Event outputEventChan chan types.Event // the buckets init returns its own chan that is used for multiplexing - /*settings*/ - lastProcessedItem time.Time /*keep track of last item timestamp in time-machine. it is used to GC buckets when we dump them.*/ + // settings + lastProcessedItem time.Time // keep track of last item timestamp in time-machine. it is used to GC buckets when we dump them. pluginBroker csplugin.PluginBroker ) @@ -307,7 +308,7 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo if cConfig.API != nil && cConfig.API.Server != nil { cConfig.API.Server.OnlineClient = nil } - /*if the api is disabled as well, just read file and exit, don't daemonize*/ + // if the api is disabled as well, just read file and exit, don't daemonize if cConfig.DisableAPI { cConfig.Common.Daemonize = false } diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index 5fb7b86f181..f1a658e9512 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -60,6 +60,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) { apiTomb = tomb.Tomb{} crowdsecTomb = tomb.Tomb{} pluginTomb = tomb.Tomb{} + lpMetricsTomb = tomb.Tomb{} cConfig, err := LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false) if err != nil { @@ -179,6 +180,15 @@ func ShutdownCrowdsecRoutines() error { log.Warningf("Outputs didn't finish in time, some events may have not been flushed") } + lpMetricsTomb.Kill(nil) + + if err := lpMetricsTomb.Wait(); err != nil { + log.Warningf("Metrics returned error : %s", err) + reterr = err + } + + log.Debugf("metrics are done") + // He's dead, Jim. crowdsecTomb.Kill(nil) @@ -322,6 +332,7 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error { apiTomb = tomb.Tomb{} crowdsecTomb = tomb.Tomb{} pluginTomb = tomb.Tomb{} + lpMetricsTomb = tomb.Tomb{} ctx := context.TODO() diff --git a/go.mod b/go.mod index f36bbcd996d..af9d7550b94 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/agext/levenshtein v1.2.3 github.com/alexliesenfeld/health v0.8.0 - github.com/antonmedv/expr v1.15.3 github.com/appleboy/gin-jwt/v2 v2.9.2 github.com/aws/aws-lambda-go v1.47.0 github.com/aws/aws-sdk-go v1.52.0 @@ -33,6 +32,7 @@ require ( github.com/dghubble/sling v1.4.2 github.com/docker/docker v24.0.9+incompatible github.com/docker/go-connections v0.4.0 + github.com/expr-lang/expr v1.16.9 github.com/fatih/color v1.16.0 github.com/fsnotify/fsnotify v1.7.0 github.com/gin-gonic/gin v1.9.1 @@ -111,7 +111,6 @@ require ( github.com/creack/pty v1.1.18 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/expr-lang/expr v1.16.9 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-logr/logr v1.2.4 // indirect diff --git a/go.sum b/go.sum index d4cc2651f0f..282f10d6367 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,6 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/alexliesenfeld/health v0.8.0 h1:lCV0i+ZJPTbqP7LfKG7p3qZBl5VhelwUFCIVWl77fgk= github.com/alexliesenfeld/health v0.8.0/go.mod h1:TfNP0f+9WQVWMQRzvMUjlws4ceXKEL3WR+6Hp95HUFc= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= -github.com/antonmedv/expr v1.15.3 h1:q3hOJZNvLvhqE8OHBs1cFRdbXFNKuA+bHmRaI+AmRmI= -github.com/antonmedv/expr v1.15.3/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= github.com/appleboy/gin-jwt/v2 v2.9.2 h1:GeS3lm9mb9HMmj7+GNjYUtpp3V1DAQ1TkUFa5poiZ7Y= diff --git a/pkg/apiclient/client.go b/pkg/apiclient/client.go index b702829efd3..3abd42cf009 100644 --- a/pkg/apiclient/client.go +++ b/pkg/apiclient/client.go @@ -39,6 +39,7 @@ type ApiClient struct { Metrics *MetricsService Signal *SignalService HeartBeat *HeartBeatService + UsageMetrics *UsageMetricsService } func (a *ApiClient) GetClient() *http.Client { @@ -108,6 +109,7 @@ func NewClient(config *Config) (*ApiClient, error) { c.Signal = (*SignalService)(&c.common) c.DecisionDelete = (*DecisionDeleteService)(&c.common) c.HeartBeat = (*HeartBeatService)(&c.common) + c.UsageMetrics = (*UsageMetricsService)(&c.common) return c, nil } @@ -144,6 +146,7 @@ func NewDefaultClient(URL *url.URL, prefix string, userAgent string, client *htt c.Signal = (*SignalService)(&c.common) c.DecisionDelete = (*DecisionDeleteService)(&c.common) c.HeartBeat = (*HeartBeatService)(&c.common) + c.UsageMetrics = (*UsageMetricsService)(&c.common) return c, nil } diff --git a/pkg/apiclient/client_test.go b/pkg/apiclient/client_test.go index 2adba170584..dd09811924f 100644 --- a/pkg/apiclient/client_test.go +++ b/pkg/apiclient/client_test.go @@ -348,5 +348,5 @@ func TestNewClientBadAnswer(t *testing.T) { URL: apiURL, VersionPrefix: "v1", }, &http.Client{}) - cstest.RequireErrorContains(t, err, "invalid body: invalid character 'b' looking for beginning of value") + cstest.RequireErrorContains(t, err, "invalid body: bad") } diff --git a/pkg/apiclient/resperr.go b/pkg/apiclient/resperr.go index ff954a73609..e8f12ee9f4e 100644 --- a/pkg/apiclient/resperr.go +++ b/pkg/apiclient/resperr.go @@ -34,12 +34,18 @@ func CheckResponse(r *http.Response) error { data, err := io.ReadAll(r.Body) if err != nil || len(data) == 0 { - ret.Message = ptr.Of(fmt.Sprintf("http code %d, no error message", r.StatusCode)) + ret.Message = ptr.Of(fmt.Sprintf("http code %d, no response body", r.StatusCode)) return ret } - if err := json.Unmarshal(data, ret); err != nil { - return fmt.Errorf("http code %d, invalid body: %w", r.StatusCode, err) + switch r.StatusCode { + case http.StatusUnprocessableEntity: + ret.Message = ptr.Of(fmt.Sprintf("http code %d, invalid request: %s", r.StatusCode, string(data))) + default: + if err := json.Unmarshal(data, ret); err != nil { + ret.Message = ptr.Of(fmt.Sprintf("http code %d, invalid body: %s", r.StatusCode, string(data))) + return ret + } } return ret diff --git a/pkg/apiclient/usagemetrics.go b/pkg/apiclient/usagemetrics.go new file mode 100644 index 00000000000..1d822bb5c1e --- /dev/null +++ b/pkg/apiclient/usagemetrics.go @@ -0,0 +1,29 @@ +package apiclient + +import ( + "context" + "fmt" + "net/http" + + "github.com/crowdsecurity/crowdsec/pkg/models" +) + +type UsageMetricsService service + +func (s *UsageMetricsService) Add(ctx context.Context, metrics *models.AllMetrics) (interface{}, *Response, error) { + u := fmt.Sprintf("%s/usage-metrics", s.client.URLPrefix) + + req, err := s.client.NewRequest(http.MethodPost, u, &metrics) + if err != nil { + return nil, nil, err + } + + var response interface{} + + resp, err := s.client.Do(ctx, req, &response) + if err != nil { + return nil, resp, err + } + + return &response, resp, nil +} diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index 68dc94367e2..284d0acdabf 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -35,26 +35,30 @@ import ( const ( // delta values must be smaller than the interval - pullIntervalDefault = time.Hour * 2 - pullIntervalDelta = 5 * time.Minute - pushIntervalDefault = time.Second * 10 - pushIntervalDelta = time.Second * 7 - metricsIntervalDefault = time.Minute * 30 - metricsIntervalDelta = time.Minute * 15 + pullIntervalDefault = time.Hour * 2 + pullIntervalDelta = 5 * time.Minute + pushIntervalDefault = time.Second * 10 + pushIntervalDelta = time.Second * 7 + metricsIntervalDefault = time.Minute * 30 + metricsIntervalDelta = time.Minute * 15 + usageMetricsInterval = time.Minute * 30 + usageMetricsIntervalFirst = time.Minute * 15 ) type apic struct { // when changing the intervals in tests, always set *First too // or they can be negative - pullInterval time.Duration - pullIntervalFirst time.Duration - pushInterval time.Duration - pushIntervalFirst time.Duration - metricsInterval time.Duration - metricsIntervalFirst time.Duration - dbClient *database.Client - apiClient *apiclient.ApiClient - AlertsAddChan chan []*models.Alert + pullInterval time.Duration + pullIntervalFirst time.Duration + pushInterval time.Duration + pushIntervalFirst time.Duration + metricsInterval time.Duration + metricsIntervalFirst time.Duration + usageMetricsInterval time.Duration + usageMetricsIntervalFirst time.Duration + dbClient *database.Client + apiClient *apiclient.ApiClient + AlertsAddChan chan []*models.Alert mu sync.Mutex pushTomb tomb.Tomb @@ -175,24 +179,26 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con var err error ret := &apic{ - AlertsAddChan: make(chan []*models.Alert), - dbClient: dbClient, - mu: sync.Mutex{}, - startup: true, - credentials: config.Credentials, - pullTomb: tomb.Tomb{}, - pushTomb: tomb.Tomb{}, - metricsTomb: tomb.Tomb{}, - scenarioList: make([]string, 0), - consoleConfig: consoleConfig, - pullInterval: pullIntervalDefault, - pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta), - pushInterval: pushIntervalDefault, - pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta), - metricsInterval: metricsIntervalDefault, - metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta), - isPulling: make(chan bool, 1), - whitelists: apicWhitelist, + AlertsAddChan: make(chan []*models.Alert), + dbClient: dbClient, + mu: sync.Mutex{}, + startup: true, + credentials: config.Credentials, + pullTomb: tomb.Tomb{}, + pushTomb: tomb.Tomb{}, + metricsTomb: tomb.Tomb{}, + scenarioList: make([]string, 0), + consoleConfig: consoleConfig, + pullInterval: pullIntervalDefault, + pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta), + pushInterval: pushIntervalDefault, + pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta), + metricsInterval: metricsIntervalDefault, + metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta), + usageMetricsInterval: usageMetricsInterval, + usageMetricsIntervalFirst: randomDuration(usageMetricsInterval, usageMetricsIntervalFirst), + isPulling: make(chan bool, 1), + whitelists: apicWhitelist, } password := strfmt.Password(config.Credentials.Password) diff --git a/pkg/apiserver/apic_metrics.go b/pkg/apiserver/apic_metrics.go index b8e23629e1e..54640afc2d0 100644 --- a/pkg/apiserver/apic_metrics.go +++ b/pkg/apiserver/apic_metrics.go @@ -2,7 +2,10 @@ package apiserver import ( "context" + "encoding/json" + "net/http" "slices" + "strings" "time" log "github.com/sirupsen/logrus" @@ -11,9 +14,170 @@ import ( "github.com/crowdsecurity/go-cs-lib/trace" "github.com/crowdsecurity/go-cs-lib/version" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" + "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/models" ) +type dbPayload struct { + Metrics []*models.DetailedMetrics `json:"metrics"` +} + +func (a *apic) GetUsageMetrics() (*models.AllMetrics, []int, error) { + allMetrics := &models.AllMetrics{} + metricsIds := make([]int, 0) + + lps, err := a.dbClient.ListMachines() + if err != nil { + return nil, nil, err + } + + bouncers, err := a.dbClient.ListBouncers() + if err != nil { + return nil, nil, err + } + + for _, bouncer := range bouncers { + dbMetrics, err := a.dbClient.GetBouncerUsageMetricsByName(bouncer.Name) + if err != nil { + log.Errorf("unable to get bouncer usage metrics: %s", err) + continue + } + + rcMetrics := models.RemediationComponentsMetrics{} + + rcMetrics.Os = &models.OSversion{ + Name: ptr.Of(bouncer.Osname), + Version: ptr.Of(bouncer.Osversion), + } + rcMetrics.Type = bouncer.Type + rcMetrics.FeatureFlags = strings.Split(bouncer.Featureflags, ",") + rcMetrics.Version = ptr.Of(bouncer.Version) + rcMetrics.Name = bouncer.Name + rcMetrics.LastPull = bouncer.LastPull.UTC().Unix() + + rcMetrics.Metrics = make([]*models.DetailedMetrics, 0) + + // Might seem weird, but we duplicate the bouncers if we have multiple unsent metrics + for _, dbMetric := range dbMetrics { + dbPayload := &dbPayload{} + // Append no matter what, if we cannot unmarshal, there's no way we'll be able to fix it automatically + metricsIds = append(metricsIds, dbMetric.ID) + + err := json.Unmarshal([]byte(dbMetric.Payload), dbPayload) + if err != nil { + log.Errorf("unable to unmarshal bouncer metric (%s)", err) + continue + } + + rcMetrics.Metrics = append(rcMetrics.Metrics, dbPayload.Metrics...) + } + + allMetrics.RemediationComponents = append(allMetrics.RemediationComponents, &rcMetrics) + } + + for _, lp := range lps { + dbMetrics, err := a.dbClient.GetLPUsageMetricsByMachineID(lp.MachineId) + if err != nil { + log.Errorf("unable to get LP usage metrics: %s", err) + continue + } + + lpMetrics := models.LogProcessorsMetrics{} + + lpMetrics.Os = &models.OSversion{ + Name: ptr.Of(lp.Osname), + Version: ptr.Of(lp.Osversion), + } + lpMetrics.FeatureFlags = strings.Split(lp.Featureflags, ",") + lpMetrics.Version = ptr.Of(lp.Version) + lpMetrics.Name = lp.MachineId + lpMetrics.LastPush = lp.LastPush.UTC().Unix() + lpMetrics.LastUpdate = lp.UpdatedAt.UTC().Unix() + + lpMetrics.Datasources = lp.Datasources + + if lp.Hubstate != nil { + // must carry over the hub state even if nothing is installed + hubItems := models.HubItems{} + for itemType, items := range lp.Hubstate { + hubItems[itemType] = []models.HubItem{} + for _, item := range items { + hubItems[itemType] = append(hubItems[itemType], models.HubItem{ + Name: item.Name, + Status: item.Status, + Version: item.Version, + }) + } + + lpMetrics.HubItems = hubItems + } + } else { + lpMetrics.HubItems = models.HubItems{} + } + + lpMetrics.Metrics = make([]*models.DetailedMetrics, 0) + + for _, dbMetric := range dbMetrics { + dbPayload := &dbPayload{} + // Append no matter what, if we cannot unmarshal, there's no way we'll be able to fix it automatically + metricsIds = append(metricsIds, dbMetric.ID) + + err := json.Unmarshal([]byte(dbMetric.Payload), dbPayload) + if err != nil { + log.Errorf("unable to unmarshal log processor metric (%s)", err) + continue + } + + lpMetrics.Metrics = append(lpMetrics.Metrics, dbPayload.Metrics...) + } + + allMetrics.LogProcessors = append(allMetrics.LogProcessors, &lpMetrics) + } + + // FIXME: all of this should only be done once on startup/reload + consoleOptions := strings.Join(csconfig.GetConfig().API.Server.ConsoleConfig.EnabledOptions(), ",") + allMetrics.Lapi = &models.LapiMetrics{ + ConsoleOptions: models.ConsoleOptions{ + consoleOptions, + }, + } + + osName, osVersion := version.DetectOS() + + allMetrics.Lapi.Os = &models.OSversion{ + Name: ptr.Of(osName), + Version: ptr.Of(osVersion), + } + allMetrics.Lapi.Version = ptr.Of(version.String()) + allMetrics.Lapi.FeatureFlags = fflag.Crowdsec.GetEnabledFeatures() + + allMetrics.Lapi.Metrics = make([]*models.DetailedMetrics, 0) + + allMetrics.Lapi.Metrics = append(allMetrics.Lapi.Metrics, &models.DetailedMetrics{ + Meta: &models.MetricsMeta{ + UtcNowTimestamp: ptr.Of(time.Now().UTC().Unix()), + WindowSizeSeconds: ptr.Of(int64(a.metricsInterval.Seconds())), + }, + Items: make([]*models.MetricsDetailItem, 0), + }) + + // Force an actual slice to avoid non existing fields in the json + if allMetrics.RemediationComponents == nil { + allMetrics.RemediationComponents = make([]*models.RemediationComponentsMetrics, 0) + } + + if allMetrics.LogProcessors == nil { + allMetrics.LogProcessors = make([]*models.LogProcessorsMetrics, 0) + } + + return allMetrics, metricsIds, nil +} + +func (a *apic) MarkUsageMetricsAsSent(ids []int) error { + return a.dbClient.MarkUsageMetricsAsSent(ids) +} + func (a *apic) GetMetrics() (*models.Metrics, error) { machines, err := a.dbClient.ListMachines() if err != nil { @@ -160,3 +324,51 @@ func (a *apic) SendMetrics(stop chan (bool)) { } } } + +func (a *apic) SendUsageMetrics() { + defer trace.CatchPanic("lapi/usageMetricsToAPIC") + + firstRun := true + + ticker := time.NewTicker(a.usageMetricsIntervalFirst) + + for { + select { + case <-a.metricsTomb.Dying(): + // The normal metrics routine also kills push/pull tombs, does that make sense ? + ticker.Stop() + return + case <-ticker.C: + if firstRun { + firstRun = false + + ticker.Reset(a.usageMetricsInterval) + } + + metrics, metricsId, err := a.GetUsageMetrics() + if err != nil { + log.Errorf("unable to get usage metrics: %s", err) + continue + } + + _, resp, err := a.apiClient.UsageMetrics.Add(context.Background(), metrics) + if err != nil { + log.Errorf("unable to send usage metrics: %s", err) + + if resp.Response.StatusCode >= http.StatusBadRequest && resp.Response.StatusCode != http.StatusUnprocessableEntity { + // In case of 422, mark the metrics as sent anyway, the API did not like what we sent, + // and it's unlikely we'll be able to fix it + continue + } + } + + err = a.MarkUsageMetricsAsSent(metricsId) + if err != nil { + log.Errorf("unable to mark usage metrics as sent: %s", err) + continue + } + + log.Infof("Sent %d usage metrics", len(metricsId)) + } + } +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index c6074801d7e..bd0b5d39bf4 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -367,6 +367,11 @@ func (s *APIServer) Run(apiReady chan bool) error { s.apic.SendMetrics(make(chan bool)) return nil }) + + s.apic.metricsTomb.Go(func() error { + s.apic.SendUsageMetrics() + return nil + }) } s.httpServerTomb.Go(func() error { @@ -375,7 +380,7 @@ func (s *APIServer) Run(apiReady chan bool) error { if err := s.httpServerTomb.Wait(); err != nil { return fmt.Errorf("local API server stopped with error: %w", err) - } + } return nil } diff --git a/pkg/apiserver/controllers/controller.go b/pkg/apiserver/controllers/controller.go index 51f359244be..8175f431384 100644 --- a/pkg/apiserver/controllers/controller.go +++ b/pkg/apiserver/controllers/controller.go @@ -4,12 +4,13 @@ import ( "context" "net" "net/http" + "strings" "github.com/alexliesenfeld/health" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" - "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1" + v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/csplugin" "github.com/crowdsecurity/crowdsec/pkg/database" @@ -59,6 +60,23 @@ func serveHealth() http.HandlerFunc { return health.NewHandler(checker) } +func eitherAuthMiddleware(jwtMiddleware gin.HandlerFunc, apiKeyMiddleware gin.HandlerFunc) gin.HandlerFunc { + return func(c *gin.Context) { + switch { + case c.GetHeader("X-Api-Key") != "": + apiKeyMiddleware(c) + case c.GetHeader("Authorization") != "": + jwtMiddleware(c) + // uh no auth header. is this TLS with mutual authentication? + case strings.HasPrefix(c.Request.UserAgent(), "crowdsec/"): + // guess log processors by sniffing user-agent + jwtMiddleware(c) + default: + apiKeyMiddleware(c) + } + } +} + func (c *Controller) NewV1() error { var err error @@ -117,6 +135,12 @@ func (c *Controller) NewV1() error { apiKeyAuth.HEAD("/decisions/stream", c.HandlerV1.StreamDecision) } + eitherAuth := groupV1.Group("") + eitherAuth.Use(eitherAuthMiddleware(c.HandlerV1.Middlewares.JWT.Middleware.MiddlewareFunc(), c.HandlerV1.Middlewares.APIKey.MiddlewareFunc())) + { + eitherAuth.POST("/usage-metrics", c.HandlerV1.UsageMetrics) + } + return nil } diff --git a/pkg/apiserver/controllers/v1/errors.go b/pkg/apiserver/controllers/v1/errors.go index 9004528e1b1..d661de44b0e 100644 --- a/pkg/apiserver/controllers/v1/errors.go +++ b/pkg/apiserver/controllers/v1/errors.go @@ -3,6 +3,7 @@ package v1 import ( "errors" "net/http" + "strings" "github.com/gin-gonic/gin" @@ -37,3 +38,32 @@ func (c *Controller) HandleDBErrors(gctx *gin.Context, err error) { return } } + +// collapseRepeatedPrefix collapses repeated occurrences of a given prefix in the text +func collapseRepeatedPrefix(text string, prefix string) string { + count := 0 + for strings.HasPrefix(text, prefix) { + count++ + text = strings.TrimPrefix(text, prefix) + } + + if count > 0 { + return prefix + text + } + + return text +} + +// RepeatedPrefixError wraps an error and removes the repeating prefix from its message +type RepeatedPrefixError struct { + OriginalError error + Prefix string +} + +func (e RepeatedPrefixError) Error() string { + return collapseRepeatedPrefix(e.OriginalError.Error(), e.Prefix) +} + +func (e RepeatedPrefixError) Unwrap() error { + return e.OriginalError +} diff --git a/pkg/apiserver/controllers/v1/errors_test.go b/pkg/apiserver/controllers/v1/errors_test.go new file mode 100644 index 00000000000..89c561f83bd --- /dev/null +++ b/pkg/apiserver/controllers/v1/errors_test.go @@ -0,0 +1,57 @@ +package v1 + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCollapseRepeatedPrefix(t *testing.T) { + tests := []struct { + input string + prefix string + want string + }{ + { + input: "aaabbbcccaaa", + prefix: "aaa", + want: "aaabbbcccaaa", + }, { + input: "hellohellohello world", + prefix: "hello", + want: "hello world", + }, { + input: "ababababxyz", + prefix: "ab", + want: "abxyz", + }, { + input: "xyzxyzxyzxyzxyz", + prefix: "xyz", + want: "xyz", + }, { + input: "123123123456", + prefix: "456", + want: "123123123456", + }, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + assert.Equal(t, tt.want, collapseRepeatedPrefix(tt.input, tt.prefix)) + }) + } +} + +func TestRepeatedPrefixError(t *testing.T) { + originalErr := errors.New("hellohellohello world") + wrappedErr := RepeatedPrefixError{OriginalError: originalErr, Prefix: "hello"} + + want := "hello world" + + assert.Equal(t, want, wrappedErr.Error()) + + assert.Equal(t, originalErr, errors.Unwrap(wrappedErr)) + require.ErrorIs(t, wrappedErr, originalErr) +} diff --git a/pkg/apiserver/controllers/v1/usagemetrics.go b/pkg/apiserver/controllers/v1/usagemetrics.go new file mode 100644 index 00000000000..74f27bb6cf4 --- /dev/null +++ b/pkg/apiserver/controllers/v1/usagemetrics.go @@ -0,0 +1,204 @@ +package v1 + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/go-openapi/strfmt" + log "github.com/sirupsen/logrus" + + "github.com/crowdsecurity/go-cs-lib/ptr" + + "github.com/crowdsecurity/crowdsec/pkg/database/ent" + "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" + "github.com/crowdsecurity/crowdsec/pkg/models" +) + +// updateBaseMetrics updates the base metrics for a machine or bouncer +func (c *Controller) updateBaseMetrics(machineID string, bouncer *ent.Bouncer, baseMetrics models.BaseMetrics, hubItems models.HubItems, datasources map[string]int64) error { + switch { + case machineID != "": + c.DBClient.MachineUpdateBaseMetrics(machineID, baseMetrics, hubItems, datasources) + case bouncer != nil: + c.DBClient.BouncerUpdateBaseMetrics(bouncer.Name, bouncer.Type, baseMetrics) + default: + return errors.New("no machineID or bouncerName set") + } + + return nil +} + +// UsageMetrics receives metrics from log processors and remediation components +func (c *Controller) UsageMetrics(gctx *gin.Context) { + var input models.AllMetrics + + logger := log.WithField("func", "UsageMetrics") + + // parse the payload + + if err := gctx.ShouldBindJSON(&input); err != nil { + logger.Errorf("Failed to bind json: %s", err) + gctx.JSON(http.StatusBadRequest, gin.H{"message": err.Error()}) + + return + } + + if err := input.Validate(strfmt.Default); err != nil { + // work around a nuisance in the generated code + cleanErr := RepeatedPrefixError{ + OriginalError: err, + Prefix: "validation failure list:\n", + } + logger.Errorf("Failed to validate usage metrics: %s", cleanErr) + gctx.JSON(http.StatusUnprocessableEntity, gin.H{"message": cleanErr.Error()}) + + return + } + + var ( + generatedType metric.GeneratedType + generatedBy string + ) + + bouncer, _ := getBouncerFromContext(gctx) + if bouncer != nil { + logger.Tracef("Received usage metris for bouncer: %s", bouncer.Name) + + generatedType = metric.GeneratedTypeRC + generatedBy = bouncer.Name + } + + machineID, _ := getMachineIDFromContext(gctx) + if machineID != "" { + logger.Tracef("Received usage metrics for log processor: %s", machineID) + + generatedType = metric.GeneratedTypeLP + generatedBy = machineID + } + + if generatedBy == "" { + // how did we get here? + logger.Error("No machineID or bouncer in request context after authentication") + gctx.JSON(http.StatusInternalServerError, gin.H{"message": "No machineID or bouncer in request context after authentication"}) + + return + } + + if machineID != "" && bouncer != nil { + logger.Errorf("Payload has both machineID and bouncer") + gctx.JSON(http.StatusBadRequest, gin.H{"message": "Payload has both LP and RC data"}) + + return + } + + var ( + payload map[string]any + baseMetrics models.BaseMetrics + hubItems models.HubItems + datasources map[string]int64 + ) + + switch len(input.LogProcessors) { + case 0: + if machineID != "" { + logger.Errorf("Missing log processor data") + gctx.JSON(http.StatusBadRequest, gin.H{"message": "Missing log processor data"}) + + return + } + case 1: + // the final slice can't have more than one item, + // guaranteed by the swagger schema + item0 := input.LogProcessors[0] + + err := item0.Validate(strfmt.Default) + if err != nil { + logger.Errorf("Failed to validate log processor data: %s", err) + gctx.JSON(http.StatusUnprocessableEntity, gin.H{"message": err.Error()}) + + return + } + + payload = map[string]any{ + "metrics": item0.Metrics, + } + baseMetrics = item0.BaseMetrics + hubItems = item0.HubItems + datasources = item0.Datasources + default: + logger.Errorf("Payload has more than one log processor") + // this is not checked in the swagger schema + gctx.JSON(http.StatusBadRequest, gin.H{"message": "Payload has more than one log processor"}) + + return + } + + switch len(input.RemediationComponents) { + case 0: + if bouncer != nil { + logger.Errorf("Missing remediation component data") + gctx.JSON(http.StatusBadRequest, gin.H{"message": "Missing remediation component data"}) + + return + } + case 1: + item0 := input.RemediationComponents[0] + + err := item0.Validate(strfmt.Default) + if err != nil { + logger.Errorf("Failed to validate remediation component data: %s", err) + gctx.JSON(http.StatusUnprocessableEntity, gin.H{"message": err.Error()}) + + return + } + + payload = map[string]any{ + "type": item0.Type, + "metrics": item0.Metrics, + } + baseMetrics = item0.BaseMetrics + default: + gctx.JSON(http.StatusBadRequest, gin.H{"message": "Payload has more than one remediation component"}) + return + } + + if baseMetrics.Os == nil { + baseMetrics.Os = &models.OSversion{ + Name: ptr.Of(""), + Version: ptr.Of(""), + } + } + + err := c.updateBaseMetrics(machineID, bouncer, baseMetrics, hubItems, datasources) + if err != nil { + logger.Errorf("Failed to update base metrics: %s", err) + c.HandleDBErrors(gctx, err) + + return + } + + jsonPayload, err := json.Marshal(payload) + if err != nil { + logger.Errorf("Failed to marshal usage metrics: %s", err) + c.HandleDBErrors(gctx, err) + + return + } + + receivedAt := time.Now().UTC() + + if _, err := c.DBClient.CreateMetric(generatedType, generatedBy, receivedAt, string(jsonPayload)); err != nil { + logger.Error(err) + c.HandleDBErrors(gctx, err) + + return + } + + // if CreateMetrics() returned nil, the metric was already there, we're good + // and don't split hair about 201 vs 200/204 + + gctx.Status(http.StatusCreated) +} diff --git a/pkg/apiserver/usage_metrics_test.go b/pkg/apiserver/usage_metrics_test.go new file mode 100644 index 00000000000..41dd0ccdc2c --- /dev/null +++ b/pkg/apiserver/usage_metrics_test.go @@ -0,0 +1,384 @@ +package apiserver + +import ( + "context" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/crowdsecurity/crowdsec/pkg/database" + "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" +) + +func TestLPMetrics(t *testing.T) { + tests := []struct { + name string + body string + expectedStatusCode int + expectedResponse string + expectedMetricsCount int + expectedOSName string + expectedOSVersion string + expectedFeatureFlags string + authType string + }{ + { + name: "empty metrics for LP", + body: `{ + }`, + expectedStatusCode: 400, + expectedResponse: "Missing log processor data", + authType: PASSWORD, + }, + { + name: "basic metrics with empty dynamic metrics for LP", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"], + "datasources": {"file": 42}, + "hub_items": {} + } + ] +}`, + expectedStatusCode: 201, + expectedMetricsCount: 1, + expectedResponse: "", + expectedOSName: "foo", + expectedOSVersion: "42", + expectedFeatureFlags: "a,b,c", + authType: PASSWORD, + }, + { + name: "basic metrics with dynamic metrics for LP", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [{"meta":{"utc_now_timestamp":42, "window_size_seconds": 42}, "items": [{"name": "foo", "value": 42, "unit": "bla"}] }, {"meta":{"utc_now_timestamp":43, "window_size_seconds": 42}, "items": [{"name": "foo", "value": 42, "unit": "bla"}] }], + "feature_flags": ["a", "b", "c"], + "datasources": {"file": 42}, + "hub_items": {} + } + ] +}`, + expectedStatusCode: 201, + expectedMetricsCount: 1, + expectedResponse: "", + expectedOSName: "foo", + expectedOSVersion: "42", + expectedFeatureFlags: "a,b,c", + authType: PASSWORD, + }, + { + name: "wrong auth type for LP", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"], + "datasources": {"file": 42}, + "hub_items": {} + } + ] +}`, + expectedStatusCode: 400, + expectedResponse: "Missing remediation component data", + authType: APIKEY, + }, + { + name: "missing OS field for LP", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"], + "datasources": {"file": 42}, + "hub_items": {} + } + ] +}`, + expectedStatusCode: 201, + expectedResponse: "", + expectedMetricsCount: 1, + expectedFeatureFlags: "a,b,c", + authType: PASSWORD, + }, + { + name: "missing datasources for LP", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"], + "hub_items": {} + } + ] +}`, + expectedStatusCode: 422, + expectedResponse: "log_processors.0.datasources in body is required", + authType: PASSWORD, + }, + { + name: "missing feature flags for LP", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "datasources": {"file": 42}, + "hub_items": {} + } + ] +}`, + expectedStatusCode: 201, + expectedMetricsCount: 1, + expectedOSName: "foo", + expectedOSVersion: "42", + authType: PASSWORD, + }, + { + name: "missing OS name", + body: ` +{ + "log_processors": [ + { + "version": "1.42", + "os": {"version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"], + "datasources": {"file": 42}, + "hub_items": {} + } + ] +}`, + expectedStatusCode: 422, + expectedResponse: "log_processors.0.os.name in body is required", + authType: PASSWORD, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lapi := SetupLAPITest(t) + + dbClient, err := database.NewClient(context.Background(), lapi.DBConfig) + if err != nil { + t.Fatalf("unable to create database client: %s", err) + } + + w := lapi.RecordResponse(t, http.MethodPost, "/v1/usage-metrics", strings.NewReader(tt.body), tt.authType) + + assert.Equal(t, tt.expectedStatusCode, w.Code) + assert.Contains(t, w.Body.String(), tt.expectedResponse) + + machine, _ := dbClient.QueryMachineByID("test") + metrics, _ := dbClient.GetLPUsageMetricsByMachineID("test") + + assert.Len(t, metrics, tt.expectedMetricsCount) + assert.Equal(t, tt.expectedOSName, machine.Osname) + assert.Equal(t, tt.expectedOSVersion, machine.Osversion) + assert.Equal(t, tt.expectedFeatureFlags, machine.Featureflags) + + if len(metrics) > 0 { + assert.Equal(t, "test", metrics[0].GeneratedBy) + assert.Equal(t, metric.GeneratedType("LP"), metrics[0].GeneratedType) + } + }) + } +} + +func TestRCMetrics(t *testing.T) { + tests := []struct { + name string + body string + expectedStatusCode int + expectedResponse string + expectedMetricsCount int + expectedOSName string + expectedOSVersion string + expectedFeatureFlags string + authType string + }{ + { + name: "empty metrics for RC", + body: `{ + }`, + expectedStatusCode: 400, + expectedResponse: "Missing remediation component data", + authType: APIKEY, + }, + { + name: "basic metrics with empty dynamic metrics for RC", + body: ` +{ + "remediation_components": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"] + } + ] +}`, + expectedStatusCode: 201, + expectedMetricsCount: 1, + expectedResponse: "", + expectedOSName: "foo", + expectedOSVersion: "42", + expectedFeatureFlags: "a,b,c", + authType: APIKEY, + }, + { + name: "basic metrics with dynamic metrics for RC", + body: ` +{ + "remediation_components": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [{"meta":{"utc_now_timestamp":42, "window_size_seconds": 42}, "items": [{"name": "foo", "value": 42, "unit": "bla"}] }, {"meta":{"utc_now_timestamp":43, "window_size_seconds": 42}, "items": [{"name": "foo", "value": 42, "unit": "bla"}] }], + "feature_flags": ["a", "b", "c"] + } + ] +}`, + expectedStatusCode: 201, + expectedMetricsCount: 1, + expectedResponse: "", + expectedOSName: "foo", + expectedOSVersion: "42", + expectedFeatureFlags: "a,b,c", + authType: APIKEY, + }, + { + name: "wrong auth type for RC", + body: ` +{ + "remediation_components": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"] + } + ] +}`, + expectedStatusCode: 400, + expectedResponse: "Missing log processor data", + authType: PASSWORD, + }, + { + name: "missing OS field for RC", + body: ` +{ + "remediation_components": [ + { + "version": "1.42", + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"] + } + ] +}`, + expectedStatusCode: 201, + expectedResponse: "", + expectedMetricsCount: 1, + expectedFeatureFlags: "a,b,c", + authType: APIKEY, + }, + { + name: "missing feature flags for RC", + body: ` +{ + "remediation_components": [ + { + "version": "1.42", + "os": {"name":"foo", "version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [] + } + ] +}`, + expectedStatusCode: 201, + expectedMetricsCount: 1, + expectedOSName: "foo", + expectedOSVersion: "42", + authType: APIKEY, + }, + { + name: "missing OS name", + body: ` +{ + "remediation_components": [ + { + "version": "1.42", + "os": {"version": "42"}, + "utc_startup_timestamp": 42, + "metrics": [], + "feature_flags": ["a", "b", "c"] + } + ] +}`, + expectedStatusCode: 422, + expectedResponse: "remediation_components.0.os.name in body is required", + authType: APIKEY, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lapi := SetupLAPITest(t) + + dbClient, err := database.NewClient(context.Background(), lapi.DBConfig) + if err != nil { + t.Fatalf("unable to create database client: %s", err) + } + + w := lapi.RecordResponse(t, http.MethodPost, "/v1/usage-metrics", strings.NewReader(tt.body), tt.authType) + + assert.Equal(t, tt.expectedStatusCode, w.Code) + assert.Contains(t, w.Body.String(), tt.expectedResponse) + + bouncer, _ := dbClient.SelectBouncerByName("test") + metrics, _ := dbClient.GetBouncerUsageMetricsByName("test") + + assert.Len(t, metrics, tt.expectedMetricsCount) + assert.Equal(t, tt.expectedOSName, bouncer.Osname) + assert.Equal(t, tt.expectedOSVersion, bouncer.Osversion) + assert.Equal(t, tt.expectedFeatureFlags, bouncer.Featureflags) + + if len(metrics) > 0 { + assert.Equal(t, "test", metrics[0].GeneratedBy) + assert.Equal(t, metric.GeneratedType("RC"), metrics[0].GeneratedType) + } + }) + } +} diff --git a/pkg/csconfig/crowdsec_service_test.go b/pkg/csconfig/crowdsec_service_test.go index 2f41beaf55e..7570b63011e 100644 --- a/pkg/csconfig/crowdsec_service_test.go +++ b/pkg/csconfig/crowdsec_service_test.go @@ -61,9 +61,9 @@ func TestLoadCrowdsec(t *testing.T) { AcquisitionFiles: []string{acquisFullPath}, SimulationFilePath: "./testdata/simulation.yaml", // context is loaded in pkg/alertcontext -// ContextToSend: map[string][]string{ -// "source_ip": {"evt.Parsed.source_ip"}, -// }, + // ContextToSend: map[string][]string{ + // "source_ip": {"evt.Parsed.source_ip"}, + // }, SimulationConfig: &SimulationConfig{ Simulation: ptr.Of(false), }, @@ -100,9 +100,9 @@ func TestLoadCrowdsec(t *testing.T) { ConsoleContextValueLength: 0, AcquisitionFiles: []string{acquisFullPath, acquisInDirFullPath}, // context is loaded in pkg/alertcontext -// ContextToSend: map[string][]string{ -// "source_ip": {"evt.Parsed.source_ip"}, -// }, + // ContextToSend: map[string][]string{ + // "source_ip": {"evt.Parsed.source_ip"}, + // }, SimulationFilePath: "./testdata/simulation.yaml", SimulationConfig: &SimulationConfig{ Simulation: ptr.Of(false), @@ -139,9 +139,9 @@ func TestLoadCrowdsec(t *testing.T) { AcquisitionFiles: []string{}, SimulationFilePath: "", // context is loaded in pkg/alertcontext -// ContextToSend: map[string][]string{ -// "source_ip": {"evt.Parsed.source_ip"}, -// }, + // ContextToSend: map[string][]string{ + // "source_ip": {"evt.Parsed.source_ip"}, + // }, SimulationConfig: &SimulationConfig{ Simulation: ptr.Of(false), }, @@ -184,6 +184,7 @@ func TestLoadCrowdsec(t *testing.T) { t.Run(tc.name, func(t *testing.T) { err := tc.input.LoadCrowdsec() cstest.RequireErrorContains(t, err, tc.expectedErr) + if tc.expectedErr != "" { return } diff --git a/pkg/csconfig/database.go b/pkg/csconfig/database.go index a24eb9e13c3..4ca582cf576 100644 --- a/pkg/csconfig/database.go +++ b/pkg/csconfig/database.go @@ -50,9 +50,10 @@ type AuthGCCfg struct { type FlushDBCfg struct { MaxItems *int `yaml:"max_items,omitempty"` // We could unmarshal as time.Duration, but alert filters right now are a map of strings - MaxAge *string `yaml:"max_age,omitempty"` - BouncersGC *AuthGCCfg `yaml:"bouncers_autodelete,omitempty"` - AgentsGC *AuthGCCfg `yaml:"agents_autodelete,omitempty"` + MaxAge *string `yaml:"max_age,omitempty"` + BouncersGC *AuthGCCfg `yaml:"bouncers_autodelete,omitempty"` + AgentsGC *AuthGCCfg `yaml:"agents_autodelete,omitempty"` + MetricsMaxAge *time.Duration `yaml:"metrics_max_age,omitempty"` } func (c *Config) LoadDBConfig(inCli bool) error { @@ -80,9 +81,9 @@ func (c *Config) LoadDBConfig(inCli bool) error { case err != nil: log.Warnf("unable to determine if database is on network filesystem: %s", err) log.Warning( - "You are using sqlite without WAL, this can have a performance impact. " + - "If you do not store the database in a network share, set db_config.use_wal to true. " + - "Set explicitly to false to disable this warning.") + "You are using sqlite without WAL, this can have a performance impact. " + + "If you do not store the database in a network share, set db_config.use_wal to true. " + + "Set explicitly to false to disable this warning.") case isNetwork: log.Debugf("database is on network filesystem (%s), setting useWal to false", fsType) c.DbConfig.UseWal = ptr.Of(false) diff --git a/pkg/database/bouncers.go b/pkg/database/bouncers.go index 03a3227301d..ff750e63c59 100644 --- a/pkg/database/bouncers.go +++ b/pkg/database/bouncers.go @@ -2,14 +2,36 @@ package database import ( "fmt" + "strings" "time" "github.com/pkg/errors" "github.com/crowdsecurity/crowdsec/pkg/database/ent" "github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer" + "github.com/crowdsecurity/crowdsec/pkg/models" ) +func (c *Client) BouncerUpdateBaseMetrics(bouncerName string, bouncerType string, baseMetrics models.BaseMetrics) error { + os := baseMetrics.Os + features := strings.Join(baseMetrics.FeatureFlags, ",") + + _, err := c.Ent.Bouncer. + Update(). + Where(bouncer.NameEQ(bouncerName)). + SetNillableVersion(baseMetrics.Version). + SetOsname(*os.Name). + SetOsversion(*os.Version). + SetFeatureflags(features). + SetType(bouncerType). + Save(c.CTX) + if err != nil { + return fmt.Errorf("unable to update base bouncer metrics in database: %w", err) + } + + return nil +} + func (c *Client) SelectBouncer(apiKeyHash string) (*ent.Bouncer, error) { result, err := c.Ent.Bouncer.Query().Where(bouncer.APIKeyEQ(apiKeyHash)).First(c.CTX) if err != nil { diff --git a/pkg/database/ent/metric.go b/pkg/database/ent/metric.go index 236d54da25d..47f3b4df4e5 100644 --- a/pkg/database/ent/metric.go +++ b/pkg/database/ent/metric.go @@ -22,8 +22,8 @@ type Metric struct { // Source of the metrics: machine id, bouncer name... // It must come from the auth middleware. GeneratedBy string `json:"generated_by,omitempty"` - // When the metrics are collected/calculated at the source - CollectedAt time.Time `json:"collected_at,omitempty"` + // When the metrics are received by LAPI + ReceivedAt time.Time `json:"received_at,omitempty"` // When the metrics are sent to the console PushedAt *time.Time `json:"pushed_at,omitempty"` // The actual metrics (item0) @@ -40,7 +40,7 @@ func (*Metric) scanValues(columns []string) ([]any, error) { values[i] = new(sql.NullInt64) case metric.FieldGeneratedType, metric.FieldGeneratedBy, metric.FieldPayload: values[i] = new(sql.NullString) - case metric.FieldCollectedAt, metric.FieldPushedAt: + case metric.FieldReceivedAt, metric.FieldPushedAt: values[i] = new(sql.NullTime) default: values[i] = new(sql.UnknownType) @@ -75,11 +75,11 @@ func (m *Metric) assignValues(columns []string, values []any) error { } else if value.Valid { m.GeneratedBy = value.String } - case metric.FieldCollectedAt: + case metric.FieldReceivedAt: if value, ok := values[i].(*sql.NullTime); !ok { - return fmt.Errorf("unexpected type %T for field collected_at", values[i]) + return fmt.Errorf("unexpected type %T for field received_at", values[i]) } else if value.Valid { - m.CollectedAt = value.Time + m.ReceivedAt = value.Time } case metric.FieldPushedAt: if value, ok := values[i].(*sql.NullTime); !ok { @@ -136,8 +136,8 @@ func (m *Metric) String() string { builder.WriteString("generated_by=") builder.WriteString(m.GeneratedBy) builder.WriteString(", ") - builder.WriteString("collected_at=") - builder.WriteString(m.CollectedAt.Format(time.ANSIC)) + builder.WriteString("received_at=") + builder.WriteString(m.ReceivedAt.Format(time.ANSIC)) builder.WriteString(", ") if v := m.PushedAt; v != nil { builder.WriteString("pushed_at=") diff --git a/pkg/database/ent/metric/metric.go b/pkg/database/ent/metric/metric.go index 879f1006d64..78e88982220 100644 --- a/pkg/database/ent/metric/metric.go +++ b/pkg/database/ent/metric/metric.go @@ -17,8 +17,8 @@ const ( FieldGeneratedType = "generated_type" // FieldGeneratedBy holds the string denoting the generated_by field in the database. FieldGeneratedBy = "generated_by" - // FieldCollectedAt holds the string denoting the collected_at field in the database. - FieldCollectedAt = "collected_at" + // FieldReceivedAt holds the string denoting the received_at field in the database. + FieldReceivedAt = "received_at" // FieldPushedAt holds the string denoting the pushed_at field in the database. FieldPushedAt = "pushed_at" // FieldPayload holds the string denoting the payload field in the database. @@ -32,7 +32,7 @@ var Columns = []string{ FieldID, FieldGeneratedType, FieldGeneratedBy, - FieldCollectedAt, + FieldReceivedAt, FieldPushedAt, FieldPayload, } @@ -88,9 +88,9 @@ func ByGeneratedBy(opts ...sql.OrderTermOption) OrderOption { return sql.OrderByField(FieldGeneratedBy, opts...).ToFunc() } -// ByCollectedAt orders the results by the collected_at field. -func ByCollectedAt(opts ...sql.OrderTermOption) OrderOption { - return sql.OrderByField(FieldCollectedAt, opts...).ToFunc() +// ByReceivedAt orders the results by the received_at field. +func ByReceivedAt(opts ...sql.OrderTermOption) OrderOption { + return sql.OrderByField(FieldReceivedAt, opts...).ToFunc() } // ByPushedAt orders the results by the pushed_at field. diff --git a/pkg/database/ent/metric/where.go b/pkg/database/ent/metric/where.go index e49f80f3411..72bd9d93cd7 100644 --- a/pkg/database/ent/metric/where.go +++ b/pkg/database/ent/metric/where.go @@ -59,9 +59,9 @@ func GeneratedBy(v string) predicate.Metric { return predicate.Metric(sql.FieldEQ(FieldGeneratedBy, v)) } -// CollectedAt applies equality check predicate on the "collected_at" field. It's identical to CollectedAtEQ. -func CollectedAt(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldEQ(FieldCollectedAt, v)) +// ReceivedAt applies equality check predicate on the "received_at" field. It's identical to ReceivedAtEQ. +func ReceivedAt(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldEQ(FieldReceivedAt, v)) } // PushedAt applies equality check predicate on the "pushed_at" field. It's identical to PushedAtEQ. @@ -159,44 +159,44 @@ func GeneratedByContainsFold(v string) predicate.Metric { return predicate.Metric(sql.FieldContainsFold(FieldGeneratedBy, v)) } -// CollectedAtEQ applies the EQ predicate on the "collected_at" field. -func CollectedAtEQ(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldEQ(FieldCollectedAt, v)) +// ReceivedAtEQ applies the EQ predicate on the "received_at" field. +func ReceivedAtEQ(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldEQ(FieldReceivedAt, v)) } -// CollectedAtNEQ applies the NEQ predicate on the "collected_at" field. -func CollectedAtNEQ(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldNEQ(FieldCollectedAt, v)) +// ReceivedAtNEQ applies the NEQ predicate on the "received_at" field. +func ReceivedAtNEQ(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldNEQ(FieldReceivedAt, v)) } -// CollectedAtIn applies the In predicate on the "collected_at" field. -func CollectedAtIn(vs ...time.Time) predicate.Metric { - return predicate.Metric(sql.FieldIn(FieldCollectedAt, vs...)) +// ReceivedAtIn applies the In predicate on the "received_at" field. +func ReceivedAtIn(vs ...time.Time) predicate.Metric { + return predicate.Metric(sql.FieldIn(FieldReceivedAt, vs...)) } -// CollectedAtNotIn applies the NotIn predicate on the "collected_at" field. -func CollectedAtNotIn(vs ...time.Time) predicate.Metric { - return predicate.Metric(sql.FieldNotIn(FieldCollectedAt, vs...)) +// ReceivedAtNotIn applies the NotIn predicate on the "received_at" field. +func ReceivedAtNotIn(vs ...time.Time) predicate.Metric { + return predicate.Metric(sql.FieldNotIn(FieldReceivedAt, vs...)) } -// CollectedAtGT applies the GT predicate on the "collected_at" field. -func CollectedAtGT(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldGT(FieldCollectedAt, v)) +// ReceivedAtGT applies the GT predicate on the "received_at" field. +func ReceivedAtGT(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldGT(FieldReceivedAt, v)) } -// CollectedAtGTE applies the GTE predicate on the "collected_at" field. -func CollectedAtGTE(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldGTE(FieldCollectedAt, v)) +// ReceivedAtGTE applies the GTE predicate on the "received_at" field. +func ReceivedAtGTE(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldGTE(FieldReceivedAt, v)) } -// CollectedAtLT applies the LT predicate on the "collected_at" field. -func CollectedAtLT(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldLT(FieldCollectedAt, v)) +// ReceivedAtLT applies the LT predicate on the "received_at" field. +func ReceivedAtLT(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldLT(FieldReceivedAt, v)) } -// CollectedAtLTE applies the LTE predicate on the "collected_at" field. -func CollectedAtLTE(v time.Time) predicate.Metric { - return predicate.Metric(sql.FieldLTE(FieldCollectedAt, v)) +// ReceivedAtLTE applies the LTE predicate on the "received_at" field. +func ReceivedAtLTE(v time.Time) predicate.Metric { + return predicate.Metric(sql.FieldLTE(FieldReceivedAt, v)) } // PushedAtEQ applies the EQ predicate on the "pushed_at" field. diff --git a/pkg/database/ent/metric_create.go b/pkg/database/ent/metric_create.go index 8fa656db427..973cddd41d0 100644 --- a/pkg/database/ent/metric_create.go +++ b/pkg/database/ent/metric_create.go @@ -32,9 +32,9 @@ func (mc *MetricCreate) SetGeneratedBy(s string) *MetricCreate { return mc } -// SetCollectedAt sets the "collected_at" field. -func (mc *MetricCreate) SetCollectedAt(t time.Time) *MetricCreate { - mc.mutation.SetCollectedAt(t) +// SetReceivedAt sets the "received_at" field. +func (mc *MetricCreate) SetReceivedAt(t time.Time) *MetricCreate { + mc.mutation.SetReceivedAt(t) return mc } @@ -103,8 +103,8 @@ func (mc *MetricCreate) check() error { if _, ok := mc.mutation.GeneratedBy(); !ok { return &ValidationError{Name: "generated_by", err: errors.New(`ent: missing required field "Metric.generated_by"`)} } - if _, ok := mc.mutation.CollectedAt(); !ok { - return &ValidationError{Name: "collected_at", err: errors.New(`ent: missing required field "Metric.collected_at"`)} + if _, ok := mc.mutation.ReceivedAt(); !ok { + return &ValidationError{Name: "received_at", err: errors.New(`ent: missing required field "Metric.received_at"`)} } if _, ok := mc.mutation.Payload(); !ok { return &ValidationError{Name: "payload", err: errors.New(`ent: missing required field "Metric.payload"`)} @@ -143,9 +143,9 @@ func (mc *MetricCreate) createSpec() (*Metric, *sqlgraph.CreateSpec) { _spec.SetField(metric.FieldGeneratedBy, field.TypeString, value) _node.GeneratedBy = value } - if value, ok := mc.mutation.CollectedAt(); ok { - _spec.SetField(metric.FieldCollectedAt, field.TypeTime, value) - _node.CollectedAt = value + if value, ok := mc.mutation.ReceivedAt(); ok { + _spec.SetField(metric.FieldReceivedAt, field.TypeTime, value) + _node.ReceivedAt = value } if value, ok := mc.mutation.PushedAt(); ok { _spec.SetField(metric.FieldPushedAt, field.TypeTime, value) diff --git a/pkg/database/ent/migrate/schema.go b/pkg/database/ent/migrate/schema.go index 60bf72a486b..986f5bc8c67 100644 --- a/pkg/database/ent/migrate/schema.go +++ b/pkg/database/ent/migrate/schema.go @@ -254,7 +254,7 @@ var ( {Name: "id", Type: field.TypeInt, Increment: true}, {Name: "generated_type", Type: field.TypeEnum, Enums: []string{"LP", "RC"}}, {Name: "generated_by", Type: field.TypeString}, - {Name: "collected_at", Type: field.TypeTime}, + {Name: "received_at", Type: field.TypeTime}, {Name: "pushed_at", Type: field.TypeTime, Nullable: true}, {Name: "payload", Type: field.TypeString, Size: 2147483647}, } @@ -263,13 +263,6 @@ var ( Name: "metrics", Columns: MetricsColumns, PrimaryKey: []*schema.Column{MetricsColumns[0]}, - Indexes: []*schema.Index{ - { - Name: "metric_generated_type_generated_by_collected_at", - Unique: true, - Columns: []*schema.Column{MetricsColumns[1], MetricsColumns[2], MetricsColumns[3]}, - }, - }, } // Tables holds all the tables in the schema. Tables = []*schema.Table{ diff --git a/pkg/database/ent/mutation.go b/pkg/database/ent/mutation.go index 5b70457c512..5c6596f3db4 100644 --- a/pkg/database/ent/mutation.go +++ b/pkg/database/ent/mutation.go @@ -8640,7 +8640,7 @@ type MetricMutation struct { id *int generated_type *metric.GeneratedType generated_by *string - collected_at *time.Time + received_at *time.Time pushed_at *time.Time payload *string clearedFields map[string]struct{} @@ -8819,40 +8819,40 @@ func (m *MetricMutation) ResetGeneratedBy() { m.generated_by = nil } -// SetCollectedAt sets the "collected_at" field. -func (m *MetricMutation) SetCollectedAt(t time.Time) { - m.collected_at = &t +// SetReceivedAt sets the "received_at" field. +func (m *MetricMutation) SetReceivedAt(t time.Time) { + m.received_at = &t } -// CollectedAt returns the value of the "collected_at" field in the mutation. -func (m *MetricMutation) CollectedAt() (r time.Time, exists bool) { - v := m.collected_at +// ReceivedAt returns the value of the "received_at" field in the mutation. +func (m *MetricMutation) ReceivedAt() (r time.Time, exists bool) { + v := m.received_at if v == nil { return } return *v, true } -// OldCollectedAt returns the old "collected_at" field's value of the Metric entity. +// OldReceivedAt returns the old "received_at" field's value of the Metric entity. // If the Metric object wasn't provided to the builder, the object is fetched from the database. // An error is returned if the mutation operation is not UpdateOne, or the database query fails. -func (m *MetricMutation) OldCollectedAt(ctx context.Context) (v time.Time, err error) { +func (m *MetricMutation) OldReceivedAt(ctx context.Context) (v time.Time, err error) { if !m.op.Is(OpUpdateOne) { - return v, errors.New("OldCollectedAt is only allowed on UpdateOne operations") + return v, errors.New("OldReceivedAt is only allowed on UpdateOne operations") } if m.id == nil || m.oldValue == nil { - return v, errors.New("OldCollectedAt requires an ID field in the mutation") + return v, errors.New("OldReceivedAt requires an ID field in the mutation") } oldValue, err := m.oldValue(ctx) if err != nil { - return v, fmt.Errorf("querying old value for OldCollectedAt: %w", err) + return v, fmt.Errorf("querying old value for OldReceivedAt: %w", err) } - return oldValue.CollectedAt, nil + return oldValue.ReceivedAt, nil } -// ResetCollectedAt resets all changes to the "collected_at" field. -func (m *MetricMutation) ResetCollectedAt() { - m.collected_at = nil +// ResetReceivedAt resets all changes to the "received_at" field. +func (m *MetricMutation) ResetReceivedAt() { + m.received_at = nil } // SetPushedAt sets the "pushed_at" field. @@ -8981,8 +8981,8 @@ func (m *MetricMutation) Fields() []string { if m.generated_by != nil { fields = append(fields, metric.FieldGeneratedBy) } - if m.collected_at != nil { - fields = append(fields, metric.FieldCollectedAt) + if m.received_at != nil { + fields = append(fields, metric.FieldReceivedAt) } if m.pushed_at != nil { fields = append(fields, metric.FieldPushedAt) @@ -9002,8 +9002,8 @@ func (m *MetricMutation) Field(name string) (ent.Value, bool) { return m.GeneratedType() case metric.FieldGeneratedBy: return m.GeneratedBy() - case metric.FieldCollectedAt: - return m.CollectedAt() + case metric.FieldReceivedAt: + return m.ReceivedAt() case metric.FieldPushedAt: return m.PushedAt() case metric.FieldPayload: @@ -9021,8 +9021,8 @@ func (m *MetricMutation) OldField(ctx context.Context, name string) (ent.Value, return m.OldGeneratedType(ctx) case metric.FieldGeneratedBy: return m.OldGeneratedBy(ctx) - case metric.FieldCollectedAt: - return m.OldCollectedAt(ctx) + case metric.FieldReceivedAt: + return m.OldReceivedAt(ctx) case metric.FieldPushedAt: return m.OldPushedAt(ctx) case metric.FieldPayload: @@ -9050,12 +9050,12 @@ func (m *MetricMutation) SetField(name string, value ent.Value) error { } m.SetGeneratedBy(v) return nil - case metric.FieldCollectedAt: + case metric.FieldReceivedAt: v, ok := value.(time.Time) if !ok { return fmt.Errorf("unexpected type %T for field %s", value, name) } - m.SetCollectedAt(v) + m.SetReceivedAt(v) return nil case metric.FieldPushedAt: v, ok := value.(time.Time) @@ -9135,8 +9135,8 @@ func (m *MetricMutation) ResetField(name string) error { case metric.FieldGeneratedBy: m.ResetGeneratedBy() return nil - case metric.FieldCollectedAt: - m.ResetCollectedAt() + case metric.FieldReceivedAt: + m.ResetReceivedAt() return nil case metric.FieldPushedAt: m.ResetPushedAt() diff --git a/pkg/database/ent/schema/metric.go b/pkg/database/ent/schema/metric.go index b47da78bdf3..319c67b7aa7 100644 --- a/pkg/database/ent/schema/metric.go +++ b/pkg/database/ent/schema/metric.go @@ -3,7 +3,6 @@ package schema import ( "entgo.io/ent" "entgo.io/ent/schema/field" - "entgo.io/ent/schema/index" ) // Metric is actually a set of metrics collected by a device @@ -21,9 +20,9 @@ func (Metric) Fields() []ent.Field { field.String("generated_by"). Immutable(). Comment("Source of the metrics: machine id, bouncer name...\nIt must come from the auth middleware."), - field.Time("collected_at"). + field.Time("received_at"). Immutable(). - Comment("When the metrics are collected/calculated at the source"), + Comment("When the metrics are received by LAPI"), field.Time("pushed_at"). Nillable(). Optional(). @@ -33,11 +32,3 @@ func (Metric) Fields() []ent.Field { Comment("The actual metrics (item0)"), } } - -func (Metric) Indexes() []ent.Index { - return []ent.Index{ - // Don't store the same metrics multiple times. - index.Fields("generated_type", "generated_by", "collected_at"). - Unique(), - } -} diff --git a/pkg/database/flush.go b/pkg/database/flush.go index 5a1f0bea5bf..5d53d10c942 100644 --- a/pkg/database/flush.go +++ b/pkg/database/flush.go @@ -8,15 +8,24 @@ import ( "github.com/go-co-op/gocron" log "github.com/sirupsen/logrus" + "github.com/crowdsecurity/go-cs-lib/ptr" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/database/ent/alert" "github.com/crowdsecurity/crowdsec/pkg/database/ent/bouncer" "github.com/crowdsecurity/crowdsec/pkg/database/ent/decision" "github.com/crowdsecurity/crowdsec/pkg/database/ent/event" "github.com/crowdsecurity/crowdsec/pkg/database/ent/machine" + "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" "github.com/crowdsecurity/crowdsec/pkg/types" ) +const ( + // how long to keep metrics in the local database + defaultMetricsMaxAge = 7 * 24 * time.Hour + flushInterval = 1 * time.Minute +) + func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) { maxItems := 0 maxAge := "" @@ -91,17 +100,46 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched } } - baJob, err := scheduler.Every(1).Minute().Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC) + baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC) if err != nil { return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err) } baJob.SingletonMode() + + metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, config.MetricsMaxAge) + if err != nil { + return nil, fmt.Errorf("while starting flushMetrics scheduler: %w", err) + } + + metricsJob.SingletonMode() + scheduler.StartAsync() return scheduler, nil } +// flushMetrics deletes metrics older than maxAge, regardless if they have been pushed to CAPI or not +func (c *Client) flushMetrics(maxAge *time.Duration) { + if maxAge == nil { + maxAge = ptr.Of(defaultMetricsMaxAge) + } + + c.Log.Debugf("flushing metrics older than %s", maxAge) + + deleted, err := c.Ent.Metric.Delete().Where( + metric.ReceivedAtLTE(time.Now().UTC().Add(-*maxAge)), + ).Exec(c.CTX) + if err != nil { + c.Log.Errorf("while flushing metrics: %s", err) + return + } + + if deleted > 0 { + c.Log.Debugf("flushed %d metrics snapshots", deleted) + } +} + func (c *Client) FlushOrphans() { /* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */ /* We want to take care of orphaned events for which the parent alert/decision has been deleted */ @@ -117,7 +155,6 @@ func (c *Client) FlushOrphans() { eventsCount, err = c.Ent.Decision.Delete().Where( decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX) - if err != nil { c.Log.Warningf("error while deleting orphan decisions: %s", err) return @@ -138,7 +175,6 @@ func (c *Client) flushBouncers(authType string, duration *time.Duration) { ).Where( bouncer.AuthTypeEQ(authType), ).Exec(c.CTX) - if err != nil { c.Log.Errorf("while auto-deleting expired bouncers (%s): %s", authType, err) return @@ -159,7 +195,6 @@ func (c *Client) flushAgents(authType string, duration *time.Duration) { machine.Not(machine.HasAlerts()), machine.AuthTypeEQ(authType), ).Exec(c.CTX) - if err != nil { c.Log.Errorf("while auto-deleting expired machines (%s): %s", authType, err) return @@ -253,7 +288,6 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { if maxid > 0 { // This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX) - if err != nil { c.Log.Errorf("FlushAlerts: Could not delete alerts: %s", err) return fmt.Errorf("could not delete alerts: %w", err) diff --git a/pkg/database/machines.go b/pkg/database/machines.go index 18fd32fdd84..21349b8b687 100644 --- a/pkg/database/machines.go +++ b/pkg/database/machines.go @@ -2,6 +2,7 @@ package database import ( "fmt" + "strings" "time" "github.com/go-openapi/strfmt" @@ -10,6 +11,8 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/database/ent" "github.com/crowdsecurity/crowdsec/pkg/database/ent/machine" + "github.com/crowdsecurity/crowdsec/pkg/database/ent/schema" + "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/crowdsecurity/crowdsec/pkg/types" ) @@ -18,6 +21,48 @@ const ( CapiListsMachineID = types.ListOrigin ) +func (c *Client) MachineUpdateBaseMetrics(machineID string, baseMetrics models.BaseMetrics, hubItems models.HubItems, datasources map[string]int64) error { + os := baseMetrics.Os + features := strings.Join(baseMetrics.FeatureFlags, ",") + + var heartbeat time.Time + + if baseMetrics.Metrics == nil || len(baseMetrics.Metrics) == 0 { + heartbeat = time.Now().UTC() + } else { + heartbeat = time.Unix(*baseMetrics.Metrics[0].Meta.UtcNowTimestamp, 0) + } + + hubState := map[string][]schema.ItemState{} + for itemType, items := range hubItems { + hubState[itemType] = []schema.ItemState{} + for _, item := range items { + hubState[itemType] = append(hubState[itemType], schema.ItemState{ + Name: item.Name, + Status: item.Status, + Version: item.Version, + }) + } + } + + _, err := c.Ent.Machine. + Update(). + Where(machine.MachineIdEQ(machineID)). + SetNillableVersion(baseMetrics.Version). + SetOsname(*os.Name). + SetOsversion(*os.Version). + SetFeatureflags(features). + SetLastHeartbeat(heartbeat). + SetHubstate(hubState). + SetDatasources(datasources). + Save(c.CTX) + if err != nil { + return fmt.Errorf("unable to update base machine metrics in database: %w", err) + } + + return nil +} + func (c *Client) CreateMachine(machineID *string, password *strfmt.Password, ipAddress string, isValidated bool, force bool, authType string) (*ent.Machine, error) { hashPassword, err := bcrypt.GenerateFromPassword([]byte(*password), bcrypt.DefaultCost) if err != nil { @@ -158,7 +203,7 @@ func (c *Client) UpdateMachineScenarios(scenarios string, ID int) error { SetScenarios(scenarios). Save(c.CTX) if err != nil { - return fmt.Errorf("unable to update machine in database: %s", err) + return fmt.Errorf("unable to update machine in database: %w", err) } return nil @@ -169,7 +214,7 @@ func (c *Client) UpdateMachineIP(ipAddr string, ID int) error { SetIpAddress(ipAddr). Save(c.CTX) if err != nil { - return fmt.Errorf("unable to update machine IP in database: %s", err) + return fmt.Errorf("unable to update machine IP in database: %w", err) } return nil @@ -180,7 +225,7 @@ func (c *Client) UpdateMachineVersion(ipAddr string, ID int) error { SetVersion(ipAddr). Save(c.CTX) if err != nil { - return fmt.Errorf("unable to update machine version in database: %s", err) + return fmt.Errorf("unable to update machine version in database: %w", err) } return nil diff --git a/pkg/database/metrics.go b/pkg/database/metrics.go new file mode 100644 index 00000000000..3bc5e7b5d32 --- /dev/null +++ b/pkg/database/metrics.go @@ -0,0 +1,73 @@ +package database + +import ( + "fmt" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/database/ent" + "github.com/crowdsecurity/crowdsec/pkg/database/ent/metric" +) + +func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy string, receivedAt time.Time, payload string) (*ent.Metric, error) { + metric, err := c.Ent.Metric. + Create(). + SetGeneratedType(generatedType). + SetGeneratedBy(generatedBy). + SetReceivedAt(receivedAt). + SetPayload(payload). + Save(c.CTX) + if err != nil { + c.Log.Warningf("CreateMetric: %s", err) + return nil, fmt.Errorf("storing metrics snapshot for '%s' at %s: %w", generatedBy, receivedAt, InsertFail) + } + + return metric, nil +} + +func (c *Client) GetLPUsageMetricsByMachineID(machineId string) ([]*ent.Metric, error) { + metrics, err := c.Ent.Metric.Query(). + Where( + metric.GeneratedTypeEQ(metric.GeneratedTypeLP), + metric.GeneratedByEQ(machineId), + metric.PushedAtIsNil(), + ). + // XXX: do we need to sort? + Order(ent.Desc(metric.FieldReceivedAt)). + All(c.CTX) + if err != nil { + c.Log.Warningf("GetLPUsageMetricsByOrigin: %s", err) + return nil, fmt.Errorf("getting LP usage metrics by origin %s: %w", machineId, err) + } + + return metrics, nil +} + +func (c *Client) GetBouncerUsageMetricsByName(bouncerName string) ([]*ent.Metric, error) { + metrics, err := c.Ent.Metric.Query(). + Where( + metric.GeneratedTypeEQ(metric.GeneratedTypeRC), + metric.GeneratedByEQ(bouncerName), + metric.PushedAtIsNil(), + ). + Order(ent.Desc(metric.FieldReceivedAt)). + All(c.CTX) + if err != nil { + c.Log.Warningf("GetBouncerUsageMetricsByName: %s", err) + return nil, fmt.Errorf("getting bouncer usage metrics by name %s: %w", bouncerName, err) + } + + return metrics, nil +} + +func (c *Client) MarkUsageMetricsAsSent(ids []int) error { + _, err := c.Ent.Metric.Update(). + Where(metric.IDIn(ids...)). + SetPushedAt(time.Now().UTC()). + Save(c.CTX) + if err != nil { + c.Log.Warningf("MarkUsageMetricsAsSent: %s", err) + return fmt.Errorf("marking usage metrics as sent: %w", err) + } + + return nil +} diff --git a/test/bats/08_metrics_bouncer.bats b/test/bats/08_metrics_bouncer.bats index 84a55dc88c1..1851ed0ac14 100644 --- a/test/bats/08_metrics_bouncer.bats +++ b/test/bats/08_metrics_bouncer.bats @@ -15,7 +15,6 @@ setup() { load "../lib/setup.sh" ./instance-data load ./instance-crowdsec start - skip "require the usage_metrics endpoint on apiserver" } teardown() { @@ -75,6 +74,18 @@ teardown() { payload=$(yq -o j '.remediation_components[0].utc_startup_timestamp = 1707399316' <<<"$payload") rune -0 curl-with-key '/v1/usage-metrics' -X POST --data "$payload" refute_output + + payload=$(yq -o j '.remediation_components[0].metrics = [{"meta": {}}]' <<<"$payload") + rune -22 curl-with-key '/v1/usage-metrics' -X POST --data "$payload" + assert_stderr --partial "error: 422" + rune -0 jq -r '.message' <(output) + assert_output - <<-EOT + validation failure list: + remediation_components.0.metrics.0.items in body is required + validation failure list: + remediation_components.0.metrics.0.meta.utc_now_timestamp in body is required + remediation_components.0.metrics.0.meta.window_size_seconds in body is required + EOT } @test "rc usage metrics (good payload)" { @@ -116,7 +127,7 @@ teardown() { rune -0 cscli metrics show bouncers -o json # aggregation is ok -- we are truncating, not rounding, because the float is mandated by swagger. # but without labels the origin string is empty - assert_json '{bouncers:{testbouncer:{"": {"foo": {"dogyear": 2, "pound": 5}}}}}' + assert_json '{bouncers:{testbouncer:{"": {foo: {dogyear: 2, pound: 5}}}}}' rune -0 cscli metrics show bouncers assert_output - <<-EOT @@ -137,7 +148,7 @@ teardown() { { "meta": {"utc_now_timestamp": 1707399916, "window_size_seconds":600}, "items":[ - {"name": "active_decisions", "unit": "ip", "value": 51936, "labels": {"ip_type": "ipv4", "origin": "lists:firehol_voipbl"}}, + {"name": "active_decisions", "unit": "ip", "value": 500, "labels": {"ip_type": "ipv4", "origin": "lists:firehol_voipbl"}}, {"name": "active_decisions", "unit": "ip", "value": 1, "labels": {"ip_type": "ipv6", "origin": "cscli"}}, {"name": "dropped", "unit": "byte", "value": 3800, "labels": {"ip_type": "ipv4", "origin": "CAPI"}}, {"name": "dropped", "unit": "byte", "value": 0, "labels": {"ip_type": "ipv4", "origin": "cscli"}}, @@ -191,7 +202,7 @@ teardown() { }, "lists:firehol_voipbl": { "active_decisions": { - "ip": 51936 + "ip": 500 }, "dropped": { "byte": 3847, @@ -219,14 +230,198 @@ teardown() { | cscli (manual decisions) | 1 | 380 | 10 | - | - | | lists:anotherlist | - | 0 | 0 | - | - | | lists:firehol_cruzit_web_attacks | - | 1.03k | 23 | - | - | - | lists:firehol_voipbl | 51.94k | 3.85k | 58 | - | - | + | lists:firehol_voipbl | 500 | 3.85k | 58 | - | - | + +----------------------------------+------------------+---------+---------+---------+-------+ + | Total | 501 | 9.06k | 191 | 2 | 5 | + +----------------------------------+------------------+---------+---------+---------+-------+ + EOT + + # active_decisions is actually a gauge: values should not be aggregated, keep only the latest one + + payload=$(yq -o j ' + .remediation_components[0].metrics = [ + { + "meta": {"utc_now_timestamp": 1707450000, "window_size_seconds":600}, + "items":[ + {"name": "active_decisions", "unit": "ip", "value": 250, "labels": {"ip_type": "ipv4", "origin": "lists:firehol_voipbl"}}, + {"name": "active_decisions", "unit": "ip", "value": 10, "labels": {"ip_type": "ipv6", "origin": "cscli"}} + ] + } + ] | + .remediation_components[0].type = "crowdsec-firewall-bouncer" + ' <<<"$payload") + + rune -0 curl-with-key '/v1/usage-metrics' -X POST --data "$payload" + rune -0 cscli metrics show bouncers -o json + assert_json '{ + "bouncers": { + "testbouncer": { + "": { + "foo": { + "dogyear": 2, + "pound": 5 + } + }, + "CAPI": { + "dropped": { + "byte": 3800, + "packet": 100 + } + }, + "cscli": { + "active_decisions": { + "ip": 10 + }, + "dropped": { + "byte": 380, + "packet": 10 + } + }, + "lists:firehol_cruzit_web_attacks": { + "dropped": { + "byte": 1034, + "packet": 23 + } + }, + "lists:firehol_voipbl": { + "active_decisions": { + "ip": 250 + }, + "dropped": { + "byte": 3847, + "packet": 58 + }, + }, + "lists:anotherlist": { + "dropped": { + "byte": 0, + "packet": 0 + } + } + } + } + }' + + rune -0 cscli metrics show bouncers + assert_output - <<-EOT + Bouncer Metrics (testbouncer) since 2024-02-08 13:35:16 +0000 UTC: + +----------------------------------+------------------+-------------------+-----------------+ + | Origin | active_decisions | dropped | foo | + | | IPs | bytes | packets | dogyear | pound | + +----------------------------------+------------------+---------+---------+---------+-------+ + | CAPI (community blocklist) | - | 3.80k | 100 | - | - | + | cscli (manual decisions) | 10 | 380 | 10 | - | - | + | lists:anotherlist | - | 0 | 0 | - | - | + | lists:firehol_cruzit_web_attacks | - | 1.03k | 23 | - | - | + | lists:firehol_voipbl | 250 | 3.85k | 58 | - | - | +----------------------------------+------------------+---------+---------+---------+-------+ - | Total | 51.94k | 9.06k | 191 | 2 | 5 | + | Total | 260 | 9.06k | 191 | 2 | 5 | +----------------------------------+------------------+---------+---------+---------+-------+ EOT +} - # TODO: multiple item lists +@test "rc usage metrics (unknown metrics)" { + # new metrics are introduced in a new bouncer version, unknown by this version of cscli: some are gauges, some are not + + API_KEY=$(cscli bouncers add testbouncer -o raw) + export API_KEY + + payload=$(yq -o j <<-EOT + remediation_components: + - version: "v1.0" + utc_startup_timestamp: 1707369316 + log_processors: [] + EOT + ) + + payload=$(yq -o j ' + .remediation_components[0].metrics = [ + { + "meta": {"utc_now_timestamp": 1707460000, "window_size_seconds":600}, + "items":[ + {"name": "ima_gauge", "unit": "second", "value": 30, "labels": {"origin": "cscli"}}, + {"name": "notagauge", "unit": "inch", "value": 15, "labels": {"origin": "cscli"}} + ] + }, { + "meta": {"utc_now_timestamp": 1707450000, "window_size_seconds":600}, + "items":[ + {"name": "ima_gauge", "unit": "second", "value": 20, "labels": {"origin": "cscli"}}, + {"name": "notagauge", "unit": "inch", "value": 10, "labels": {"origin": "cscli"}} + ] + } + ] | + .remediation_components[0].type = "crowdsec-firewall-bouncer" + ' <<<"$payload") + + rune -0 curl-with-key '/v1/usage-metrics' -X POST --data "$payload" + + rune -0 cscli metrics show bouncers -o json + assert_json '{bouncers: {testbouncer: {cscli: {ima_gauge: {second: 30}, notagauge: {inch: 25}}}}}' + rune -0 cscli metrics show bouncers + assert_output - <<-EOT + Bouncer Metrics (testbouncer) since 2024-02-09 03:40:00 +0000 UTC: + +--------------------------+--------+-----------+ + | Origin | ima | notagauge | + | | second | inch | + +--------------------------+--------+-----------+ + | cscli (manual decisions) | 30 | 25 | + +--------------------------+--------+-----------+ + | Total | 30 | 25 | + +--------------------------+--------+-----------+ + EOT +} + +@test "rc usage metrics (ipv4/ipv6)" { + # gauge metrics are not aggregated over time, but they are over ip type + + API_KEY=$(cscli bouncers add testbouncer -o raw) + export API_KEY + + payload=$(yq -o j <<-EOT + remediation_components: + - version: "v1.0" + utc_startup_timestamp: 1707369316 + log_processors: [] + EOT + ) + + payload=$(yq -o j ' + .remediation_components[0].metrics = [ + { + "meta": {"utc_now_timestamp": 1707460000, "window_size_seconds":600}, + "items":[ + {"name": "active_decisions", "unit": "ip", "value": 200, "labels": {"ip_type": "ipv4", "origin": "cscli"}}, + {"name": "active_decisions", "unit": "ip", "value": 30, "labels": {"ip_type": "ipv6", "origin": "cscli"}} + ] + }, { + "meta": {"utc_now_timestamp": 1707450000, "window_size_seconds":600}, + "items":[ + {"name": "active_decisions", "unit": "ip", "value": 400, "labels": {"ip_type": "ipv4", "origin": "cscli"}}, + {"name": "active_decisions", "unit": "ip", "value": 50, "labels": {"ip_type": "ipv6", "origin": "cscli"}} + ] + } + ] | + .remediation_components[0].type = "crowdsec-firewall-bouncer" + ' <<<"$payload") + + rune -0 curl-with-key '/v1/usage-metrics' -X POST --data "$payload" + + rune -0 cscli metrics show bouncers -o json + assert_json '{bouncers: {testbouncer: {cscli: {active_decisions: {ip: 230}}}}}' + + rune -0 cscli metrics show bouncers + assert_output - <<-EOT + Bouncer Metrics (testbouncer) since 2024-02-09 03:40:00 +0000 UTC: + +--------------------------+------------------+ + | Origin | active_decisions | + | | IPs | + +--------------------------+------------------+ + | cscli (manual decisions) | 230 | + +--------------------------+------------------+ + | Total | 230 | + +--------------------------+------------------+ + EOT } @test "rc usage metrics (multiple bouncers)" { diff --git a/test/bats/08_metrics_machines.bats b/test/bats/08_metrics_machines.bats index e63078124a9..3b73839e753 100644 --- a/test/bats/08_metrics_machines.bats +++ b/test/bats/08_metrics_machines.bats @@ -15,7 +15,6 @@ setup() { load "../lib/setup.sh" ./instance-data load ./instance-crowdsec start - skip "require the usage_metrics endpoint on apiserver" } teardown() { diff --git a/test/bats/11_bouncers_tls.bats b/test/bats/11_bouncers_tls.bats index 849b3a5b35c..554308ae962 100644 --- a/test/bats/11_bouncers_tls.bats +++ b/test/bats/11_bouncers_tls.bats @@ -162,6 +162,35 @@ teardown() { rune cscli bouncers delete localhost@127.0.0.1 } +@test "a bouncer authenticated with TLS can send metrics" { + payload=$(yq -o j <<-EOT + remediation_components: [] + log_processors: [] + EOT + ) + + # with mutual authentication there is no api key, so it's detected as RC if user agent != crowdsec + + rune -22 curl --fail-with-body -sS \ + --cert "$tmpdir/leaf.pem" \ + --key "$tmpdir/leaf-key.pem" \ + --cacert "$tmpdir/bundle.pem" \ + https://localhost:8080/v1/usage-metrics -X POST --data "$payload" + assert_stderr --partial 'error: 400' + assert_json '{message: "Missing remediation component data"}' + + rune -22 curl --fail-with-body -sS \ + --cert "$tmpdir/leaf.pem" \ + --key "$tmpdir/leaf-key.pem" \ + --cacert "$tmpdir/bundle.pem" \ + --user-agent "crowdsec/someversion" \ + https://localhost:8080/v1/usage-metrics -X POST --data "$payload" + assert_stderr --partial 'error: 401' + assert_json '{code:401, message: "cookie token is empty"}' + + rune cscli bouncers delete localhost@127.0.0.1 +} + @test "simulate a bouncer request with an invalid cert" { rune -77 curl --fail-with-body -sS \ --cert "$tmpdir/leaf_invalid.pem" \