Skip to content

Commit

Permalink
libbeat: add Offset to libbeat/reader.Message (#39873)
Browse files Browse the repository at this point in the history
This commit introduces the Offset property to libbeat/reader.Message, which stores the total number of bytes read and discarded before generating the message. The Offset field allows inputs to accurately determine how much data has been read up to the message, calculated as Message.Bytes + Message.Offset.

With this new Offset field, the filestream input correctly updates its state to account for data read but discarded by the include_message parser.

(cherry picked from commit 535a174)

# Conflicts:
#	filebeat/input/filestream/environment_test.go
  • Loading branch information
AndersonQ authored and mergify[bot] committed Jun 21, 2024
1 parent 99a6e8c commit de334eb
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Skip flakey metrics test on windows in filebeat httpjson input. {issue}39676[39676] {pull}39678[39678]
- Fix flakey test on Windows 2022 in packetbeat/route. {issue}39698[39698] {pull}39822[39822]
- Fix bug in minimum length for request trace logging. {pull}39834[39834]
- Close connections properly in Filbeat's HTTPJSON input. {pull}39790[39790]
- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653]

==== Added

Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,34 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496]
- [threatintel] MISP pagination fixes {pull}37898[37898]
- Fix file handle leak when handling errors in filestream {pull}37973[37973]
- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094]
- Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116]
- Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125]
- Fix duplicated addition of regexp extension in CEL input. {pull}38181[38181]
- Fix the incorrect values generated by the uri_parts processor. {pull}38216[38216]
- Fix HTTPJSON handling of empty object bodies in POST requests. {issue}33961[33961] {pull}38290[38290]
- Fix PEM key validation for CEL and HTTPJSON inputs. {pull}38405[38405]
- Fix filebeat gcs input panic {pull}38407[38407]
- Rename `activity_guid` to `activity_id` in ETW input events to suit other Windows inputs. {pull}38530[38530]
- Add missing provider registration and fix published entity for Active Directory entityanalytics provider. {pull}38645[38645]
- Fix handling of un-parsed JSON in O365 module. {issue}37800[37800] {pull}38709[38709]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780]
- Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917]
- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953]
- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985]
- entity-analytics input: Improve structured logging. {pull}38990[38990]
- Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962]
- Updated Websocket input title to align with existing inputs {pull}39006[39006]
- Restore netflow input on Windows {pull}39024[39024]
- Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861]
- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131]
- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420]
- Fix request trace filename handling in http_endpoint input. {pull}39410[39410]
- Fix filestream not correctly tracking the offset of a file when using the `include_message` parsser. {pull}39873[39873] {issue}39653[39653]

*Heartbeat*

Expand Down
66 changes: 31 additions & 35 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
Expand Down Expand Up @@ -95,7 +94,11 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
e.t.Helper()
e.grp = unison.TaskGroup{}
manager := e.getManager()
<<<<<<< HEAD

Check failure on line 97 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
manager.Init(&e.grp, v2.ModeRun)
=======
_ = manager.Init(&e.grp)
>>>>>>> 535a17418d (libbeat: add Offset to libbeat/reader.Message (#39873))

Check failure on line 101 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -107,7 +110,11 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) {
e.grp = unison.TaskGroup{}
manager := e.getManager()
<<<<<<< HEAD

Check failure on line 113 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
manager.Init(&e.grp, v2.ModeRun)
=======
_ = manager.Init(&e.grp)
>>>>>>> 535a17418d (libbeat: add Offset to libbeat/reader.Message (#39873))

Check failure on line 117 in filebeat/input/filestream/environment_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -128,9 +135,9 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
e.wg.Add(1)
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()
inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"}
inp.Run(inputCtx, e.pipeline)
defer func() { _ = grp.Stop() }()
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"}
_ = inp.Run(inputCtx, e.pipeline)
}(&e.wg, &e.grp)
}

Expand Down Expand Up @@ -358,14 +365,16 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e
var entry registryEntry
err := inputStore.Get(key, &entry)
if err != nil {
keys := []string{}
inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) {
var keys []string
_ = inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) {
keys = append(keys, key)
return false, nil
})
e.t.Logf("keys in store: %v", keys)

return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err)
return registryEntry{},
fmt.Errorf("error when getting expected key '%s' from store: %w",
key, err)
}

