diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 3c27562..b8d16ef 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -14,5 +14,16 @@ jobs: - name: Execute tests run: | sudo apt update - sudo apt install -y make gcc curl libsystemd-dev + sudo apt install -y make rsyslog + sudo cp contrib/rsyslog/config/rsyslog.conf /etc/rsyslog.conf + sudo rm -rf /etc/rsyslog.d + sudo mkdir /etc/rsyslog.d + sudo cp -r contrib/rsyslog/config/rsyslog.d/* /etc/rsyslog.d/ + sudo mkdir -p /var/log/audito-maldito + sudo mkdir -p /app-audit + sudo mkfifo /app-audit/app-events-output-test.log + sudo mkfifo /app-audit/sshd-pipe + sudo mkfifo /app-audit/audit-pipe + sudo systemctl restart rsyslog sudo make integration-test + \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a7af4d8..49377b5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,9 +1,13 @@ -name: Release + +name: Create and publish a api image on: push: tags: - v** +env: + REGISTRY: ghcr.io/metal-toolbox + API_IMAGE_NAME: audito-maldito/audito-maldito jobs: auto-release: @@ -20,11 +24,46 @@ jobs: with: generate_release_notes: true - container-main: - uses: metal-toolbox/container-push/.github/workflows/container-push.yml@main - with: - name: audito-maldito - tag: ${{ github.ref_name }} - registry_org: ${{ github.repository }} - dockerfile_path: Dockerfile - platforms: linux/amd64,linux/arm64 + build-and-push-image: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Log in to the Container registry + uses: docker/login-action@40891eba8c2bcd1309b07ba8b11232f313e86779 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Get current date + id: date + run: echo "::set-output name=date::$(date -u +'%Y-%m-%dT%H:%M:%SZ')" + + - name: Extract metadata (tags, labels) for Docker + id: meta-api + uses: docker/metadata-action@517f8b0c3b2daa800eac32a9a71024c8126d46a7 + with: + images: ${{ env.REGISTRY }}/${{ env.API_IMAGE_NAME }} + + - name: Build rsyslog and push Docker image + uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + with: + context: "./contrib/rsyslog" + push: true + file: ./contrib/rsyslog/Dockerfile.ubuntu + tags: ${{ env.REGISTRY }}/${{ env.API_IMAGE_NAME }}:${{ github.ref_name }}-rsyslog + labels: ${{ steps.meta-api.outputs.labels }} + + - name: Build and push Docker image + uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + with: + push: true + file: Dockerfile + tags: ${{ env.REGISTRY }}/${{ env.API_IMAGE_NAME }}:${{ github.ref_name }} + labels: ${{ steps.meta-api.outputs.labels }} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 7c5d3ed..890fd4b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,4 @@ -FROM registry.fedoraproject.org/fedora-minimal:38 AS builder - -RUN microdnf install -y systemd-devel golang git && microdnf clean all +FROM golang:1.20 as builder WORKDIR /go/src/audito-maldito @@ -8,17 +6,20 @@ WORKDIR /go/src/audito-maldito COPY go.mod go.sum ./ RUN go mod download && go mod verify -COPY . . +COPY cmd ./cmd +COPY ingesters ./ingesters +COPY internal ./internal +COPY processors ./processors +COPY main.go . RUN go build -o audito-maldito # Not using distroless nor scratch because we need the systemd shared libraries -FROM registry.fedoraproject.org/fedora-minimal:38 - +FROM ubuntu:22.04 # NOTE(jaosorior): Yes, we need to be the root user for this case. # We need access to the journal's privileged log entries and the audit log in the future. USER 0 COPY --from=builder /go/src/audito-maldito/audito-maldito /usr/bin/audito-maldito -ENTRYPOINT [ "/usr/bin/audito-maldito" ] +ENTRYPOINT [ "/usr/bin/audito-maldito" ] \ No newline at end of file diff --git a/cmd/cmd.go b/cmd/cmd.go index 243cbfb..daf3cc3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -2,25 +2,17 @@ package cmd import ( "context" - "flag" "net/http" "os" "strconv" "time" - "github.com/metal-toolbox/auditevent" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" - "github.com/metal-toolbox/audito-maldito/ingesters/journald" - "github.com/metal-toolbox/audito-maldito/internal/common" "github.com/metal-toolbox/audito-maldito/internal/health" "github.com/metal-toolbox/audito-maldito/internal/metrics" - "github.com/metal-toolbox/audito-maldito/internal/util" - "github.com/metal-toolbox/audito-maldito/processors/sshd" - "github.com/metal-toolbox/audito-maldito/processors/varlogsecure" ) const usage = `audito-maldito @@ -56,119 +48,6 @@ type metricsConfig struct { auditLogWriteTimeSecondThreshold int } -type appConfig struct { - bootID string - auditlogpath string - auditLogDirPath string - metricsConfig metricsConfig - logLevel zapcore.Level -} - -func parseFlags(osArgs []string) (*appConfig, error) { - flagSet := flag.NewFlagSet(osArgs[0], flag.ContinueOnError) - - config := &appConfig{ - logLevel: zapcore.InfoLevel, - } - - // This is just needed for testing purposes. If it's empty we'll use the current boot ID - flagSet.StringVar(&config.bootID, "boot-id", "", "Optional Linux boot ID to use when reading from the journal") - flagSet.StringVar(&config.auditlogpath, "audit-log-path", "/app-audit/audit.log", "Path to the audit log file") - flagSet.StringVar(&config.auditLogDirPath, "audit-dir-path", "/var/log/audit", - "Path to the Linux audit log directory") - flagSet.Var(&config.logLevel, "log-level", "Set the log level according to zapcore.Level") - flagSet.BoolVar(&config.metricsConfig.enableMetrics, "metrics", false, "Enable Prometheus HTTP /metrics server") - flagSet.BoolVar(&config.metricsConfig.enableHealthz, "healthz", false, "Enable HTTP health endpoints server") - flagSet.BoolVar(&config.metricsConfig.enableAuditMetrics, "audit-metrics", false, "Enable Prometheus audit metrics") - flagSet.DurationVar(&config.metricsConfig.httpServerReadTimeout, "http-server-read-timeout", - DefaultHTTPServerReadTimeout, "HTTP server read timeout") - flagSet.DurationVar(&config.metricsConfig.httpServerReadHeaderTimeout, "http-server-read-header-timeout", - DefaultHTTPServerReadHeaderTimeout, "HTTP server read header timeout") - flagSet.DurationVar( - &config.metricsConfig.auditMetricsSecondsInterval, - "audit-seconds-interval", - DefaultAuditCheckInterval, - "Interval in seconds to collect audit metrics") - flagSet.IntVar( - &config.metricsConfig.auditLogWriteTimeSecondThreshold, - "audit-log-last-modify-seconds-threshold", - DefaultAuditModifyTimeThreshold, - "seconds since last write to audit.log before alerting") - - flagSet.Usage = func() { - os.Stderr.WriteString(usage) - flagSet.PrintDefaults() - os.Exit(1) - } - - if err := flagSet.Parse(osArgs[1:]); err != nil { - return nil, err - } - - return config, nil -} - -func runProcessorsForSSHLogins( - ctx context.Context, - logins chan<- common.RemoteUserLogin, - eg *errgroup.Group, - distro util.DistroType, - mid string, - nodename string, - bootID string, - lastReadJournalTS uint64, - eventWriter *auditevent.EventWriter, - h *health.Health, - pprov *metrics.PrometheusMetricsProvider, -) { - sshdProcessor := sshd.NewSshdProcessor(ctx, logins, nodename, mid, eventWriter, pprov) - - //nolint:exhaustive // In this case it's actually simpler to just default to journald - switch distro { - case util.DistroRocky: - h.AddReadiness(varlogsecure.VarLogSecureComponentName) - - // TODO: handle last read timestamp - eg.Go(func() error { - vls := varlogsecure.VarLogSecure{ - L: logger, - Logins: logins, - NodeName: nodename, - MachineID: mid, - AuWriter: eventWriter, - Health: h, - Metrics: pprov, - SshdProcessor: sshdProcessor, - } - - err := vls.Read(ctx) - logger.Infof("varlogsecure worker exited (%v)", err) - return err - }) - default: - h.AddReadiness(journald.JournaldReaderComponentName) - - eg.Go(func() error { - jp := journald.Processor{ - BootID: bootID, - MachineID: mid, - NodeName: nodename, - Distro: distro, - EventW: eventWriter, - Logins: logins, - CurrentTS: lastReadJournalTS, - Health: h, - Metrics: pprov, - SshdProcessor: sshdProcessor, - } - - err := jp.Read(ctx) - logger.Infof("journald worker exited (%v)", err) - return err - }) - } -} - // handleMetricsAndHealth starts a HTTP server on port 2112 to serve metrics // and health endpoints. // @@ -208,44 +87,20 @@ func handleMetricsAndHealth(ctx context.Context, mc metricsConfig, eg *errgroup. } } -// lastReadJournalTimeStamp returns the last-read journal entry's timestamp -// or a sensible default if the timestamp cannot be loaded. -func lastReadJournalTimeStamp() uint64 { - lastRead, err := common.GetLastRead() - switch { - case err != nil: - lastRead = uint64(time.Now().UnixMicro()) - - logger.Warnf("failed to read last read timestamp for journal - "+ - "reading from current time (reason: '%s')", err.Error()) - case lastRead == 0: - lastRead = uint64(time.Now().UnixMicro()) - - logger.Info("last read timestamp for journal is zero - " + - "reading from current time") - default: - logger.Infof("last read timestamp for journal is: '%d'", lastRead) - } - - return lastRead -} - func handleAuditLogMetrics( ctx context.Context, + mc metricsConfig, eg *errgroup.Group, pprov *metrics.PrometheusMetricsProvider, - auditMetricsSecondsInterval time.Duration, - auditLogWriteTimeSecondThreshold int, - enableAuditMetrics bool, ) { - if !enableAuditMetrics { + if !mc.enableAuditMetrics { return } auditLogFilePath := "/var/log/audit/audit.log" eg.Go(func() error { - ticker := time.NewTicker(auditMetricsSecondsInterval) + ticker := time.NewTicker(mc.auditMetricsSecondsInterval) defer ticker.Stop() for { @@ -257,10 +112,10 @@ func handleAuditLogMetrics( continue } - if time.Since(s.ModTime()).Seconds() > float64(auditLogWriteTimeSecondThreshold) { - pprov.SetAuditLogCheck(0, strconv.Itoa(auditLogWriteTimeSecondThreshold)) + if time.Since(s.ModTime()).Seconds() > float64(mc.auditLogWriteTimeSecondThreshold) { + pprov.SetAuditLogCheck(0, strconv.Itoa(mc.auditLogWriteTimeSecondThreshold)) } else { - pprov.SetAuditLogCheck(1, strconv.Itoa(auditLogWriteTimeSecondThreshold)) + pprov.SetAuditLogCheck(1, strconv.Itoa(mc.auditLogWriteTimeSecondThreshold)) } pprov.SetAuditLogModifyTime(float64(s.ModTime().Unix())) diff --git a/cmd/journald.go b/cmd/journald.go deleted file mode 100644 index 30c3ecc..0000000 --- a/cmd/journald.go +++ /dev/null @@ -1,145 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "time" - - "github.com/go-logr/zapr" - "github.com/metal-toolbox/auditevent" - "github.com/metal-toolbox/auditevent/helpers" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - - "github.com/metal-toolbox/audito-maldito/ingesters/journald" - "github.com/metal-toolbox/audito-maldito/internal/common" - "github.com/metal-toolbox/audito-maldito/internal/health" - "github.com/metal-toolbox/audito-maldito/internal/metrics" - "github.com/metal-toolbox/audito-maldito/internal/util" - "github.com/metal-toolbox/audito-maldito/processors/auditd" - "github.com/metal-toolbox/audito-maldito/processors/auditd/dirreader" - "github.com/metal-toolbox/audito-maldito/processors/sshd" -) - -func RunJournald(ctx context.Context, osArgs []string, h *health.Health, optLoggerConfig *zap.Config) error { - appCfg, err := parseFlags(osArgs) - if err != nil { - return fmt.Errorf("failed to parse flags: %w", err) - } - - if optLoggerConfig == nil { - cfg := zap.NewProductionConfig() - optLoggerConfig = &cfg - } - - optLoggerConfig.Level = zap.NewAtomicLevelAt(appCfg.logLevel) - - l, err := optLoggerConfig.Build() - if err != nil { - return err - } - - defer func() { - _ = l.Sync() //nolint - }() - - logger = l.Sugar() - - auditd.SetLogger(logger) - journald.SetLogger(logger) - sshd.SetLogger(logger) - - distro, err := util.Distro() - if err != nil { - return fmt.Errorf("failed to get os distro type: %w", err) - } - - mid, miderr := common.GetMachineID() - if miderr != nil { - return fmt.Errorf("failed to get machine id: %w", miderr) - } - - nodename, nodenameerr := common.GetNodeName() - if nodenameerr != nil { - return fmt.Errorf("failed to get node name: %w", nodenameerr) - } - - if err := common.EnsureFlushDirectory(); err != nil { - return fmt.Errorf("failed to ensure flush directory: %w", err) - } - - eg, groupCtx := errgroup.WithContext(ctx) - - auf, auditfileerr := helpers.OpenAuditLogFileUntilSuccessWithContext( - groupCtx, appCfg.auditlogpath, zapr.NewLogger(l)) - if auditfileerr != nil { - return fmt.Errorf("failed to open audit log file: %w", auditfileerr) - } - - logger.Infoln("starting workers...") - - handleMetricsAndHealth(groupCtx, appCfg.metricsConfig, eg, h) - - logDirReader, err := dirreader.StartLogDirReader(groupCtx, appCfg.auditLogDirPath) - if err != nil { - return fmt.Errorf("failed to create linux audit dir reader for '%s' - %w", - appCfg.auditLogDirPath, err) - } - - h.AddReadiness(dirreader.DirReaderComponentName) - go func() { - <-logDirReader.InitFilesDone() - h.OnReady(dirreader.DirReaderComponentName) - }() - - eg.Go(func() error { - err := logDirReader.Wait() - logger.Infof("linux audit log dir reader worker exited (%v)", err) - return err - }) - - lastReadJournalTS := lastReadJournalTimeStamp() - eventWriter := auditevent.NewDefaultAuditEventWriter(auf) - logins := make(chan common.RemoteUserLogin) - pprov := metrics.NewPrometheusMetricsProvider() - - handleAuditLogMetrics( - groupCtx, - eg, - pprov, - appCfg.metricsConfig.auditMetricsSecondsInterval, - appCfg.metricsConfig.auditLogWriteTimeSecondThreshold, - appCfg.metricsConfig.enableAuditMetrics, - ) - runProcessorsForSSHLogins(groupCtx, logins, eg, distro, - mid, nodename, appCfg.bootID, lastReadJournalTS, eventWriter, h, pprov) - - h.AddReadiness(auditd.AuditdProcessorComponentName) - eg.Go(func() error { - ap := auditd.Auditd{ - After: time.UnixMicro(int64(lastReadJournalTS)), - Audits: logDirReader.Lines(), - Logins: logins, - EventW: eventWriter, - Health: h, - } - - err := ap.Read(groupCtx) - logger.Infof("linux audit worker exited (%v)", err) - return err - }) - - if err := eg.Wait(); err != nil { - // We cannot treat errors containing context.Canceled - // as non-errors because the errgroup.Group uses its - // own context, which is canceled if one of the Go - // routines returns a non-nil error. Thus, treating - // context.Canceled as a graceful shutdown may hide - // an error returned by one of the Go routines. - return fmt.Errorf("workers finished with error: %w", err) - } - - logger.Infoln("all workers finished without error") - - return nil -} diff --git a/cmd/namedpipe.go b/cmd/namedpipe.go index 028cd5d..69bab95 100644 --- a/cmd/namedpipe.go +++ b/cmd/namedpipe.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "os" "github.com/go-logr/zapr" "github.com/metal-toolbox/auditevent" @@ -28,7 +29,6 @@ func RunNamedPipe(ctx context.Context, osArgs []string, h *health.Health, optLog var appEventsOutput string var auditdLogFilePath string var sshdLogFilePath string - var enableMetrics bool var metricsConfig metricsConfig logLevel := zapcore.InfoLevel @@ -38,11 +38,23 @@ func RunNamedPipe(ctx context.Context, osArgs []string, h *health.Health, optLog flagSet.Var(&logLevel, "log-level", "Set the log level according to zapcore.Level") flagSet.BoolVar(&metricsConfig.enableMetrics, "metrics", false, "Enable Prometheus HTTP /metrics server") flagSet.BoolVar(&metricsConfig.enableHealthz, "healthz", false, "Enable HTTP health endpoints server") + flagSet.BoolVar(&metricsConfig.enableAuditMetrics, "audit-metrics", false, "Enable Prometheus audit metrics") + flagSet.DurationVar(&metricsConfig.httpServerReadTimeout, "http-server-read-timeout", DefaultHTTPServerReadTimeout, "HTTP server read timeout") flagSet.DurationVar(&metricsConfig.httpServerReadHeaderTimeout, "http-server-read-header-timeout", DefaultHTTPServerReadHeaderTimeout, "HTTP server read header timeout") - flagSet.BoolVar(&enableMetrics, "metrics", false, "Enable Prometheus HTTP /metrics server") + flagSet.DurationVar( + &metricsConfig.auditMetricsSecondsInterval, + "audit-seconds-interval", + DefaultAuditCheckInterval, + "Interval in seconds to collect audit metrics") + flagSet.IntVar( + &metricsConfig.auditLogWriteTimeSecondThreshold, + "audit-log-last-modify-seconds-threshold", + DefaultAuditModifyTimeThreshold, + "seconds since last write to audit.log before alerting") + flagSet.StringVar( &appEventsOutput, "app-events-output", @@ -51,7 +63,7 @@ func RunNamedPipe(ctx context.Context, osArgs []string, h *health.Health, optLog flagSet.StringVar( &sshdLogFilePath, "sshd-log-file-path", - "/app/audit/sshd-pipe", + "/app-audit/sshd-pipe", "Path to the sshd log file") flagSet.StringVar( &auditdLogFilePath, @@ -59,6 +71,11 @@ func RunNamedPipe(ctx context.Context, osArgs []string, h *health.Health, optLog "/app-audit/audit-pipe", "Path to the audit log file") + flagSet.Usage = func() { + os.Stderr.WriteString(usage) + flagSet.PrintDefaults() + os.Exit(1) + } err := flagSet.Parse(osArgs[1:]) if err != nil { return err @@ -115,6 +132,7 @@ func RunNamedPipe(ctx context.Context, osArgs []string, h *health.Health, optLog logger.Infoln("starting workers...") handleMetricsAndHealth(groupCtx, metricsConfig, eg, h) + handleAuditLogMetrics(groupCtx, metricsConfig, eg, pprov) h.AddReadiness(namedpipe.NamedPipeProcessorComponentName) eg.Go(func() error { @@ -140,10 +158,8 @@ func RunNamedPipe(ctx context.Context, osArgs []string, h *health.Health, optLog h.AddReadiness(namedpipe.NamedPipeProcessorComponentName) eg.Go(func() error { - alp := auditlog.AuditLogIngester{ - FilePath: auditdLogFilePath, - AuditLogChan: auditLogChan, - } + np := namedpipe.NewNamedPipeIngester(logger, h) + alp := auditlog.NewAuditLogIngester(auditdLogFilePath, auditLogChan, np) err := alp.Ingest(groupCtx) if logger.Level().Enabled(zap.DebugLevel) { diff --git a/contrib/rsyslog/Dockerfile.ubuntu b/contrib/rsyslog/Dockerfile.ubuntu index a097e85..87856e6 100644 --- a/contrib/rsyslog/Dockerfile.ubuntu +++ b/contrib/rsyslog/Dockerfile.ubuntu @@ -4,4 +4,4 @@ RUN apt install rsyslog -y RUN rm -r /etc/rsyslog.d && mkdir /etc/rsyslog.d COPY config/rsyslog.d/* /etc/rsyslog.d/ COPY config/rsyslog.conf /etc/rsyslog.conf -ENTRYPOINT ["rsyslogd" ,"-n"] \ No newline at end of file +ENTRYPOINT ["rsyslogd" ,"-n", "-d"] \ No newline at end of file diff --git a/go.mod b/go.mod index f5f3e91..782a131 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.19 require ( github.com/cenkalti/backoff/v4 v4.2.1 - github.com/coreos/go-systemd/v22 v22.5.0 github.com/elastic/go-libaudit/v2 v2.3.2 github.com/fsnotify/fsnotify v1.6.0 github.com/go-logr/zapr v1.2.4 diff --git a/ingesters/journald/doc.go b/ingesters/journald/doc.go deleted file mode 100644 index 9ce94fe..0000000 --- a/ingesters/journald/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package journald provides functionality for working with systemd's -// journal logging system. -package journald diff --git a/ingesters/journald/flush.go b/ingesters/journald/flush.go deleted file mode 100644 index e745e38..0000000 --- a/ingesters/journald/flush.go +++ /dev/null @@ -1,36 +0,0 @@ -package journald - -import ( - "fmt" - "os" - "time" - - "github.com/metal-toolbox/audito-maldito/internal/common" -) - -var defaultSleep = 1 * time.Second - -const ( - onlyUserReadable = 0o600 -) - -// writes the last read timestamp to a file -// Note we don't fail if we can't write the file nor read the directory -// as we intend to go through the defer statements and exit. -// If this fails, we will just start reading from the beginning of the journal. -func flushLastRead(lastReadTimestamp uint64) { - logger.Infof("flushing last read timestamp %d", lastReadTimestamp) - - if err := common.EnsureFlushDirectory(); err != nil { - logger.Errorf("failed to ensure flush directory: %v", err) - return - } - - // The WriteFile function ensures the file will only contain - // *exactly* what we write to it by either creating a new file, - // or by truncating an existing file. - err := os.WriteFile(common.TimeFlushPath, []byte(fmt.Sprintf("%d", lastReadTimestamp)), onlyUserReadable) - if err != nil { - logger.Errorf("failed to write flush file: %s", err) - } -} diff --git a/ingesters/journald/journalif.go b/ingesters/journald/journalif.go deleted file mode 100644 index 80eb01f..0000000 --- a/ingesters/journald/journalif.go +++ /dev/null @@ -1,56 +0,0 @@ -package journald - -import "time" - -type JournalEntry interface { - GetMessage() (string, bool) - - // GetTimeStamp returns the wallclock time in microseconds - // (since epoch) that the entry occurred at. - // - // From the systemd documentation for "__REALTIME_TIMESTAMP": - // - // "The wallclock time (CLOCK_REALTIME) at the point in time - // the entry was received by the journal, in microseconds - // since the epoch UTC, formatted as a decimal string." - // - // Refer to the following documentation for more information: - // https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html - GetTimeStamp() uint64 - - GetPID() string -} - -type JournalReader interface { - // Next advances the read pointer into the journal by one entry. - // - // According to systemd, the returned integer can be: - // - Negative number (error code, in which case the corresponding - // error object is non-nil) - // - 0 - If Next is still pointing to the same file - // - 1 - If Next is pointing to a new file - // - // Refer to the following source file for details: - // https://github.com/systemd/systemd/blob/main/src/libsystemd/sd-journal/sd-journal.c#L815-L867 - Next() (uint64, error) - - // GetEntry returns a full representation of the journal entry - // referenced by the last completed Next/Previous function call, - // with all key-value pairs of data as well as address fields - // (cursor, realtime timestamp and monotonic timestamp). - // - // To call GetEntry, you must first have called one of the - // Next/Previous functions. - GetEntry() (JournalEntry, error) - - // Wait will synchronously wait until the journal gets changed. - // The maximum time this call sleeps may be controlled with the - // timeout parameter. - // - // If sdjournal.IndefiniteWait is passed as the timeout parameter, - // Wait will wait indefinitely for a journal change. - Wait(time.Duration) int - - // Close closes a journal opened with NewJournal. - Close() error -} diff --git a/ingesters/journald/journalreader.go b/ingesters/journald/journalreader.go deleted file mode 100644 index d38ac4c..0000000 --- a/ingesters/journald/journalreader.go +++ /dev/null @@ -1,146 +0,0 @@ -//go:build linux -// +build linux - -package journald - -import ( - "errors" - "fmt" - "time" - - "github.com/coreos/go-systemd/v22/sdjournal" - - "github.com/metal-toolbox/audito-maldito/internal/util" -) - -type journalEntryImpl struct { - entry *sdjournal.JournalEntry -} - -type journalReaderImpl struct { - journal *sdjournal.Journal -} - -func newJournalReader(bootID string, distro util.DistroType, optSeekToTS uint64) (JournalReader, error) { - j, err := sdjournal.NewJournal() - if err != nil { - return nil, fmt.Errorf("failed to open journal: %w", err) - } - - if j == nil { - return nil, errors.New("journal is nil") - } - - if bootID == "" { - bootID, err = j.GetBootID() - if err != nil { - _ = j.Close() - return nil, fmt.Errorf("failed to get boot id from journal: %w", err) - } - } - - // Initialize/restart the journal reader. - j.FlushMatches() - - matchSSH, err := getDistroSpecificMatch(distro) - if err != nil { - _ = j.Close() - return nil, fmt.Errorf("failed to get journal ssh match: %w", err) - } - - if err := j.AddMatch(matchSSH.String()); err != nil { - _ = j.Close() - return nil, fmt.Errorf("failed to add ssh match: %w", err) - } - - logger.Infof("distro: '%s' | boot id: '%s'", distro, bootID) - - // NOTE(jaosorior): We only care about the current boot - matchBootID := sdjournal.Match{ - Field: sdjournal.SD_JOURNAL_FIELD_BOOT_ID, - Value: bootID, - } - - if err := j.AddMatch(matchBootID.String()); err != nil { - _ = j.Close() - return nil, fmt.Errorf("failed to add boot id match: %w", err) - } - - if optSeekToTS > 0 { - next := optSeekToTS + 1 - - logger.Infof("seeking journal to realtime usec '%d'...", next) - - if err := j.SeekRealtimeUsec(next); err != nil { - logger.Errorf("failed to seek journal to '%d' - "+ - "attempting to continue anyway (err: '%s')", next, err) - } - } - - return &journalReaderImpl{ - journal: j, - }, nil -} - -func (jr *journalReaderImpl) Next() (uint64, error) { - return jr.journal.Next() -} - -func (jr *journalReaderImpl) GetEntry() (JournalEntry, error) { - entry, err := jr.journal.GetEntry() - if err != nil { - return nil, err - } - - return &journalEntryImpl{ - entry: entry, - }, nil -} - -func (jr *journalReaderImpl) Wait(d time.Duration) int { - return jr.journal.Wait(d) -} - -func (jr *journalReaderImpl) Close() error { - return jr.journal.Close() -} - -func (je *journalEntryImpl) GetTimeStamp() uint64 { - return je.entry.RealtimeTimestamp -} - -func (je *journalEntryImpl) GetMessage() (string, bool) { - msg, ok := je.entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE] - return msg, ok -} - -func (je *journalEntryImpl) GetPID() string { - pid, ok := je.entry.Fields[sdjournal.SD_JOURNAL_FIELD_PID] - if !ok { - return "" - } - - return pid -} - -func getDistroSpecificMatch(distro util.DistroType) (sdjournal.Match, error) { - switch distro { - case util.DistroFlatcar: - return sdjournal.Match{ - Field: sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SLICE, - Value: "system-sshd.slice", - }, nil - case util.DistroUbuntu: - return sdjournal.Match{ - Field: sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT, - Value: "ssh.service", - }, nil - case util.DistroUnknown: - return sdjournal.Match{}, errors.New("unknown os distro (literally)") - case util.DistroRocky: - // Should never hit this case. Here for linter. - return sdjournal.Match{}, fmt.Errorf("unsupported os distro: '%s'", distro) - default: - return sdjournal.Match{}, fmt.Errorf("unsupported os distro: '%s'", distro) - } -} diff --git a/ingesters/journald/journalreader_unsupported.go b/ingesters/journald/journalreader_unsupported.go deleted file mode 100644 index f19e1ac..0000000 --- a/ingesters/journald/journalreader_unsupported.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build !linux -// +build !linux - -package journald - -import ( - "errors" - - "github.com/metal-toolbox/audito-maldito/internal/util" -) - -func newJournalReader(string, util.DistroType, uint64) (JournalReader, error) { - return nil, errors.New("unsupported platform") -} diff --git a/ingesters/journald/reader.go b/ingesters/journald/reader.go deleted file mode 100644 index aa95325..0000000 --- a/ingesters/journald/reader.go +++ /dev/null @@ -1,185 +0,0 @@ -// Package journald contains the functions for -// audito maldito to interact with journald -package journald - -import ( - "context" - "errors" - "fmt" - "io" - "time" - - "github.com/metal-toolbox/auditevent" - "go.uber.org/zap" - - "github.com/metal-toolbox/audito-maldito/internal/common" - "github.com/metal-toolbox/audito-maldito/internal/health" - "github.com/metal-toolbox/audito-maldito/internal/metrics" - "github.com/metal-toolbox/audito-maldito/internal/util" - "github.com/metal-toolbox/audito-maldito/processors/sshd" -) - -const ( - // JournaldReaderComponentName is the name of the component - // that reads from journald. This is used in the health check. - JournaldReaderComponentName = "journald-reader" -) - -// ErrNonFatal is returned when the error is not fatal -// and processing may continue. -var ( - ErrNonFatal = errors.New("non-fatal error") - logger *zap.SugaredLogger -) - -func SetLogger(l *zap.SugaredLogger) { - logger = l -} - -type Processor struct { - BootID string - MachineID string - NodeName string - Distro util.DistroType - EventW *auditevent.EventWriter - Logins chan<- common.RemoteUserLogin - CurrentTS uint64 // Microseconds since unix epoch. - Health *health.Health - Metrics *metrics.PrometheusMetricsProvider - jr JournalReader - SshdProcessor sshd.SshdProcessor -} - -func (jp *Processor) getJournalReader() JournalReader { - return jp.jr -} - -// Read reads the journal and sends the events to the EventWriter. -func (jp *Processor) Read(ctx context.Context) error { - var err error - - jp.jr, err = newJournalReader(jp.BootID, jp.Distro, jp.CurrentTS) - if err != nil { - return err - } - defer func() { - jr := jp.getJournalReader() - if jr != nil { - jr.Close() - } - }() - - defer func() { - // Using an anonymous function here allows us to save the - // current value of CurrentTS. Using "defer(flushLastRead(...))" - // results in the deferred function receiving an out-of-date - // copy of the value. - // - // This can be simplified by making flushLastRead a method - // on Processor... but the tradeoff between exposing all the - // struct's fields to such a simple function is making me - // second guess that. - flushLastRead(jp.CurrentTS) - }() - - jp.Health.OnReady(JournaldReaderComponentName) - - for { - select { - case <-ctx.Done(): - logger.Infof("exiting because context is done: %v", ctx.Err()) - return nil - default: - if err := jp.readEntry(ctx); err != nil { - if errors.Is(err, ErrNonFatal) { - continue - } - return err - } - } - } -} - -func (jp *Processor) readEntry(ctx context.Context) error { - j := jp.getJournalReader() - isNewFile, nextErr := j.Next() - if nextErr != nil { - if errors.Is(nextErr, io.EOF) { - if r := j.Wait(defaultSleep); r < 0 { - flushLastRead(jp.CurrentTS) - - logger.Infof("wait failed after calling next, reinitializing (error-code: %d)", r) - time.Sleep(defaultSleep) - - if err := jp.resetJournal(); err != nil { - return fmt.Errorf("failed to reset journal after next failed: %w", err) - } - } - - return nil - } - - if closeErr := j.Close(); closeErr != nil { - logger.Errorf("failed to close journal: %v", closeErr) - } - - return fmt.Errorf("failed to read next journal entry: %w", nextErr) - } - - if isNewFile == 0 { - if r := j.Wait(defaultSleep); r < 0 { - jp.Metrics.IncErrors(metrics.ErrorTypeJournaldWait) - - flushLastRead(jp.CurrentTS) - - logger.Errorf("wait failed after checking for new journal file, "+ - "reinitializing. error-code: %d", r) - time.Sleep(defaultSleep) - - if err := jp.resetJournal(); err != nil { - return fmt.Errorf("failed to reset journal after wait failed: %w", err) - } - } - - return nil - } - - entry, geErr := j.GetEntry() - if geErr != nil { - logger.Errorf("error getting entry: %v", geErr) - return ErrNonFatal - } - - entryMsg, hasMessage := entry.GetMessage() - if !hasMessage { - logger.Error("got entry with no message") - return ErrNonFatal - } - - usec := entry.GetTimeStamp() - jp.CurrentTS = usec - - err := jp.SshdProcessor.ProcessSshdLogEntry(ctx, sshd.SshdLogEntry{ - Message: entryMsg, - PID: entry.GetPID(), - }) - if err != nil { - return fmt.Errorf("failed to process journal entry '%s': %w", entryMsg, err) - } - - return nil -} - -func (jp *Processor) resetJournal() error { - if err := jp.jr.Close(); err != nil { - logger.Errorf("failed to close journal: %v", err) - } - - var err error - jp.jr, err = newJournalReader(jp.BootID, jp.Distro, jp.CurrentTS) - if err != nil { - return fmt.Errorf("failed to reset journal: %w", err) - } - - return nil -} diff --git a/ingesters/namedpipe/namedpipeingester.go b/ingesters/namedpipe/namedpipeingester.go index c99960c..889e7cf 100644 --- a/ingesters/namedpipe/namedpipeingester.go +++ b/ingesters/namedpipe/namedpipeingester.go @@ -40,6 +40,7 @@ func (n *NamedPipeIngester) Ingest( var err error ready := make(chan struct{}) + n.Health.OnReady(NamedPipeProcessorComponentName) // os.OpenFile blocks. Put in go routine so we can gracefully exit. go func() { file, err = os.OpenFile(filePath, os.O_RDONLY, os.ModeNamedPipe) @@ -56,10 +57,14 @@ func (n *NamedPipeIngester) Ingest( return err } + go func() { + <-ctx.Done() + file.Close() + }() + n.Logger.Infof("Successfully opened %s", filePath) defer file.Close() - n.Health.OnReady(NamedPipeProcessorComponentName) r := bufio.NewReader(file) for { diff --git a/internal/integration_tests/ubuntu_test.go b/internal/integration_tests/ubuntu_test.go index 21e22fd..a0ed8c9 100644 --- a/internal/integration_tests/ubuntu_test.go +++ b/internal/integration_tests/ubuntu_test.go @@ -49,8 +49,6 @@ func TestSSHCertLoginAndExecStuff_Ubuntu(t *testing.T) { ctx, cancelFn := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancelFn() - ourPrivateKeyPath := setupUbuntuComputer(t, ctx) - // Required by audito-maldito. t.Setenv("NODE_NAME", "integration-test") @@ -71,23 +69,31 @@ func TestSSHCertLoginAndExecStuff_Ubuntu(t *testing.T) { checkPipelineErrs, onEventFn := newShellPipelineChecker(ctx, expectedShellPipeline) - readEventsErrs := createPipeAndReadEvents(t, ctx, "/app-audit/audit.log", onEventFn) + appEventsOutputFilePath := "/app-audit/app-events-output-test.log" + readEventsErrs := createPipeAndReadEvents(t, ctx, appEventsOutputFilePath, onEventFn) appHealth := health.NewHealth() - tmoutctx, tmoutctxFn := context.WithTimeout(ctx, time.Minute) + tmoutctx, tmoutctxFn := context.WithTimeout(ctx, time.Minute*5) defer tmoutctxFn() appErrs := make(chan error, 1) go func() { - appErrs <- cmd.RunJournald(ctx, []string{"audito-maldito"}, appHealth, zapLoggerConfig()) + appErrs <- cmd.RunNamedPipe(tmoutctx, []string{"audito-maldito", "--app-events-output", appEventsOutputFilePath}, appHealth, zapLoggerConfig()) }() + // let audito-maldito start + time.Sleep(30 * time.Second) + ourPrivateKeyPath := setupUbuntuComputer(t, tmoutctx) + err := <-appHealth.WaitForReady(tmoutctx) if err != nil { t.Fatalf("failed to wait for app to become ready - %s", err) } + // Required by audito-maldito. + t.Setenv("NODE_NAME", "integration-test") + err = execSSHPipeline(ctx, ourPrivateKeyPath, expectedShellPipeline) if err != nil { t.Fatalf("failed to execute ssh pipeline - %s", err) diff --git a/main.go b/main.go index 902e54e..30e4d28 100644 --- a/main.go +++ b/main.go @@ -24,14 +24,5 @@ func main() { func mainWithError() error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - - args := os.Args[1:] - - for _, p := range args { - if p == "--namedpipe" { - return cmd.RunNamedPipe(ctx, os.Args, health.NewHealth(), nil) - } - } - os.Stdout.WriteString("defaulting to journald command") - return cmd.RunJournald(ctx, os.Args, health.NewHealth(), nil) + return cmd.RunNamedPipe(ctx, os.Args, health.NewHealth(), nil) }