Skip to content

Commit

Permalink
context propagation: appsec, docker, kafka, k8s datasources (#3284)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc authored Oct 25, 2024
1 parent d00a6a6 commit 9d6ccb0
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 34 deletions.
164 changes: 164 additions & 0 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# we measure coverage but don't enforce it
# https://docs.codecov.com/docs/codecov-yaml
codecov:
require_ci_to_pass: false

coverage:
status:
patch:
default:
target: 0%
project:
default:
target: 0%

# if a directory is ignored, there is no way to un-ignore files like pkg/models/helpers.go
# so we make a full list
ignore:
- "./pkg/modelscapi/success_response.go"
- "./pkg/modelscapi/get_decisions_stream_response_deleted.go"
- "./pkg/modelscapi/login_request.go"
- "./pkg/modelscapi/get_decisions_stream_response_links.go"
- "./pkg/modelscapi/login_response.go"
- "./pkg/modelscapi/add_signals_request_item.go"
- "./pkg/modelscapi/blocklist_link.go"
- "./pkg/modelscapi/get_decisions_stream_response_deleted_item.go"
- "./pkg/modelscapi/decisions_sync_request.go"
- "./pkg/modelscapi/get_decisions_stream_response.go"
- "./pkg/modelscapi/metrics_request_machines_item.go"
- "./pkg/modelscapi/metrics_request.go"
- "./pkg/modelscapi/get_decisions_stream_response_new.go"
- "./pkg/modelscapi/add_signals_request_item_decisions_item.go"
- "./pkg/modelscapi/metrics_request_bouncers_item.go"
- "./pkg/modelscapi/decisions_sync_request_item_decisions_item.go"
- "./pkg/modelscapi/decisions_delete_request_item.go"
- "./pkg/modelscapi/get_decisions_stream_response_new_item.go"
- "./pkg/modelscapi/decisions_sync_request_item.go"
- "./pkg/modelscapi/add_signals_request.go"
- "./pkg/modelscapi/reset_password_request.go"
- "./pkg/modelscapi/add_signals_request_item_decisions.go"
- "./pkg/modelscapi/decisions_sync_request_item_source.go"
- "./pkg/modelscapi/error_response.go"
- "./pkg/modelscapi/decisions_delete_request.go"
- "./pkg/modelscapi/decisions_sync_request_item_decisions.go"
- "./pkg/modelscapi/enroll_request.go"
- "./pkg/modelscapi/register_request.go"
- "./pkg/modelscapi/add_signals_request_item_source.go"
- "./pkg/models/success_response.go"
- "./pkg/models/hub_items.go"
- "./pkg/models/alert.go"
- "./pkg/models/metrics_bouncer_info.go"
- "./pkg/models/add_signals_request_item.go"
- "./pkg/models/metrics_meta.go"
- "./pkg/models/metrics_detail_item.go"
- "./pkg/models/add_signals_request_item_decisions_item.go"
- "./pkg/models/hub_item.go"
- "./pkg/models/get_alerts_response.go"
- "./pkg/models/metrics_labels.go"
- "./pkg/models/watcher_auth_request.go"
- "./pkg/models/add_alerts_request.go"
- "./pkg/models/event.go"
- "./pkg/models/decisions_delete_request_item.go"
- "./pkg/models/meta.go"
- "./pkg/models/detailed_metrics.go"
- "./pkg/models/delete_alerts_response.go"
- "./pkg/models/remediation_components_metrics.go"
- "./pkg/models/console_options.go"
- "./pkg/models/topx_response.go"
- "./pkg/models/add_signals_request.go"
- "./pkg/models/delete_decision_response.go"
- "./pkg/models/get_decisions_response.go"
- "./pkg/models/add_signals_request_item_decisions.go"
- "./pkg/models/source.go"
- "./pkg/models/decisions_stream_response.go"
- "./pkg/models/error_response.go"
- "./pkg/models/all_metrics.go"
- "./pkg/models/o_sversion.go"
- "./pkg/models/decision.go"
- "./pkg/models/decisions_delete_request.go"
- "./pkg/models/flush_decision_response.go"
- "./pkg/models/watcher_auth_response.go"
- "./pkg/models/lapi_metrics.go"
- "./pkg/models/watcher_registration_request.go"
- "./pkg/models/metrics_agent_info.go"
- "./pkg/models/log_processors_metrics.go"
- "./pkg/models/add_signals_request_item_source.go"
- "./pkg/models/base_metrics.go"
- "./pkg/models/add_alerts_response.go"
- "./pkg/models/metrics.go"
- "./pkg/protobufs/notifier.pb.go"
- "./pkg/protobufs/notifier_grpc.pb.go"
- "./pkg/database/ent/metric_update.go"
- "./pkg/database/ent/machine_delete.go"
- "./pkg/database/ent/decision_query.go"
- "./pkg/database/ent/meta_query.go"
- "./pkg/database/ent/metric/where.go"
- "./pkg/database/ent/metric/metric.go"
- "./pkg/database/ent/machine_create.go"
- "./pkg/database/ent/alert.go"
- "./pkg/database/ent/event_update.go"
- "./pkg/database/ent/alert_create.go"
- "./pkg/database/ent/alert_query.go"
- "./pkg/database/ent/metric_delete.go"
- "./pkg/database/ent/lock_create.go"
- "./pkg/database/ent/bouncer_update.go"
- "./pkg/database/ent/meta_update.go"
- "./pkg/database/ent/decision_create.go"
- "./pkg/database/ent/configitem_update.go"
- "./pkg/database/ent/machine_query.go"
- "./pkg/database/ent/client.go"
- "./pkg/database/ent/predicate/predicate.go"
- "./pkg/database/ent/lock/where.go"
- "./pkg/database/ent/lock/lock.go"
- "./pkg/database/ent/mutation.go"
- "./pkg/database/ent/migrate/migrate.go"
- "./pkg/database/ent/migrate/schema.go"
- "./pkg/database/ent/configitem.go"
- "./pkg/database/ent/metric_query.go"
- "./pkg/database/ent/event.go"
- "./pkg/database/ent/event_query.go"
- "./pkg/database/ent/lock_update.go"
- "./pkg/database/ent/meta.go"
- "./pkg/database/ent/configitem_query.go"
- "./pkg/database/ent/bouncer.go"
- "./pkg/database/ent/alert_update.go"
- "./pkg/database/ent/meta/meta.go"
- "./pkg/database/ent/meta/where.go"
- "./pkg/database/ent/decision_update.go"
- "./pkg/database/ent/alert_delete.go"
- "./pkg/database/ent/lock.go"
- "./pkg/database/ent/runtime/runtime.go"
- "./pkg/database/ent/alert/alert.go"
- "./pkg/database/ent/alert/where.go"
- "./pkg/database/ent/runtime.go"
- "./pkg/database/ent/bouncer/bouncer.go"
- "./pkg/database/ent/bouncer/where.go"
- "./pkg/database/ent/hook/hook.go"
- "./pkg/database/ent/metric.go"
- "./pkg/database/ent/configitem_create.go"
- "./pkg/database/ent/configitem_delete.go"
- "./pkg/database/ent/tx.go"
- "./pkg/database/ent/decision.go"
- "./pkg/database/ent/lock_delete.go"
- "./pkg/database/ent/decision_delete.go"
- "./pkg/database/ent/machine/where.go"
- "./pkg/database/ent/machine/machine.go"
- "./pkg/database/ent/event_create.go"
- "./pkg/database/ent/metric_create.go"
- "./pkg/database/ent/decision/where.go"
- "./pkg/database/ent/decision/decision.go"
- "./pkg/database/ent/enttest/enttest.go"
- "./pkg/database/ent/lock_query.go"
- "./pkg/database/ent/bouncer_create.go"
- "./pkg/database/ent/event_delete.go"
- "./pkg/database/ent/bouncer_delete.go"
- "./pkg/database/ent/event/event.go"
- "./pkg/database/ent/event/where.go"
- "./pkg/database/ent/machine.go"
- "./pkg/database/ent/ent.go"
- "./pkg/database/ent/meta_create.go"
- "./pkg/database/ent/bouncer_query.go"
- "./pkg/database/ent/meta_delete.go"
- "./pkg/database/ent/machine_update.go"
- "./pkg/database/ent/configitem/configitem.go"
- "./pkg/database/ent/configitem/where.go"
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,3 @@ msi
__pycache__
*.py[cod]
*.egg-info

# automatically generated before running codecov
.github/codecov.yml
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
w.logger.Info("Shutting down Appsec server")
// xx let's clean up the appsec runners :)
appsec.AppsecRulesDetails = make(map[int]appsec.RulesDetails)
w.server.Shutdown(context.TODO())
w.server.Shutdown(ctx)
return nil
})
return nil
Expand Down
47 changes: 24 additions & 23 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ func (d *DockerSource) SupportedModes() []string {

// OneShotAcquisition reads a set of file and returns when done
func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
ctx := context.TODO()
d.logger.Debug("In oneshot")
runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{})
if err != nil {
return err
}
Expand All @@ -298,10 +299,10 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
d.logger.Debugf("container with id %s is already being read from", container.ID)
continue
}
if containerConfig := d.EvalContainer(container); containerConfig != nil {
if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
d.logger.Infof("reading logs from container %s", containerConfig.Name)
d.logger.Debugf("logs options: %+v", *d.containerLogsOptions)
dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions)
dockerReader, err := d.Client.ContainerLogs(ctx, containerConfig.ID, *d.containerLogsOptions)
if err != nil {
d.logger.Errorf("unable to read logs from container: %+v", err)
return err
Expand Down Expand Up @@ -372,26 +373,26 @@ func (d *DockerSource) CanRun() error {
return nil
}

func (d *DockerSource) getContainerTTY(containerId string) bool {
containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
func (d *DockerSource) getContainerTTY(ctx context.Context, containerId string) bool {
containerDetails, err := d.Client.ContainerInspect(ctx, containerId)
if err != nil {
return false
}
return containerDetails.Config.Tty
}

func (d *DockerSource) getContainerLabels(containerId string) map[string]interface{} {
containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId)
func (d *DockerSource) getContainerLabels(ctx context.Context, containerId string) map[string]interface{} {
containerDetails, err := d.Client.ContainerInspect(ctx, containerId)
if err != nil {
return map[string]interface{}{}
}
return parseLabels(containerDetails.Config.Labels)
}

func (d *DockerSource) EvalContainer(container dockerTypes.Container) *ContainerConfig {
func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.Container) *ContainerConfig {
for _, containerID := range d.Config.ContainerID {
if containerID == container.ID {
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
}

Expand All @@ -401,27 +402,27 @@ func (d *DockerSource) EvalContainer(container dockerTypes.Container) *Container
name = name[1:]
}
if name == containerName {
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
}
}

for _, cont := range d.compiledContainerID {
if matched := cont.MatchString(container.ID); matched {
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
}

for _, cont := range d.compiledContainerName {
for _, name := range container.Names {
if matched := cont.MatchString(name); matched {
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)}
}
}
}

if d.Config.UseContainerLabels {
parsedLabels := d.getContainerLabels(container.ID)
parsedLabels := d.getContainerLabels(ctx, container.ID)
if len(parsedLabels) == 0 {
d.logger.Tracef("container has no 'crowdsec' labels set, ignoring container: %s", container.ID)
return nil
Expand Down Expand Up @@ -458,13 +459,13 @@ func (d *DockerSource) EvalContainer(container dockerTypes.Container) *Container
}
d.logger.Errorf("label %s is not a string", k)
}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(container.ID)}
return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(ctx, container.ID)}
}

