From 1be057590bc9ad13ab507297c3b0fff1a444325d Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 16 Nov 2023 11:17:58 +0100 Subject: [PATCH] review comments --- internal/pkg/api/handleAck_test.go | 1 + internal/pkg/bulk/bulk_remote_output_test.go | 5 ++-- internal/pkg/bulk/engine.go | 27 ++++++++++++-------- internal/pkg/policy/policy_output.go | 17 ++++++------ internal/pkg/testing/bulk.go | 5 ---- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index d6f5f0d01..1cd0b9019 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -608,6 +608,7 @@ func TestInvalidateAPIKeysRemoteOutput(t *testing.T) { ack.invalidateAPIKeys(context.Background(), logger, out.ToRetireAPIKeyIds, "") bulker.AssertExpectations(t) + remoteBulker.AssertExpectations(t) } } diff --git a/internal/pkg/bulk/bulk_remote_output_test.go b/internal/pkg/bulk/bulk_remote_output_test.go index 488735215..5b5dab693 100644 --- a/internal/pkg/bulk/bulk_remote_output_test.go +++ b/internal/pkg/bulk/bulk_remote_output_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_CheckRemoteOutputChanged(t *testing.T) { +func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) { testcases := []struct { name string cfg map[string]interface{} @@ -80,8 +80,9 @@ func Test_CheckRemoteOutputChanged(t *testing.T) { log := testlog.SetLogger(t) bulker := NewBulker(nil, nil) bulker.remoteOutputConfigMap["remote1"] = tc.cfg - hasChanged := bulker.CheckRemoteOutputChanged(log, "remote1", tc.newCfg) + hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(log, "remote1", tc.newCfg) assert.Equal(t, tc.changed, hasChanged) + assert.Equal(t, tc.newCfg, bulker.remoteOutputConfigMap["remote1"]) }) } } diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 13d163464..3efa08388 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -68,8 +68,6 @@ type Bulk interface { // Accessor used to talk to elastic search direcly bypassing bulk engine Client() *elasticsearch.Client - CheckRemoteOutputChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool - CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) GetBulker(outputName string) Bulk GetBulkerMap() map[string]Bulk @@ -121,8 +119,9 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)), tracer: tracer, remoteOutputConfigMap: make(map[string]map[string]interface{}), - bulkerMap: make(map[string]Bulk), - remoteOutputErrorMap: make(map[string]string), + // remote ES bulkers + bulkerMap: make(map[string]Bulk), + remoteOutputErrorMap: make(map[string]string), } } @@ -146,8 +145,12 @@ func (b *Bulker) CancelFn() context.CancelFunc { return b.cancelFn } +// for remote ES output, create a new bulker in bulkerMap if does not exist +// if bulker exists for output, check if config changed +// if not changed, return the existing bulker +// if changed, stop the existing bulker and create a new one func (b *Bulker) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) { - hasConfigChanged := b.CheckRemoteOutputChanged(zlog, outputName, outputMap[outputName]) + hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName]) bulker := b.bulkerMap[outputName] if bulker != nil && !hasConfigChanged { return bulker, false, nil @@ -181,6 +184,7 @@ func (b *Bulker) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serv go func() { select { case err = <-errCh: + zlog.Error().Err(err).Str("outputName", outputName).Msg("Bulker error") case <-bulkCtx.Done(): zlog.Debug().Str("outputName", outputName).Msg("Bulk context done") err = bulkCtx.Err() @@ -237,12 +241,8 @@ func (b *Bulker) Client() *elasticsearch.Client { return client } -func (b *Bulker) Tracer() *apm.Tracer { - return b.tracer -} - // check if remote output cfg changed -func (b *Bulker) CheckRemoteOutputChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { +func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { curCfg := b.remoteOutputConfigMap[name] hasChanged := false @@ -413,6 +413,13 @@ func (b *Bulker) Run(ctx context.Context) error { } + // cancelling context of each remote bulker when Run exits + defer func() { + for _, bulker := range b.bulkerMap { + bulker.CancelFn()() + } + }() + return err } diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 4cb014116..cf29367ff 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -113,23 +113,24 @@ func (p *Output) prepareElasticsearch( // retire api key of removed remote output var toRetireAPIKeys *model.ToRetireAPIKeyIdsItems - var removedOutputKey string + var removedOutputName string // find the first output that is removed - supposing one output can be removed at a time - for agentOutputKey, agentOutput := range agent.Outputs { + for agentOutputName, agentOutput := range agent.Outputs { found := false for outputMapKey := range outputMap { - if agentOutputKey == outputMapKey { + if agentOutputName == outputMapKey { found = true + break } } if !found { - zlog.Info().Str("APIKeyID", agentOutput.APIKeyID).Str("output", agentOutputKey).Msg("Output removed, will retire API key") + zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str("outputName", agentOutputName).Msg("Output removed, will retire API key") toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{ ID: agentOutput.APIKeyID, RetiredAt: time.Now().UTC().Format(time.RFC3339), - Output: agentOutputKey, + Output: agentOutputName, } - removedOutputKey = agentOutputKey + removedOutputName = agentOutputName break } } @@ -156,7 +157,7 @@ func (p *Output) prepareElasticsearch( body, err = json.Marshal(map[string]interface{}{ "script": map[string]interface{}{ "lang": "painless", - "source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputKey), + "source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputName), }, }) if err != nil { @@ -262,7 +263,7 @@ func (p *Output) prepareElasticsearch( // reporting output error status to self monitor and not returning the error to keep fleet-server running if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch { if err != nil { - zerolog.Ctx(ctx).Warn().Msg("Could not create API key in remote ES") + zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES") bulker.SetRemoteOutputError(p.Name, err.Error()) } else { bulker.SetRemoteOutputError(p.Name, "") diff --git a/internal/pkg/testing/bulk.go b/internal/pkg/testing/bulk.go index 0dc5e0543..8291dc2df 100644 --- a/internal/pkg/testing/bulk.go +++ b/internal/pkg/testing/bulk.go @@ -95,11 +95,6 @@ func (m *MockBulk) CreateAndGetBulker(zlog zerolog.Logger, outputName string, se return args.Get(0).(bulk.Bulk), args.Get(1).(bool), nil } -func (m *MockBulk) CheckRemoteOutputChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { - args := m.Called() - return args.Get(0).(bool) -} - func (m *MockBulk) GetRemoteOutputErrorMap() map[string]string { args := m.Called() return args.Get(0).(map[string]string)