Skip to content

Commit

Permalink
add equiv check to filebeat as shipper test (#36536)
Browse files Browse the repository at this point in the history
* add equiv check to filebeat as shipper test

Closes elastic/elastic-agent-shipper#290

* update go.mod & NOTICE.txt

* fix unchecked error
  • Loading branch information
leehinman authored Sep 12, 2023
1 parent c10fc69 commit ad2da2b
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 42 deletions.
60 changes: 30 additions & 30 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22291,6 +22291,36 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/sergi/go-diff
Version: v1.3.1
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/sergi/[email protected]/LICENSE:

Copyright (c) 2012-2016 The go-diff Authors. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.



--------------------------------------------------------------------------------
Dependency : github.com/shirou/gopsutil/v3
Version: v3.22.10
Expand Down Expand Up @@ -47464,36 +47494,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.


--------------------------------------------------------------------------------
Dependency : github.com/sergi/go-diff
Version: v1.3.1
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/sergi/[email protected]/LICENSE:

Copyright (c) 2012-2016 The go-diff Authors. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.



--------------------------------------------------------------------------------
Dependency : github.com/shirou/gopsutil
Version: v3.21.11+incompatible
Expand Down
4 changes: 3 additions & 1 deletion filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func newCommonConfigEditor(
setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
if !clientCfg.Processing.DisableType {
setOptional(fields, "input.type", config.Type)
}
if config.Module != "" {
event := mapstr.M{"module": config.Module}
if config.Fileset != "" {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ require (
github.com/otiai10/copy v1.12.0
github.com/pierrec/lz4/v4 v4.1.16
github.com/pkg/xattr v0.4.9
github.com/sergi/go-diff v1.3.1
github.com/shirou/gopsutil/v3 v3.22.10
go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.4
go.elastic.co/apm/module/apmhttp/v2 v2.4.4
Expand Down Expand Up @@ -340,7 +341,6 @@ require (
github.com/rootless-containers/rootlesskit v1.1.0 // indirect
github.com/sanathkr/go-yaml v0.0.0-20170819195128-ed9d249f429b // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type ProcessingConfig struct {
// is applied to events. If nil the Beat's default behavior prevails.
EventNormalization *bool

// Disables the addition of input.type
DisableType bool

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
Expand Down
13 changes: 7 additions & 6 deletions x-pack/filebeat/input/shipper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type InputManager struct {

// NewInputManager creates a new shipper input manager
func NewInputManager(log *logp.Logger) *InputManager {

log.Infof("creating new InputManager")
return &InputManager{
log: log.Named("shipper-beat"),
Expand All @@ -68,7 +67,6 @@ func (im *InputManager) Init(_ unison.Group, _ v2.Mode) error {
// Create creates the input from a given config
// in an attempt to speed things up, this will create the processors from the config before we have access to the pipeline to create the clients
func (im *InputManager) Create(cfg *config.C) (v2.Input, error) {

config := Instance{}
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("error unpacking config: %w", err)
Expand Down Expand Up @@ -135,7 +133,7 @@ func (in *shipperInput) Test(ctx v2.TestContext) error {
// Stop the shipper
func (in *shipperInput) Stop() {
in.log.Infof("shipper shutting down")
//stop individual clients
// stop individual clients
for streamID, stream := range in.streams {
err := stream.client.Close()
if err != nil {
Expand Down Expand Up @@ -171,8 +169,11 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err
PublishMode: beat.GuaranteedSend,
EventListener: acker.TrackingCounter(in.acker.Track),
Processing: beat.ProcessingConfig{
Processor: streamProc.processors,
Processor: streamProc.processors,
DisableHost: true,
DisableType: true,
},

CloseRef: inputContext.Cancelation,
})
if err != nil {
Expand All @@ -184,7 +185,7 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err
in.streams[streamID] = newStreamData
}

//setup gRPC
// setup gRPC
err := in.setupgRPC(pipeline)
if err != nil {
return fmt.Errorf("error starting shipper gRPC server: %w", err)
Expand Down Expand Up @@ -264,7 +265,7 @@ func (in *shipperInput) setupgRPC(pipeline beat.Pipeline) error {
}

func (in *shipperInput) sendEvent(event *messages.Event) (uint64, error) {
//look for matching processor config
// look for matching processor config
stream, ok := in.streams[event.Source.StreamId]
if !ok {
return 0, fmt.Errorf("could not find data stream associated with ID '%s'", event.Source.StreamId)
Expand Down
88 changes: 84 additions & 4 deletions x-pack/filebeat/tests/integration/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ package integration
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/sergi/go-diff/diffmatchpatch"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
Expand Down Expand Up @@ -75,13 +78,29 @@ setup.kibana:
username: %s
password: %s
logging.level: debug
queue.mem:
events: 100
flush.min_events: 0
processors:
- add_fields:
target: data_stream
fields:
type: logs
namespace: generic
dataset: generic
- add_fields:
target: host
fields:
name: %s
- add_fields:
target: agent
fields:
type: metricbeat
`
// check that file can be ingested normally and found in elasticsearch
filebeat := integration.NewBeat(t, "filebeat", "../../filebeat.test")
filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword))
filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg))
filebeat.Start()
filebeat.WaitForLogs("Publish event: ", 10*time.Second)
filebeat.WaitForLogs("PublishEvents: ", 10*time.Second)
Expand Down Expand Up @@ -162,10 +181,18 @@ processors:
type: logs
namespace: generic
dataset: generic
- add_fields:
target: host
fields:
name: %s
- add_fields:
target: agent
fields:
type: metricbeat
`
// start filebeat with shipper output, make doc is ingested into elasticsearch
fb2shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test")
fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword))
fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg))
fb2shipper.Start()
fb2shipper.WaitForLogs("Publish event: ", 10*time.Second)
fb2shipper.WaitForLogs("events to protobuf", 10*time.Second)
Expand All @@ -185,6 +212,59 @@ processors:
require.NoError(t, err, "error doing search request: %s", err)
return res.Hits.Total.Value == 2
}, 30*time.Second, 250*time.Millisecond, "never found 2 documents")
// ToDo add comparison of docs to make sure they are the same
// for example right now input.type is being overwritten with shipper

res, err := es.Search().
Index(".ds-filebeat-*").
Request(&search.Request{
Query: &types.Query{
Match: map[string]types.MatchQuery{
"message": {
Query: uniqMsg,
Operator: &operator.And,
},
},
},
}).Do(context.Background())
require.NoError(t, err, "error doing search request: %s", err)
require.Equal(t, int64(2), res.Hits.Total.Value)
diff, err := diffDocs(res.Hits.Hits[0].Source_,
res.Hits.Hits[1].Source_)
require.NoError(t, err, "error diffing docs")
if len(diff) != 0 {
t.Fatalf("docs differ:\n:%s\n", diff)
}
}

func diffDocs(doc1 json.RawMessage, doc2 json.RawMessage) (string, error) {
fieldsToDrop := []string{
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"elastic_agent.id",
}
var d1 map[string]interface{}
var d2 map[string]interface{}

if err := json.Unmarshal(doc1, &d1); err != nil {
return "", err
}

if err := json.Unmarshal(doc2, &d2); err != nil {
return "", err
}
f1 := mapstr.M(d1).Flatten()
f2 := mapstr.M(d2).Flatten()

for _, key := range fieldsToDrop {
_ = f1.Delete(key)
_ = f2.Delete(key)
}

dmp := diffmatchpatch.New()
diffs := dmp.DiffMain(f1.StringToPrint(), f2.StringToPrint(), false)

if len(diffs) != 1 {
return dmp.DiffPrettyText(diffs), nil
}
return "", nil
}

0 comments on commit ad2da2b

Please sign in to comment.