diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 6f3e0d63525..84333392028 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -44,6 +44,8 @@ type LineReader struct { inOffset int // input buffer read offset byteCount int // number of bytes decoded from input buffer into output buffer decoder transform.Transformer + + skippedByteCount int // number of bytes skipped, when the line is too long } // New creates a new reader object @@ -106,8 +108,9 @@ func (r *LineReader) Next() ([]byte, int, error) { } // return and reset consumed bytes count - sz := r.byteCount + sz := r.byteCount + r.skippedByteCount r.byteCount = 0 + r.skippedByteCount = 0 return bytes, sz, nil } @@ -148,8 +151,10 @@ func (r *LineReader) advance() error { if r.maxBytes != 0 { // If newLine is found, drop the lines longer than maxBytes for idx != -1 && idx > r.maxBytes { - logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) - err = r.inBuffer.Advance(idx + len(r.nl)) + skipped := idx + len(r.nl) + r.skippedByteCount += skipped + logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped) + err = r.inBuffer.Advance(skipped) r.inBuffer.Reset() r.inOffset = 0 idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) @@ -158,6 +163,7 @@ func (r *LineReader) advance() error { // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine if idx == -1 && r.inBuffer.Len() > r.maxBytes { skipped, err := r.skipUntilNewLine(buf) + r.skippedByteCount += skipped if err != nil { logp.Err("Error skipping until new line, err: %v", err) return err @@ -216,7 +222,7 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { if idx != -1 { r.inBuffer.Append(buf[idx+len(r.nl) : n]) - skipped += idx + skipped += idx + len(r.nl) } else { skipped += n }