Skip to content

Commit

Permalink
bugfix: keep track of bytes read when max_bytes exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
liuwenping committed Jun 4, 2022
1 parent f951ca6 commit 4b0952a
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type LineReader struct {
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer

skippedByteCount int // number of bytes skipped, when the line is too long
}

// New creates a new reader object
Expand Down Expand Up @@ -106,8 +108,9 @@ func (r *LineReader) Next() ([]byte, int, error) {
}

// return and reset consumed bytes count
sz := r.byteCount
sz := r.byteCount + r.skippedByteCount
r.byteCount = 0
r.skippedByteCount = 0
return bytes, sz, nil
}

Expand Down Expand Up @@ -148,8 +151,10 @@ func (r *LineReader) advance() error {
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
skipped := idx + len(r.nl)
r.skippedByteCount += skipped
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
err = r.inBuffer.Advance(skipped)
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
Expand All @@ -158,6 +163,7 @@ func (r *LineReader) advance() error {
// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
r.skippedByteCount += skipped
if err != nil {
logp.Err("Error skipping until new line, err: %v", err)
return err
Expand Down Expand Up @@ -216,7 +222,7 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
skipped += idx
skipped += idx + len(r.nl)
} else {
skipped += n
}
Expand Down

0 comments on commit 4b0952a

Please sign in to comment.