From 41f4ced5e635fbcb8b1d8cd6e4e451a5b8f97553 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 2 Nov 2023 15:42:56 +0100 Subject: [PATCH] fixes --- internal/pkg/bulk/engine.go | 11 +++++++++-- internal/pkg/policy/policy_output.go | 8 ++++---- internal/pkg/server/fleet.go | 6 +++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 3aa5cf71d..c6f7e80fb 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -135,11 +135,18 @@ func (b *Bulker) Tracer() *apm.Tracer { func (b *Bulker) CheckRemoteOutputChanged(name string, newCfg map[string]interface{}) { curCfg := b.remoteOutputConfigMap[name] + // ignore output sent to agents where type is set to elasticsearch + if newCfg["type"] == "elasticsearch" { + return + } + b.remoteOutputConfigMap[name] = newCfg + if curCfg == nil { + return + } if !reflect.DeepEqual(curCfg, newCfg) { - log.Info().Str("name", name).Msg("remote output configuration has changed") + log.Info().Str("name", name).Any("curCfg", curCfg).Any("newCfg", newCfg).Msg("remote output configuration has changed") b.remoteOutputCh <- true } - b.remoteOutputConfigMap[name] = curCfg } func (b *Bulker) RemoteOutputCh() chan bool { diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index d66257d41..35bf009a4 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -66,7 +66,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B } case OutputTypeRemoteElasticsearch: zlog.Debug().Msg("preparing remote elasticsearch output") - newBulker, err := p.createAndGetBulker(ctx, bulker, outputMap) + newBulker, err := p.createAndGetBulker(bulker, outputMap) if err != nil { return err } @@ -272,10 +272,10 @@ func (p *Output) prepareElasticsearch( return nil } -func (p *Output) createAndGetBulker(ctx context.Context, mainBulker bulk.Bulk, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) { +func (p *Output) createAndGetBulker(mainBulker bulk.Bulk, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) { + mainBulker.CheckRemoteOutputChanged(p.Name, outputMap[p.Name]) bulker := bulkerMap[p.Name] if bulker != nil { - mainBulker.CheckRemoteOutputChanged(p.Name, outputMap[p.Name]) return bulker, nil } bulkCtx, bulkCancel := context.WithCancel(context.Background()) @@ -296,7 +296,7 @@ func (p *Output) createAndGetBulker(ctx context.Context, mainBulker bulk.Bulk, o errCh <- runFunc() }() - go func() (err error) { + go func() { select { case err = <-errCh: case <-bulkCtx.Done(): diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 8a4af86b6..dc13af93d 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -421,11 +421,11 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) { return err } - select { - case outputChanged := <-bulker.RemoteOutputCh(): + go func() { + outputChanged := <-bulker.RemoteOutputCh() f.outputCh <- outputChanged log.Info().Msg("Remote output configuration update") - } + }() return g.Wait() }