From 145f627f72977a2159d7ce98d7e8091196590003 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 24 Jan 2024 12:36:00 -0800 Subject: [PATCH] Use single channel from epbevents ebpfevents library has been updated to use a single channel. Updated to use latest ebpfevents library and the single channel. --- NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- .../provider/ebpf_provider/ebpf_provider.go | 211 +++++++++--------- 4 files changed, 109 insertions(+), 112 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index f8dbafc4959..b0b06b031e7 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12257,11 +12257,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/elastic/ebpfevents -Version: v0.1.0 +Version: v0.3.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/ebpfevents@v0.1.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/ebpfevents@v0.3.0/LICENSE.txt: The https://github.com/elastic/ebpfevents repository contains source code under various licenses: diff --git a/go.mod b/go.mod index 7c78327ea0d..6456d66f747 100644 --- a/go.mod +++ b/go.mod @@ -200,7 +200,7 @@ require ( github.com/aws/smithy-go v1.13.5 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 - github.com/elastic/ebpfevents v0.1.0 + github.com/elastic/ebpfevents v0.3.0 github.com/elastic/elastic-agent-autodiscover v0.6.7 github.com/elastic/elastic-agent-libs v0.7.3 github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 diff --git a/go.sum b/go.sum index 939923a6e2e..149598ca524 100644 --- a/go.sum +++ b/go.sum @@ -660,8 +660,8 @@ github.com/elastic/bayeux v1.0.5 h1:UceFq01ipmT3S8DzFK+uVAkbCdiPR0Bqei8qIGmUeY0= github.com/elastic/bayeux v1.0.5/go.mod h1:CSI4iP7qeo5MMlkznGvYKftp8M7qqP/3nzmVZoXHY68= github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqrj3lotWinO9+jFmeDXIC4gvIQs= github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= -github.com/elastic/ebpfevents v0.1.0 h1:Kr62fVcDSrPYpwsW3FXUOcmImHqbiRfwmb6fZOw5PgI= -github.com/elastic/ebpfevents v0.1.0/go.mod h1:o21z5xup/9dK8u0Hg9bZRflSqqj1Zu5h2dg2hSTcUPQ= +github.com/elastic/ebpfevents v0.3.0 h1:tN2X+FNyV2o1x81tX1anAyVwXOTd1iRNxlkhf6zVY24= +github.com/elastic/ebpfevents v0.3.0/go.mod h1:o21z5xup/9dK8u0Hg9bZRflSqqj1Zu5h2dg2hSTcUPQ= github.com/elastic/elastic-agent-autodiscover v0.6.7 h1:+KVjltN0rPsBrU8b156gV4lOTBgG/vt0efFCFARrf3g= github.com/elastic/elastic-agent-autodiscover v0.6.7/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4= github.com/elastic/elastic-agent-client/v7 v7.6.0 h1:FEn6FjzynW4TIQo5G096Tr7xYK/P5LY9cSS6wRbXZTc= diff --git a/x-pack/auditbeat/processors/add_session_metadata/provider/ebpf_provider/ebpf_provider.go b/x-pack/auditbeat/processors/add_session_metadata/provider/ebpf_provider/ebpf_provider.go index 1b3e39b0c87..69f28df20f3 100644 --- a/x-pack/auditbeat/processors/add_session_metadata/provider/ebpf_provider/ebpf_provider.go +++ b/x-pack/auditbeat/processors/add_session_metadata/provider/ebpf_provider/ebpf_provider.go @@ -41,122 +41,119 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr return prvdr{}, err } - events := make(chan ebpfevents.Event) - errors := make(chan error) + records := make(chan ebpfevents.Record) - go l.EventLoop(p.ctx, events, errors) + go l.EventLoop(p.ctx, records) go func(logger logp.Logger) { for { - select { - case err := <-errors: + r := <-records + if r.Error != nil { logger.Errorf("recv'd error: %w", err) continue - case ev := <-events: - if err != nil { - logger.Errorf("marshal event: %w", err) + } + if r.Event == nil { + continue + } + ev := r.Event + switch ev.Type { + case ebpfevents.EventTypeProcessFork: + body, ok := ev.Body.(*ebpfevents.ProcessFork) + if !ok { + logger.Errorf("unexpected event body") continue } - switch ev.Type { - case ebpfevents.EventTypeProcessFork: - body, ok := ev.Body.(*ebpfevents.ProcessFork) - if !ok { - logger.Errorf("unexpected event body") - continue - } - pe := types.ProcessForkEvent{ - ParentPids: types.PidInfo{ - Tid: body.ParentPids.Tid, - Tgid: body.ParentPids.Tgid, - Ppid: body.ParentPids.Ppid, - Pgid: body.ParentPids.Pgid, - Sid: body.ParentPids.Sid, - StartTimeNs: body.ParentPids.StartTimeNs, - }, - ChildPids: types.PidInfo{ - Tid: body.ChildPids.Tid, - Tgid: body.ChildPids.Tgid, - Ppid: body.ChildPids.Ppid, - Pgid: body.ChildPids.Pgid, - Sid: body.ChildPids.Sid, - StartTimeNs: body.ChildPids.StartTimeNs, - }, - Creds: types.CredInfo{ - Ruid: body.Creds.Ruid, - Rgid: body.Creds.Rgid, - Euid: body.Creds.Euid, - Egid: body.Creds.Egid, - Suid: body.Creds.Suid, - Sgid: body.Creds.Sgid, - CapPermitted: body.Creds.CapPermitted, - CapEffective: body.Creds.CapEffective, - }, - } - if err := p.db.InsertFork(pe); err != nil { - p.logger.Errorf("insert fork: %w", err) - continue - } - case ebpfevents.EventTypeProcessExec: - body, ok := ev.Body.(*ebpfevents.ProcessExec) - if !ok { - logger.Errorf("unexpected event body") - continue - } - pe := types.ProcessExecEvent{ - Pids: types.PidInfo{ - Tid: body.Pids.Tid, - Tgid: body.Pids.Tgid, - Ppid: body.Pids.Ppid, - Pgid: body.Pids.Pgid, - Sid: body.Pids.Sid, - StartTimeNs: body.Pids.StartTimeNs, - }, - Creds: types.CredInfo{ - Ruid: body.Creds.Ruid, - Rgid: body.Creds.Rgid, - Euid: body.Creds.Euid, - Egid: body.Creds.Egid, - Suid: body.Creds.Suid, - Sgid: body.Creds.Sgid, - CapPermitted: body.Creds.CapPermitted, - CapEffective: body.Creds.CapEffective, - }, - CTty: types.TtyDev{ - Major: body.CTTY.Major, - Minor: body.CTTY.Minor, - }, - Cwd: body.Cwd, - Argv: deepcopy.Copy(body.Argv).([]string), - Env: deepcopy.Copy(body.Env).(map[string]string), - Filename: body.Filename, - } - if err := p.db.InsertExec(pe); err != nil { - p.logger.Errorf("insert exec: %w", err) - continue - } - case ebpfevents.EventTypeProcessExit: - body, ok := ev.Body.(*ebpfevents.ProcessExit) - if !ok { - logger.Errorf("unexpected event body") - continue - } - pe := types.ProcessExitEvent{ - Pids: types.PidInfo{ - Tid: body.Pids.Tid, - Tgid: body.Pids.Tgid, - Ppid: body.Pids.Ppid, - Pgid: body.Pids.Pgid, - Sid: body.Pids.Sid, - StartTimeNs: body.Pids.StartTimeNs, - }, - ExitCode: body.ExitCode, - } - if err := p.db.InsertExit(pe); err != nil { - p.logger.Errorf("insert exit: %w", err) - continue - } + pe := types.ProcessForkEvent{ + ParentPids: types.PidInfo{ + Tid: body.ParentPids.Tid, + Tgid: body.ParentPids.Tgid, + Ppid: body.ParentPids.Ppid, + Pgid: body.ParentPids.Pgid, + Sid: body.ParentPids.Sid, + StartTimeNs: body.ParentPids.StartTimeNs, + }, + ChildPids: types.PidInfo{ + Tid: body.ChildPids.Tid, + Tgid: body.ChildPids.Tgid, + Ppid: body.ChildPids.Ppid, + Pgid: body.ChildPids.Pgid, + Sid: body.ChildPids.Sid, + StartTimeNs: body.ChildPids.StartTimeNs, + }, + Creds: types.CredInfo{ + Ruid: body.Creds.Ruid, + Rgid: body.Creds.Rgid, + Euid: body.Creds.Euid, + Egid: body.Creds.Egid, + Suid: body.Creds.Suid, + Sgid: body.Creds.Sgid, + CapPermitted: body.Creds.CapPermitted, + CapEffective: body.Creds.CapEffective, + }, + } + if err := p.db.InsertFork(pe); err != nil { + p.logger.Errorf("insert fork: %w", err) + continue + } + case ebpfevents.EventTypeProcessExec: + body, ok := ev.Body.(*ebpfevents.ProcessExec) + if !ok { + logger.Errorf("unexpected event body") + continue + } + pe := types.ProcessExecEvent{ + Pids: types.PidInfo{ + Tid: body.Pids.Tid, + Tgid: body.Pids.Tgid, + Ppid: body.Pids.Ppid, + Pgid: body.Pids.Pgid, + Sid: body.Pids.Sid, + StartTimeNs: body.Pids.StartTimeNs, + }, + Creds: types.CredInfo{ + Ruid: body.Creds.Ruid, + Rgid: body.Creds.Rgid, + Euid: body.Creds.Euid, + Egid: body.Creds.Egid, + Suid: body.Creds.Suid, + Sgid: body.Creds.Sgid, + CapPermitted: body.Creds.CapPermitted, + CapEffective: body.Creds.CapEffective, + }, + CTty: types.TtyDev{ + Major: body.CTTY.Major, + Minor: body.CTTY.Minor, + }, + Cwd: body.Cwd, + Argv: deepcopy.Copy(body.Argv).([]string), + Env: deepcopy.Copy(body.Env).(map[string]string), + Filename: body.Filename, + } + if err := p.db.InsertExec(pe); err != nil { + p.logger.Errorf("insert exec: %w", err) + continue + } + case ebpfevents.EventTypeProcessExit: + body, ok := ev.Body.(*ebpfevents.ProcessExit) + if !ok { + logger.Errorf("unexpected event body") + continue + } + pe := types.ProcessExitEvent{ + Pids: types.PidInfo{ + Tid: body.Pids.Tid, + Tgid: body.Pids.Tgid, + Ppid: body.Pids.Ppid, + Pgid: body.Pids.Pgid, + Sid: body.Pids.Sid, + StartTimeNs: body.Pids.StartTimeNs, + }, + ExitCode: body.ExitCode, + } + if err := p.db.InsertExit(pe); err != nil { + p.logger.Errorf("insert exit: %w", err) + continue } - continue } } }(*p.logger)