Skip to content

Commit

Permalink
Analyzer should periodically checkpoint progress when processing larg…
Browse files Browse the repository at this point in the history
…e log files (#231)

* We want to incrementally checkpoint log processing when processing
really long files.
* Otherwise when processing really long files we could potentially run
into problems with not making progress.
* As part of this processing loop we need to periodically check if the
application has been shutdown and if so stop processing log entries.
  • Loading branch information
jlewi authored Sep 10, 2024
1 parent 9d43ef9 commit 9d44e44
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 45 deletions.
114 changes: 75 additions & 39 deletions app/pkg/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,57 +219,94 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
log.V(logs.Debug).Info("Logfile already processed", "path", path)
return nil
}
lines, offset, err := readLinesFromOffset(ctx, path, offset)

if err != nil {
return err
}
if len(lines) == 0 {
return nil
}

traceIDs := make(map[string]bool)
maxLines := 200

// TODO(jeremy): We use pkgPath to filter out log entries from the Analyzer package.
// We could use the pattern illustrated by the fnames package of using a constant to define the package path
// and then a unittest which uses reflection to verify the constant is correct.
pkgPath := getFullPackagePath()
for _, line := range lines {
entry := &api.LogEntry{}
if err := json.Unmarshal([]byte(line), entry); err != nil {
log.Error(err, "Error decoding log entry", "path", path, "line", line)
continue
}

if strings.HasSuffix(entry.Function(), "agent.(*Agent).LogEvents") {
a.processLogEvent(ctx, entry)
continue
}
for {
// Keep reading lines from the file until we reach the end.
// We process the log entries in chunks of maxLines. After every maxLines read we will perform checkpointing.
// This is to ensure that when backfilling we make progress
var err error
var lines []string
// n.b. if we do lines,offset, err := we will end up shadowing offset and on each call to readLinesFromOffset
// the value of offset won't be the new value
lines, offset, err = readLinesFromOffset(ctx, path, offset, maxLines)

// Ignore log entries without traces
if entry.TraceID() == "" {
continue
if err != nil {
return err
}

// Drop all log entries that come from the Analyzer package itself. This shouldn't be neccessary
// but its a precaution to guard against someone accidentally adding a log message with the the field "traceId"
// to log a message about processing that trace. If we include such messages as part of the trace
// we could trigger an infinite loop
if strings.HasPrefix(entry.Function(), pkgPath) {
log.Error(errors.New("Ignoring log entry from Analyzer package"), "Ignoring log entry from Analyzer package", "entry", entry)
if len(lines) == 0 {
return nil
}

if err := a.rawLogsDB.ReadModifyWrite(entry.TraceID(), func(entries *logspb.LogEntries) error {
if entries.Lines == nil {
entries.Lines = make([]string, 0, 1)
traceIDs := make(map[string]bool)

// We read the lines line by line. We keep track of all the traceIDs mentioned in those lines. We
// Then do a combineAndCheckpoint for all the traceIDs mentioned. Lines are also persisted in a KV store
// keyed by traceID. So if on the next iteration we get a new line for a given traceId and need to reprocess
// the trace we can do that because we can fetch all the line entries for that trace.
for _, line := range lines {
entry := &api.LogEntry{}
if err := json.Unmarshal([]byte(line), entry); err != nil {
log.Error(err, "Error decoding log entry", "path", path, "line", line)
continue
}
entries.Lines = append(entries.Lines, line)
return nil
}); err != nil {
// If there is a problem writing to the DB we should probably surface it rather than just keep going.
return err

if strings.HasSuffix(entry.Function(), "agent.(*Agent).LogEvents") {
a.processLogEvent(ctx, entry)
continue
}

// Ignore log entries without traces
if entry.TraceID() == "" {
continue
}

// Drop all log entries that come from the Analyzer package itself. This shouldn't be neccessary
// but its a precaution to guard against someone accidentally adding a log message with the the field "traceId"
// to log a message about processing that trace. If we include such messages as part of the trace
// we could trigger an infinite loop
if strings.HasPrefix(entry.Function(), pkgPath) {
log.Error(errors.New("Ignoring log entry from Analyzer package"), "Ignoring log entry from Analyzer package", "entry", entry)
}

if err := a.rawLogsDB.ReadModifyWrite(entry.TraceID(), func(entries *logspb.LogEntries) error {
if entries.Lines == nil {
entries.Lines = make([]string, 0, 1)
}
entries.Lines = append(entries.Lines, line)
return nil
}); err != nil {
// If there is a problem writing to the DB we should probably surface it rather than just keep going.
log.Error(err, "Failed to write log entry to DB", "entry", entry)
continue
}

traceIDs[entry.TraceID()] = true
}

traceIDs[entry.TraceID()] = true
// Now run a combineAndCheckpoint
a.combineAndCheckpoint(ctx, path, offset, traceIDs)

// If we are shutting down we don't want to keep processing the file.
// By aborting shutdown here as opposed to here we are blocking shutdown for as least as long it takes
// to process maxLines. If maxLines is large it could be a while.
if a.queue.ShuttingDown() {
log.Info("Halting processing of log file because Analyzer is shutting down", "path", path)
return nil
}
}
}

// combineAndCheckpoint runs a combine operation for all the traceIDs listed in the map.
// Progress is then checkpointed.
func (a *Analyzer) combineAndCheckpoint(ctx context.Context, path string, offset int64, traceIDs map[string]bool) {
log := logs.FromContext(ctx)
// Combine the entries for each trace that we saw.
// N.B. We could potentially make this more efficient by checking if the log message is the final message
// in a trace. This would avoid potentially doing a combine for a trace on each log message.
Expand All @@ -280,7 +317,6 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
}
// Update the offset
a.setLogFileOffset(path, offset)
return nil
}

func (a *Analyzer) GetWatermark() *logspb.LogsWaterMark {
Expand Down
12 changes: 11 additions & 1 deletion app/pkg/analyze/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ import (
"github.com/pkg/errors"
)

const (
readAllLines = -1
)

// readLinesFromOffset reads lines from a file starting at the given offset.
// It will read until the end of the file.
func readLinesFromOffset(ctx context.Context, path string, startOffset int64) ([]string, int64, error) {
//
// maxLines specifies the maximum number of lines to read. If maxLines is <=0 then all lines are read.
func readLinesFromOffset(ctx context.Context, path string, startOffset int64, maxLines int) ([]string, int64, error) {
f, err := os.Open(path)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to open file %s", path)
Expand All @@ -38,6 +44,10 @@ func readLinesFromOffset(ctx context.Context, path string, startOffset int64) ([
line := scanner.Text()
lines = append(lines, line)
offset += int64(len(line) + 1) // +1 for newline

if maxLines > 0 && len(lines) >= maxLines {
break
}
}

if err := scanner.Err(); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions app/pkg/analyze/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_readFromOffset(t *testing.T) {
}

// Read the data from the file
lines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), 0)
lines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), 0, readAllLines)
if err != nil {
t.Fatal(err)
}
Expand All @@ -40,7 +40,7 @@ func Test_readFromOffset(t *testing.T) {
}

// Read the data from the file and see that we properly carry on reading.
newLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset)
newLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset, readAllLines)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,7 +57,7 @@ func Test_readFromOffset(t *testing.T) {
t.Fatalf("failed to write to file: %v", err)
}

partialLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset)
partialLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset, readAllLines)
if err != nil {
t.Fatalf("failed to read from file: %v", err)
}
Expand All @@ -76,7 +76,7 @@ func Test_readFromOffset(t *testing.T) {
t.Fatalf("failed to write to file: %v", err)
}

lastLine, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset)
lastLine, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset, readAllLines)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func Test_readReallyLongLines(t *testing.T) {
t.Fatal(err)
}

lines, _, err := readLinesFromOffset(context.Background(), filePath, 0)
lines, _, err := readLinesFromOffset(context.Background(), filePath, 0, readAllLines)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 9d44e44

Please sign in to comment.