From e77cfa1c59f44dccddf9937be94220f4e45fdb0b Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Mon, 27 Jan 2025 19:55:48 +0100 Subject: [PATCH 1/7] Add simplified integration framework That can be used for running Beats binaries in integration tests. This framework has a simplified API comparing to the existing one and uses a more efficient way to search for logs in the output of the command. --- filebeat/testing/integration/integration.go | 77 ++++ filebeat/testing/integration/log_generator.go | 138 +++++++ filebeat/testing/integration/sample_test.go | 139 ++++++++ libbeat/testing/integration/integration.go | 336 ++++++++++++++++++ libbeat/testing/integration/output_watcher.go | 187 ++++++++++ .../integration/output_watcher_test.go | 191 ++++++++++ libbeat/testing/integration/run_beat.go | 269 ++++++++++++++ libbeat/testing/integration/sample_test.go | 93 +++++ 8 files changed, 1430 insertions(+) create mode 100644 filebeat/testing/integration/integration.go create mode 100644 filebeat/testing/integration/log_generator.go create mode 100644 filebeat/testing/integration/sample_test.go create mode 100644 libbeat/testing/integration/integration.go create mode 100644 libbeat/testing/integration/output_watcher.go create mode 100644 libbeat/testing/integration/output_watcher_test.go create mode 100644 libbeat/testing/integration/run_beat.go create mode 100644 libbeat/testing/integration/sample_test.go diff --git a/filebeat/testing/integration/integration.go b/filebeat/testing/integration/integration.go new file mode 100644 index 000000000000..90f1b6f95285 --- /dev/null +++ b/filebeat/testing/integration/integration.go @@ -0,0 +1,77 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/testing/integration" +) + +// EnsureCompiled ensures that Filebeat is compiled and ready +// to run. +func EnsureCompiled(ctx context.Context, t *testing.T) (path string) { + return integration.EnsureCompiled(ctx, t, "filebeat") +} + +// Test describes all operations for testing Filebeat +// +// Due to interface composition all Filebeat-specific functions +// must be used first in the call-chain. +type Test interface { + integration.BeatTest + // ExpectEOF sets an expectation that Filebeat will read the given + // files to EOF. + ExpectEOF(...string) Test +} + +// TestOptions describes all available options for the test. +type TestOptions struct { + // Config for the Beat written in YAML + Config string + // Args sets additional arguments to pass when running the binary. + Args []string +} + +// NewTest creates a new integration test for Filebeat. +func NewTest(t *testing.T, opts TestOptions) Test { + return &test{ + BeatTest: integration.NewBeatTest(t, integration.BeatTestOptions{ + Beatname: "filebeat", + Config: opts.Config, + Args: opts.Args, + }), + } +} + +type test struct { + integration.BeatTest +} + +// ExpectEOF implements the Test interface. +func (fbt *test) ExpectEOF(files ...string) Test { + // Ensuring we completely ingest every file + for _, filename := range files { + line := fmt.Sprintf("End of file reached: %s; Backoff now.", filename) + fbt.ExpectOutput(line) + } + + return fbt +} diff --git a/filebeat/testing/integration/log_generator.go b/filebeat/testing/integration/log_generator.go new file mode 100644 index 000000000000..d4961c76302b --- /dev/null +++ b/filebeat/testing/integration/log_generator.go @@ -0,0 +1,138 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + uuid "github.com/gofrs/uuid/v5" +) + +// LogGenerator used for generating log files +type LogGenerator interface { + // GenerateLine generates a single line for a log file. + // Expected no new line character at the end. + GenerateLine(filename string, index int) string + // FileExtension sets the extension of the file where lines are written. + FileExtension() string +} + +// NewPlainTextGenerator creates is a simple plain text generator. +// +// It's using the given message prefix following by the filename +// and the line number, e.g. `filename:128` +func NewPlainTextGenerator(prefix string) LogGenerator { + return plainTextGenerator{ + prefix: prefix, + } +} + +type plainTextGenerator struct { + prefix string +} + +func (g plainTextGenerator) GenerateLine(filename string, index int) string { + return fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index) +} + +func (g plainTextGenerator) FileExtension() string { + return ".log" +} + +// JSONLineGenerator creates a JSON log line generator. +// Forms a JSON object with a message +// prefixed by the given prefix and followed by the filename +// and the line number, e.g. `filename:128` +func NewJSONGenerator(prefix string) LogGenerator { + return jsonGenerator{ + prefix: prefix, + } +} + +type jsonGenerator struct { + prefix string +} + +func (g jsonGenerator) GenerateLine(filename string, index int) string { + message := fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index) + + line := struct{ Message string }{Message: message} + bytes, _ := json.Marshal(line) + return string(bytes) +} + +func (g jsonGenerator) FileExtension() string { + return ".ndjson" +} + +// GenerateLogFiles generate given amount of files with given +// amount of lines in them. +// +// Returns the path value to put in the Filebeat configuration and +// filenames for all created files. +func GenerateLogFiles(t *testing.T, files, lines int, generator LogGenerator) (path string, filenames []string) { + t.Helper() + t.Logf("generating %d log files with %d lines each...", files, lines) + logsPath := filepath.Join(t.TempDir(), "logs") + err := os.MkdirAll(logsPath, 0777) + if err != nil { + t.Fatalf("failed to create a directory for logs %q: %s", logsPath, err) + return "", nil + } + + filenames = make([]string, 0, files) + for i := 0; i < files; i++ { + id, err := uuid.NewV4() + if err != nil { + t.Fatalf("failed to generate a unique filename: %s", err) + return "", nil + } + filename := filepath.Join(logsPath, id.String()+generator.FileExtension()) + filenames = append(filenames, filename) + GenerateLogFile(t, filename, lines, generator) + } + + t.Logf("finished generating %d log files with %d lines each", files, lines) + + return filepath.Join(logsPath, "*"+generator.FileExtension()), filenames +} + +// GenerateLogFile generates a single log file with the given full +// filename, amount of lines using the given generator +// to create each line. +func GenerateLogFile(t *testing.T, filename string, lines int, generator LogGenerator) { + t.Helper() + file, err := os.Create(filename) + if err != nil { + t.Fatalf("failed to create a log file: %q", filename) + return + } + defer file.Close() + for i := 1; i <= lines; i++ { + line := generator.GenerateLine(filename, i) + "\n" + _, err := file.WriteString(line) + if err != nil { + t.Fatalf("cannot write a generated log line to %s", filename) + return + } + } +} diff --git a/filebeat/testing/integration/sample_test.go b/filebeat/testing/integration/sample_test.go new file mode 100644 index 000000000000..01330be7c40e --- /dev/null +++ b/filebeat/testing/integration/sample_test.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/testing/integration" +) + +func TestFilebeat(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + EnsureCompiled(ctx, t) + + messagePrefix := "sample test message" + fileCount := 5 + lineCount := 128 + + reportOptions := integration.ReportOptions{ + PrintLinesOnFail: 10, + } + + t.Run("Filebeat starts and ingests files", func(t *testing.T) { + configTemplate := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" + paths: + - %s +# we want to check that all messages are ingested +# without using an external service, this is an easy way +output.console: + enabled: true +` + // we can generate any amount of expectations + // they are light-weight + expectIngestedFiles := func(test Test, files []string) { + // ensuring we ingest every line from every file + for _, filename := range files { + for i := 1; i <= lineCount; i++ { + line := fmt.Sprintf("%s %s:%d", messagePrefix, filepath.Base(filename), i) + test.ExpectOutput(line) + } + } + } + + t.Run("plain text files", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + generator := NewPlainTextGenerator(messagePrefix) + path, files := GenerateLogFiles(t, fileCount, lineCount, generator) + config := fmt.Sprintf(configTemplate, path) + test := NewTest(t, TestOptions{ + Config: config, + }) + + expectIngestedFiles(test, files) + + test. + // we expect to read all generated files to EOF + ExpectEOF(files...). + WithReportOptions(reportOptions). + // we should observe the start message of the Beat + ExpectStart(). + // check that the first and the last line of the file get ingested + Start(ctx). + // wait until all the expectations are met + // or we hit the timeout set by the context + Wait() + }) + + t.Run("JSON files", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + generator := NewJSONGenerator(messagePrefix) + path, files := GenerateLogFiles(t, fileCount, lineCount, generator) + config := fmt.Sprintf(configTemplate, path) + test := NewTest(t, TestOptions{ + Config: config, + }) + + expectIngestedFiles(test, files) + + test. + ExpectEOF(files...). + WithReportOptions(reportOptions). + ExpectStart(). + Start(ctx). + Wait() + }) + }) + + t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // path items are required, this config is invalid + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" +output.console: + enabled: true +` + test := NewTest(t, TestOptions{ + Config: config, + }) + + test. + WithReportOptions(reportOptions). + ExpectStart(). + ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured"). + ExpectStop(1). + Start(ctx). + Wait() + }) +} diff --git a/libbeat/testing/integration/integration.go b/libbeat/testing/integration/integration.go new file mode 100644 index 000000000000..95be1f7ba268 --- /dev/null +++ b/libbeat/testing/integration/integration.go @@ -0,0 +1,336 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os/exec" + "regexp" + "sync" + "testing" +) + +const ( + expectErrMsg = "cannot set expectations once the test started" +) + +// BeatTest describes all operations involved +// in integration testing of a Beat +type BeatTest interface { + // Start the integration test + // + // The test runs until all the expectations are met (unless `ExpectStop` is used) or context was canceled or the Beat exits on its own. + Start(context.Context) BeatTest + + // Wait until the test is over. + // + // `PrintOutput` might be helpful for debugging after calling this function. + Wait() + + // ExpectStart sets an expectation that the Beat will report that it started. + ExpectStart() BeatTest + + // ExpectStop sets an expectation that the Beat will exit by itself. + // The process exit code will be checked against the given value. + // + // User controls the timeout by passing the context in `Start`. + // + // All the output expectations would still work as usual, however, + // satisfying all expectations would not stop the Beat. + ExpectStop(exitCode int) BeatTest + + // ExpectOutput registers an output watch for the given substrings. + // + // Every future output line produced by the Beat will be checked + // if it contains one of the given strings. + // + // If given multiple strings, they get checked in order: + // The first substring must be found first, then second, etc. + // + // For `AND` behavior use this function multiple times. + // + // This function should be used before `Start` because it's + // inspecting only the new output lines. + ExpectOutput(...string) BeatTest + + // ExpectOutputRegex registers an output watch for the given regular expression.. + // + // Every future output line produced by the Beat will be matched + // against the given regular expression. + // + // If given multiple expressions, they get checked in order. + // The first expression must match first, then second, etc. + // + // For `AND` behavior use this function multiple times. + // + // This function should be used before `Start` because it's + // inspecting only new outputs. + ExpectOutputRegex(...*regexp.Regexp) BeatTest + + // PrintOutput prints last `limit` lines of the output + // + // It might be handy for inspecting the output in case of a failure. + // Use `limit=-1` to print the entire output (strongly discouraged). + // + // JSON lines of the output are formatted. + PrintOutput(lineCount int) + + // PrintExpectations prints all currently set expectations + PrintExpectations() + + // WithReportOptions sets the reporting options for the test. + WithReportOptions(ReportOptions) BeatTest +} + +// ReportOptions describes all reporting options +type ReportOptions struct { + // PrintExpectationsBeforeStart if set to `true`, all the defined + // expectations will be printed before the test starts. + // + // Use it only if you have a manageable amount of expectations that + // would be readable in the output. + PrintExpectationsBeforeStart bool + + // PrintLinesOnFail defines how many lines of the Beat output + // the test should print in case of failure (default 0). + // + // It uses `PrintOutput`, see its documentation for details. + PrintLinesOnFail int +} + +// BeatTestOptions describes all options to run the test +type BeatTestOptions = RunBeatOptions + +// NewBeatTest creates a new integration test for a Beat. +func NewBeatTest(t *testing.T, opts BeatTestOptions) BeatTest { + test := &beatTest{ + t: t, + opts: opts, + } + + return test +} + +type beatTest struct { + t *testing.T + opts BeatTestOptions + reportOpts ReportOptions + expectations []OutputWatcher + expectedExitCode *int + beat *RunningBeat + mtx sync.Mutex +} + +// Start implements the BeatTest interface. +func (b *beatTest) Start(ctx context.Context) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + if b.beat != nil { + b.t.Fatal("test cannot be startd multiple times") + return b + } + watcher := NewOverallWatcher(b.expectations) + b.t.Logf("running %s integration test...", b.opts.Beatname) + if b.reportOpts.PrintExpectationsBeforeStart { + b.printExpectations() + } + b.beat = RunBeat(ctx, b.t, b.opts, watcher) + + return b +} + +// Wait implements the BeatTest interface. +func (b *beatTest) Wait() { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat == nil { + b.t.Fatal("test must start first before calling wait on it") + return + } + + err := b.beat.Wait() + exitErr := &exec.ExitError{} + if !errors.As(err, &exitErr) { + b.t.Fatalf("unexpected error when stopping %s: %s", b.opts.Beatname, err) + return + } + + exitCode := 0 + if err != nil { + exitCode = exitErr.ExitCode() + } + b.t.Logf("%s stopped, exit code %d", b.opts.Beatname, exitCode) + + if b.expectedExitCode != nil && exitCode != *b.expectedExitCode { + b.t.Cleanup(func() { + b.t.Logf("expected exit code %d, actual %d", b.expectedExitCode, exitCode) + }) + + b.t.Fail() + } + + if b.beat.watcher != nil { + b.t.Cleanup(func() { + b.t.Logf("\n\nExpectations are not met:\n\n%s\n\n", b.beat.watcher.String()) + if b.reportOpts.PrintLinesOnFail != 0 { + b.PrintOutput(b.reportOpts.PrintLinesOnFail) + } + }) + b.t.Fail() + } +} + +// ExpectOutput implements the BeatTest interface. +func (b *beatTest) ExpectOutput(lines ...string) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + if len(lines) == 0 { + return b + } + + if len(lines) == 1 { + l := escapeJSONCharacters(lines[0]) + b.expectations = append(b.expectations, NewStringWatcher(l)) + return b + } + + watchers := make([]OutputWatcher, 0, len(lines)) + for _, l := range lines { + escaped := escapeJSONCharacters(l) + b.t.Log(escaped) + watchers = append(watchers, NewStringWatcher(escaped)) + } + b.expectations = append(b.expectations, NewInOrderWatcher(watchers)) + return b +} + +// ExpectOutputRegex implements the BeatTest interface. +func (b *beatTest) ExpectOutputRegex(exprs ...*regexp.Regexp) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + if len(exprs) == 0 { + return b + } + + if len(exprs) == 1 { + b.expectations = append(b.expectations, NewRegexpWatcher(exprs[0])) + return b + } + + watchers := make([]OutputWatcher, 0, len(exprs)) + for _, e := range exprs { + watchers = append(watchers, NewRegexpWatcher(e)) + } + b.expectations = append(b.expectations, NewInOrderWatcher(watchers)) + + return b +} + +// ExpectStart implements the BeatTest interface. +func (b *beatTest) ExpectStart() BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + expectedLine := fmt.Sprintf("%s start running.", b.opts.Beatname) + b.expectations = append(b.expectations, NewStringWatcher(expectedLine)) + return b +} + +// ExpectStop implements the BeatTest interface. +func (b *beatTest) ExpectStop(exitCode int) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + b.opts.KeepRunning = true + b.expectedExitCode = &exitCode + return b +} + +// PrintOutput implements the BeatTest interface. +func (b *beatTest) PrintOutput(lineCount int) { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat == nil { + return + } + + b.t.Logf("\n\nLast %d lines of the output:\n\n%s\n\n", lineCount, b.beat.CollectOutput(lineCount)) +} + +// WithReportOptions implements the BeatTest interface. +func (b *beatTest) WithReportOptions(opts ReportOptions) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + b.reportOpts = opts + return b +} + +// PrintExpectations implements the BeatTest interface. +func (b *beatTest) PrintExpectations() { + b.mtx.Lock() + defer b.mtx.Unlock() + b.printExpectations() +} + +// lock-free, so it can be used inside a lock +func (b *beatTest) printExpectations() { + overall := NewOverallWatcher(b.expectations) + b.t.Logf("set expectations:\n%s", overall) + if b.expectedExitCode != nil { + b.t.Logf("\nprocess is expected to exit with code %d\n\n", *b.expectedExitCode) + } else { + b.t.Log("\nprocess is expected to be killed once expectations are met\n\n") + } +} + +// we know that we're going to inpect the JSON output from the Beat +// so we must take care of the escaped characters, +// e.g. backslashes in paths on Windows. +func escapeJSONCharacters(s string) string { + bytes, _ := json.Marshal(s) + // trimming quote marks + return string(bytes[1 : len(bytes)-1]) +} diff --git a/libbeat/testing/integration/output_watcher.go b/libbeat/testing/integration/output_watcher.go new file mode 100644 index 000000000000..3a69b60c618f --- /dev/null +++ b/libbeat/testing/integration/output_watcher.go @@ -0,0 +1,187 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "fmt" + "regexp" + "strings" +) + +// OutputWatcher describes operations for watching output. +type OutputWatcher interface { + // Inspect the line of the output and adjust the state accordingly. + Inspect(string) + // Observed is `true` if every expected output has been observed. + Observed() bool + // String is the string representation of the current state. + // Describes what output is still expected. + String() string +} + +// NewStringWatcher creates a new output watcher that watches for a +// substring. +// +// The given string must be a substring of an output line +// to be marked as observed. +func NewStringWatcher(str string) OutputWatcher { + return &stringWatcher{ + expecting: &str, + } +} + +type stringWatcher struct { + expecting *string +} + +func (w *stringWatcher) Inspect(line string) { + if w.Observed() { + return + } + if strings.Contains(line, *w.expecting) { + w.expecting = nil + return + } +} + +func (w *stringWatcher) Observed() bool { + return w.expecting == nil +} + +func (w *stringWatcher) String() string { + if w.Observed() { + return "" + } + return fmt.Sprintf("to have a substring %q", *w.expecting) +} + +// NewRegexpWatcher create a new output watcher that watches for an +// output line to match the given regular expression. +func NewRegexpWatcher(expr *regexp.Regexp) OutputWatcher { + return ®expWatcher{ + expecting: expr, + } +} + +type regexpWatcher struct { + expecting *regexp.Regexp +} + +func (w *regexpWatcher) Inspect(line string) { + if w.Observed() { + return + } + if w.expecting.MatchString(line) { + w.expecting = nil + } +} + +func (w *regexpWatcher) Observed() bool { + return w.expecting == nil +} + +func (w *regexpWatcher) String() string { + if w.Observed() { + return "" + } + return fmt.Sprintf("to match %s", w.expecting.String()) +} + +// NewInOrderWatcher creates a watcher that makes sure that the first +// watcher has `Observed() == true` then it moves on to the second, +// then third, etc. +// +// Reports overall state of all watchers on the list. +func NewInOrderWatcher(watchers []OutputWatcher) OutputWatcher { + return &inOrderWatcher{ + watchers: watchers, + } +} + +type inOrderWatcher struct { + watchers []OutputWatcher +} + +func (w *inOrderWatcher) Inspect(line string) { + if w.Observed() { + return + } + w.watchers[0].Inspect(line) + if w.watchers[0].Observed() { + w.watchers = w.watchers[1:] + return + } +} + +func (w *inOrderWatcher) Observed() bool { + return len(w.watchers) == 0 +} + +func (w *inOrderWatcher) String() string { + if w.Observed() { + return "" + } + + expectations := make([]string, 0, len(w.watchers)) + for _, watcher := range w.watchers { + expectations = append(expectations, watcher.String()) + } + return strings.Join(expectations, " -> ") +} + +// NewOverallWatcher creates a watcher that reports an overall state +// of the list of other watchers. +// +// It's state marked as observed when all the nested watchers have +// `Observed() == true`. +func NewOverallWatcher(watchers []OutputWatcher) OutputWatcher { + return &metaWatcher{ + active: watchers, + } +} + +type metaWatcher struct { + active []OutputWatcher +} + +func (w *metaWatcher) Inspect(line string) { + var active []OutputWatcher + for _, watcher := range w.active { + watcher.Inspect(line) + if !watcher.Observed() { + active = append(active, watcher) + } + } + w.active = active +} + +func (w *metaWatcher) Observed() bool { + return len(w.active) == 0 +} + +func (w *metaWatcher) String() string { + if w.Observed() { + return "" + } + + expectations := make([]string, 0, len(w.active)) + for _, watcher := range w.active { + expectations = append(expectations, watcher.String()) + } + return " * " + strings.Join(expectations, "\n * ") +} diff --git a/libbeat/testing/integration/output_watcher_test.go b/libbeat/testing/integration/output_watcher_test.go new file mode 100644 index 000000000000..bc2d9c7664ed --- /dev/null +++ b/libbeat/testing/integration/output_watcher_test.go @@ -0,0 +1,191 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOutputWatcher(t *testing.T) { + t.Run("StringWatcher", func(t *testing.T) { + cases := []struct { + name string + str string + expectObserved bool + input string + }{ + { + name: "line has substring", + str: "log line", + expectObserved: true, + input: "some log line that would match", + }, + { + name: "line does not have substring", + str: "no line", + expectObserved: false, + input: "some log line that would match", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + w := NewStringWatcher(tc.str) + w.Inspect(tc.input) + require.Equal(t, tc.expectObserved, w.Observed(), "Observed() does not match") + }) + } + }) + + t.Run("RegexpWatcher", func(t *testing.T) { + cases := []struct { + name string + expr *regexp.Regexp + expectObserved bool + input string + }{ + { + name: "line partially matches", + expr: regexp.MustCompile("line(.*)match"), + expectObserved: true, + input: "some log line that would match", + }, + { + name: "line does not match", + expr: regexp.MustCompile("no(.*)line"), + expectObserved: false, + input: "some log line that would match", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + w := NewRegexpWatcher(tc.expr) + w.Inspect(tc.input) + require.Equal(t, tc.expectObserved, w.Observed(), "Observed() does not match") + }) + } + }) + + t.Run("InOrderWatcher", func(t *testing.T) { + cases := []struct { + name string + strs []string + expectObserved bool + input []string + }{ + { + name: "lines match in order", + strs: []string{"first match", "second match"}, + expectObserved: true, + input: []string{ + "not important line", + "this would trigger the first match", + "not important line", + "not important line", + "this would trigger the second match", + "not important line", + }, + }, + { + name: "lines don't match in order", + strs: []string{"first match", "second match"}, + expectObserved: false, + input: []string{ + "not important line", + "this would trigger the second match", + "not important line", + "not important line", + "this would trigger the first match", + "not important line", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + watchers := make([]OutputWatcher, 0, len(tc.strs)) + for _, s := range tc.strs { + watchers = append(watchers, NewStringWatcher(s)) + } + w := NewInOrderWatcher(watchers) + + for _, l := range tc.input { + w.Inspect(l) + } + + require.Equal(t, tc.expectObserved, w.Observed(), "Observed() does not match") + }) + } + }) + + t.Run("OverallWatcher", func(t *testing.T) { + cases := []struct { + name string + strs []string + expectObserved bool + input []string + }{ + { + name: "lines match in order", + strs: []string{"first match", "second match"}, + expectObserved: true, + input: []string{ + "not important line", + "this would trigger the first match", + "not important line", + "not important line", + "this would trigger the second match", + "not important line", + }, + }, + { + name: "lines don't match in order", + strs: []string{"first match", "no second match"}, + expectObserved: false, + input: []string{ + "not important line", + "this would trigger the first match", + "not important line", + "not important line", + "this would trigger the second match", + "not important line", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + watchers := make([]OutputWatcher, 0, len(tc.strs)) + for _, s := range tc.strs { + watchers = append(watchers, NewStringWatcher(s)) + } + w := NewOverallWatcher(watchers) + + for _, l := range tc.input { + w.Inspect(l) + } + + require.Equal(t, tc.expectObserved, w.Observed(), "Observed() does not match") + }) + } + }) +} diff --git a/libbeat/testing/integration/run_beat.go b/libbeat/testing/integration/run_beat.go new file mode 100644 index 000000000000..751e8a8cca1c --- /dev/null +++ b/libbeat/testing/integration/run_beat.go @@ -0,0 +1,269 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "sync" + "testing" +) + +var ( + compiling sync.Mutex +) + +// RunningBeat describes the running Beat binary. +type RunningBeat struct { + c *exec.Cmd + outputRW sync.RWMutex + output []string + outputDone chan struct{} + watcher OutputWatcher + keepRunning bool +} + +// CollectOutput returns the last `limit` lines of the currently +// accumulated output. +// `limit=-1` returns the entire output from the beginning. +func (b *RunningBeat) CollectOutput(limit int) string { + b.outputRW.RLock() + defer b.outputRW.RUnlock() + if limit < 0 { + limit = len(b.output) + } + + builder := strings.Builder{} + output := b.output + if len(output) > limit { + output = output[len(output)-limit:] + } + + m := make(map[string]any) + for i, l := range output { + err := json.Unmarshal([]byte(l), &m) + if err != nil { + builder.WriteString(l) + } else { + pretty, _ := json.MarshalIndent(m, "", " ") + builder.Write(pretty) + } + if i < len(output)-1 { + builder.WriteByte('\n') + } + } + + return builder.String() +} + +// Wait until the Beat exists and all the output is processed +func (b *RunningBeat) Wait() error { + err := b.c.Wait() + <-b.outputDone + return err +} + +func (b *RunningBeat) writeOutputLine(line string) { + b.outputRW.Lock() + defer b.outputRW.Unlock() + + b.output = append(b.output, line) + + if b.watcher == nil { + return + } + + b.watcher.Inspect(line) + if b.watcher.Observed() { + if !b.keepRunning { + _ = b.c.Process.Kill() + } + b.watcher = nil + } +} + +// RunBeatOptions describes the options for running a Beat +type RunBeatOptions struct { + // Beatname, for example "filebeat". + Beatname string + // Config for the Beat written in YAML + Config string + // Args sets additional arguments to pass when running the binary. + Args []string + // KeepRunning if set to `true` observing all + // the expected output would not kill the process. + // + // In this case user controls the runtime through the context + // passed in `RunBeat`. + KeepRunning bool +} + +// Runs a Beat binary with the given config and args. +// Returns a `RunningBeat` that allow to collect the output and wait until the exit. +func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher OutputWatcher) *RunningBeat { + t.Logf("preparing to run %s...", opts.Beatname) + + binaryFilename := findBeatBinaryPath(t, opts.Beatname) + + // create a temporary Beat config + cfgPath := filepath.Join(t.TempDir(), fmt.Sprintf("%s.yml", opts.Beatname)) + homePath := filepath.Join(t.TempDir(), "home") + + err := os.WriteFile(cfgPath, []byte(opts.Config), 0777) + if err != nil { + t.Fatalf("failed to create a temporary config file: %s", err) + return nil + } + t.Logf("temporary config has been created at %s", cfgPath) + + // compute the args for execution + baseArgs := []string{ + // logging to stderr instead of log files + "-e", + "-c", cfgPath, + // we want all the logs + "-E", "logging.level=debug", + // so we can run multiple Beats at the same time + "--path.home", homePath, + } + execArgs := make([]string, 0, len(baseArgs)+len(opts.Args)) + execArgs = append(execArgs, baseArgs...) + execArgs = append(execArgs, opts.Args...) + + t.Logf("running %s %s", binaryFilename, strings.Join(execArgs, " ")) + c := exec.CommandContext(ctx, binaryFilename, execArgs...) + + output, err := c.StdoutPipe() + if err != nil { + t.Fatalf("failed to create the stdout pipe: %s", err) + return nil + } + c.Stderr = c.Stdout + + b := &RunningBeat{ + c: c, + watcher: watcher, + keepRunning: opts.KeepRunning, + outputDone: make(chan struct{}), + } + + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + b.writeOutputLine(scanner.Text()) + } + if scanner.Err() != nil { + t.Logf("error while reading from stdout/stderr: %s", scanner.Err()) + } + close(b.outputDone) + }() + + err = c.Start() + if err != nil { + t.Fatalf("failed to start Filebeat command: %s", err) + return nil + } + + t.Logf("%s is running (pid: %d)", binaryFilename, c.Process.Pid) + + return b +} + +// EnsureCompiled ensures that the given Beat is compiled and ready +// to run. +func EnsureCompiled(ctx context.Context, t *testing.T, beatname string) (path string) { + compiling.Lock() + defer compiling.Unlock() + + t.Logf("ensuring the %s binary is available...", beatname) + + binaryFilename := findBeatBinaryPath(t, beatname) + _, err := os.Stat(binaryFilename) + if err == nil { + t.Logf("found existing %s binary at %s", beatname, binaryFilename) + return binaryFilename + } + + if !errors.Is(err, os.ErrNotExist) { + t.Fatalf("failed to check for compiled binary %s: %s", binaryFilename, err) + return "" + } + + mageCommand := "mage" + if runtime.GOOS == "windows" { + mageCommand += ".exe" + } + args := []string{"build"} + t.Logf("existing %s binary not found, building with \"%s %s\"... ", mageCommand, binaryFilename, strings.Join(args, " ")) + c := exec.CommandContext(ctx, mageCommand, args...) + c.Dir = filepath.Dir(binaryFilename) + output, err := c.CombinedOutput() + if err != nil { + t.Fatalf("failed to build %s binary: %s\n%s", beatname, err, output) + return "" + } + + _, err = os.Stat(binaryFilename) + if err == nil { + t.Logf("%s binary has been successfully built ", binaryFilename) + return binaryFilename + } + if !errors.Is(err, os.ErrNotExist) { + t.Fatalf("building command for binary %s succeeded but the binary was not created: %s", binaryFilename, err) + return "" + } + + return "" +} + +func findBeatDir(t *testing.T, beatName string) string { + pwd, err := os.Getwd() + if err != nil { + t.Fatalf("failed to get the working directory: %s", err) + return "" + } + t.Logf("searching for the %s directory, starting with %s...", beatName, pwd) + for pwd != "" { + stat, err := os.Stat(filepath.Join(pwd, beatName)) + if errors.Is(err, os.ErrNotExist) || !stat.IsDir() { + pwd = filepath.Dir(pwd) + continue + } + return filepath.Join(pwd, beatName) + } + t.Fatalf("could not find the %s base directory", beatName) + return "" +} + +func findBeatBinaryPath(t *testing.T, beatname string) string { + baseDir := findBeatDir(t, beatname) + t.Logf("found %s directory at %s", beatname, baseDir) + binary := filepath.Join(baseDir, beatname) + if runtime.GOOS == "windows" { + binary += ".exe" + } + return binary +} diff --git a/libbeat/testing/integration/sample_test.go b/libbeat/testing/integration/sample_test.go new file mode 100644 index 000000000000..4458ca782646 --- /dev/null +++ b/libbeat/testing/integration/sample_test.go @@ -0,0 +1,93 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "testing" + "time" +) + +func TestFilebeat(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + EnsureCompiled(ctx, t, "filebeat") + + reportOptions := ReportOptions{ + PrintExpectationsBeforeStart: true, + // last 10 output lines would suffice + PrintLinesOnFail: 10, + } + + t.Run("Filebeat starts", func(t *testing.T) { + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" + paths: + - /var/log/*.log +# we want to check that all messages are ingested +# without using an external service, this is an easy way +output.console: + enabled: true +` + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + test := NewBeatTest(t, BeatTestOptions{ + Beatname: "filebeat", + Config: config, + }) + + test. + WithReportOptions(reportOptions). + // we should observe the start message of the Beat + ExpectStart(). + // check that the first and the last line of the file get ingested + Start(ctx). + // wait until all the expectations are met + // or we hit the timeout set by the context + Wait() + }) + + t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // path items are required, this config is invalid + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" +output.console: + enabled: true +` + test := NewBeatTest(t, BeatTestOptions{ + Beatname: "filebeat", + Config: config, + }) + + test. + WithReportOptions(reportOptions). + ExpectStart(). + ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured"). + ExpectStop(1). + Start(ctx). + Wait() + }) +} From 7ec435f6e2cbeb4356e33b9441afd30d98f599b7 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 11 Feb 2025 10:05:37 +0100 Subject: [PATCH 2/7] Address review feedback: docs, code changes --- filebeat/testing/integration/log_generator.go | 2 - libbeat/testing/integration/README.md | 122 ++++++++++++++++++ libbeat/testing/integration/run_beat.go | 6 +- libbeat/testing/integration/sample_test.go | 93 ------------- 4 files changed, 125 insertions(+), 98 deletions(-) create mode 100644 libbeat/testing/integration/README.md delete mode 100644 libbeat/testing/integration/sample_test.go diff --git a/filebeat/testing/integration/log_generator.go b/filebeat/testing/integration/log_generator.go index d4961c76302b..b30476a7d39d 100644 --- a/filebeat/testing/integration/log_generator.go +++ b/filebeat/testing/integration/log_generator.go @@ -90,7 +90,6 @@ func (g jsonGenerator) FileExtension() string { // Returns the path value to put in the Filebeat configuration and // filenames for all created files. func GenerateLogFiles(t *testing.T, files, lines int, generator LogGenerator) (path string, filenames []string) { - t.Helper() t.Logf("generating %d log files with %d lines each...", files, lines) logsPath := filepath.Join(t.TempDir(), "logs") err := os.MkdirAll(logsPath, 0777) @@ -120,7 +119,6 @@ func GenerateLogFiles(t *testing.T, files, lines int, generator LogGenerator) (p // filename, amount of lines using the given generator // to create each line. func GenerateLogFile(t *testing.T, filename string, lines int, generator LogGenerator) { - t.Helper() file, err := os.Create(filename) if err != nil { t.Fatalf("failed to create a log file: %q", filename) diff --git a/libbeat/testing/integration/README.md b/libbeat/testing/integration/README.md new file mode 100644 index 000000000000..cfff21652ee9 --- /dev/null +++ b/libbeat/testing/integration/README.md @@ -0,0 +1,122 @@ +# Integration Framework for Beats + +This package contains a simple framework for integration testing of Beats. The main goal of the framework is to make it easy to test a Beat binary as close to our customer's usage as possible. No custom binaries, no inspecting internal state files, just pure output and asserting external behavior like if the Beat was a black box. + +## Current functionality + +### Basic Assertions + +* Assert an output line that contains a defined string +* Assert a list of output lines in a defined order that contain a given list of strings +* Assert an output line that matches a regular expression +* Assert a list of output lines in a defined order that match a given list of regular expressions +* Assert that the process started +* Assert that the process exited by itself with a certain exit code + +When building a Beat-specific wrapper around this framework, new assertions can be created based on the basic assertions listed above. + +### Reporting + +* Print out all defined expectations of the test +* Print last `N` lines of the output + +### Config + +* Add additional arguments to the command to run the binary +* Pass a config file (e.g. `filebeat.yml`) + +## Quick start + +### Things to know before you start + +This framework: + +* does not use log files for inspecting/matching the expected logs. Instead it connects directly to stdout/stderr and matches all the output expectations in memory line by line as they arrive. Which makes it extremely efficient at expecting thousands of log lines (e.g. confirming each line of a file gets ingested). +* kills the process immediately once the defined expectations are met, no more polling with intervals. +* runs the binary that we ship to our customers instead of a [custom binary](https://github.com/elastic/beats/blob/12c36bdfa6fe088f3963bdf5e15780878c228eaf/dev-tools/mage/gotest.go#L399-L430) +* has a call-chain interface which is very compact +* supports testing cases when a Beat crashes with errors +* has very detailed output for debugging a test failure +* is generic and in theory can be used with any Beat +* can be extended and specialized for each Beat, see the [example with Filebeat](https://github.com/elastic/beats/tree/main/filebeat/testing/integration). + +### Samples + +Sample test that validates that Filebeat started, read all the expected files to EOF and ingested all the lines from them: + +```go +func TestFilebeat(t *testing.T) { + messagePrefix := "sample text message" + fileCount := 5 + lineCount := 128 + configTemplate := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" + paths: + - %s +# we want to check that all messages are ingested +# without using an external service, this is an easy way +output.console: + enabled: true +` + // we can generate any amount of expectations + // they are light-weight + expectIngestedFiles := func(test Test, files []string) { + // ensuring we ingest every line from every file + for _, filename := range files { + for i := 1; i <= lineCount; i++ { + line := fmt.Sprintf("%s %s:%d", messagePrefix, filepath.Base(filename), i) + test.ExpectOutput(line) + } + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + generator := NewJSONGenerator(messagePrefix) + path, files := GenerateLogFiles(t, fileCount, lineCount, generator) + config := fmt.Sprintf(configTemplate, path) + test := NewTest(t, TestOptions{ + Config: config, + }) + + expectIngestedFiles(test, files) + + test. + ExpectEOF(files...). + ExpectStart(). + Start(ctx). + Wait() +} +``` + +Another sample test, this time we expect Beat to crash: + +```go +func TestFilebeat(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // path items are required, this config is invalid + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" +output.console: + enabled: true +` + test := NewBeatTest(t, BeatTestOptions{ + Beatname: "filebeat", + Config: config, + }) + + test. + ExpectStart(). + ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured"). + ExpectStop(1). + Start(ctx). + Wait() +} +``` diff --git a/libbeat/testing/integration/run_beat.go b/libbeat/testing/integration/run_beat.go index 751e8a8cca1c..a3814421f8c4 100644 --- a/libbeat/testing/integration/run_beat.go +++ b/libbeat/testing/integration/run_beat.go @@ -127,10 +127,10 @@ func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher Out t.Logf("preparing to run %s...", opts.Beatname) binaryFilename := findBeatBinaryPath(t, opts.Beatname) - + dir := t.TempDir() // create a temporary Beat config - cfgPath := filepath.Join(t.TempDir(), fmt.Sprintf("%s.yml", opts.Beatname)) - homePath := filepath.Join(t.TempDir(), "home") + cfgPath := filepath.Join(dir, fmt.Sprintf("%s.yml", opts.Beatname)) + homePath := filepath.Join(dir, "home") err := os.WriteFile(cfgPath, []byte(opts.Config), 0777) if err != nil { diff --git a/libbeat/testing/integration/sample_test.go b/libbeat/testing/integration/sample_test.go deleted file mode 100644 index 4458ca782646..000000000000 --- a/libbeat/testing/integration/sample_test.go +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package integration - -import ( - "context" - "testing" - "time" -) - -func TestFilebeat(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - EnsureCompiled(ctx, t, "filebeat") - - reportOptions := ReportOptions{ - PrintExpectationsBeforeStart: true, - // last 10 output lines would suffice - PrintLinesOnFail: 10, - } - - t.Run("Filebeat starts", func(t *testing.T) { - config := ` -filebeat.inputs: - - type: filestream - id: "test-filestream" - paths: - - /var/log/*.log -# we want to check that all messages are ingested -# without using an external service, this is an easy way -output.console: - enabled: true -` - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - test := NewBeatTest(t, BeatTestOptions{ - Beatname: "filebeat", - Config: config, - }) - - test. - WithReportOptions(reportOptions). - // we should observe the start message of the Beat - ExpectStart(). - // check that the first and the last line of the file get ingested - Start(ctx). - // wait until all the expectations are met - // or we hit the timeout set by the context - Wait() - }) - - t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // path items are required, this config is invalid - config := ` -filebeat.inputs: - - type: filestream - id: "test-filestream" -output.console: - enabled: true -` - test := NewBeatTest(t, BeatTestOptions{ - Beatname: "filebeat", - Config: config, - }) - - test. - WithReportOptions(reportOptions). - ExpectStart(). - ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured"). - ExpectStop(1). - Start(ctx). - Wait() - }) -} From bb39651b47e060a83ec10d488187b8a2cb867aaa Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 11 Feb 2025 13:09:45 +0100 Subject: [PATCH 3/7] Rebuild outdated binaries --- libbeat/testing/integration/run_beat.go | 53 +++++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/libbeat/testing/integration/run_beat.go b/libbeat/testing/integration/run_beat.go index a3814421f8c4..99a47097f6f3 100644 --- a/libbeat/testing/integration/run_beat.go +++ b/libbeat/testing/integration/run_beat.go @@ -20,9 +20,11 @@ package integration import ( "bufio" "context" + "crypto/sha256" "encoding/json" "errors" "fmt" + "io" "os" "os/exec" "path/filepath" @@ -34,6 +36,9 @@ import ( var ( compiling sync.Mutex + // map of Beat names to binary hashes that `EnsureCompiled` function built + compiled = map[string]string{} + hash = sha256.New() ) // RunningBeat describes the running Beat binary. @@ -194,30 +199,45 @@ func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher Out // EnsureCompiled ensures that the given Beat is compiled and ready // to run. +// This functions allows to use binaries only built by this function. +// Externally created binaries will be removed and rebuilt. func EnsureCompiled(ctx context.Context, t *testing.T, beatname string) (path string) { compiling.Lock() defer compiling.Unlock() t.Logf("ensuring the %s binary is available...", beatname) - binaryFilename := findBeatBinaryPath(t, beatname) + // empty if the binary was not compiled before + expectedHash := compiled[beatname] + // we allow to use binaries only built by this function. + // binaries from different origins are marked as outdated _, err := os.Stat(binaryFilename) - if err == nil { - t.Logf("found existing %s binary at %s", beatname, binaryFilename) - return binaryFilename - } - - if !errors.Is(err, os.ErrNotExist) { + if err != nil && !errors.Is(err, os.ErrNotExist) { t.Fatalf("failed to check for compiled binary %s: %s", binaryFilename, err) return "" } + if err == nil { + actualHash := hashBinary(t, binaryFilename) + if actualHash == expectedHash { + t.Logf("%s binary has been compiled before at %s, using...", beatname, binaryFilename) + return binaryFilename + } + t.Logf("found outdated %s binary at %s, removing...", beatname, binaryFilename) + err := os.Remove(binaryFilename) + if err != nil { + t.Fatalf("failed to remove outdated %s binary at %s: %s", beatname, binaryFilename, err) + return "" + } + } else { + t.Logf("%s binary was not found at %s", beatname, binaryFilename) + } mageCommand := "mage" if runtime.GOOS == "windows" { mageCommand += ".exe" } args := []string{"build"} - t.Logf("existing %s binary not found, building with \"%s %s\"... ", mageCommand, binaryFilename, strings.Join(args, " ")) + t.Logf("building %s binary with \"%s %s\"... ", binaryFilename, mageCommand, strings.Join(args, " ")) c := exec.CommandContext(ctx, mageCommand, args...) c.Dir = filepath.Dir(binaryFilename) output, err := c.CombinedOutput() @@ -229,6 +249,7 @@ func EnsureCompiled(ctx context.Context, t *testing.T, beatname string) (path st _, err = os.Stat(binaryFilename) if err == nil { t.Logf("%s binary has been successfully built ", binaryFilename) + compiled[beatname] = hashBinary(t, binaryFilename) return binaryFilename } if !errors.Is(err, os.ErrNotExist) { @@ -239,6 +260,22 @@ func EnsureCompiled(ctx context.Context, t *testing.T, beatname string) (path st return "" } +func hashBinary(t *testing.T, filename string) string { + f, err := os.Open(filename) + if err != nil { + t.Fatalf("failed to open %s: %s", filename, err) + return "" + } + defer f.Close() + hash.Reset() + if _, err := io.Copy(hash, f); err != nil { + t.Fatalf("failed to hash %s: %s", filename, err) + return "" + } + + return fmt.Sprintf("%x", hash.Sum(nil)) +} + func findBeatDir(t *testing.T, beatName string) string { pwd, err := os.Getwd() if err != nil { From a96d68eee42b974373685b5fb9c64cce5562782f Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 11 Feb 2025 14:56:24 +0100 Subject: [PATCH 4/7] Fix occasional corrupted output --- libbeat/testing/integration/run_beat.go | 48 ++++++++++++++++++++----- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/libbeat/testing/integration/run_beat.go b/libbeat/testing/integration/run_beat.go index 99a47097f6f3..1575cf6c70ef 100644 --- a/libbeat/testing/integration/run_beat.go +++ b/libbeat/testing/integration/run_beat.go @@ -161,12 +161,19 @@ func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher Out t.Logf("running %s %s", binaryFilename, strings.Join(execArgs, " ")) c := exec.CommandContext(ctx, binaryFilename, execArgs...) - output, err := c.StdoutPipe() + // we must use 2 pipes since writes are not aligned by lines + // part of the stdout output can end up in the middle of the stderr line + stdout, err := c.StdoutPipe() + if err != nil { + t.Fatalf("failed to create the stdout pipe: %s", err) + return nil + } + + stderr, err := c.StderrPipe() if err != nil { t.Fatalf("failed to create the stdout pipe: %s", err) return nil } - c.Stderr = c.Stdout b := &RunningBeat{ c: c, @@ -175,16 +182,29 @@ func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher Out outputDone: make(chan struct{}), } + var wg sync.WaitGroup + // arbitrary buffer size + output := make(chan string, 128) + + wg.Add(2) go func() { - scanner := bufio.NewScanner(output) - for scanner.Scan() { - b.writeOutputLine(scanner.Text()) - } - if scanner.Err() != nil { - t.Logf("error while reading from stdout/stderr: %s", scanner.Err()) - } + processPipe(t, stdout, output) + wg.Done() + }() + go func() { + processPipe(t, stderr, output) + wg.Done() + }() + go func() { + wg.Wait() + close(output) close(b.outputDone) }() + go func() { + for line := range output { + b.writeOutputLine(line) + } + }() err = c.Start() if err != nil { @@ -197,6 +217,16 @@ func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher Out return b } +func processPipe(t *testing.T, r io.Reader, output chan<- string) { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + output <- scanner.Text() + } + if scanner.Err() != nil { + t.Logf("error while reading from stdout/stderr: %s", scanner.Err()) + } +} + // EnsureCompiled ensures that the given Beat is compiled and ready // to run. // This functions allows to use binaries only built by this function. From e6fdff1f806c43c2c281d16e1235178a5a2ee9a3 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 11 Feb 2025 14:58:43 +0100 Subject: [PATCH 5/7] Move the channel to the right place --- libbeat/testing/integration/run_beat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/testing/integration/run_beat.go b/libbeat/testing/integration/run_beat.go index 1575cf6c70ef..00f7327ce9a6 100644 --- a/libbeat/testing/integration/run_beat.go +++ b/libbeat/testing/integration/run_beat.go @@ -198,12 +198,12 @@ func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher Out go func() { wg.Wait() close(output) - close(b.outputDone) }() go func() { for line := range output { b.writeOutputLine(line) } + close(b.outputDone) }() err = c.Start() From d956ba79a8fb11c314f3065b1e3c5750d681ecc6 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 11 Feb 2025 16:49:57 +0100 Subject: [PATCH 6/7] Remove the extra log --- libbeat/testing/integration/integration.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/testing/integration/integration.go b/libbeat/testing/integration/integration.go index 95be1f7ba268..832dec0787b2 100644 --- a/libbeat/testing/integration/integration.go +++ b/libbeat/testing/integration/integration.go @@ -222,7 +222,6 @@ func (b *beatTest) ExpectOutput(lines ...string) BeatTest { watchers := make([]OutputWatcher, 0, len(lines)) for _, l := range lines { escaped := escapeJSONCharacters(l) - b.t.Log(escaped) watchers = append(watchers, NewStringWatcher(escaped)) } b.expectations = append(b.expectations, NewInOrderWatcher(watchers)) From 4e88b1fbe3a62e2e25fe464cfcec973a03985b4c Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 11 Feb 2025 17:02:04 +0100 Subject: [PATCH 7/7] Clarify the docs --- filebeat/testing/integration/log_generator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/filebeat/testing/integration/log_generator.go b/filebeat/testing/integration/log_generator.go index b30476a7d39d..424f4f3559a3 100644 --- a/filebeat/testing/integration/log_generator.go +++ b/filebeat/testing/integration/log_generator.go @@ -32,7 +32,8 @@ type LogGenerator interface { // GenerateLine generates a single line for a log file. // Expected no new line character at the end. GenerateLine(filename string, index int) string - // FileExtension sets the extension of the file where lines are written. + // FileExtension defines the extension of the new file where + // the generated lines are written. FileExtension() string }