Skip to content

Commit

Permalink
context propagation: StreamingAcquisition() (#3274)
Browse files Browse the repository at this point in the history
* context propagation: StreamingAcquisition()
* lint
* ship with codecov.yml
  • Loading branch information
mmetc authored Oct 11, 2024
1 parent 8ff58ee commit 9976616
Show file tree
Hide file tree
Showing 29 changed files with 235 additions and 154 deletions.
3 changes: 3 additions & 0 deletions .github/generate-codecov-yml.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
cat <<EOT
# we measure coverage but don't enforce it
# https://docs.codecov.com/docs/codecov-yaml
codecov:
require_ci_to_pass: false
coverage:
status:
patch:
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/go-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ jobs:
set -o pipefail
make go-acc | sed 's/ *coverage:.*of statements in.*//' | richgo testfilter
- name: Generate codecov configuration
run: |
.github/generate-codecov-yml.sh >> .github/codecov.yml
- name: Upload unit coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H

log.Info("Starting processing data")

if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil {
if err := acquisition.StartAcquisition(context.TODO(), dataSources, inputLineChan, &acquisTomb); err != nil {
return fmt.Errorf("starting acquisition error: %w", err)
}

Expand Down
56 changes: 42 additions & 14 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acquisition

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -39,17 +40,17 @@ func (e *DataSourceUnavailableError) Unwrap() error {

// The interface each datasource must implement
type DataSource interface {
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks.
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetUuid() string // Get the unique identifier of the datasource
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks.
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetUuid() string // Get the unique identifier of the datasource
Dump() interface{}
}

Expand Down Expand Up @@ -242,8 +243,10 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig

for {
var sub configuration.DataSourceCommonCfg
err = dec.Decode(&sub)

idx += 1

err = dec.Decode(&sub)
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("failed to yaml decode %s: %w", acquisFile, err)
Expand Down Expand Up @@ -283,36 +286,44 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig

uniqueId := uuid.NewString()
sub.UniqueId = uniqueId

src, err := DataSourceConfigure(sub, metrics_level)
if err != nil {
var dserr *DataSourceUnavailableError
if errors.As(err, &dserr) {
log.Error(err)
continue
}

return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err)
}

if sub.TransformExpr != "" {
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
if err != nil {
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position: %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
}

transformRuntimes[uniqueId] = vm
}

sources = append(sources, *src)
}
}

return sources, nil
}

func GetMetrics(sources []DataSource, aggregated bool) error {
var metrics []prometheus.Collector

for i := range sources {
if aggregated {
metrics = sources[i].GetMetrics()
} else {
metrics = sources[i].GetAggregMetrics()
}

for _, metric := range metrics {
if err := prometheus.Register(metric); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
Expand All @@ -322,47 +333,56 @@ func GetMetrics(sources []DataSource, aggregated bool) error {
}
}
}

return nil
}

func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
defer trace.CatchPanic("crowdsec/acquis")
logger.Infof("transformer started")

for {
select {
case <-AcquisTomb.Dying():
logger.Debugf("transformer is dying")
return
case evt := <-transformChan:
logger.Tracef("Received event %s", evt.Line.Raw)

out, err := expr.Run(transformRuntime, map[string]interface{}{"evt": &evt})
if err != nil {
logger.Errorf("while running transform expression: %s, sending event as-is", err)
output <- evt
}

if out == nil {
logger.Errorf("transform expression returned nil, sending event as-is")
output <- evt
}

switch v := out.(type) {
case string:
logger.Tracef("transform expression returned %s", v)
evt.Line.Raw = v
output <- evt
case []interface{}:
logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content

for _, line := range v {
l, ok := line.(string)
if !ok {
logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string")
output <- evt

continue
}

evt.Line.Raw = l
output <- evt
}
case []string:
logger.Tracef("transform expression returned %v", v)

for _, line := range v {
evt.Line.Raw = line
output <- evt
Expand All @@ -375,7 +395,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
}
}

func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
// Don't wait if we have no sources, as it will hang forever
if len(sources) == 0 {
return nil
Expand All @@ -387,32 +407,40 @@ func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb

AcquisTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis")

var err error

outChan := output

log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid())

if transformRuntime, ok := transformRuntimes[subsrc.GetUuid()]; ok {
log.Infof("transform expression found for datasource %s", subsrc.GetName())

transformChan := make(chan types.Event)
outChan = transformChan
transformLogger := log.WithFields(log.Fields{
"component": "transform",
"datasource": subsrc.GetName(),
})

AcquisTomb.Go(func() error {
transform(outChan, output, AcquisTomb, transformRuntime, transformLogger)
return nil
})
}

if subsrc.GetMode() == configuration.TAIL_MODE {
err = subsrc.StreamingAcquisition(outChan, AcquisTomb)
err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb)
} else {
err = subsrc.OneShotAcquisition(outChan, AcquisTomb)
}

if err != nil {
// if one of the acqusition returns an error, we kill the others to properly shutdown
AcquisTomb.Kill(err)
}

return nil
})
}
Expand Down
52 changes: 30 additions & 22 deletions pkg/acquisition/acquisition_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acquisition

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -56,14 +57,16 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int)

return nil
}
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) GetName() string { return "mock" }
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return nil
}
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) GetName() string { return "mock" }
func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return errors.New("not supported")
}
Expand Down Expand Up @@ -327,7 +330,7 @@ func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) erro
return nil
}

func (f *MockCat) StreamingAcquisition(chan types.Event, *tomb.Tomb) error {
func (f *MockCat) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return errors.New("can't run in tail")
}
func (f *MockCat) CanRun() error { return nil }
Expand Down Expand Up @@ -366,7 +369,7 @@ func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) err
return errors.New("can't run in cat mode")
}

func (f *MockTail) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTail) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand All @@ -389,14 +392,15 @@ func (f *MockTail) GetUuid() string { return "" }
// func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {

func TestStartAcquisitionCat(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockCat{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
Expand All @@ -416,14 +420,15 @@ READLOOP:
}

func TestStartAcquisitionTail(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockTail{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
Expand All @@ -450,7 +455,7 @@ type MockTailError struct {
MockTail
}

func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTailError) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand All @@ -463,14 +468,15 @@ func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb)
}

func TestStartAcquisitionTailError(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockTailError{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
t.Errorf("expected error, got '%s'", err)
}
}()
Expand Down Expand Up @@ -501,14 +507,16 @@ func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
return nil
}
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) CanRun() error { return nil }
func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) Dump() interface{} { return f }
func (f *MockSourceByDSN) GetName() string { return "mockdsn" }
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return nil
}
func (f *MockSourceByDSN) CanRun() error { return nil }
func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) Dump() interface{} { return f }
func (f *MockSourceByDSN) GetName() string { return "mockdsn" }
func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
dsn = strings.TrimPrefix(dsn, "mockdsn://")
if dsn != "test_expect" {
Expand Down
Loading

0 comments on commit 9976616

Please sign in to comment.