From 535a17418d5a59583ba6ea5ea7d13e0f693ed468 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Fri, 21 Jun 2024 17:37:50 +0200 Subject: [PATCH] libbeat: add Offset to libbeat/reader.Message (#39873) This commit introduces the Offset property to libbeat/reader.Message, which stores the total number of bytes read and discarded before generating the message. The Offset field allows inputs to accurately determine how much data has been read up to the message, calculated as Message.Bytes + Message.Offset. With this new Offset field, the filestream input correctly updates its state to account for data read but discarded by the include_message parser. --- CHANGELOG-developer.next.asciidoc | 1 + CHANGELOG.next.asciidoc | 1 + filebeat/input/filestream/environment_test.go | 62 ++++++++----------- filebeat/input/filestream/input.go | 2 +- .../filestream/parsers_integration_test.go | 33 ++++++++++ libbeat/reader/filter/filter.go | 13 +++- libbeat/reader/message.go | 1 + 7 files changed, 73 insertions(+), 40 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index ca96782c557..92a42ea40fe 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -101,6 +101,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Fix flakey test on Windows 2022 in packetbeat/route. {issue}39698[39698] {pull}39822[39822] - Fix bug in minimum length for request trace logging. {pull}39834[39834] - Close connections properly in Filbeat's HTTPJSON input. {pull}39790[39790] +- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653] ==== Added diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ec692d6c9a2..64d6b513040 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -142,6 +142,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] - Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] - Fix request trace filename handling in http_endpoint input. {pull}39410[39410] +- Fix filestream not correctly tracking the offset of a file when using the `include_message` parsser. {pull}39873[39873] {issue}39653[39653] *Heartbeat* diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 91454d7e179..f9804bb16f3 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -33,7 +33,6 @@ import ( "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" - input "github.com/elastic/beats/v7/filebeat/input/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" @@ -95,7 +94,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) e.t.Helper() e.grp = unison.TaskGroup{} manager := e.getManager() - manager.Init(&e.grp) + _ = manager.Init(&e.grp) c := conf.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -107,7 +106,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) { e.grp = unison.TaskGroup{} manager := e.getManager() - manager.Init(&e.grp) + _ = manager.Init(&e.grp) c := conf.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -128,9 +127,9 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) e.wg.Add(1) go func(wg *sync.WaitGroup, grp *unison.TaskGroup) { defer wg.Done() - defer grp.Stop() - inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"} - inp.Run(inputCtx, e.pipeline) + defer func() { _ = grp.Stop() }() + inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"} + _ = inp.Run(inputCtx, e.pipeline) }(&e.wg, &e.grp) } @@ -358,14 +357,16 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e var entry registryEntry err := inputStore.Get(key, &entry) if err != nil { - keys := []string{} - inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) { + var keys []string + _ = inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) { keys = append(keys, key) return false, nil }) e.t.Logf("keys in store: %v", keys) - return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err) + return registryEntry{}, + fmt.Errorf("error when getting expected key '%s' from store: %w", + key, err) } return entry, nil @@ -385,16 +386,20 @@ func getIDFromPath(filepath, inputID string, fi os.FileInfo) string { // waitUntilEventCount waits until total count events arrive to the client. func (e *inputTestingEnvironment) waitUntilEventCount(count int) { - for { - sum := len(e.pipeline.GetAllEvents()) + msg := &strings.Builder{} + require.Eventuallyf(e.t, func() bool { + msg.Reset() + + events := e.pipeline.GetAllEvents() + sum := len(events) if sum == count { - return - } - if count < sum { - e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum) + return true } - time.Sleep(10 * time.Millisecond) - } + fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d\n", + count, sum) + + return false + }, 2*time.Minute, 10*time.Millisecond, "%s", msg) } // waitUntilEventCountCtx calls waitUntilEventCount, but fails if ctx is cancelled. @@ -489,9 +494,7 @@ func (e *inputTestingEnvironment) getOutputMessages() []string { func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) { events := make([]beat.Event, 0) for _, c := range e.pipeline.clients { - for _, evt := range c.GetEvents() { - events = append(events, evt) - } + events = append(events, c.GetEvents()...) } selectedEvent := events[nr] @@ -514,9 +517,7 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) { } events := make([]beat.Event, 0) for _, c := range e.pipeline.clients { - for _, evt := range c.GetEvents() { - events = append(events, evt) - } + events = append(events, c.GetEvents()...) } selectedEvent := events[nr] @@ -578,9 +579,7 @@ func (c *mockClient) PublishAll(events []beat.Event) { } c.ackHandler.ACKEvents(len(events)) - for _, event := range events { - c.published = append(c.published, event) - } + c.published = append(c.published, events...) } func (c *mockClient) waitUntilPublishingHasStarted() { @@ -652,17 +651,6 @@ func (pc *mockPipelineConnector) cancelAllClients() { } } -func (pc *mockPipelineConnector) cancelClient(i int) { - pc.mtx.Lock() - defer pc.mtx.Unlock() - - if len(pc.clients) < i+1 { - return - } - - pc.clients[i].canceler() -} - func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener { if !blocking { return config.EventListener diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 7da25654a25..ea761ec177f 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -372,7 +372,7 @@ func (inp *filestream) readFromSource( return nil } - s.Offset += int64(message.Bytes) + s.Offset += int64(message.Bytes) + int64(message.Offset) metrics.MessagesRead.Inc() if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) { diff --git a/filebeat/input/filestream/parsers_integration_test.go b/filebeat/input/filestream/parsers_integration_test.go index 5de8a57e123..619d39f0512 100644 --- a/filebeat/input/filestream/parsers_integration_test.go +++ b/filebeat/input/filestream/parsers_integration_test.go @@ -59,6 +59,39 @@ func TestParsersAgentLogs(t *testing.T) { env.waitUntilInputStops() } +func TestParsersIncludeMessage(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + readLine := "include this" + inp := env.mustCreateInput(map[string]interface{}{ + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "100ms", + "parsers": []map[string]interface{}{ + { + "include_message": map[string]interface{}{ + "patterns": "^" + readLine + "$", + }, + }, + }, + }) + + logs := []byte("do no include this line\r\n" + readLine + "\r\n") + env.mustWriteToFile(testlogName, logs) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, "fake-ID", len(logs)) + + env.requireEventContents(0, "message", readLine) + + cancelInput() + env.waitUntilInputStops() +} + // test_docker_logs_filtering from test_json.py func TestParsersDockerLogsFiltering(t *testing.T) { env := newInputTestingEnvironment(t) diff --git a/libbeat/reader/filter/filter.go b/libbeat/reader/filter/filter.go index 61894981be9..2ed83968c2d 100644 --- a/libbeat/reader/filter/filter.go +++ b/libbeat/reader/filter/filter.go @@ -55,15 +55,24 @@ func NewParser(r reader.Reader, c *Config) *FilterParser { } } -func (p *FilterParser) Next() (reader.Message, error) { +func (p *FilterParser) Next() (message reader.Message, err error) { + // discardedOffset accounts for the bytes of discarded messages. The inputs + // need to correctly track the file offset, therefore if only the matching + // message size is returned, the offset cannot be correctly updated. + var discardedOffset int + defer func() { + message.Offset = discardedOffset + }() + for p.ctx.Err() == nil { - message, err := p.r.Next() + message, err = p.r.Next() if err != nil { return message, err } if p.matchAny(string(message.Content)) { return message, err } + discardedOffset += message.Bytes p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content)) } return reader.Message{}, io.EOF diff --git a/libbeat/reader/message.go b/libbeat/reader/message.go index 4450e4f7c9a..7c4d6935751 100644 --- a/libbeat/reader/message.go +++ b/libbeat/reader/message.go @@ -30,6 +30,7 @@ type Message struct { Ts time.Time // timestamp the content was read Content []byte // actual content read Bytes int // total number of bytes read to generate the message + Offset int // total number of bytes read and discarded prior to generate the message Fields mapstr.M // optional fields that can be added by reader Meta mapstr.M // deprecated Private interface{}