From 4d2f7137e9f76e7a617bb08800fe80858b3e4a97 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 12 Jun 2024 14:28:16 +0200 Subject: [PATCH 1/9] wip --- libbeat/reader/filter/filter.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/libbeat/reader/filter/filter.go b/libbeat/reader/filter/filter.go index 61894981be9..0cc6fdf09e3 100644 --- a/libbeat/reader/filter/filter.go +++ b/libbeat/reader/filter/filter.go @@ -55,15 +55,22 @@ func NewParser(r reader.Reader, c *Config) *FilterParser { } } -func (p *FilterParser) Next() (reader.Message, error) { +func (p *FilterParser) Next() (message reader.Message, err error) { + // discardedOffset accounts for the bytes of discarded messages + var discardedOffset int + defer func() { + message.Bytes += discardedOffset + }() + for p.ctx.Err() == nil { - message, err := p.r.Next() + message, err = p.r.Next() if err != nil { return message, err } if p.matchAny(string(message.Content)) { return message, err } + discardedOffset += message.Bytes p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content)) } return reader.Message{}, io.EOF From a5db930b43177957b7b8f4441f26ef373e43a05a Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 12 Jun 2024 16:46:04 +0200 Subject: [PATCH 2/9] add Offset to message Offset contains the data read before the message itself --- libbeat/reader/filter/filter.go | 6 ++++-- libbeat/reader/message.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/libbeat/reader/filter/filter.go b/libbeat/reader/filter/filter.go index 0cc6fdf09e3..2ed83968c2d 100644 --- a/libbeat/reader/filter/filter.go +++ b/libbeat/reader/filter/filter.go @@ -56,10 +56,12 @@ func NewParser(r reader.Reader, c *Config) *FilterParser { } func (p *FilterParser) Next() (message reader.Message, err error) { - // discardedOffset accounts for the bytes of discarded messages + // discardedOffset accounts for the bytes of discarded messages. The inputs + // need to correctly track the file offset, therefore if only the matching + // message size is returned, the offset cannot be correctly updated. var discardedOffset int defer func() { - message.Bytes += discardedOffset + message.Offset = discardedOffset }() for p.ctx.Err() == nil { diff --git a/libbeat/reader/message.go b/libbeat/reader/message.go index 4450e4f7c9a..7c4d6935751 100644 --- a/libbeat/reader/message.go +++ b/libbeat/reader/message.go @@ -30,6 +30,7 @@ type Message struct { Ts time.Time // timestamp the content was read Content []byte // actual content read Bytes int // total number of bytes read to generate the message + Offset int // total number of bytes read and discarded prior to generate the message Fields mapstr.M // optional fields that can be added by reader Meta mapstr.M // deprecated Private interface{} From 4b94cc4a03d6f5d9bec4d08d5ba2935a27f7aa59 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 12 Jun 2024 16:47:03 +0200 Subject: [PATCH 3/9] account for the offset of discarded messages --- filebeat/input/filestream/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 7da25654a25..ea761ec177f 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -372,7 +372,7 @@ func (inp *filestream) readFromSource( return nil } - s.Offset += int64(message.Bytes) + s.Offset += int64(message.Bytes) + int64(message.Offset) metrics.MessagesRead.Inc() if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) { From c388c96c04ba08f8c94ce5aef8a050c4246c1999 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 12 Jun 2024 16:47:27 +0200 Subject: [PATCH 4/9] add test for include_message parser --- .../filestream/parsers_integration_test.go | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/filebeat/input/filestream/parsers_integration_test.go b/filebeat/input/filestream/parsers_integration_test.go index 5de8a57e123..619d39f0512 100644 --- a/filebeat/input/filestream/parsers_integration_test.go +++ b/filebeat/input/filestream/parsers_integration_test.go @@ -59,6 +59,39 @@ func TestParsersAgentLogs(t *testing.T) { env.waitUntilInputStops() } +func TestParsersIncludeMessage(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + readLine := "include this" + inp := env.mustCreateInput(map[string]interface{}{ + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "100ms", + "parsers": []map[string]interface{}{ + { + "include_message": map[string]interface{}{ + "patterns": "^" + readLine + "$", + }, + }, + }, + }) + + logs := []byte("do no include this line\r\n" + readLine + "\r\n") + env.mustWriteToFile(testlogName, logs) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, "fake-ID", len(logs)) + + env.requireEventContents(0, "message", readLine) + + cancelInput() + env.waitUntilInputStops() +} + // test_docker_logs_filtering from test_json.py func TestParsersDockerLogsFiltering(t *testing.T) { env := newInputTestingEnvironment(t) From 0725d669f6e90157a514614f644c8c0649806603 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 12 Jun 2024 16:48:12 +0200 Subject: [PATCH 5/9] avoid tests hanging forever --- filebeat/input/filestream/environment_test.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 91454d7e179..e36a467453f 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -385,16 +385,20 @@ func getIDFromPath(filepath, inputID string, fi os.FileInfo) string { // waitUntilEventCount waits until total count events arrive to the client. func (e *inputTestingEnvironment) waitUntilEventCount(count int) { - for { - sum := len(e.pipeline.GetAllEvents()) + msg := &strings.Builder{} + require.Eventuallyf(e.t, func() bool { + msg.Reset() + + events := e.pipeline.GetAllEvents() + sum := len(events) if sum == count { - return + return true } - if count < sum { - e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum) - } - time.Sleep(10 * time.Millisecond) - } + fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d", + count, sum) + + return false + }, 2*time.Minute, 10*time.Millisecond, "%s", msg) } // waitUntilEventCountCtx calls waitUntilEventCount, but fails if ctx is cancelled. From eef16ea60d7c05b7400f341dfb0430ea95ab2ec1 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 17 Jun 2024 18:01:36 +0200 Subject: [PATCH 6/9] fix double import --- filebeat/input/filestream/environment_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index e36a467453f..3f308c8c7fd 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -33,7 +33,6 @@ import ( "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" - input "github.com/elastic/beats/v7/filebeat/input/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" @@ -129,7 +128,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) go func(wg *sync.WaitGroup, grp *unison.TaskGroup) { defer wg.Done() defer grp.Stop() - inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"} + inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"} inp.Run(inputCtx, e.pipeline) }(&e.wg, &e.grp) } From 16fa60b61849336f5cd8f5abe30ddf5e073a3969 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 17 Jun 2024 18:28:18 +0200 Subject: [PATCH 7/9] add changelog --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e5cd3497e1..2ff5fdd167d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561] - Fix parsing of RFC 3164 process IDs in syslog processor. {issue}38947[38947] {pull}38982[38982] - Rename the field "apache2.module.error" to "apache.module.error" in Apache error visualization. {issue}39480[39480] {pull}39481[39481] +- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653] *Auditbeat* @@ -142,6 +143,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] - Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] - Fix request trace filename handling in http_endpoint input. {pull}39410[39410] +- Fix filestream not correctly tracking the offset of a file when using the `include_message` parsser. {pull}39873[39873] {issue}39653[39653] *Heartbeat* From e8b9edd490303c5aec1ad81a5fd9cfff1bd51367 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 19 Jun 2024 11:01:21 +0200 Subject: [PATCH 8/9] fix changelog --- CHANGELOG-developer.next.asciidoc | 1 + CHANGELOG.next.asciidoc | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 29c9ce99f49..f9fa95e61d5 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -101,6 +101,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Fix flakey test on Windows 2022 in packetbeat/route. {issue}39698[39698] {pull}39822[39822] - Fix bug in minimum length for request trace logging. {pull}39834[39834] - Close connections properly in Filbeat's HTTPJSON input. {pull}39790[39790] +- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653] ==== Added diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ff08d03115e..288f39fbcec 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -94,7 +94,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561] - Fix parsing of RFC 3164 process IDs in syslog processor. {issue}38947[38947] {pull}38982[38982] - Rename the field "apache2.module.error" to "apache.module.error" in Apache error visualization. {issue}39480[39480] {pull}39481[39481] -- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653] *Auditbeat* From fed6f10e7046eac1b716fd660684eee5d054a92e Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 19 Jun 2024 11:01:29 +0200 Subject: [PATCH 9/9] fix linter issues --- filebeat/input/filestream/environment_test.go | 41 ++++++------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 3f308c8c7fd..f9804bb16f3 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -94,7 +94,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) e.t.Helper() e.grp = unison.TaskGroup{} manager := e.getManager() - manager.Init(&e.grp) + _ = manager.Init(&e.grp) c := conf.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -106,7 +106,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) { e.grp = unison.TaskGroup{} manager := e.getManager() - manager.Init(&e.grp) + _ = manager.Init(&e.grp) c := conf.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -127,9 +127,9 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) e.wg.Add(1) go func(wg *sync.WaitGroup, grp *unison.TaskGroup) { defer wg.Done() - defer grp.Stop() + defer func() { _ = grp.Stop() }() inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"} - inp.Run(inputCtx, e.pipeline) + _ = inp.Run(inputCtx, e.pipeline) }(&e.wg, &e.grp) } @@ -357,14 +357,16 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e var entry registryEntry err := inputStore.Get(key, &entry) if err != nil { - keys := []string{} - inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) { + var keys []string + _ = inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) { keys = append(keys, key) return false, nil }) e.t.Logf("keys in store: %v", keys) - return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err) + return registryEntry{}, + fmt.Errorf("error when getting expected key '%s' from store: %w", + key, err) } return entry, nil @@ -393,7 +395,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) { if sum == count { return true } - fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d", + fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d\n", count, sum) return false @@ -492,9 +494,7 @@ func (e *inputTestingEnvironment) getOutputMessages() []string { func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) { events := make([]beat.Event, 0) for _, c := range e.pipeline.clients { - for _, evt := range c.GetEvents() { - events = append(events, evt) - } + events = append(events, c.GetEvents()...) } selectedEvent := events[nr] @@ -517,9 +517,7 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) { } events := make([]beat.Event, 0) for _, c := range e.pipeline.clients { - for _, evt := range c.GetEvents() { - events = append(events, evt) - } + events = append(events, c.GetEvents()...) } selectedEvent := events[nr] @@ -581,9 +579,7 @@ func (c *mockClient) PublishAll(events []beat.Event) { } c.ackHandler.ACKEvents(len(events)) - for _, event := range events { - c.published = append(c.published, event) - } + c.published = append(c.published, events...) } func (c *mockClient) waitUntilPublishingHasStarted() { @@ -655,17 +651,6 @@ func (pc *mockPipelineConnector) cancelAllClients() { } } -func (pc *mockPipelineConnector) cancelClient(i int) { - pc.mtx.Lock() - defer pc.mtx.Unlock() - - if len(pc.clients) < i+1 { - return - } - - pc.clients[i].canceler() -} - func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener { if !blocking { return config.EventListener