Skip to content

Commit

Permalink
(wip) lp metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Feb 13, 2024
1 parent 8315daf commit b6120d5
Show file tree
Hide file tree
Showing 19 changed files with 309 additions and 35 deletions.
29 changes: 18 additions & 11 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,36 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)

func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, error) {
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, []acquisition.DataSource, error) {
var err error

if err = alertcontext.LoadConsoleContext(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading context: %w", err)
return nil, nil, fmt.Errorf("while loading context: %w", err)
}

// Start loading configs
csParsers := parser.NewParsers(hub)
if csParsers, err = parser.LoadParsers(cConfig, csParsers); err != nil {
return nil, fmt.Errorf("while loading parsers: %w", err)
return nil, nil, fmt.Errorf("while loading parsers: %w", err)
}

if err := LoadBuckets(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading scenarios: %w", err)
return nil, nil, fmt.Errorf("while loading scenarios: %w", err)
}

if err := appsec.LoadAppsecRules(hub); err != nil {
return nil, fmt.Errorf("while loading appsec rules: %w", err)
return nil, nil, fmt.Errorf("while loading appsec rules: %w", err)
}

if err := LoadAcquisition(cConfig); err != nil {
return nil, fmt.Errorf("while loading acquisition config: %w", err)
datasources, err := LoadAcquisition(cConfig)
if err != nil {
return nil, nil, fmt.Errorf("while loading acquisition config: %w", err)
}

return csParsers, nil
return csParsers, datasources, nil
}

func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub) error {
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
inputEventChan = make(chan types.Event)
inputLineChan = make(chan types.Event)

Expand Down Expand Up @@ -141,6 +142,12 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
})
outputWg.Wait()

lpMetricsTomb.Go(func() error {
// in case of reload, we send a new startup time
// (use crowdsecT0 as a reference for the first startup time)
return lpMetrics(apiClient, cConfig.API.Server.ConsoleConfig, datasources)
})

if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
aggregated := false
if cConfig.Prometheus.Level == "aggregated" {
Expand All @@ -161,7 +168,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
return nil
}

func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
crowdsecTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveCrowdsec")

Expand All @@ -171,7 +178,7 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
agentReady <- true

if err := runCrowdsec(cConfig, parsers, hub); err != nil {
if err := runCrowdsec(cConfig, parsers, hub, datasources); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()
Expand Down
113 changes: 113 additions & 0 deletions cmd/crowdsec/lpmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"context"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/blackfireio/osinfo"
"time"

"github.com/crowdsecurity/go-cs-lib/ptr"
"github.com/crowdsecurity/go-cs-lib/trace"

"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
"github.com/crowdsecurity/crowdsec/pkg/fflag"
"github.com/crowdsecurity/crowdsec/pkg/models"
)

func detectOs() (string, string) {
if cwversion.System == "docker" {
return "docker", ""
}

osInfo, err := osinfo.GetOSInfo()
if err != nil {
return cwversion.System, "???"
}

return osInfo.Name, osInfo.Version
}

func lpMetricsPayload(consoleCfg *csconfig.ConsoleConfig, datasources []acquisition.DataSource, windowSize int, utcStartupTimestamp int64) models.AllMetrics {
meta := &models.MetricsMeta{
UtcStartupTimestamp: float64(utcStartupTimestamp),
WindowSizeSeconds: int64(windowSize),
}

osName, osVersion := detectOs()

os := &models.OSversion{
Name: osName,
Version: osVersion,
}

features := fflag.Crowdsec.GetEnabledFeatures()

datasourceMap := map[string]int64{}

for _, ds := range datasources {
datasourceMap[ds.GetName()] += 1
}

return models.AllMetrics{
LogProcessors: []models.LogProcessorsMetrics{
{
&models.LogProcessorsMetricsItems0{
BaseMetrics: models.BaseMetrics{
Meta: meta,
Os: os,
Version: ptr.Of(cwversion.VersionStr()),
FeatureFlags: features,
},
ConsoleOptions: consoleCfg.EnabledOptions(),
Datasources: datasourceMap,
},
},
},
}
}

