Skip to content

Commit

Permalink
Add status reporting for Journald input
Browse files Browse the repository at this point in the history
This commit adds the status reporting for the Journald input. It also
adds a debug log to the `UpdateStatus` function from `v2.Context`.
  • Loading branch information
belimawr committed Jan 28, 2025
1 parent 8406c86 commit 9c98d03
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]

*Auditbeat*

Expand Down
62 changes: 53 additions & 9 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
conf "github.com/elastic/elastic-agent-libs/config"
Expand All @@ -40,10 +41,11 @@ import (
)

type inputTestingEnvironment struct {
t *testing.T
workingDir string
stateStore *testInputStore
pipeline *mockPipelineConnector
t *testing.T
workingDir string
stateStore *testInputStore
pipeline *mockPipelineConnector
statusReporter *mockStatusReporter

pluginInitOnce sync.Once
plugin v2.Plugin
Expand All @@ -54,10 +56,11 @@ type inputTestingEnvironment struct {

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
statusReporter: &mockStatusReporter{},
}
}

Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
}
}()

inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx}
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, StatusReporter: e.statusReporter}
if err := inp.Run(inputCtx, e.pipeline); err != nil {
e.t.Errorf("input 'Run' method returned an error: %s", err)
}
Expand Down Expand Up @@ -251,3 +254,44 @@ func blockingACKer(starter context.Context) beat.EventListener {
}
})
}

type statusUpdate struct {
state status.Status
msg string
}

type mockStatusReporter struct {
mutex sync.RWMutex
updates []statusUpdate
}

func (m *mockStatusReporter) UpdateStatus(status status.Status, msg string) {
m.mutex.Lock()
m.updates = append(m.updates, statusUpdate{status, msg})
m.mutex.Unlock()
}

func (m *mockStatusReporter) GetUpdates() []statusUpdate {
m.mutex.RLock()
defer m.mutex.RUnlock()
return append([]statusUpdate{}, m.updates...)
}

func (env *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {

Check failure on line 280 in filebeat/input/journald/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ST1016: methods on the same type should have the same receiver name (seen 1x "env", 4x "e") (stylecheck)
t := env.t
t.Helper()
got := env.statusReporter.GetUpdates()
if len(got) != len(expected) {
t.Fatalf("expecting %d updates, got %d", len(expected), len(got))
}

for i := range expected {
g, e := got[i], expected[i]
if g != e {
t.Errorf(
"expecting [%d] status update to be {state:%s, msg:%s}, got {state:%s, msg:%s}",
i, e.state.String(), e.msg, g.state.String(), g.msg,
)
}
}
}
16 changes: 13 additions & 3 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/parser"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -154,6 +155,8 @@ func (inp *journald) Run(
logger := ctx.Logger.
With("path", src.Name()).
With("input_id", inp.ID)

ctx.UpdateStatus(status.Starting, "Starting")
currentCheckpoint := initCheckpoint(logger, cursor)

mode := inp.Seek
Expand All @@ -173,7 +176,9 @@ func (inp *journald) Run(
journalctl.Factory,
)
if err != nil {
return fmt.Errorf("could not start journal reader: %w", err)
wrappedErr := fmt.Errorf("could not start journal reader: %w", err)
ctx.UpdateStatus(status.Failed, wrappedErr.Error())
return wrappedErr
}

defer reader.Close()
Expand All @@ -186,6 +191,7 @@ func (inp *journald) Run(
saveRemoteHostname: inp.SaveRemoteHostname,
})

ctx.UpdateStatus(status.Running, "Running")
for {
entry, err := parser.Next()
if err != nil {
Expand All @@ -197,14 +203,18 @@ func (inp *journald) Run(
case errors.Is(err, journalctl.ErrRestarting):
continue
default:
logger.Errorf("could not read event: %s", err)
msg := fmt.Sprintf("could not read event: %s", err)
ctx.UpdateStatus(status.Failed, msg)
logger.Error(msg)
return err
}
}

event := entry.ToEvent()
if err := publisher.Publish(event, event.Private); err != nil {
logger.Errorf("could not publish event: %s", err)
msg := fmt.Sprintf("could not publish event: %s", err)
ctx.UpdateStatus(status.Failed, msg)
logger.Errorf(msg)
return err
}
}
Expand Down
28 changes: 28 additions & 0 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -330,6 +331,33 @@ func TestReaderAdapterCanHandleNonStringFields(t *testing.T) {
}
}

func TestInputCanReportStatus(t *testing.T) {
out := decompress(t, filepath.Join("testdata", "multiple-boots.journal.gz"))

env := newInputTestingEnvironment(t)
cfg := mapstr.M{
"paths": []string{out},
}
inp := env.mustCreateInput(cfg)

ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)

env.startInput(ctx, inp)
env.waitUntilEventCount(6)

env.RequireStatuses([]statusUpdate{
{
state: status.Starting,
msg: "Starting",
},
{
state: status.Running,
msg: "Running",
},
})
}

func decompress(t *testing.T, namegz string) string {
t.Helper()

Expand Down
1 change: 1 addition & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type Context struct {

func (c Context) UpdateStatus(status status.Status, msg string) {
if c.StatusReporter != nil {
c.Logger.Debugf("updating status, status: '%s', message: '%s'", status.String(), msg)
c.StatusReporter.UpdateStatus(status, msg)
}
}
Expand Down

0 comments on commit 9c98d03

Please sign in to comment.