diff --git a/app/pkg/analyze/analyzer.go b/app/pkg/analyze/analyzer.go index a635bdcb..32a6e472 100644 --- a/app/pkg/analyze/analyzer.go +++ b/app/pkg/analyze/analyzer.go @@ -219,57 +219,94 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error { log.V(logs.Debug).Info("Logfile already processed", "path", path) return nil } - lines, offset, err := readLinesFromOffset(ctx, path, offset) - if err != nil { - return err - } - if len(lines) == 0 { - return nil - } - - traceIDs := make(map[string]bool) + maxLines := 200 + // TODO(jeremy): We use pkgPath to filter out log entries from the Analyzer package. + // We could use the pattern illustrated by the fnames package of using a constant to define the package path + // and then a unittest which uses reflection to verify the constant is correct. pkgPath := getFullPackagePath() - for _, line := range lines { - entry := &api.LogEntry{} - if err := json.Unmarshal([]byte(line), entry); err != nil { - log.Error(err, "Error decoding log entry", "path", path, "line", line) - continue - } - if strings.HasSuffix(entry.Function(), "agent.(*Agent).LogEvents") { - a.processLogEvent(ctx, entry) - continue - } + for { + // Keep reading lines from the file until we reach the end. + // We process the log entries in chunks of maxLines. After every maxLines read we will perform checkpointing. + // This is to ensure that when backfilling we make progress + var err error + var lines []string + // n.b. if we do lines,offset, err := we will end up shadowing offset and on each call to readLinesFromOffset + // the value of offset won't be the new value + lines, offset, err = readLinesFromOffset(ctx, path, offset, maxLines) - // Ignore log entries without traces - if entry.TraceID() == "" { - continue + if err != nil { + return err } - - // Drop all log entries that come from the Analyzer package itself. This shouldn't be neccessary - // but its a precaution to guard against someone accidentally adding a log message with the the field "traceId" - // to log a message about processing that trace. If we include such messages as part of the trace - // we could trigger an infinite loop - if strings.HasPrefix(entry.Function(), pkgPath) { - log.Error(errors.New("Ignoring log entry from Analyzer package"), "Ignoring log entry from Analyzer package", "entry", entry) + if len(lines) == 0 { + return nil } - if err := a.rawLogsDB.ReadModifyWrite(entry.TraceID(), func(entries *logspb.LogEntries) error { - if entries.Lines == nil { - entries.Lines = make([]string, 0, 1) + traceIDs := make(map[string]bool) + + // We read the lines line by line. We keep track of all the traceIDs mentioned in those lines. We + // Then do a combineAndCheckpoint for all the traceIDs mentioned. Lines are also persisted in a KV store + // keyed by traceID. So if on the next iteration we get a new line for a given traceId and need to reprocess + // the trace we can do that because we can fetch all the line entries for that trace. + for _, line := range lines { + entry := &api.LogEntry{} + if err := json.Unmarshal([]byte(line), entry); err != nil { + log.Error(err, "Error decoding log entry", "path", path, "line", line) + continue } - entries.Lines = append(entries.Lines, line) - return nil - }); err != nil { - // If there is a problem writing to the DB we should probably surface it rather than just keep going. - return err + + if strings.HasSuffix(entry.Function(), "agent.(*Agent).LogEvents") { + a.processLogEvent(ctx, entry) + continue + } + + // Ignore log entries without traces + if entry.TraceID() == "" { + continue + } + + // Drop all log entries that come from the Analyzer package itself. This shouldn't be neccessary + // but its a precaution to guard against someone accidentally adding a log message with the the field "traceId" + // to log a message about processing that trace. If we include such messages as part of the trace + // we could trigger an infinite loop + if strings.HasPrefix(entry.Function(), pkgPath) { + log.Error(errors.New("Ignoring log entry from Analyzer package"), "Ignoring log entry from Analyzer package", "entry", entry) + } + + if err := a.rawLogsDB.ReadModifyWrite(entry.TraceID(), func(entries *logspb.LogEntries) error { + if entries.Lines == nil { + entries.Lines = make([]string, 0, 1) + } + entries.Lines = append(entries.Lines, line) + return nil + }); err != nil { + // If there is a problem writing to the DB we should probably surface it rather than just keep going. + log.Error(err, "Failed to write log entry to DB", "entry", entry) + continue + } + + traceIDs[entry.TraceID()] = true } - traceIDs[entry.TraceID()] = true + // Now run a combineAndCheckpoint + a.combineAndCheckpoint(ctx, path, offset, traceIDs) + + // If we are shutting down we don't want to keep processing the file. + // By aborting shutdown here as opposed to here we are blocking shutdown for as least as long it takes + // to process maxLines. If maxLines is large it could be a while. + if a.queue.ShuttingDown() { + log.Info("Halting processing of log file because Analyzer is shutting down", "path", path) + return nil + } } +} +// combineAndCheckpoint runs a combine operation for all the traceIDs listed in the map. +// Progress is then checkpointed. +func (a *Analyzer) combineAndCheckpoint(ctx context.Context, path string, offset int64, traceIDs map[string]bool) { + log := logs.FromContext(ctx) // Combine the entries for each trace that we saw. // N.B. We could potentially make this more efficient by checking if the log message is the final message // in a trace. This would avoid potentially doing a combine for a trace on each log message. @@ -280,7 +317,6 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error { } // Update the offset a.setLogFileOffset(path, offset) - return nil } func (a *Analyzer) GetWatermark() *logspb.LogsWaterMark { diff --git a/app/pkg/analyze/reader.go b/app/pkg/analyze/reader.go index 6afe634e..5d4abb29 100644 --- a/app/pkg/analyze/reader.go +++ b/app/pkg/analyze/reader.go @@ -9,9 +9,15 @@ import ( "github.com/pkg/errors" ) +const ( + readAllLines = -1 +) + // readLinesFromOffset reads lines from a file starting at the given offset. // It will read until the end of the file. -func readLinesFromOffset(ctx context.Context, path string, startOffset int64) ([]string, int64, error) { +// +// maxLines specifies the maximum number of lines to read. If maxLines is <=0 then all lines are read. +func readLinesFromOffset(ctx context.Context, path string, startOffset int64, maxLines int) ([]string, int64, error) { f, err := os.Open(path) if err != nil { return nil, 0, errors.Wrapf(err, "failed to open file %s", path) @@ -38,6 +44,10 @@ func readLinesFromOffset(ctx context.Context, path string, startOffset int64) ([ line := scanner.Text() lines = append(lines, line) offset += int64(len(line) + 1) // +1 for newline + + if maxLines > 0 && len(lines) >= maxLines { + break + } } if err := scanner.Err(); err != nil { diff --git a/app/pkg/analyze/reader_test.go b/app/pkg/analyze/reader_test.go index e6a0b48e..e6c6c9b1 100644 --- a/app/pkg/analyze/reader_test.go +++ b/app/pkg/analyze/reader_test.go @@ -22,7 +22,7 @@ func Test_readFromOffset(t *testing.T) { } // Read the data from the file - lines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), 0) + lines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), 0, readAllLines) if err != nil { t.Fatal(err) } @@ -40,7 +40,7 @@ func Test_readFromOffset(t *testing.T) { } // Read the data from the file and see that we properly carry on reading. - newLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset) + newLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset, readAllLines) if err != nil { t.Fatal(err) } @@ -57,7 +57,7 @@ func Test_readFromOffset(t *testing.T) { t.Fatalf("failed to write to file: %v", err) } - partialLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset) + partialLines, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset, readAllLines) if err != nil { t.Fatalf("failed to read from file: %v", err) } @@ -76,7 +76,7 @@ func Test_readFromOffset(t *testing.T) { t.Fatalf("failed to write to file: %v", err) } - lastLine, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset) + lastLine, offset, err := readLinesFromOffset(context.Background(), logFile.Name(), offset, readAllLines) if err != nil { t.Fatal(err) } @@ -112,7 +112,7 @@ func Test_readReallyLongLines(t *testing.T) { t.Fatal(err) } - lines, _, err := readLinesFromOffset(context.Background(), filePath, 0) + lines, _, err := readLinesFromOffset(context.Background(), filePath, 0, readAllLines) if err != nil { t.Fatal(err) }