Skip to content

Commit

Permalink
add filebeat integration test for shipper input/output (#36349)
Browse files Browse the repository at this point in the history
* add filebeat integration test for shipper input/output
  • Loading branch information
leehinman authored Sep 7, 2023
1 parent 3d0cdb0 commit 6cf8778
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 13 deletions.
36 changes: 23 additions & 13 deletions libbeat/tests/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc {

glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(tempDir, beatName))
files, err := filepath.Glob(glob)
if err != nil {
t.Logf("glob error with: %s: %s", glob, err)
}
for _, f := range files {
contents, err := readLastNBytes(f, maxlen)
if err != nil {
Expand Down Expand Up @@ -210,10 +213,10 @@ func (b *BeatProc) Start(args ...string) {
func (b *BeatProc) startBeat() {
b.cmdMutex.Lock()
defer b.cmdMutex.Unlock()
b.stdout.Seek(0, 0)
b.stdout.Truncate(0)
b.stderr.Seek(0, 0)
b.stderr.Truncate(0)
_, _ = b.stdout.Seek(0, 0)
_ = b.stdout.Truncate(0)
_, _ = b.stderr.Seek(0, 0)
_ = b.stderr.Truncate(0)
var procAttr os.ProcAttr
procAttr.Files = []*os.File{os.Stdin, b.stdout, b.stderr}
process, err := os.StartProcess(b.fullPath, b.Args, &procAttr)
Expand Down Expand Up @@ -254,7 +257,7 @@ func (b *BeatProc) Stop() {
func (b *BeatProc) LogMatch(match string) bool {
re := regexp.MustCompile(match)
logFile := b.openLogFile()
_, err := logFile.Seek(b.logFileOffset, os.SEEK_SET)
_, err := logFile.Seek(b.logFileOffset, io.SeekStart)
if err != nil {
b.t.Fatalf("could not set offset for '%s': %s", logFile.Name(), err)
}
Expand Down Expand Up @@ -294,7 +297,7 @@ func (b *BeatProc) LogMatch(match string) bool {
func (b *BeatProc) LogContains(s string) bool {
t := b.t
logFile := b.openLogFile()
_, err := logFile.Seek(b.logFileOffset, os.SEEK_SET)
_, err := logFile.Seek(b.logFileOffset, io.SeekStart)
if err != nil {
t.Fatalf("could not set offset for '%s': %s", logFile.Name(), err)
}
Expand Down Expand Up @@ -402,14 +405,17 @@ func (b *BeatProc) openLogFile() *os.File {
// If the tests are run with -v, the temporary directory will
// be logged.
func createTempDir(t *testing.T) string {
tempDir, err := filepath.Abs(filepath.Join("../../build/integration-tests/",
fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix())))
rootDir, err := filepath.Abs("../../build/integration-tests")
if err != nil {
t.Fatal(err)
t.Fatalf("failed to determine absolute path for temp dir: %s", err)
}

if err := os.MkdirAll(tempDir, 0o766); err != nil {
t.Fatalf("cannot create tmp dir: %s, msg: %s", err, err.Error())
err = os.MkdirAll(rootDir, 0o750)
if err != nil {
t.Fatalf("error making test dir: %s: %s", rootDir, err)
}
tempDir, err := os.MkdirTemp(rootDir, strings.ReplaceAll(t.Name(), "/", "-"))
if err != nil {
t.Fatalf("failed to make temp directory: %s", err)
}

cleanup := func() {
Expand Down Expand Up @@ -450,6 +456,7 @@ func EnsureESIsRunning(t *testing.T) {
// containers required for integration tests
t.Fatalf("cannot execute HTTP request to ES: '%s', check to make sure ES is running (mage compose:Up)", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected HTTP status: %d, expecting 200 - OK", resp.StatusCode)
}
Expand Down Expand Up @@ -570,7 +577,10 @@ func GetKibana(t *testing.T) (url.URL, *url.Userinfo) {
func HttpDo(t *testing.T, method string, targetURL url.URL) (statusCode int, body []byte, err error) {
t.Helper()
client := &http.Client{}
req, err := http.NewRequest(method, targetURL.String(), nil)

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
req, err := http.NewRequestWithContext(ctx, method, targetURL.String(), nil)
if err != nil {
return 0, nil, fmt.Errorf("error making request, method: %s, url: %s, error: %w", method, targetURL.String(), err)
}
Expand Down
190 changes: 190 additions & 0 deletions x-pack/filebeat/tests/integration/shipper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration && !windows

package integration

import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator"
)

func TestShipperInputOutput(t *testing.T) {
integration.EnsureESIsRunning(t)
esURL := integration.GetESURL(t, "http")
esPassword, ok := esURL.User.Password()
require.True(t, ok, "ES didn't have a password")
kURL, kUserInfo := integration.GetKibana(t)
kPassword, ok := kUserInfo.Password()
require.True(t, ok, "Kibana didn't have a password")

gRpcPath := filepath.Join(os.TempDir(), "grpc")

// create file to ingest with a unique message
inputFilePath := filepath.Join(t.TempDir(), "test.log")
inputFile, err := os.Create(inputFilePath)
require.NoError(t, err, "error creating input test file")
uniqVal := make([]byte, 16)
_, err = rand.Read(uniqVal)
uniqMsg := fmt.Sprintf("%X", uniqVal)
require.NoError(t, err, "error getting a unique random value")
_, err = inputFile.Write([]byte(uniqMsg))
require.NoError(t, err, "error writing input test file")
_, err = inputFile.Write([]byte("\n"))
require.NoError(t, err, "error writing new line")
err = inputFile.Close()
require.NoError(t, err, "error closing input test file")

// Elasticsearch client
esCfg := elasticsearch.Config{
Addresses: []string{esURL.String()},
Username: esURL.User.Username(),
Password: esPassword,
}
es, err := elasticsearch.NewTypedClient(esCfg)
require.NoError(t, err, "error creating new es client")

cfg := `filebeat.inputs:
- type: filestream
id: my-filestream-id
paths:
- %s
output.elasticsearch:
hosts:
- %s
username: %s
password: %s
allow_older_versions: true
setup.kibana:
hosts: %s
username: %s
password: %s
logging.level: debug
queue.mem:
events: 100
flush.min_events: 0
`
// check that file can be ingested normally and found in elasticsearch
filebeat := integration.NewBeat(t, "filebeat", "../../filebeat.test")
filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword))
filebeat.Start()
filebeat.WaitForLogs("Publish event: ", 10*time.Second)
filebeat.WaitForLogs("PublishEvents: ", 10*time.Second)
// It can take a few seconds for a doc to show up in a search
require.Eventually(t, func() bool {
res, err := es.Search().
Index(".ds-filebeat-*").
Request(&search.Request{
Query: &types.Query{
Match: map[string]types.MatchQuery{
"message": {
Query: uniqMsg,
Operator: &operator.And,
},
},
},
}).Do(context.Background())
require.NoError(t, err, "error doing search request: %s", err)
return res.Hits.Total.Value == 1
}, 30*time.Second, 250*time.Millisecond, "never found document")

shipperCfg := `filebeat.inputs:
- type: shipper
server: unix://%s
id: my-shipper-id
data_stream:
data_set: generic
type: log
namespace: generic
streams:
- id: stream-id
output.elasticsearch:
hosts:
- %s
username: %s
password: %s
allow_older_versions: true
setup.kibana:
hosts: %s
username: %s
password: %s
logging.level: debug
queue.mem:
events: 100
flush.min_events: 0
`
// start a shipper filebeat, wait until gRPC service starts
shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test")
shipper.WriteConfigFile(fmt.Sprintf(shipperCfg, gRpcPath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword))
shipper.Start()
shipper.WaitForLogs("done setting up gRPC server", 30*time.Second)

fb2shipperCfg := `filebeat.inputs:
- type: filestream
id: my-filestream-id
paths:
- %s
output.shipper:
server: unix://%s
setup.kibana:
hosts: %s
username: %s
password: %s
logging.level: debug
queue.mem:
events: 100
flush.min_events: 0
processors:
- script:
lang: javascript
source: >
function process(event) {
event.Put("@metadata.stream_id", "stream-id");
}
- add_fields:
target: data_stream
fields:
type: logs
namespace: generic
dataset: generic
`
// start filebeat with shipper output, make doc is ingested into elasticsearch
fb2shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test")
fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword))
fb2shipper.Start()
fb2shipper.WaitForLogs("Publish event: ", 10*time.Second)
fb2shipper.WaitForLogs("events to protobuf", 10*time.Second)
require.Eventually(t, func() bool {
res, err := es.Search().
Index(".ds-filebeat-*").
Request(&search.Request{
Query: &types.Query{
Match: map[string]types.MatchQuery{
"message": {
Query: uniqMsg,
Operator: &operator.And,
},
},
},
}).Do(context.Background())
require.NoError(t, err, "error doing search request: %s", err)
return res.Hits.Total.Value == 2
}, 30*time.Second, 250*time.Millisecond, "never found 2 documents")
// ToDo add comparison of docs to make sure they are the same
// for example right now input.type is being overwritten with shipper
}

0 comments on commit 6cf8778

Please sign in to comment.