From 6cf8778bcab52d2cbf8c49794e4337ac2903b7e3 Mon Sep 17 00:00:00 2001 From: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Date: Wed, 6 Sep 2023 19:26:19 -0500 Subject: [PATCH] add filebeat integration test for shipper input/output (#36349) * add filebeat integration test for shipper input/output --- libbeat/tests/integration/framework.go | 36 ++-- .../tests/integration/shipper_test.go | 190 ++++++++++++++++++ 2 files changed, 213 insertions(+), 13 deletions(-) create mode 100644 x-pack/filebeat/tests/integration/shipper_test.go diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 4c2e0935bca..046c578d7cd 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -138,6 +138,9 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(tempDir, beatName)) files, err := filepath.Glob(glob) + if err != nil { + t.Logf("glob error with: %s: %s", glob, err) + } for _, f := range files { contents, err := readLastNBytes(f, maxlen) if err != nil { @@ -210,10 +213,10 @@ func (b *BeatProc) Start(args ...string) { func (b *BeatProc) startBeat() { b.cmdMutex.Lock() defer b.cmdMutex.Unlock() - b.stdout.Seek(0, 0) - b.stdout.Truncate(0) - b.stderr.Seek(0, 0) - b.stderr.Truncate(0) + _, _ = b.stdout.Seek(0, 0) + _ = b.stdout.Truncate(0) + _, _ = b.stderr.Seek(0, 0) + _ = b.stderr.Truncate(0) var procAttr os.ProcAttr procAttr.Files = []*os.File{os.Stdin, b.stdout, b.stderr} process, err := os.StartProcess(b.fullPath, b.Args, &procAttr) @@ -254,7 +257,7 @@ func (b *BeatProc) Stop() { func (b *BeatProc) LogMatch(match string) bool { re := regexp.MustCompile(match) logFile := b.openLogFile() - _, err := logFile.Seek(b.logFileOffset, os.SEEK_SET) + _, err := logFile.Seek(b.logFileOffset, io.SeekStart) if err != nil { b.t.Fatalf("could not set offset for '%s': %s", logFile.Name(), err) } @@ -294,7 +297,7 @@ func (b *BeatProc) LogMatch(match string) bool { func (b *BeatProc) LogContains(s string) bool { t := b.t logFile := b.openLogFile() - _, err := logFile.Seek(b.logFileOffset, os.SEEK_SET) + _, err := logFile.Seek(b.logFileOffset, io.SeekStart) if err != nil { t.Fatalf("could not set offset for '%s': %s", logFile.Name(), err) } @@ -402,14 +405,17 @@ func (b *BeatProc) openLogFile() *os.File { // If the tests are run with -v, the temporary directory will // be logged. func createTempDir(t *testing.T) string { - tempDir, err := filepath.Abs(filepath.Join("../../build/integration-tests/", - fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()))) + rootDir, err := filepath.Abs("../../build/integration-tests") if err != nil { - t.Fatal(err) + t.Fatalf("failed to determine absolute path for temp dir: %s", err) } - - if err := os.MkdirAll(tempDir, 0o766); err != nil { - t.Fatalf("cannot create tmp dir: %s, msg: %s", err, err.Error()) + err = os.MkdirAll(rootDir, 0o750) + if err != nil { + t.Fatalf("error making test dir: %s: %s", rootDir, err) + } + tempDir, err := os.MkdirTemp(rootDir, strings.ReplaceAll(t.Name(), "/", "-")) + if err != nil { + t.Fatalf("failed to make temp directory: %s", err) } cleanup := func() { @@ -450,6 +456,7 @@ func EnsureESIsRunning(t *testing.T) { // containers required for integration tests t.Fatalf("cannot execute HTTP request to ES: '%s', check to make sure ES is running (mage compose:Up)", err) } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Errorf("unexpected HTTP status: %d, expecting 200 - OK", resp.StatusCode) } @@ -570,7 +577,10 @@ func GetKibana(t *testing.T) (url.URL, *url.Userinfo) { func HttpDo(t *testing.T, method string, targetURL url.URL) (statusCode int, body []byte, err error) { t.Helper() client := &http.Client{} - req, err := http.NewRequest(method, targetURL.String(), nil) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + defer cancel() + req, err := http.NewRequestWithContext(ctx, method, targetURL.String(), nil) if err != nil { return 0, nil, fmt.Errorf("error making request, method: %s, url: %s, error: %w", method, targetURL.String(), err) } diff --git a/x-pack/filebeat/tests/integration/shipper_test.go b/x-pack/filebeat/tests/integration/shipper_test.go new file mode 100644 index 00000000000..35f76b73585 --- /dev/null +++ b/x-pack/filebeat/tests/integration/shipper_test.go @@ -0,0 +1,190 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !windows + +package integration + +import ( + "context" + "crypto/rand" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator" +) + +func TestShipperInputOutput(t *testing.T) { + integration.EnsureESIsRunning(t) + esURL := integration.GetESURL(t, "http") + esPassword, ok := esURL.User.Password() + require.True(t, ok, "ES didn't have a password") + kURL, kUserInfo := integration.GetKibana(t) + kPassword, ok := kUserInfo.Password() + require.True(t, ok, "Kibana didn't have a password") + + gRpcPath := filepath.Join(os.TempDir(), "grpc") + + // create file to ingest with a unique message + inputFilePath := filepath.Join(t.TempDir(), "test.log") + inputFile, err := os.Create(inputFilePath) + require.NoError(t, err, "error creating input test file") + uniqVal := make([]byte, 16) + _, err = rand.Read(uniqVal) + uniqMsg := fmt.Sprintf("%X", uniqVal) + require.NoError(t, err, "error getting a unique random value") + _, err = inputFile.Write([]byte(uniqMsg)) + require.NoError(t, err, "error writing input test file") + _, err = inputFile.Write([]byte("\n")) + require.NoError(t, err, "error writing new line") + err = inputFile.Close() + require.NoError(t, err, "error closing input test file") + + // Elasticsearch client + esCfg := elasticsearch.Config{ + Addresses: []string{esURL.String()}, + Username: esURL.User.Username(), + Password: esPassword, + } + es, err := elasticsearch.NewTypedClient(esCfg) + require.NoError(t, err, "error creating new es client") + + cfg := `filebeat.inputs: +- type: filestream + id: my-filestream-id + paths: + - %s +output.elasticsearch: + hosts: + - %s + username: %s + password: %s + allow_older_versions: true +setup.kibana: + hosts: %s + username: %s + password: %s +logging.level: debug +queue.mem: + events: 100 + flush.min_events: 0 +` + // check that file can be ingested normally and found in elasticsearch + filebeat := integration.NewBeat(t, "filebeat", "../../filebeat.test") + filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword)) + filebeat.Start() + filebeat.WaitForLogs("Publish event: ", 10*time.Second) + filebeat.WaitForLogs("PublishEvents: ", 10*time.Second) + // It can take a few seconds for a doc to show up in a search + require.Eventually(t, func() bool { + res, err := es.Search(). + Index(".ds-filebeat-*"). + Request(&search.Request{ + Query: &types.Query{ + Match: map[string]types.MatchQuery{ + "message": { + Query: uniqMsg, + Operator: &operator.And, + }, + }, + }, + }).Do(context.Background()) + require.NoError(t, err, "error doing search request: %s", err) + return res.Hits.Total.Value == 1 + }, 30*time.Second, 250*time.Millisecond, "never found document") + + shipperCfg := `filebeat.inputs: +- type: shipper + server: unix://%s + id: my-shipper-id + data_stream: + data_set: generic + type: log + namespace: generic + streams: + - id: stream-id +output.elasticsearch: + hosts: + - %s + username: %s + password: %s + allow_older_versions: true +setup.kibana: + hosts: %s + username: %s + password: %s +logging.level: debug +queue.mem: + events: 100 + flush.min_events: 0 +` + // start a shipper filebeat, wait until gRPC service starts + shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test") + shipper.WriteConfigFile(fmt.Sprintf(shipperCfg, gRpcPath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword)) + shipper.Start() + shipper.WaitForLogs("done setting up gRPC server", 30*time.Second) + + fb2shipperCfg := `filebeat.inputs: +- type: filestream + id: my-filestream-id + paths: + - %s +output.shipper: + server: unix://%s +setup.kibana: + hosts: %s + username: %s + password: %s +logging.level: debug +queue.mem: + events: 100 + flush.min_events: 0 +processors: +- script: + lang: javascript + source: > + function process(event) { + event.Put("@metadata.stream_id", "stream-id"); + } +- add_fields: + target: data_stream + fields: + type: logs + namespace: generic + dataset: generic +` + // start filebeat with shipper output, make doc is ingested into elasticsearch + fb2shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test") + fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword)) + fb2shipper.Start() + fb2shipper.WaitForLogs("Publish event: ", 10*time.Second) + fb2shipper.WaitForLogs("events to protobuf", 10*time.Second) + require.Eventually(t, func() bool { + res, err := es.Search(). + Index(".ds-filebeat-*"). + Request(&search.Request{ + Query: &types.Query{ + Match: map[string]types.MatchQuery{ + "message": { + Query: uniqMsg, + Operator: &operator.And, + }, + }, + }, + }).Do(context.Background()) + require.NoError(t, err, "error doing search request: %s", err) + return res.Hits.Total.Value == 2 + }, 30*time.Second, 250*time.Millisecond, "never found 2 documents") + // ToDo add comparison of docs to make sure they are the same + // for example right now input.type is being overwritten with shipper +}