Skip to content

Commit

Permalink
bin/crowdsec: avoid writing errors twice when log_media=stdout (#2876)
Browse files Browse the repository at this point in the history
* bin/crowdsec: avoid writing errors twice when log_media=stdout
simpler, correct hook usage
* lint
  • Loading branch information
mmetc authored Mar 7, 2024
1 parent e611d01 commit 98560d0
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 69 deletions.
2 changes: 1 addition & 1 deletion cmd/crowdsec/api.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"errors"
"fmt"
"runtime"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/go-cs-lib/trace"
Expand Down
28 changes: 28 additions & 0 deletions cmd/crowdsec/fatalhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"io"

log "github.com/sirupsen/logrus"
)

// FatalHook is used to log fatal messages to stderr when the rest goes to a file
type FatalHook struct {
Writer io.Writer
LogLevels []log.Level
}

func (hook *FatalHook) Fire(entry *log.Entry) error {
line, err := entry.String()
if err != nil {
return err
}

_, err = hook.Writer.Write([]byte(line))

return err
}

func (hook *FatalHook) Levels() []log.Level {
return hook.LogLevels
}
43 changes: 0 additions & 43 deletions cmd/crowdsec/hook.go

This file was deleted.

21 changes: 14 additions & 7 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Flags struct {
DisableCAPI bool
Transform string
OrderEvent bool
CpuProfile string
CPUProfile string
}

type labelsMap map[string]string
Expand Down Expand Up @@ -181,7 +181,7 @@ func (f *Flags) Parse() {
}

flag.StringVar(&dumpFolder, "dump-data", "", "dump parsers/buckets raw outputs")
flag.StringVar(&f.CpuProfile, "cpu-profile", "", "write cpu profile to file")
flag.StringVar(&f.CPUProfile, "cpu-profile", "", "write cpu profile to file")
flag.Parse()
}

Expand Down Expand Up @@ -249,7 +249,12 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
return nil, err
}

primalHook.Enabled = (cConfig.Common.LogMedia != "stdout")
if cConfig.Common.LogMedia != "stdout" {
log.AddHook(&FatalHook{
Writer: os.Stderr,
LogLevels: []log.Level{log.FatalLevel, log.PanicLevel},
})
}

