Skip to content

Commit

Permalink
fixed test, replace semaphore with mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 27, 2023
1 parent 9382975 commit 0006533
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
12 changes: 6 additions & 6 deletions internal/pkg/bulk/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ func TestCancelCtxChildBulker(t *testing.T) {
func TestCancelCtxChildBulkerReplaced(t *testing.T) {
bulker := NewBulker(nil, nil)

ctx, _ := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = testlog.SetLogger(t).WithContext(ctx)

logger := testlog.SetLogger(t)
Expand Down Expand Up @@ -345,13 +346,12 @@ func TestCancelCtxChildBulkerReplaced(t *testing.T) {
go func() {
defer wg.Done()

_, err := childBulker.APIKeyAuth(ctx, apikey.APIKey{})
err := childBulker.APIKeyUpdate(ctx, "", "", make([]byte, 0))

t.Log(err)
// TODO not context canceled error
// if !errors.Is(err, context.Canceled) {
// t.Error("Expected context cancel err: ", err)
// }
if !errors.Is(err, context.Canceled) {
t.Error("Expected context cancel err: ", err)
}
}()

wg.Wait()
Expand Down
9 changes: 3 additions & 6 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Bulker struct {
remoteOutputConfigMap map[string]map[string]interface{}
bulkerMap map[string]Bulk
cancelFn context.CancelFunc
remoteOutputLimit *semaphore.Weighted
remoteOutputMutex sync.RWMutex

Check failure on line 91 in internal/pkg/bulk/engine.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed (goimports)
}

const (
Expand Down Expand Up @@ -119,7 +119,6 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker
remoteOutputConfigMap: make(map[string]map[string]interface{}),
// remote ES bulkers
bulkerMap: make(map[string]Bulk),
remoteOutputLimit: semaphore.NewWeighted(1),
}
}

Expand All @@ -137,10 +136,8 @@ func (b *Bulker) CancelFn() context.CancelFunc {

func (b *Bulker) updateBulkerMap(ctx context.Context, outputName string, newBulker *Bulker) error {

Check failure on line 137 in internal/pkg/bulk/engine.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`(*Bulker).updateBulkerMap` - `ctx` is unused (unparam)
// concurrency control of updating map
if err := b.remoteOutputLimit.Acquire(ctx, 1); err != nil {
return err
}
defer b.remoteOutputLimit.Release(1)
b.remoteOutputMutex.Lock()
defer b.remoteOutputMutex.Unlock()

b.bulkerMap[outputName] = newBulker
return nil
Expand Down

0 comments on commit 0006533

Please sign in to comment.