return nil
}

func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
ticker := time.NewTicker(d.CheckIntervalDuration)
d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
for {
Expand All @@ -475,7 +476,7 @@ func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteCha
case <-ticker.C:
// to track for garbage collection
runningContainersID := make(map[string]bool)
runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{})
runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
for idx, container := range d.runningContainerState {
Expand All @@ -501,7 +502,7 @@ func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteCha
if _, ok := d.runningContainerState[container.ID]; ok {
continue
}
if containerConfig := d.EvalContainer(container); containerConfig != nil {
if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
monitChan <- containerConfig
}
}
Expand All @@ -524,10 +525,10 @@ func (d *DockerSource) StreamingAcquisition(ctx context.Context, out chan types.
deleteChan := make(chan *ContainerConfig)
d.logger.Infof("Starting docker acquisition")
t.Go(func() error {
return d.DockerManager(monitChan, deleteChan, out)
return d.DockerManager(ctx, monitChan, deleteChan, out)
})

return d.WatchContainer(monitChan, deleteChan)
return d.WatchContainer(ctx, monitChan, deleteChan)
}

func (d *DockerSource) Dump() interface{} {
Expand All @@ -541,9 +542,9 @@ func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) erro
return scanner.Err()
}

func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error {
func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error {
container.logger.Infof("start tail for container %s", container.Name)
dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
dockerReader, err := d.Client.ContainerLogs(ctx, container.ID, *d.containerLogsOptions)
if err != nil {
container.logger.Errorf("unable to read logs from container: %+v", err)
return err
Expand Down Expand Up @@ -601,7 +602,7 @@ func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types
}
}

func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
func (d *DockerSource) DockerManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error {
d.logger.Info("DockerSource Manager started")
for {
select {
Expand All @@ -610,7 +611,7 @@ func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *
newContainer.t = &tomb.Tomb{}
newContainer.logger = d.logger.WithField("container_name", newContainer.Name)
newContainer.t.Go(func() error {
return d.TailDocker(newContainer, outChan, deleteChan)
return d.TailDocker(ctx, newContainer, outChan, deleteChan)
})
d.runningContainerState[newContainer.ID] = newContainer
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/acquisition/modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ func (k *KafkaSource) Dump() interface{} {
return k
}

func (k *KafkaSource) ReadMessage(out chan types.Event) error {
func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) error {
// Start processing from latest Offset
k.Reader.SetOffsetAt(context.Background(), time.Now())
k.Reader.SetOffsetAt(ctx, time.Now())
for {
k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)
m, err := k.Reader.ReadMessage(context.Background())
m, err := k.Reader.ReadMessage(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
Expand Down Expand Up @@ -184,10 +184,10 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error {
}
}

func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
func (k *KafkaSource) RunReader(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
t.Go(func() error {
return k.ReadMessage(out)
return k.ReadMessage(ctx, out)
})
//nolint //fp
for {
Expand All @@ -207,7 +207,7 @@ func (k *KafkaSource) StreamingAcquisition(ctx context.Context, out chan types.E

t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/kafka/live")
return k.RunReader(out, t)
return k.RunReader(ctx, out, t)
})

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/kubernetesaudit/k8s_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (ka *KubernetesAuditSource) StreamingAcquisition(ctx context.Context, out c
})
<-t.Dying()
ka.logger.Infof("Stopping k8s-audit server on %s:%d%s", ka.config.ListenAddr, ka.config.ListenPort, ka.config.WebhookPath)
ka.server.Shutdown(context.TODO())
ka.server.Shutdown(ctx)
return nil
})
return nil
Expand Down

0 comments on commit 9d6ccb0

Please sign in to comment.