if err := csconfig.LoadFeatureFlagsFile(configFile, log.StandardLogger()); err != nil {
return nil, err
Expand Down Expand Up @@ -323,7 +328,9 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
var crowdsecT0 time.Time

func main() {
log.AddHook(primalHook)
// The initial log level is INFO, even if the user provided an -error or -warning flag
// because we need feature flags before parsing cli flags
log.SetFormatter(&log.TextFormatter{TimestampFormat: time.RFC3339, FullTimestamp: true})

if err := fflag.RegisterAllFeatures(); err != nil {
log.Fatalf("failed to register features: %s", err)
Expand Down Expand Up @@ -355,13 +362,13 @@ func main() {
os.Exit(0)
}

if flags.CpuProfile != "" {
f, err := os.Create(flags.CpuProfile)
if flags.CPUProfile != "" {
f, err := os.Create(flags.CPUProfile)
if err != nil {
log.Fatalf("could not create CPU profile: %s", err)
}

log.Infof("CPU profile will be written to %s", flags.CpuProfile)
log.Infof("CPU profile will be written to %s", flags.CPUProfile)

if err := pprof.StartCPUProfile(f); err != nil {
f.Close()
Expand Down
6 changes: 3 additions & 3 deletions cmd/crowdsec/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// catch panics here because they are not handled by servePrometheus
defer trace.CatchPanic("crowdsec/computeDynamicMetrics")
//update cache metrics (stash)
// update cache metrics (stash)
cache.UpdateCacheMetrics()
//update cache metrics (regexp)
// update cache metrics (regexp)
exprhelpers.UpdateRegexpCacheMetrics()

//decision metrics are only relevant for LAPI
// decision metrics are only relevant for LAPI
if dbClient == nil {
next.ServeHTTP(w, r)
return
Expand Down
8 changes: 4 additions & 4 deletions cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
}

for k, src := range alert.Sources {
refsrc := *alert.Alert //copy
refsrc := *alert.Alert // copy

log.Tracef("source[%s]", k)

Expand Down Expand Up @@ -81,7 +81,7 @@ LOOP:
cacheMutex.Unlock()
if err := PushAlerts(cachecopy, client); err != nil {
log.Errorf("while pushing to api : %s", err)
//just push back the events to the queue
// just push back the events to the queue
cacheMutex.Lock()
cache = append(cache, cachecopy...)
cacheMutex.Unlock()
Expand Down Expand Up @@ -110,8 +110,8 @@ LOOP:
return fmt.Errorf("postoverflow failed: %w", err)
}
log.Printf("%s", *event.Overflow.Alert.Message)
//if the Alert is nil, it's to signal bucket is ready for GC, don't track this
//dump after postoveflow processing to avoid missing whitelist info
// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
// dump after postoveflow processing to avoid missing whitelist info
if dumpStates && event.Overflow.Alert != nil {
if bucketOverflows == nil {
bucketOverflows = make([]types.Event, 0)
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

func runParse(input chan types.Event, output chan types.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error {

LOOP:
for {
select {
Expand Down Expand Up @@ -56,5 +55,6 @@ LOOP:
output <- parsed
}
}

return nil
}
19 changes: 13 additions & 6 deletions cmd/crowdsec/pour.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,61 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/crowdsec/pkg/csconfig"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error {
count := 0

for {
//bucket is now ready
// bucket is now ready
select {
case <-bucketsTomb.Dying():
log.Infof("Bucket routine exiting")
return nil
case parsed := <-input:
startTime := time.Now()

count++
if count%5000 == 0 {
log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
//when in forensics mode, garbage collect buckets
// when in forensics mode, garbage collect buckets
if cConfig.Crowdsec.BucketsGCEnabled {
if parsed.MarshaledTime != "" {
z := &time.Time{}
if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Warningf("Failed to unmarshal time from event '%s' : %s", parsed.MarshaledTime, err)
} else {
log.Warning("Starting buckets garbage collection ...")

if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil {
return fmt.Errorf("failed to start bucket GC : %s", err)
return fmt.Errorf("failed to start bucket GC : %w", err)
}
}
}
}
}
//here we can bucketify with parsed
// here we can bucketify with parsed
poured, err := leaky.PourItemToHolders(parsed, holders, buckets)
if err != nil {
log.Errorf("bucketify failed for: %v", parsed)
continue
}

elapsed := time.Since(startTime)
globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds())

if poured {
globalBucketPourOk.Inc()
} else {
globalBucketPourKo.Inc()
}

if len(parsed.MarshaledTime) != 0 {
if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Warningf("failed to unmarshal time from event : %s", err)
Expand Down
4 changes: 2 additions & 2 deletions cmd/crowdsec/run_in_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func StartRunSvc() error {

defer trace.CatchPanic("crowdsec/StartRunSvc")

//Always try to stop CPU profiling to avoid passing flags around
//It's a noop if profiling is not enabled
// Always try to stop CPU profiling to avoid passing flags around
// It's a noop if profiling is not enabled
defer pprof.StopCPUProfile()

if cConfig, err = LoadConfig(flags.ConfigFile, flags.DisableAgent, flags.DisableAPI, false); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/crowdsec/run_in_svc_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func StartRunSvc() error {

defer trace.CatchPanic("crowdsec/StartRunSvc")

//Always try to stop CPU profiling to avoid passing flags around
//It's a noop if profiling is not enabled
// Always try to stop CPU profiling to avoid passing flags around
// It's a noop if profiling is not enabled
defer pprof.StopCPUProfile()

isRunninginService, err := svc.IsWindowsService()
Expand Down

0 comments on commit 98560d0

Please sign in to comment.