Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 2, 2023
1 parent a52b32b commit 41f4ced
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
11 changes: 9 additions & 2 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,18 @@ func (b *Bulker) Tracer() *apm.Tracer {
func (b *Bulker) CheckRemoteOutputChanged(name string, newCfg map[string]interface{}) {
curCfg := b.remoteOutputConfigMap[name]

// ignore output sent to agents where type is set to elasticsearch
if newCfg["type"] == "elasticsearch" {
return
}
b.remoteOutputConfigMap[name] = newCfg
if curCfg == nil {
return
}
if !reflect.DeepEqual(curCfg, newCfg) {
log.Info().Str("name", name).Msg("remote output configuration has changed")
log.Info().Str("name", name).Any("curCfg", curCfg).Any("newCfg", newCfg).Msg("remote output configuration has changed")
b.remoteOutputCh <- true
}
b.remoteOutputConfigMap[name] = curCfg
}

func (b *Bulker) RemoteOutputCh() chan bool {
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B
}
case OutputTypeRemoteElasticsearch:
zlog.Debug().Msg("preparing remote elasticsearch output")
newBulker, err := p.createAndGetBulker(ctx, bulker, outputMap)
newBulker, err := p.createAndGetBulker(bulker, outputMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -272,10 +272,10 @@ func (p *Output) prepareElasticsearch(
return nil
}

func (p *Output) createAndGetBulker(ctx context.Context, mainBulker bulk.Bulk, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) {
func (p *Output) createAndGetBulker(mainBulker bulk.Bulk, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) {
mainBulker.CheckRemoteOutputChanged(p.Name, outputMap[p.Name])
bulker := bulkerMap[p.Name]
if bulker != nil {
mainBulker.CheckRemoteOutputChanged(p.Name, outputMap[p.Name])
return bulker, nil
}
bulkCtx, bulkCancel := context.WithCancel(context.Background())
Expand All @@ -296,7 +296,7 @@ func (p *Output) createAndGetBulker(ctx context.Context, mainBulker bulk.Bulk, o

errCh <- runFunc()
}()
go func() (err error) {
go func() {
select {
case err = <-errCh:
case <-bulkCtx.Done():
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/server/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,11 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) {
return err
}

select {
case outputChanged := <-bulker.RemoteOutputCh():
go func() {
outputChanged := <-bulker.RemoteOutputCh()
f.outputCh <- outputChanged
log.Info().Msg("Remote output configuration update")
}
}()

return g.Wait()
}
Expand Down

0 comments on commit 41f4ced

Please sign in to comment.