Skip to content

Commit

Permalink
Merge branch 'master' into fix_rpm
Browse files Browse the repository at this point in the history
  • Loading branch information
sabban authored Mar 14, 2024
2 parents b45a1cf + 2a7e838 commit 0926120
Show file tree
Hide file tree
Showing 34 changed files with 531 additions and 129 deletions.
3 changes: 2 additions & 1 deletion cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/crowdsecurity/go-cs-lib/trace"

"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
"github.com/crowdsecurity/crowdsec/pkg/appsec"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
Expand Down Expand Up @@ -147,7 +148,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H

if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
aggregated := false
if cConfig.Prometheus.Level == "aggregated" {
if cConfig.Prometheus.Level == configuration.CFG_METRICS_AGGREGATE {
aggregated = true
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error)
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec, cConfig.Prometheus)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/crowdsec/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/crowdsecurity/go-cs-lib/trace"
"github.com/crowdsecurity/go-cs-lib/version"

"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
"github.com/crowdsecurity/crowdsec/pkg/cache"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
Expand Down Expand Up @@ -161,7 +162,7 @@ func registerPrometheus(config *csconfig.PrometheusCfg) {

// Registering prometheus
// If in aggregated mode, do not register events associated with a source, to keep the cardinality low
if config.Level == "aggregated" {
if config.Level == configuration.CFG_METRICS_AGGREGATE {
log.Infof("Loading aggregated prometheus collectors")
prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo,
globalCsInfo, globalParsingHistogram, globalPourHistogram,
Expand Down
30 changes: 25 additions & 5 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type DataSource interface {
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks.
Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks.
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
Expand Down Expand Up @@ -94,7 +94,7 @@ func GetDataSourceIface(dataSourceType string) DataSource {
// if the configuration is not valid it returns an error.
// If the datasource can't be run (eg. journalctl not available), it still returns an error which
// can be checked for the appropriate action.
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg, metricsLevel int) (*DataSource, error) {
// we dump it back to []byte, because we want to decode the yaml blob twice:
// once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
yamlConfig, err := yaml.Marshal(commonConfig)
Expand Down Expand Up @@ -122,7 +122,7 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS
return nil, &DataSourceUnavailableError{Name: commonConfig.Source, Err: err}
}
/* configure the actual datasource */
if err := dataSrc.Configure(yamlConfig, subLogger); err != nil {
if err := dataSrc.Configure(yamlConfig, subLogger, metricsLevel); err != nil {
return nil, fmt.Errorf("failed to configure datasource %s: %w", commonConfig.Source, err)

}
Expand Down Expand Up @@ -180,10 +180,30 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string, transformExpr
return sources, nil
}

func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int {
if prom == nil {
return configuration.METRICS_FULL

}
if !prom.Enabled {
return configuration.METRICS_NONE
}
if prom.Level == configuration.CFG_METRICS_AGGREGATE {
return configuration.METRICS_AGGREGATE
}

if prom.Level == configuration.CFG_METRICS_FULL {
return configuration.METRICS_FULL
}
return configuration.METRICS_FULL

}

// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) {
var sources []DataSource

metrics_level := GetMetricsLevelFromPromCfg(prom)
for _, acquisFile := range config.AcquisitionFiles {
log.Infof("loading acquisition file : %s", acquisFile)
yamlFile, err := os.Open(acquisFile)
Expand Down Expand Up @@ -225,7 +245,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
}
uniqueId := uuid.NewString()
sub.UniqueId = uniqueId
src, err := DataSourceConfigure(sub)
src, err := DataSourceConfigure(sub, metrics_level)
if err != nil {
var dserr *DataSourceUnavailableError
if errors.As(err, &dserr) {
Expand Down
16 changes: 9 additions & 7 deletions pkg/acquisition/acquisition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (f *MockSource) UnmarshalConfig(cfg []byte) error {
return nil
}

func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error {
func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
if err := f.UnmarshalConfig(cfg); err != nil {
return err
Expand Down Expand Up @@ -182,7 +182,7 @@ wowo: ajsajasjas
t.Run(tc.TestName, func(t *testing.T) {
common := configuration.DataSourceCommonCfg{}
yaml.Unmarshal([]byte(tc.String), &common)
ds, err := DataSourceConfigure(common)
ds, err := DataSourceConfigure(common, configuration.METRICS_NONE)
cstest.RequireErrorContains(t, err, tc.ExpectedError)
if tc.ExpectedError != "" {
return
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) {
for _, tc := range tests {
tc := tc
t.Run(tc.TestName, func(t *testing.T) {
dss, err := LoadAcquisitionFromFile(&tc.Config)
dss, err := LoadAcquisitionFromFile(&tc.Config, nil)
cstest.RequireErrorContains(t, err, tc.ExpectedError)
if tc.ExpectedError != "" {
return
Expand All @@ -305,7 +305,7 @@ type MockCat struct {
logger *log.Entry
}

func (f *MockCat) Configure(cfg []byte, logger *log.Entry) error {
func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
if f.Mode == "" {
f.Mode = configuration.CAT_MODE
Expand Down Expand Up @@ -349,7 +349,7 @@ type MockTail struct {
logger *log.Entry
}

func (f *MockTail) Configure(cfg []byte, logger *log.Entry) error {
func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
if f.Mode == "" {
f.Mode = configuration.TAIL_MODE
Expand Down Expand Up @@ -497,8 +497,10 @@ type MockSourceByDSN struct {
logger *log.Entry //nolint: unused
}

func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry) error { return nil }
func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
return nil
}
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
Expand Down
11 changes: 11 additions & 0 deletions pkg/acquisition/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,14 @@ type DataSourceCommonCfg struct {
var TAIL_MODE = "tail"
var CAT_MODE = "cat"
var SERVER_MODE = "server" // No difference with tail, just a bit more verbose

const (
METRICS_NONE = iota
METRICS_AGGREGATE
METRICS_FULL
)

const (
CFG_METRICS_AGGREGATE = "aggregated"
CFG_METRICS_FULL = "full"
)
5 changes: 3 additions & 2 deletions pkg/acquisition/modules/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type AppsecSourceConfig struct {

// runtime structure of AppsecSourceConfig
type AppsecSource struct {
metricsLevel int
config AppsecSourceConfig
logger *log.Entry
mux *http.ServeMux
Expand Down Expand Up @@ -149,13 +150,13 @@ func (w *AppsecSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{AppsecReqCounter, AppsecBlockCounter, AppsecRuleHits, AppsecOutbandParsingHistogram, AppsecInbandParsingHistogram, AppsecGlobalParsingHistogram}
}

func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry) error {
func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
err := w.UnmarshalConfig(yamlConfig)
if err != nil {
return errors.Wrap(err, "unable to parse appsec configuration")
}
w.logger = logger

w.metricsLevel = MetricsLevel
w.logger.Tracef("Appsec configuration: %+v", w.config)

if w.config.AuthCacheDuration == nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/acquisition/modules/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ var linesRead = prometheus.NewCounterVec(

// CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group
type CloudwatchSource struct {
Config CloudwatchSourceConfiguration
metricsLevel int
Config CloudwatchSourceConfiguration
/*runtime stuff*/
logger *log.Entry
t *tomb.Tomb
Expand Down Expand Up @@ -152,11 +153,12 @@ func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}

func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry) error {
func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
err := cw.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
cw.metricsLevel = MetricsLevel

cw.logger = logger.WithField("group", cw.Config.GroupName)

Expand Down Expand Up @@ -385,7 +387,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
if !stream.t.Alive() {
cw.logger.Debugf("stream %s already exists, but is dead", newStream.StreamName)
cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...)
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec()
if cw.metricsLevel != configuration.METRICS_NONE {
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec()
}
break
}
shouldCreate = false
Expand All @@ -395,7 +399,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha

//let's start watching this stream
if shouldCreate {
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc()
if cw.metricsLevel != configuration.METRICS_NONE {
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc()
}
newStream.t = tomb.Tomb{}
newStream.logger = cw.logger.WithFields(log.Fields{"stream": newStream.StreamName})
cw.logger.Debugf("starting tail of stream %s", newStream.StreamName)
Expand All @@ -409,7 +415,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
for idx, stream := range cw.monitoredStreams {
if !cw.monitoredStreams[idx].t.Alive() {
cw.logger.Debugf("remove dead stream %s", stream.StreamName)
openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec()
if cw.metricsLevel != configuration.METRICS_NONE {
openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec()
}
} else {
newMonitoredStreams = append(newMonitoredStreams, stream)
}
Expand Down Expand Up @@ -485,7 +493,9 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
cfg.logger.Warningf("cwLogToEvent error, discarded event : %s", err)
} else {
cfg.logger.Debugf("pushing message : %s", evt.Line.Raw)
linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc()
if cw.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc()
}
outChan <- evt
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/acquisition/modules/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -427,7 +428,7 @@ stream_name: test_stream`),
dbgLogger.Logger.SetLevel(log.DebugLevel)
dbgLogger.Infof("starting test")
cw := CloudwatchSource{}
err := cw.Configure(tc.config, dbgLogger)
err := cw.Configure(tc.config, dbgLogger, configuration.METRICS_NONE)
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)

if tc.expectedCfgErr != "" {
Expand Down Expand Up @@ -559,7 +560,7 @@ stream_name: test_stream`),
dbgLogger := log.New().WithField("test", tc.name)
dbgLogger.Logger.SetLevel(log.DebugLevel)
cw := CloudwatchSource{}
err := cw.Configure(tc.config, dbgLogger)
err := cw.Configure(tc.config, dbgLogger, configuration.METRICS_NONE)
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
if tc.expectedCfgErr != "" {
return
Expand Down
9 changes: 6 additions & 3 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type DockerConfiguration struct {
}

type DockerSource struct {
metricsLevel int
Config DockerConfiguration
runningContainerState map[string]*ContainerConfig
compiledContainerName []*regexp.Regexp
Expand Down Expand Up @@ -128,9 +129,9 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}

func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error {
func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
d.logger = logger

d.metricsLevel = MetricsLevel
err := d.UnmarshalConfig(yamlConfig)
if err != nil {
return err
Expand Down Expand Up @@ -325,7 +326,9 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
l.Src = containerConfig.Name
l.Process = true
l.Module = d.GetName()
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
if d.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
}
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
Expand Down
9 changes: 5 additions & 4 deletions pkg/acquisition/modules/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/crowdsecurity/go-cs-lib/cstest"

"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
dockerTypes "github.com/docker/docker/api/types"
dockerContainer "github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -60,7 +61,7 @@ container_name:

for _, test := range tests {
f := DockerSource{}
err := f.Configure([]byte(test.config), subLogger)
err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
cstest.AssertErrorContains(t, err, test.expectedErr)
}
}
Expand Down Expand Up @@ -162,7 +163,7 @@ container_name_regexp:

for _, ts := range tests {
var (
logger *log.Logger
logger *log.Logger
subLogger *log.Entry
)

Expand All @@ -182,7 +183,7 @@ container_name_regexp:
out := make(chan types.Event)
dockerSource := DockerSource{}

err := dockerSource.Configure([]byte(ts.config), subLogger)
err := dockerSource.Configure([]byte(ts.config), subLogger, configuration.METRICS_NONE)
if err != nil {
t.Fatalf("Unexpected error : %s", err)
}
Expand Down Expand Up @@ -304,7 +305,7 @@ func TestOneShot(t *testing.T) {
for _, ts := range tests {
var (
subLogger *log.Entry
logger *log.Logger
logger *log.Logger
)

if ts.expectedOutput != "" {
Expand Down
Loading

0 comments on commit 0926120

Please sign in to comment.