// lpMetrics collects metrics from the LP and sends them to the LAPI
func lpMetrics(client *apiclient.ApiClient, consoleCfg *csconfig.ConsoleConfig, datasources []acquisition.DataSource) error {
defer trace.CatchPanic("crowdsec/runLpMetrics")
log.Trace("Starting lpMetrics goroutine")

windowSize := 6
utcStartupEpoch := time.Now().Unix()

met := lpMetricsPayload(consoleCfg, datasources, windowSize, utcStartupEpoch)

ticker := time.NewTicker(time.Duration(windowSize) * time.Second)

log.Tracef("Sending lp metrics every %d seconds", windowSize)

LOOP:
for {
select {
case <-ticker.C:
met.LogProcessors[0][0].Meta.UtcNowTimestamp = float64(time.Now().Unix())

_, resp, err := client.UsageMetrics.Add(context.Background(), &met)
if err != nil {
log.Errorf("failed to send lp metrics: %s", err)
continue
}

if resp.Response.StatusCode != http.StatusCreated {
log.Errorf("failed to send lp metrics: %s", resp.Response.Status)
continue
}

log.Tracef("lp usage metrics sent")
case <-lpMetricsTomb.Dying():
break LOOP
}
}

ticker.Stop()

return nil
}
25 changes: 13 additions & 12 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (

var (
/*tombs for the parser, buckets and outputs.*/
acquisTomb tomb.Tomb
parsersTomb tomb.Tomb
bucketsTomb tomb.Tomb
outputsTomb tomb.Tomb
apiTomb tomb.Tomb
crowdsecTomb tomb.Tomb
pluginTomb tomb.Tomb
acquisTomb tomb.Tomb
parsersTomb tomb.Tomb
bucketsTomb tomb.Tomb
outputsTomb tomb.Tomb
apiTomb tomb.Tomb
crowdsecTomb tomb.Tomb
pluginTomb tomb.Tomb
lpMetricsTomb tomb.Tomb

flags *Flags

Expand Down Expand Up @@ -107,7 +108,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
return nil
}

func LoadAcquisition(cConfig *csconfig.Config) error {
func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
var err error

if flags.SingleFileType != "" && flags.OneShotDSN != "" {
Expand All @@ -116,20 +117,20 @@ func LoadAcquisition(cConfig *csconfig.Config) error {

dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil {
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
return nil, errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {
return err
return nil, err
}
}

if len(dataSources) == 0 {
return fmt.Errorf("no datasource enabled")
return nil, fmt.Errorf("no datasource enabled")
}

return nil
return dataSources, nil
}

var (
Expand Down
8 changes: 4 additions & 4 deletions cmd/crowdsec/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
return nil, fmt.Errorf("while loading hub index: %w", err)
}

csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return nil, fmt.Errorf("unable to init crowdsec: %w", err)
}
Expand All @@ -103,7 +103,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
}

agentReady := make(chan bool, 1)
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
}

log.Printf("Reload is finished")
Expand Down Expand Up @@ -369,14 +369,14 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
return fmt.Errorf("while loading hub index: %w", err)
}

csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return fmt.Errorf("crowdsec init: %w", err)
}