return entry, nil
Expand All @@ -385,16 +394,20 @@ func getIDFromPath(filepath, inputID string, fi os.FileInfo) string {

// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
for {
sum := len(e.pipeline.GetAllEvents())
msg := &strings.Builder{}
require.Eventuallyf(e.t, func() bool {
msg.Reset()

events := e.pipeline.GetAllEvents()
sum := len(events)
if sum == count {
return
}
if count < sum {
e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum)
return true
}
time.Sleep(10 * time.Millisecond)
}
fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d\n",
count, sum)

return false
}, 2*time.Minute, 10*time.Millisecond, "%s", msg)
}

// waitUntilEventCountCtx calls waitUntilEventCount, but fails if ctx is cancelled.
Expand Down Expand Up @@ -489,9 +502,7 @@ func (e *inputTestingEnvironment) getOutputMessages() []string {
func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) {
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
events = append(events, evt)
}
events = append(events, c.GetEvents()...)
}

selectedEvent := events[nr]
Expand All @@ -514,9 +525,7 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
}
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
events = append(events, evt)
}
events = append(events, c.GetEvents()...)
}

selectedEvent := events[nr]
Expand Down Expand Up @@ -578,9 +587,7 @@ func (c *mockClient) PublishAll(events []beat.Event) {
}
c.ackHandler.ACKEvents(len(events))

for _, event := range events {
c.published = append(c.published, event)
}
c.published = append(c.published, events...)
}

func (c *mockClient) waitUntilPublishingHasStarted() {
Expand Down Expand Up @@ -652,17 +659,6 @@ func (pc *mockPipelineConnector) cancelAllClients() {
}
}

func (pc *mockPipelineConnector) cancelClient(i int) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

if len(pc.clients) < i+1 {
return
}

pc.clients[i].canceler()
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener {
if !blocking {
return config.EventListener
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (inp *filestream) readFromSource(
return nil
}

s.Offset += int64(message.Bytes)
s.Offset += int64(message.Bytes) + int64(message.Offset)

metrics.MessagesRead.Inc()
if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) {
Expand Down
33 changes: 33 additions & 0 deletions filebeat/input/filestream/parsers_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,39 @@ func TestParsersAgentLogs(t *testing.T) {
env.waitUntilInputStops()
}

func TestParsersIncludeMessage(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
readLine := "include this"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "100ms",
"parsers": []map[string]interface{}{
{
"include_message": map[string]interface{}{
"patterns": "^" + readLine + "$",
},
},
},
})

logs := []byte("do no include this line\r\n" + readLine + "\r\n")
env.mustWriteToFile(testlogName, logs)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)
env.requireOffsetInRegistry(testlogName, "fake-ID", len(logs))

env.requireEventContents(0, "message", readLine)

cancelInput()
env.waitUntilInputStops()
}

// test_docker_logs_filtering from test_json.py
func TestParsersDockerLogsFiltering(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down
13 changes: 11 additions & 2 deletions libbeat/reader/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@ func NewParser(r reader.Reader, c *Config) *FilterParser {
}
}

func (p *FilterParser) Next() (reader.Message, error) {
func (p *FilterParser) Next() (message reader.Message, err error) {
// discardedOffset accounts for the bytes of discarded messages. The inputs
// need to correctly track the file offset, therefore if only the matching
// message size is returned, the offset cannot be correctly updated.
var discardedOffset int
defer func() {
message.Offset = discardedOffset
}()

for p.ctx.Err() == nil {
message, err := p.r.Next()
message, err = p.r.Next()
if err != nil {
return message, err
}
if p.matchAny(string(message.Content)) {
return message, err
}
discardedOffset += message.Bytes
p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content))
}
return reader.Message{}, io.EOF
Expand Down
1 change: 1 addition & 0 deletions libbeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Message struct {
Ts time.Time // timestamp the content was read
Content []byte // actual content read
Bytes int // total number of bytes read to generate the message
Offset int // total number of bytes read and discarded prior to generate the message
Fields mapstr.M // optional fields that can be added by reader
Meta mapstr.M // deprecated
Private interface{}
Expand Down

0 comments on commit de334eb

Please sign in to comment.