From f4535a4c84c071541a82295cbc6350dfc13de2bd Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 11 Sep 2023 11:22:12 +0200 Subject: [PATCH] Support flattened data_stream.* fields under Elastic-Agent (#36516) When an input configuration comes from Elastic-Agent the data_stream.* fields sometimes are flattened (e.g: data_stream.dataset: mystuff), this was not supported by Beats. This commit enables support for it. --- CHANGELOG.next.asciidoc | 1 + x-pack/libbeat/management/generate.go | 79 +++++++++++- x-pack/libbeat/management/generate_test.go | 84 +++++++++++++ x-pack/libbeat/management/managerV2_test.go | 130 ++++++++++++++++++++ 4 files changed, 293 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 60053a5c393..a0af35209b5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Support build of projects outside of beats directory {pull}36126[36126] - Add default cgroup regex for add_process_metadata processor {pull}36484[36484] {issue}32961[32961] - Fix environment capture by `add_process_metadata` processor. {issue}36469[36469] {pull}36471[36471] +- Support fattened `data_stream` object when running under Elastic-Agent {pr}36516[36516] *Auditbeat* diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index 59537e06686..3bdb1f29c6a 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -5,8 +5,11 @@ package management import ( + "errors" "fmt" + "google.golang.org/protobuf/types/known/structpb" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -82,10 +85,78 @@ func handleSimpleConfig(raw *proto.UnitExpectedConfig) (map[string]any, error) { return m, nil } +// dataStreamAndSource is a generic way to represent proto mesages +// that contain a source field and a datastream field. +type dataStreamAndSource interface { + GetDataStream() *proto.DataStream + GetSource() *structpb.Struct +} + +// deDotDataStream reads any datastream value from the dotted notation +// (data_stream.*) and returns it as a *proto.DataStream. If raw already +// contains a DataStream but no fields are duplicated, then the values are merged. +func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { + ds := raw.GetDataStream() + if ds == nil { + ds = &proto.DataStream{} + } + + tmp := struct { + DataStream struct { + Dataset string `config:"dataset" yaml:"dataset"` + Type string `config:"type" yaml:"type"` + Namespace string `config:"namespace" yaml:"namespace"` + } `config:"data_stream" yaml:"data_stream"` + }{} + + cfg, err := conf.NewConfigFrom(raw.GetSource().AsMap()) + if err != nil { + return nil, fmt.Errorf("cannot generate config from source field: %w", err) + } + + if err := cfg.Unpack(&tmp); err != nil { + return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) + } + + if ds.Dataset != "" && tmp.DataStream.Dataset != "" { + return nil, errors.New("duplicated key 'datastream.dataset'") + } + + if ds.Type != "" && tmp.DataStream.Type != "" { + return nil, errors.New("duplicated key 'datastream.type'") + } + + if ds.Namespace != "" && tmp.DataStream.Namespace != "" { + return nil, errors.New("duplicated key 'datastream.namespace'") + } + + ret := &proto.DataStream{ + Dataset: merge(tmp.DataStream.Dataset, ds.Dataset), + Type: merge(tmp.DataStream.Type, ds.Type), + Namespace: merge(tmp.DataStream.Namespace, ds.Namespace), + } + + return ret, nil +} + +// merge returns b if a is an empty string +func merge(a, b string) string { + if a == "" { + return b + } + return a +} + // CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values // that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/ // This also performs the basic task of inserting module-level add_field processors into the inputs/modules. func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { + ds, err := deDotDataStream(raw) + if err != nil { + return nil, fmt.Errorf("could not read 'data_stream': %w", err) + } + raw.DataStream = ds + // If there are no streams, we fall into the 'simple input config' case, // this means the key configuration values are on the root level instead of // an element in the `streams` array. @@ -106,8 +177,14 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamTyp inputs := make([]map[string]interface{}, len(raw.GetStreams())) for iter, stream := range raw.GetStreams() { + ds, err := deDotDataStream(stream) + if err != nil { + return nil, fmt.Errorf("could not read 'data_stream' from stream ID '%s': %w", + stream.GetId(), err) + } + stream.DataStream = ds streamSource := raw.GetStreams()[iter].GetSource().AsMap() - streamSource, err := createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) + streamSource, err = createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) if err != nil { return nil, fmt.Errorf("error creating stream rules: %w", err) } diff --git a/x-pack/libbeat/management/generate_test.go b/x-pack/libbeat/management/generate_test.go index fb7f88ff775..9c0c7df72a1 100644 --- a/x-pack/libbeat/management/generate_test.go +++ b/x-pack/libbeat/management/generate_test.go @@ -7,8 +7,10 @@ package management import ( "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -230,3 +232,85 @@ func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo * require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) return cfgMap } + +func TestDeDotDataStream(t *testing.T) { + testCases := map[string]struct { + source map[string]any + dataStream *proto.DataStream + wantError bool + expectedDataStream *proto.DataStream + }{ + "all data is flattened": { + source: map[string]any{ + "data_stream.dataset": "my dataset", + "data_stream.namespace": "my namespace", + "data_stream.type": "my type", + }, + expectedDataStream: &proto.DataStream{ + Dataset: "my dataset", + Namespace: "my namespace", + Type: "my type", + }, + }, + "no data is flattened": { + dataStream: &proto.DataStream{ + Dataset: "my dataset", + Namespace: "my namespace", + Type: "my type", + }, + expectedDataStream: &proto.DataStream{ + Dataset: "my dataset", + Namespace: "my namespace", + Type: "my type", + }, + }, + "mix of flattened and data_stream": { + dataStream: &proto.DataStream{ + Dataset: "my dataset", + Type: "my type", + }, + source: map[string]any{ + "data_stream.namespace": "my namespace", + }, + expectedDataStream: &proto.DataStream{ + Dataset: "my dataset", + Namespace: "my namespace", + Type: "my type", + }, + }, + "duplicated keys generate error": { + dataStream: &proto.DataStream{ + Dataset: "my dataset", + }, + source: map[string]any{ + "data_stream.dataset": "another dataset", + }, + wantError: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + raw := &proto.UnitExpectedConfig{ + Source: requireNewStruct(t, tc.source), + DataStream: tc.dataStream, + } + + final, err := deDotDataStream(raw) + if tc.wantError { + if err == nil { + t.Error("expecting an error") + } + return + } + if err != nil { + t.Fatalf("deDotDataStream returned an error: %s", err) + } + + if !cmp.Equal(final, tc.expectedDataStream, protocmp.Transform()) { + t.Errorf("expecting a different value: --got/++want\n'%s'", + cmp.Diff(final, tc.expectedDataStream, protocmp.Transform())) + } + }) + } +} diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 9fe238605b4..217e54c8fe3 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -511,6 +511,136 @@ func TestErrorPerUnit(t *testing.T) { }, 10*time.Second, 100*time.Millisecond, "desired state, was not reached") } +func TestFlattenedDataStreams(t *testing.T) { + stateReached := atomic.Bool{} + + expectedDataset := "my-dataset" + expectedNamespace := "my-namespace" + expectedType := "my-type" + expectedIndex := fmt.Sprintf("%s-%s-%s", + expectedType, expectedDataset, expectedNamespace) + + r := reload.NewRegistry() + + output := &mockOutput{ + ReloadFn: func(config *reload.ConfigWithMeta) error { + return nil + }, + } + r.MustRegisterOutput(output) + + inputs := &mockReloadable{ + ReloadFn: func(configs []*reload.ConfigWithMeta) error { + for _, input := range configs { + tmp := struct { + Index string `config:"index" yaml:"index"` + }{} + + if err := input.Config.Unpack(&tmp); err != nil { + t.Fatalf("error unpacking config: %s", err) + } + + if tmp.Index != expectedIndex { + t.Fatalf("expecting index %q, got %q", expectedIndex, tmp.Index) + } + + stateReached.Store(true) + } + return nil + }, + } + r.MustRegisterInput(inputs) + + outputUnit := proto.UnitExpected{ + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + State: proto.State_HEALTHY, + ConfigStateIdx: 1, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "mock", + Name: "mock", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "Is": "this", + "required?": "Yes!", + }), + }, + } + + inputUnit1 := proto.UnitExpected{ + Id: "input-unit1", + Type: proto.UnitType_INPUT, + State: proto.State_HEALTHY, + ConfigStateIdx: 1, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "input-unit-config-id", + Type: "filestream", + Name: "foo", + Source: requireNewStruct(t, map[string]any{ + "data_stream.dataset": expectedDataset, + "data_stream.namespace": expectedNamespace, + "data_stream.type": expectedType, + }), + Streams: []*proto.Stream{ + { + Id: "filestream-id", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "input-unit1", + }), + }, + }, + }, + } + units := []*proto.UnitExpected{ + &outputUnit, + &inputUnit1, + } + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + // Nothing to do here, just keep sending the same units. + return &proto.CheckinExpected{ + Units: units, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + if err := server.Start(); err != nil { + t.Fatalf("could not start mock Elastic-Agent server: %s", err) + } + defer server.Stop() + + client := client.NewV2( + fmt.Sprintf(":%d", server.Port), + "", + client.VersionInfo{}, + grpc.WithTransportCredentials(insecure.NewCredentials())) + + m, err := NewV2AgentManagerWithClient( + &Config{ + Enabled: true, + }, + r, + client, + ) + if err != nil { + t.Fatalf("could not instantiate ManagerV2: %s", err) + } + + if err := m.Start(); err != nil { + t.Fatalf("could not start ManagerV2: %s", err) + } + defer m.Stop() + + require.Eventually(t, func() bool { + return stateReached.Load() + }, 10*time.Second, 100*time.Millisecond, + "did not find expected 'index' field on input final config") +} + type reloadable struct { mx sync.Mutex config *reload.ConfigWithMeta