// if it's just linting, we're done
if !flags.TestMode {
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
} else {
agentReady <- true
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apiclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ApiClient struct {
Metrics *MetricsService
Signal *SignalService
HeartBeat *HeartBeatService
UsageMetrics *UsageMetricsService
}

func (a *ApiClient) GetClient() *http.Client {
Expand Down Expand Up @@ -95,6 +96,7 @@ func NewClient(config *Config) (*ApiClient, error) {
c.Signal = (*SignalService)(&c.common)
c.DecisionDelete = (*DecisionDeleteService)(&c.common)
c.HeartBeat = (*HeartBeatService)(&c.common)
c.UsageMetrics = (*UsageMetricsService)(&c.common)

return c, nil
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/apiclient/usagemetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package apiclient

import (
"context"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/crowdsec/pkg/models"
)

type UsageMetricsService service

func (s *UsageMetricsService) Add(ctx context.Context, metrics *models.AllMetrics) (interface{}, *Response, error) {
log.Warnf("prefix: %s", s.client.URLPrefix)
u := fmt.Sprintf("%s/usage-metrics/", s.client.URLPrefix)

req, err := s.client.NewRequest(http.MethodPost, u, &metrics)
if err != nil {
return nil, nil, err
}

Check warning on line 22 in pkg/apiclient/usagemetrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiclient/usagemetrics.go#L15-L22

Added lines #L15 - L22 were not covered by tests

var response interface{}

resp, err := s.client.Do(ctx, req, &response)
if err != nil {
return nil, resp, err
}

Check warning on line 29 in pkg/apiclient/usagemetrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiclient/usagemetrics.go#L24-L29

Added lines #L24 - L29 were not covered by tests

return &response, resp, nil

Check warning on line 31 in pkg/apiclient/usagemetrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiclient/usagemetrics.go#L31

Added line #L31 was not covered by tests
}
2 changes: 2 additions & 0 deletions pkg/apiserver/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (c *Controller) NewV1() error {
jwtAuth.DELETE("/decisions", c.HandlerV1.DeleteDecisions)
jwtAuth.DELETE("/decisions/:decision_id", c.HandlerV1.DeleteDecisionById)
jwtAuth.GET("/heartbeat", c.HandlerV1.HeartBeat)
jwtAuth.POST("/usage-metrics", c.HandlerV1.UsageMetrics)
}

apiKeyAuth := groupV1.Group("")
Expand All @@ -115,6 +116,7 @@ func (c *Controller) NewV1() error {
apiKeyAuth.HEAD("/decisions", c.HandlerV1.GetDecision)
apiKeyAuth.GET("/decisions/stream", c.HandlerV1.StreamDecision)
apiKeyAuth.HEAD("/decisions/stream", c.HandlerV1.StreamDecision)
// apiKeyAuth.POST("/usage-metrics", c.HandlerV1.UsageMetrics)
}

return nil
Expand Down
41 changes: 41 additions & 0 deletions pkg/apiserver/controllers/v1/usagemetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package v1

import (
"net/http"

jwt "github.com/appleboy/gin-jwt/v2"
"github.com/gin-gonic/gin"
"github.com/sanity-io/litter"
"github.com/go-openapi/strfmt"
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/crowdsec/pkg/models"
)

// UsageMetrics receives metrics from log processors and remediation components
func (c *Controller) UsageMetrics(gctx *gin.Context) {
var input models.AllMetrics

claims := jwt.ExtractClaims(gctx)
// TBD: use defined rather than hardcoded key to find back owner
machineID := claims["id"].(string)

if err := gctx.ShouldBindJSON(&input); err != nil {
log.Errorf("Failed to bind json: %s", err)
gctx.JSON(http.StatusBadRequest, gin.H{"message": err.Error()})
return
}

Check warning on line 27 in pkg/apiserver/controllers/v1/usagemetrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiserver/controllers/v1/usagemetrics.go#L16-L27

Added lines #L16 - L27 were not covered by tests

if err := input.Validate(strfmt.Default); err != nil {
log.Errorf("Failed to validate input: %s", err)
c.HandleDBErrors(gctx, err)
return
}

Check warning on line 33 in pkg/apiserver/controllers/v1/usagemetrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiserver/controllers/v1/usagemetrics.go#L29-L33

Added lines #L29 - L33 were not covered by tests

log.Infof("Received all metrics from %s", machineID)

inputStr := litter.Sdump(input)
log.Info(inputStr)

gctx.JSON(http.StatusCreated, "FOOBAR")

Check warning on line 40 in pkg/apiserver/controllers/v1/usagemetrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiserver/controllers/v1/usagemetrics.go#L35-L40

Added lines #L35 - L40 were not covered by tests
}
Loading

0 comments on commit b6120d5

Please sign in to comment.