Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 16, 2023
1 parent 8797823 commit 1be0575
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 25 deletions.
1 change: 1 addition & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ func TestInvalidateAPIKeysRemoteOutput(t *testing.T) {
ack.invalidateAPIKeys(context.Background(), logger, out.ToRetireAPIKeyIds, "")

bulker.AssertExpectations(t)
remoteBulker.AssertExpectations(t)
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/bulk/bulk_remote_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_CheckRemoteOutputChanged(t *testing.T) {
func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) {
testcases := []struct {
name string
cfg map[string]interface{}
Expand Down Expand Up @@ -80,8 +80,9 @@ func Test_CheckRemoteOutputChanged(t *testing.T) {
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
bulker.remoteOutputConfigMap["remote1"] = tc.cfg
hasChanged := bulker.CheckRemoteOutputChanged(log, "remote1", tc.newCfg)
hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(log, "remote1", tc.newCfg)
assert.Equal(t, tc.changed, hasChanged)
assert.Equal(t, tc.newCfg, bulker.remoteOutputConfigMap["remote1"])
})
}
}
Expand Down
27 changes: 17 additions & 10 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ type Bulk interface {
// Accessor used to talk to elastic search direcly bypassing bulk engine
Client() *elasticsearch.Client

CheckRemoteOutputChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool

CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error)
GetBulker(outputName string) Bulk
GetBulkerMap() map[string]Bulk
Expand Down Expand Up @@ -121,8 +119,9 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker
apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)),
tracer: tracer,
remoteOutputConfigMap: make(map[string]map[string]interface{}),
bulkerMap: make(map[string]Bulk),
remoteOutputErrorMap: make(map[string]string),
// remote ES bulkers
bulkerMap: make(map[string]Bulk),
remoteOutputErrorMap: make(map[string]string),
}
}

Expand All @@ -146,8 +145,12 @@ func (b *Bulker) CancelFn() context.CancelFunc {
return b.cancelFn
}

// for remote ES output, create a new bulker in bulkerMap if does not exist
// if bulker exists for output, check if config changed
// if not changed, return the existing bulker
// if changed, stop the existing bulker and create a new one
func (b *Bulker) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) {
hasConfigChanged := b.CheckRemoteOutputChanged(zlog, outputName, outputMap[outputName])
hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName])
bulker := b.bulkerMap[outputName]
if bulker != nil && !hasConfigChanged {
return bulker, false, nil
Expand Down Expand Up @@ -181,6 +184,7 @@ func (b *Bulker) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serv
go func() {
select {
case err = <-errCh:
zlog.Error().Err(err).Str("outputName", outputName).Msg("Bulker error")
case <-bulkCtx.Done():
zlog.Debug().Str("outputName", outputName).Msg("Bulk context done")
err = bulkCtx.Err()
Expand Down Expand Up @@ -237,12 +241,8 @@ func (b *Bulker) Client() *elasticsearch.Client {
return client
}

func (b *Bulker) Tracer() *apm.Tracer {
return b.tracer
}

// check if remote output cfg changed
func (b *Bulker) CheckRemoteOutputChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
curCfg := b.remoteOutputConfigMap[name]

hasChanged := false
Expand Down Expand Up @@ -413,6 +413,13 @@ func (b *Bulker) Run(ctx context.Context) error {

}

// cancelling context of each remote bulker when Run exits
defer func() {
for _, bulker := range b.bulkerMap {
bulker.CancelFn()()
}
}()

return err
}

Expand Down
17 changes: 9 additions & 8 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,24 @@ func (p *Output) prepareElasticsearch(

// retire api key of removed remote output
var toRetireAPIKeys *model.ToRetireAPIKeyIdsItems
var removedOutputKey string
var removedOutputName string
// find the first output that is removed - supposing one output can be removed at a time
for agentOutputKey, agentOutput := range agent.Outputs {
for agentOutputName, agentOutput := range agent.Outputs {
found := false
for outputMapKey := range outputMap {
if agentOutputKey == outputMapKey {
if agentOutputName == outputMapKey {
found = true
break
}
}
if !found {
zlog.Info().Str("APIKeyID", agentOutput.APIKeyID).Str("output", agentOutputKey).Msg("Output removed, will retire API key")
zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str("outputName", agentOutputName).Msg("Output removed, will retire API key")
toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{
ID: agentOutput.APIKeyID,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
Output: agentOutputKey,
Output: agentOutputName,
}
removedOutputKey = agentOutputKey
removedOutputName = agentOutputName
break
}
}
Expand All @@ -156,7 +157,7 @@ func (p *Output) prepareElasticsearch(
body, err = json.Marshal(map[string]interface{}{
"script": map[string]interface{}{
"lang": "painless",
"source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputKey),
"source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputName),
},
})
if err != nil {
Expand Down Expand Up @@ -262,7 +263,7 @@ func (p *Output) prepareElasticsearch(
// reporting output error status to self monitor and not returning the error to keep fleet-server running
if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch {
if err != nil {
zerolog.Ctx(ctx).Warn().Msg("Could not create API key in remote ES")
zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES")
bulker.SetRemoteOutputError(p.Name, err.Error())
} else {
bulker.SetRemoteOutputError(p.Name, "")
Expand Down
5 changes: 0 additions & 5 deletions internal/pkg/testing/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ func (m *MockBulk) CreateAndGetBulker(zlog zerolog.Logger, outputName string, se
return args.Get(0).(bulk.Bulk), args.Get(1).(bool), nil
}

func (m *MockBulk) CheckRemoteOutputChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
args := m.Called()
return args.Get(0).(bool)
}

func (m *MockBulk) GetRemoteOutputErrorMap() map[string]string {
args := m.Called()
return args.Get(0).(map[string]string)
Expand Down

0 comments on commit 1be0575

Please sign in to comment.