Skip to content

Commit

Permalink
added semaphore for updating bulkerMap
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 21, 2023
1 parent 10021a8 commit 4c99949
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
13 changes: 10 additions & 3 deletions internal/pkg/bulk/bulk_remote_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package bulk

import (
"context"
"testing"

testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
Expand Down Expand Up @@ -88,6 +89,8 @@ func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) {
}

func Test_CreateAndGetBulkerNew(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
outputMap := make(map[string]map[string]interface{})
Expand All @@ -96,13 +99,15 @@ func Test_CreateAndGetBulkerNew(t *testing.T) {
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "token1",
}
newBulker, hasChanged, err := bulker.CreateAndGetBulker(log, "remote1", "token1", outputMap)
newBulker, hasChanged, err := bulker.CreateAndGetBulker(ctx, log, "remote1", "token1", outputMap)
assert.NotNil(t, newBulker)
assert.Equal(t, false, hasChanged)
assert.Nil(t, err)
}

func Test_CreateAndGetBulkerExisting(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
outputBulker := NewBulker(nil, nil)
Expand All @@ -115,13 +120,15 @@ func Test_CreateAndGetBulkerExisting(t *testing.T) {
}
bulker.remoteOutputConfigMap["remote1"] = cfg
outputMap["remote1"] = cfg
newBulker, hasChanged, err := bulker.CreateAndGetBulker(log, "remote1", "token1", outputMap)
newBulker, hasChanged, err := bulker.CreateAndGetBulker(ctx, log, "remote1", "token1", outputMap)
assert.Equal(t, outputBulker, newBulker)
assert.Equal(t, false, hasChanged)
assert.Nil(t, err)
}

func Test_CreateAndGetBulkerChanged(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
outputBulker := NewBulker(nil, nil)
Expand All @@ -139,7 +146,7 @@ func Test_CreateAndGetBulkerChanged(t *testing.T) {
}
cancelFnCalled := false
outputBulker.cancelFn = func() { cancelFnCalled = true }
newBulker, hasChanged, err := bulker.CreateAndGetBulker(log, "remote1", "token2", outputMap)
newBulker, hasChanged, err := bulker.CreateAndGetBulker(ctx, log, "remote1", "token2", outputMap)
assert.NotEqual(t, outputBulker, newBulker)
assert.Equal(t, true, hasChanged)
assert.Nil(t, err)
Expand Down
25 changes: 21 additions & 4 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Bulk interface {
// Accessor used to talk to elastic search direcly bypassing bulk engine
Client() *elasticsearch.Client

CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error)
CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error)
GetBulker(outputName string) Bulk
GetBulkerMap() map[string]Bulk
GetRemoteOutputErrorMap() map[string]string
Expand All @@ -91,6 +91,7 @@ type Bulker struct {
bulkerMap map[string]Bulk
remoteOutputErrorMap map[string]string
cancelFn context.CancelFunc
remoteOutputLimit *semaphore.Weighted
}

const (
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker
// remote ES bulkers
bulkerMap: make(map[string]Bulk),
remoteOutputErrorMap: make(map[string]string),
remoteOutputLimit: semaphore.NewWeighted(1),
}
}

Expand All @@ -130,6 +132,7 @@ func (b *Bulker) GetRemoteOutputErrorMap() map[string]string {
}

func (b *Bulker) SetRemoteOutputError(name string, status string) {
// TODO concurrency control of updating map
b.remoteOutputErrorMap[name] = status
}

Expand All @@ -145,11 +148,22 @@ func (b *Bulker) CancelFn() context.CancelFunc {
return b.cancelFn
}

func (b *Bulker) updateBulkerMap(ctx context.Context, outputName string, newBulker *Bulker) error {
// concurrency control of updating map
if err := b.remoteOutputLimit.Acquire(ctx, 1); err != nil {
return err
}
defer b.remoteOutputLimit.Release(1)

b.bulkerMap[outputName] = newBulker
return nil
}

// for remote ES output, create a new bulker in bulkerMap if does not exist
// if bulker exists for output, check if config changed
// if not changed, return the existing bulker
// if changed, stop the existing bulker and create a new one
func (b *Bulker) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) {
func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) {
hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName])
bulker := b.bulkerMap[outputName]
if bulker != nil && !hasConfigChanged {
Expand All @@ -170,7 +184,11 @@ func (b *Bulker) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serv
// starting a new bulker to create/update API keys for remote ES output
newBulker := NewBulker(es, b.tracer)
newBulker.cancelFn = bulkCancel
b.bulkerMap[outputName] = newBulker

err = b.updateBulkerMap(ctx, outputName, newBulker)
if err != nil {
return nil, hasConfigChanged, err
}

errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -247,7 +265,6 @@ func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name

hasChanged := false

// TODO remoteOutputConfigMap empty when FS restarts - won't detect changes
// when output config first added, not reporting change
if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) {
zlog.Info().Str("name", name).Msg("remote output configuration has changed")
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B
}
case OutputTypeRemoteElasticsearch:
zlog.Debug().Msg("preparing remote elasticsearch output")
newBulker, hasConfigChanged, err := bulker.CreateAndGetBulker(zlog, p.Name, p.ServiceToken, outputMap)
newBulker, hasConfigChanged, err := bulker.CreateAndGetBulker(ctx, zlog, p.Name, p.ServiceToken, outputMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -265,7 +265,7 @@ func (p *Output) prepareElasticsearch(
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES")
bulker.SetRemoteOutputError(p.Name, err.Error())
} else {
} else if bulker.GetRemoteOutputErrorMap()[p.Name] != "" {
bulker.SetRemoteOutputError(p.Name, "")
}

Expand All @@ -274,7 +274,7 @@ func (p *Output) prepareElasticsearch(
// remove the service token from the agent policy sent to the agent
delete(outputMap[p.Name], FieldOutputServiceToken)
return nil
} else if p.Type == OutputTypeRemoteElasticsearch {
} else if p.Type == OutputTypeRemoteElasticsearch && bulker.GetRemoteOutputErrorMap()[p.Name] != "" {
bulker.SetRemoteOutputError(p.Name, "")
}
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/testing/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (m *MockBulk) GetBulkerMap() map[string]bulk.Bulk {
return args.Get(0).(map[string]bulk.Bulk)
}

func (m *MockBulk) CreateAndGetBulker(zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (bulk.Bulk, bool, error) {
args := m.Called(zlog, outputName, serviceToken, outputMap)
func (m *MockBulk) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (bulk.Bulk, bool, error) {
args := m.Called(ctx, zlog, outputName, serviceToken, outputMap)
return args.Get(0).(bulk.Bulk), args.Get(1).(bool), nil
}

Expand Down

0 comments on commit 4c99949

Please sign in to comment.