From ad2da2b10a0140c66f3e3a7204ef9388f20f1f2c Mon Sep 17 00:00:00 2001 From: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Date: Tue, 12 Sep 2023 17:01:07 -0500 Subject: [PATCH] add equiv check to filebeat as shipper test (#36536) * add equiv check to filebeat as shipper test Closes elastic/elastic-agent-shipper#290 * update go.mod & NOTICE.txt * fix unchecked error --- NOTICE.txt | 60 ++++++------- filebeat/channel/runner.go | 4 +- go.mod | 2 +- libbeat/beat/pipeline.go | 3 + x-pack/filebeat/input/shipper/input.go | 13 +-- .../tests/integration/shipper_test.go | 88 ++++++++++++++++++- 6 files changed, 128 insertions(+), 42 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 5f075e426e1..2d20996227d 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -22291,6 +22291,36 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- +Dependency : github.com/sergi/go-diff +Version: v1.3.1 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/sergi/go-diff@v1.3.1/LICENSE: + +Copyright (c) 2012-2016 The go-diff Authors. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. + + + -------------------------------------------------------------------------------- Dependency : github.com/shirou/gopsutil/v3 Version: v3.22.10 @@ -47464,36 +47494,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. --------------------------------------------------------------------------------- -Dependency : github.com/sergi/go-diff -Version: v1.3.1 -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/sergi/go-diff@v1.3.1/LICENSE: - -Copyright (c) 2012-2016 The go-diff Authors. All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining a -copy of this software and associated documentation files (the "Software"), -to deal in the Software without restriction, including without limitation -the rights to use, copy, modify, merge, publish, distribute, sublicense, -and/or sell copies of the Software, and to permit persons to whom the -Software is furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included -in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. - - - -------------------------------------------------------------------------------- Dependency : github.com/shirou/gopsutil Version: v3.21.11+incompatible diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 359a140dc7d..3bd745d5aff 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -155,7 +155,9 @@ func newCommonConfigEditor( setOptional(meta, "pipeline", config.Pipeline) setOptional(fields, "fileset.name", config.Fileset) setOptional(fields, "service.type", serviceType) - setOptional(fields, "input.type", config.Type) + if !clientCfg.Processing.DisableType { + setOptional(fields, "input.type", config.Type) + } if config.Module != "" { event := mapstr.M{"module": config.Module} if config.Fileset != "" { diff --git a/go.mod b/go.mod index fecbf95b1ed..cfbc0ddeb93 100644 --- a/go.mod +++ b/go.mod @@ -217,6 +217,7 @@ require ( github.com/otiai10/copy v1.12.0 github.com/pierrec/lz4/v4 v4.1.16 github.com/pkg/xattr v0.4.9 + github.com/sergi/go-diff v1.3.1 github.com/shirou/gopsutil/v3 v3.22.10 go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.4 go.elastic.co/apm/module/apmhttp/v2 v2.4.4 @@ -340,7 +341,6 @@ require ( github.com/rootless-containers/rootlesskit v1.1.0 // indirect github.com/sanathkr/go-yaml v0.0.0-20170819195128-ed9d249f429b // indirect github.com/segmentio/asm v1.2.0 // indirect - github.com/sergi/go-diff v1.3.1 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 7b957099f6c..8e8b285042c 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -128,6 +128,9 @@ type ProcessingConfig struct { // is applied to events. If nil the Beat's default behavior prevails. EventNormalization *bool + // Disables the addition of input.type + DisableType bool + // Private contains additional information to be passed to the processing // pipeline builder. Private interface{} diff --git a/x-pack/filebeat/input/shipper/input.go b/x-pack/filebeat/input/shipper/input.go index 213e14be0eb..5cece851d9c 100644 --- a/x-pack/filebeat/input/shipper/input.go +++ b/x-pack/filebeat/input/shipper/input.go @@ -52,7 +52,6 @@ type InputManager struct { // NewInputManager creates a new shipper input manager func NewInputManager(log *logp.Logger) *InputManager { - log.Infof("creating new InputManager") return &InputManager{ log: log.Named("shipper-beat"), @@ -68,7 +67,6 @@ func (im *InputManager) Init(_ unison.Group, _ v2.Mode) error { // Create creates the input from a given config // in an attempt to speed things up, this will create the processors from the config before we have access to the pipeline to create the clients func (im *InputManager) Create(cfg *config.C) (v2.Input, error) { - config := Instance{} if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("error unpacking config: %w", err) @@ -135,7 +133,7 @@ func (in *shipperInput) Test(ctx v2.TestContext) error { // Stop the shipper func (in *shipperInput) Stop() { in.log.Infof("shipper shutting down") - //stop individual clients + // stop individual clients for streamID, stream := range in.streams { err := stream.client.Close() if err != nil { @@ -171,8 +169,11 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err PublishMode: beat.GuaranteedSend, EventListener: acker.TrackingCounter(in.acker.Track), Processing: beat.ProcessingConfig{ - Processor: streamProc.processors, + Processor: streamProc.processors, + DisableHost: true, + DisableType: true, }, + CloseRef: inputContext.Cancelation, }) if err != nil { @@ -184,7 +185,7 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err in.streams[streamID] = newStreamData } - //setup gRPC + // setup gRPC err := in.setupgRPC(pipeline) if err != nil { return fmt.Errorf("error starting shipper gRPC server: %w", err) @@ -264,7 +265,7 @@ func (in *shipperInput) setupgRPC(pipeline beat.Pipeline) error { } func (in *shipperInput) sendEvent(event *messages.Event) (uint64, error) { - //look for matching processor config + // look for matching processor config stream, ok := in.streams[event.Source.StreamId] if !ok { return 0, fmt.Errorf("could not find data stream associated with ID '%s'", event.Source.StreamId) diff --git a/x-pack/filebeat/tests/integration/shipper_test.go b/x-pack/filebeat/tests/integration/shipper_test.go index 35f76b73585..9b650c525b8 100644 --- a/x-pack/filebeat/tests/integration/shipper_test.go +++ b/x-pack/filebeat/tests/integration/shipper_test.go @@ -9,15 +9,18 @@ package integration import ( "context" "crypto/rand" + "encoding/json" "fmt" "os" "path/filepath" "testing" "time" + "github.com/sergi/go-diff/diffmatchpatch" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" "github.com/elastic/go-elasticsearch/v8/typedapi/types" @@ -75,13 +78,29 @@ setup.kibana: username: %s password: %s logging.level: debug + queue.mem: events: 100 flush.min_events: 0 +processors: +- add_fields: + target: data_stream + fields: + type: logs + namespace: generic + dataset: generic +- add_fields: + target: host + fields: + name: %s +- add_fields: + target: agent + fields: + type: metricbeat ` // check that file can be ingested normally and found in elasticsearch filebeat := integration.NewBeat(t, "filebeat", "../../filebeat.test") - filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword)) + filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg)) filebeat.Start() filebeat.WaitForLogs("Publish event: ", 10*time.Second) filebeat.WaitForLogs("PublishEvents: ", 10*time.Second) @@ -162,10 +181,18 @@ processors: type: logs namespace: generic dataset: generic +- add_fields: + target: host + fields: + name: %s +- add_fields: + target: agent + fields: + type: metricbeat ` // start filebeat with shipper output, make doc is ingested into elasticsearch fb2shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test") - fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword)) + fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg)) fb2shipper.Start() fb2shipper.WaitForLogs("Publish event: ", 10*time.Second) fb2shipper.WaitForLogs("events to protobuf", 10*time.Second) @@ -185,6 +212,59 @@ processors: require.NoError(t, err, "error doing search request: %s", err) return res.Hits.Total.Value == 2 }, 30*time.Second, 250*time.Millisecond, "never found 2 documents") - // ToDo add comparison of docs to make sure they are the same - // for example right now input.type is being overwritten with shipper + + res, err := es.Search(). + Index(".ds-filebeat-*"). + Request(&search.Request{ + Query: &types.Query{ + Match: map[string]types.MatchQuery{ + "message": { + Query: uniqMsg, + Operator: &operator.And, + }, + }, + }, + }).Do(context.Background()) + require.NoError(t, err, "error doing search request: %s", err) + require.Equal(t, int64(2), res.Hits.Total.Value) + diff, err := diffDocs(res.Hits.Hits[0].Source_, + res.Hits.Hits[1].Source_) + require.NoError(t, err, "error diffing docs") + if len(diff) != 0 { + t.Fatalf("docs differ:\n:%s\n", diff) + } +} + +func diffDocs(doc1 json.RawMessage, doc2 json.RawMessage) (string, error) { + fieldsToDrop := []string{ + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "elastic_agent.id", + } + var d1 map[string]interface{} + var d2 map[string]interface{} + + if err := json.Unmarshal(doc1, &d1); err != nil { + return "", err + } + + if err := json.Unmarshal(doc2, &d2); err != nil { + return "", err + } + f1 := mapstr.M(d1).Flatten() + f2 := mapstr.M(d2).Flatten() + + for _, key := range fieldsToDrop { + _ = f1.Delete(key) + _ = f2.Delete(key) + } + + dmp := diffmatchpatch.New() + diffs := dmp.DiffMain(f1.StringToPrint(), f2.StringToPrint(), false) + + if len(diffs) != 1 { + return dmp.DiffPrettyText(diffs), nil + } + return "", nil }