From 9d6ccb0f08b3afd067c9a858e6ff04212f03b7f1 Mon Sep 17 00:00:00 2001 From: mmetc <92726601+mmetc@users.noreply.github.com> Date: Fri, 25 Oct 2024 15:43:03 +0200 Subject: [PATCH] context propagation: appsec, docker, kafka, k8s datasources (#3284) --- .github/codecov.yml | 164 ++++++++++++++++++ .gitignore | 3 - pkg/acquisition/modules/appsec/appsec.go | 2 +- pkg/acquisition/modules/docker/docker.go | 47 ++--- pkg/acquisition/modules/kafka/kafka.go | 12 +- .../modules/kubernetesaudit/k8s_audit.go | 2 +- 6 files changed, 196 insertions(+), 34 deletions(-) create mode 100644 .github/codecov.yml diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 00000000000..e3a81070324 --- /dev/null +++ b/.github/codecov.yml @@ -0,0 +1,164 @@ +# we measure coverage but don't enforce it +# https://docs.codecov.com/docs/codecov-yaml +codecov: + require_ci_to_pass: false + +coverage: + status: + patch: + default: + target: 0% + project: + default: + target: 0% + +# if a directory is ignored, there is no way to un-ignore files like pkg/models/helpers.go +# so we make a full list +ignore: + - "./pkg/modelscapi/success_response.go" + - "./pkg/modelscapi/get_decisions_stream_response_deleted.go" + - "./pkg/modelscapi/login_request.go" + - "./pkg/modelscapi/get_decisions_stream_response_links.go" + - "./pkg/modelscapi/login_response.go" + - "./pkg/modelscapi/add_signals_request_item.go" + - "./pkg/modelscapi/blocklist_link.go" + - "./pkg/modelscapi/get_decisions_stream_response_deleted_item.go" + - "./pkg/modelscapi/decisions_sync_request.go" + - "./pkg/modelscapi/get_decisions_stream_response.go" + - "./pkg/modelscapi/metrics_request_machines_item.go" + - "./pkg/modelscapi/metrics_request.go" + - "./pkg/modelscapi/get_decisions_stream_response_new.go" + - "./pkg/modelscapi/add_signals_request_item_decisions_item.go" + - "./pkg/modelscapi/metrics_request_bouncers_item.go" + - "./pkg/modelscapi/decisions_sync_request_item_decisions_item.go" + - "./pkg/modelscapi/decisions_delete_request_item.go" + - "./pkg/modelscapi/get_decisions_stream_response_new_item.go" + - "./pkg/modelscapi/decisions_sync_request_item.go" + - "./pkg/modelscapi/add_signals_request.go" + - "./pkg/modelscapi/reset_password_request.go" + - "./pkg/modelscapi/add_signals_request_item_decisions.go" + - "./pkg/modelscapi/decisions_sync_request_item_source.go" + - "./pkg/modelscapi/error_response.go" + - "./pkg/modelscapi/decisions_delete_request.go" + - "./pkg/modelscapi/decisions_sync_request_item_decisions.go" + - "./pkg/modelscapi/enroll_request.go" + - "./pkg/modelscapi/register_request.go" + - "./pkg/modelscapi/add_signals_request_item_source.go" + - "./pkg/models/success_response.go" + - "./pkg/models/hub_items.go" + - "./pkg/models/alert.go" + - "./pkg/models/metrics_bouncer_info.go" + - "./pkg/models/add_signals_request_item.go" + - "./pkg/models/metrics_meta.go" + - "./pkg/models/metrics_detail_item.go" + - "./pkg/models/add_signals_request_item_decisions_item.go" + - "./pkg/models/hub_item.go" + - "./pkg/models/get_alerts_response.go" + - "./pkg/models/metrics_labels.go" + - "./pkg/models/watcher_auth_request.go" + - "./pkg/models/add_alerts_request.go" + - "./pkg/models/event.go" + - "./pkg/models/decisions_delete_request_item.go" + - "./pkg/models/meta.go" + - "./pkg/models/detailed_metrics.go" + - "./pkg/models/delete_alerts_response.go" + - "./pkg/models/remediation_components_metrics.go" + - "./pkg/models/console_options.go" + - "./pkg/models/topx_response.go" + - "./pkg/models/add_signals_request.go" + - "./pkg/models/delete_decision_response.go" + - "./pkg/models/get_decisions_response.go" + - "./pkg/models/add_signals_request_item_decisions.go" + - "./pkg/models/source.go" + - "./pkg/models/decisions_stream_response.go" + - "./pkg/models/error_response.go" + - "./pkg/models/all_metrics.go" + - "./pkg/models/o_sversion.go" + - "./pkg/models/decision.go" + - "./pkg/models/decisions_delete_request.go" + - "./pkg/models/flush_decision_response.go" + - "./pkg/models/watcher_auth_response.go" + - "./pkg/models/lapi_metrics.go" + - "./pkg/models/watcher_registration_request.go" + - "./pkg/models/metrics_agent_info.go" + - "./pkg/models/log_processors_metrics.go" + - "./pkg/models/add_signals_request_item_source.go" + - "./pkg/models/base_metrics.go" + - "./pkg/models/add_alerts_response.go" + - "./pkg/models/metrics.go" + - "./pkg/protobufs/notifier.pb.go" + - "./pkg/protobufs/notifier_grpc.pb.go" + - "./pkg/database/ent/metric_update.go" + - "./pkg/database/ent/machine_delete.go" + - "./pkg/database/ent/decision_query.go" + - "./pkg/database/ent/meta_query.go" + - "./pkg/database/ent/metric/where.go" + - "./pkg/database/ent/metric/metric.go" + - "./pkg/database/ent/machine_create.go" + - "./pkg/database/ent/alert.go" + - "./pkg/database/ent/event_update.go" + - "./pkg/database/ent/alert_create.go" + - "./pkg/database/ent/alert_query.go" + - "./pkg/database/ent/metric_delete.go" + - "./pkg/database/ent/lock_create.go" + - "./pkg/database/ent/bouncer_update.go" + - "./pkg/database/ent/meta_update.go" + - "./pkg/database/ent/decision_create.go" + - "./pkg/database/ent/configitem_update.go" + - "./pkg/database/ent/machine_query.go" + - "./pkg/database/ent/client.go" + - "./pkg/database/ent/predicate/predicate.go" + - "./pkg/database/ent/lock/where.go" + - "./pkg/database/ent/lock/lock.go" + - "./pkg/database/ent/mutation.go" + - "./pkg/database/ent/migrate/migrate.go" + - "./pkg/database/ent/migrate/schema.go" + - "./pkg/database/ent/configitem.go" + - "./pkg/database/ent/metric_query.go" + - "./pkg/database/ent/event.go" + - "./pkg/database/ent/event_query.go" + - "./pkg/database/ent/lock_update.go" + - "./pkg/database/ent/meta.go" + - "./pkg/database/ent/configitem_query.go" + - "./pkg/database/ent/bouncer.go" + - "./pkg/database/ent/alert_update.go" + - "./pkg/database/ent/meta/meta.go" + - "./pkg/database/ent/meta/where.go" + - "./pkg/database/ent/decision_update.go" + - "./pkg/database/ent/alert_delete.go" + - "./pkg/database/ent/lock.go" + - "./pkg/database/ent/runtime/runtime.go" + - "./pkg/database/ent/alert/alert.go" + - "./pkg/database/ent/alert/where.go" + - "./pkg/database/ent/runtime.go" + - "./pkg/database/ent/bouncer/bouncer.go" + - "./pkg/database/ent/bouncer/where.go" + - "./pkg/database/ent/hook/hook.go" + - "./pkg/database/ent/metric.go" + - "./pkg/database/ent/configitem_create.go" + - "./pkg/database/ent/configitem_delete.go" + - "./pkg/database/ent/tx.go" + - "./pkg/database/ent/decision.go" + - "./pkg/database/ent/lock_delete.go" + - "./pkg/database/ent/decision_delete.go" + - "./pkg/database/ent/machine/where.go" + - "./pkg/database/ent/machine/machine.go" + - "./pkg/database/ent/event_create.go" + - "./pkg/database/ent/metric_create.go" + - "./pkg/database/ent/decision/where.go" + - "./pkg/database/ent/decision/decision.go" + - "./pkg/database/ent/enttest/enttest.go" + - "./pkg/database/ent/lock_query.go" + - "./pkg/database/ent/bouncer_create.go" + - "./pkg/database/ent/event_delete.go" + - "./pkg/database/ent/bouncer_delete.go" + - "./pkg/database/ent/event/event.go" + - "./pkg/database/ent/event/where.go" + - "./pkg/database/ent/machine.go" + - "./pkg/database/ent/ent.go" + - "./pkg/database/ent/meta_create.go" + - "./pkg/database/ent/bouncer_query.go" + - "./pkg/database/ent/meta_delete.go" + - "./pkg/database/ent/machine_update.go" + - "./pkg/database/ent/configitem/configitem.go" + - "./pkg/database/ent/configitem/where.go" diff --git a/.gitignore b/.gitignore index d76efcbfc48..6e6624fd282 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,3 @@ msi __pycache__ *.py[cod] *.egg-info - -# automatically generated before running codecov -.github/codecov.yml diff --git a/pkg/acquisition/modules/appsec/appsec.go b/pkg/acquisition/modules/appsec/appsec.go index 5161b631c33..4ab980ee860 100644 --- a/pkg/acquisition/modules/appsec/appsec.go +++ b/pkg/acquisition/modules/appsec/appsec.go @@ -294,7 +294,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types. w.logger.Info("Shutting down Appsec server") // xx let's clean up the appsec runners :) appsec.AppsecRulesDetails = make(map[int]appsec.RulesDetails) - w.server.Shutdown(context.TODO()) + w.server.Shutdown(ctx) return nil }) return nil diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 874b1556fd5..57ec7c7abda 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -287,8 +287,9 @@ func (d *DockerSource) SupportedModes() []string { // OneShotAcquisition reads a set of file and returns when done func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { + ctx := context.TODO() d.logger.Debug("In oneshot") - runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{}) + runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{}) if err != nil { return err } @@ -298,10 +299,10 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er d.logger.Debugf("container with id %s is already being read from", container.ID) continue } - if containerConfig := d.EvalContainer(container); containerConfig != nil { + if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { d.logger.Infof("reading logs from container %s", containerConfig.Name) d.logger.Debugf("logs options: %+v", *d.containerLogsOptions) - dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions) + dockerReader, err := d.Client.ContainerLogs(ctx, containerConfig.ID, *d.containerLogsOptions) if err != nil { d.logger.Errorf("unable to read logs from container: %+v", err) return err @@ -372,26 +373,26 @@ func (d *DockerSource) CanRun() error { return nil } -func (d *DockerSource) getContainerTTY(containerId string) bool { - containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId) +func (d *DockerSource) getContainerTTY(ctx context.Context, containerId string) bool { + containerDetails, err := d.Client.ContainerInspect(ctx, containerId) if err != nil { return false } return containerDetails.Config.Tty } -func (d *DockerSource) getContainerLabels(containerId string) map[string]interface{} { - containerDetails, err := d.Client.ContainerInspect(context.Background(), containerId) +func (d *DockerSource) getContainerLabels(ctx context.Context, containerId string) map[string]interface{} { + containerDetails, err := d.Client.ContainerInspect(ctx, containerId) if err != nil { return map[string]interface{}{} } return parseLabels(containerDetails.Config.Labels) } -func (d *DockerSource) EvalContainer(container dockerTypes.Container) *ContainerConfig { +func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.Container) *ContainerConfig { for _, containerID := range d.Config.ContainerID { if containerID == container.ID { - return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)} + return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)} } } @@ -401,27 +402,27 @@ func (d *DockerSource) EvalContainer(container dockerTypes.Container) *Container name = name[1:] } if name == containerName { - return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)} + return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)} } } } for _, cont := range d.compiledContainerID { if matched := cont.MatchString(container.ID); matched { - return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)} + return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)} } } for _, cont := range d.compiledContainerName { for _, name := range container.Names { if matched := cont.MatchString(name); matched { - return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(container.ID)} + return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels, Tty: d.getContainerTTY(ctx, container.ID)} } } } if d.Config.UseContainerLabels { - parsedLabels := d.getContainerLabels(container.ID) + parsedLabels := d.getContainerLabels(ctx, container.ID) if len(parsedLabels) == 0 { d.logger.Tracef("container has no 'crowdsec' labels set, ignoring container: %s", container.ID) return nil @@ -458,13 +459,13 @@ func (d *DockerSource) EvalContainer(container dockerTypes.Container) *Container } d.logger.Errorf("label %s is not a string", k) } - return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(container.ID)} + return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: labels, Tty: d.getContainerTTY(ctx, container.ID)} } return nil } -func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { +func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { ticker := time.NewTicker(d.CheckIntervalDuration) d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String()) for { @@ -475,7 +476,7 @@ func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteCha case <-ticker.C: // to track for garbage collection runningContainersID := make(map[string]bool) - runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{}) + runningContainer, err := d.Client.ContainerList(ctx, dockerTypes.ContainerListOptions{}) if err != nil { if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") { for idx, container := range d.runningContainerState { @@ -501,7 +502,7 @@ func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteCha if _, ok := d.runningContainerState[container.ID]; ok { continue } - if containerConfig := d.EvalContainer(container); containerConfig != nil { + if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { monitChan <- containerConfig } } @@ -524,10 +525,10 @@ func (d *DockerSource) StreamingAcquisition(ctx context.Context, out chan types. deleteChan := make(chan *ContainerConfig) d.logger.Infof("Starting docker acquisition") t.Go(func() error { - return d.DockerManager(monitChan, deleteChan, out) + return d.DockerManager(ctx, monitChan, deleteChan, out) }) - return d.WatchContainer(monitChan, deleteChan) + return d.WatchContainer(ctx, monitChan, deleteChan) } func (d *DockerSource) Dump() interface{} { @@ -541,9 +542,9 @@ func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) erro return scanner.Err() } -func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error { +func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error { container.logger.Infof("start tail for container %s", container.Name) - dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions) + dockerReader, err := d.Client.ContainerLogs(ctx, container.ID, *d.containerLogsOptions) if err != nil { container.logger.Errorf("unable to read logs from container: %+v", err) return err @@ -601,7 +602,7 @@ func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types } } -func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error { +func (d *DockerSource) DockerManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error { d.logger.Info("DockerSource Manager started") for { select { @@ -610,7 +611,7 @@ func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan * newContainer.t = &tomb.Tomb{} newContainer.logger = d.logger.WithField("container_name", newContainer.Name) newContainer.t.Go(func() error { - return d.TailDocker(newContainer, outChan, deleteChan) + return d.TailDocker(ctx, newContainer, outChan, deleteChan) }) d.runningContainerState[newContainer.ID] = newContainer } diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index 9fd5fc2a035..d08a0ae4e4d 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -147,12 +147,12 @@ func (k *KafkaSource) Dump() interface{} { return k } -func (k *KafkaSource) ReadMessage(out chan types.Event) error { +func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) error { // Start processing from latest Offset - k.Reader.SetOffsetAt(context.Background(), time.Now()) + k.Reader.SetOffsetAt(ctx, time.Now()) for { k.logger.Tracef("reading message from topic '%s'", k.Config.Topic) - m, err := k.Reader.ReadMessage(context.Background()) + m, err := k.Reader.ReadMessage(ctx) if err != nil { if errors.Is(err, io.EOF) { return nil @@ -184,10 +184,10 @@ func (k *KafkaSource) ReadMessage(out chan types.Event) error { } } -func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error { +func (k *KafkaSource) RunReader(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config) t.Go(func() error { - return k.ReadMessage(out) + return k.ReadMessage(ctx, out) }) //nolint //fp for { @@ -207,7 +207,7 @@ func (k *KafkaSource) StreamingAcquisition(ctx context.Context, out chan types.E t.Go(func() error { defer trace.CatchPanic("crowdsec/acquis/kafka/live") - return k.RunReader(out, t) + return k.RunReader(ctx, out, t) }) return nil diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go index f979b044dcc..0d64345a4a0 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go @@ -149,7 +149,7 @@ func (ka *KubernetesAuditSource) StreamingAcquisition(ctx context.Context, out c }) <-t.Dying() ka.logger.Infof("Stopping k8s-audit server on %s:%d%s", ka.config.ListenAddr, ka.config.ListenPort, ka.config.WebhookPath) - ka.server.Shutdown(context.TODO()) + ka.server.Shutdown(ctx) return nil }) return nil