Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simplified integration framework #42450

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions filebeat/testing/integration/integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package integration

import (
"context"
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/testing/integration"
)

// EnsureCompiled ensures that Filebeat is compiled and ready
// to run.
func EnsureCompiled(ctx context.Context, t *testing.T) (path string) {
return integration.EnsureCompiled(ctx, t, "filebeat")
}

// Test describes all operations for testing Filebeat
//
// Due to interface composition all Filebeat-specific functions
// must be used first in the call-chain.
type Test interface {
integration.BeatTest
// ExpectEOF sets an expectation that Filebeat will read the given
// files to EOF.
ExpectEOF(...string) Test
}

// TestOptions describes all available options for the test.
type TestOptions struct {
// Config for the Beat written in YAML
Config string
// Args sets additional arguments to pass when running the binary.
Args []string
}

// NewTest creates a new integration test for Filebeat.
func NewTest(t *testing.T, opts TestOptions) Test {
return &test{
BeatTest: integration.NewBeatTest(t, integration.BeatTestOptions{
Beatname: "filebeat",
Config: opts.Config,
Args: opts.Args,
}),
}
}

type test struct {
integration.BeatTest
}

// ExpectEOF implements the Test interface.
func (fbt *test) ExpectEOF(files ...string) Test {
// Ensuring we completely ingest every file
for _, filename := range files {
line := fmt.Sprintf("End of file reached: %s; Backoff now.", filename)
fbt.ExpectOutput(line)
}

return fbt
}
138 changes: 138 additions & 0 deletions filebeat/testing/integration/log_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package integration

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"

uuid "github.com/gofrs/uuid/v5"
)

// LogGenerator used for generating log files
type LogGenerator interface {
// GenerateLine generates a single line for a log file.
// Expected no new line character at the end.
GenerateLine(filename string, index int) string
// FileExtension sets the extension of the file where lines are written.
FileExtension() string
}

// NewPlainTextGenerator creates is a simple plain text generator.
//
// It's using the given message prefix following by the filename
// and the line number, e.g. `filename:128`
func NewPlainTextGenerator(prefix string) LogGenerator {
return plainTextGenerator{
prefix: prefix,
}
}

type plainTextGenerator struct {
prefix string
}

func (g plainTextGenerator) GenerateLine(filename string, index int) string {
return fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index)
}

func (g plainTextGenerator) FileExtension() string {
return ".log"
}

// JSONLineGenerator creates a JSON log line generator.
// Forms a JSON object with a message
// prefixed by the given prefix and followed by the filename
// and the line number, e.g. `filename:128`
func NewJSONGenerator(prefix string) LogGenerator {
return jsonGenerator{
prefix: prefix,
}
}

type jsonGenerator struct {
prefix string
}

func (g jsonGenerator) GenerateLine(filename string, index int) string {
message := fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index)

line := struct{ Message string }{Message: message}
bytes, _ := json.Marshal(line)
return string(bytes)
}

func (g jsonGenerator) FileExtension() string {
return ".ndjson"
}

// GenerateLogFiles generate given amount of files with given
// amount of lines in them.
//
// Returns the path value to put in the Filebeat configuration and
// filenames for all created files.
func GenerateLogFiles(t *testing.T, files, lines int, generator LogGenerator) (path string, filenames []string) {
t.Helper()
t.Logf("generating %d log files with %d lines each...", files, lines)
logsPath := filepath.Join(t.TempDir(), "logs")
err := os.MkdirAll(logsPath, 0777)
if err != nil {
t.Fatalf("failed to create a directory for logs %q: %s", logsPath, err)
return "", nil
}

filenames = make([]string, 0, files)
for i := 0; i < files; i++ {
id, err := uuid.NewV4()
if err != nil {
t.Fatalf("failed to generate a unique filename: %s", err)
return "", nil
}
filename := filepath.Join(logsPath, id.String()+generator.FileExtension())
filenames = append(filenames, filename)
GenerateLogFile(t, filename, lines, generator)
}

t.Logf("finished generating %d log files with %d lines each", files, lines)

return filepath.Join(logsPath, "*"+generator.FileExtension()), filenames
}

// GenerateLogFile generates a single log file with the given full
// filename, amount of lines using the given generator
// to create each line.
func GenerateLogFile(t *testing.T, filename string, lines int, generator LogGenerator) {
t.Helper()
file, err := os.Create(filename)
if err != nil {
t.Fatalf("failed to create a log file: %q", filename)
return
}
defer file.Close()
for i := 1; i <= lines; i++ {
line := generator.GenerateLine(filename, i) + "\n"
_, err := file.WriteString(line)
if err != nil {
t.Fatalf("cannot write a generated log line to %s", filename)
return
}
}
}
139 changes: 139 additions & 0 deletions filebeat/testing/integration/sample_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package integration

import (
"context"
"fmt"
"path/filepath"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/testing/integration"
)

func TestFilebeat(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
EnsureCompiled(ctx, t)

messagePrefix := "sample test message"
fileCount := 5
lineCount := 128

reportOptions := integration.ReportOptions{
PrintLinesOnFail: 10,
}

t.Run("Filebeat starts and ingests files", func(t *testing.T) {
configTemplate := `
filebeat.inputs:
- type: filestream
id: "test-filestream"
paths:
- %s
# we want to check that all messages are ingested
# without using an external service, this is an easy way
output.console:
enabled: true
`
// we can generate any amount of expectations
// they are light-weight
expectIngestedFiles := func(test Test, files []string) {
// ensuring we ingest every line from every file
for _, filename := range files {
for i := 1; i <= lineCount; i++ {
line := fmt.Sprintf("%s %s:%d", messagePrefix, filepath.Base(filename), i)
test.ExpectOutput(line)
}
}
}

t.Run("plain text files", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

generator := NewPlainTextGenerator(messagePrefix)
path, files := GenerateLogFiles(t, fileCount, lineCount, generator)
config := fmt.Sprintf(configTemplate, path)
test := NewTest(t, TestOptions{
Config: config,
})

expectIngestedFiles(test, files)

test.
// we expect to read all generated files to EOF
ExpectEOF(files...).
WithReportOptions(reportOptions).
// we should observe the start message of the Beat
ExpectStart().
// check that the first and the last line of the file get ingested
Start(ctx).
// wait until all the expectations are met
// or we hit the timeout set by the context
Wait()
})

t.Run("JSON files", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

generator := NewJSONGenerator(messagePrefix)
path, files := GenerateLogFiles(t, fileCount, lineCount, generator)
config := fmt.Sprintf(configTemplate, path)
test := NewTest(t, TestOptions{
Config: config,
})

expectIngestedFiles(test, files)

test.
ExpectEOF(files...).
WithReportOptions(reportOptions).
ExpectStart().
Start(ctx).
Wait()
})
})

t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// path items are required, this config is invalid
config := `
filebeat.inputs:
- type: filestream
id: "test-filestream"
output.console:
enabled: true
`
test := NewTest(t, TestOptions{
Config: config,
})

test.
WithReportOptions(reportOptions).
ExpectStart().
ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured").
ExpectStop(1).
Start(ctx).
Wait()
})
}
Loading
Loading