Skip to content

Commit

Permalink
Fix race detector error
Browse files Browse the repository at this point in the history
  • Loading branch information
adcharre committed Nov 1, 2023
1 parent fa15461 commit 5bf3d08
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions collector/processor/decoupleprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package decoupleprocessor

import (
"context"
"errors"
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/processor/processortest"
"sync"
"testing"
"time"
)
Expand All @@ -21,6 +23,9 @@ func (m *MockLifecycleNotifier) AddListener(l lambdalifecycle.Listener) {
type MockConsumer struct {
info client.Info
dataReceived chan any
lock sync.Mutex
gotData bool
data any
}

func (m *MockConsumer) consume(ctx context.Context, data any) error {
Expand All @@ -29,6 +34,32 @@ func (m *MockConsumer) consume(ctx context.Context, data any) error {
return nil
}

func (m *MockConsumer) receiveDataAfter(d time.Duration) {
go func() {
time.Sleep(d)
m.gotData = false
select {
case data := <-m.dataReceived:
m.lock.Lock()
m.data = data
m.gotData = true
m.lock.Unlock()
}
}()
}

func (m *MockConsumer) getReceivedData() (any, error) {
m.lock.Lock()
gotData := m.gotData
data := m.data
m.lock.Unlock()
if gotData {
return data, nil
}

return nil, errors.New("no data received")
}

func newMockConsumer() *MockConsumer {
m := MockConsumer{
info: client.Info{},
Expand Down Expand Up @@ -120,14 +151,12 @@ func TestLifecycle(t *testing.T) {
expectedData := "data"
dp.queueData(client.NewContext(context.Background(), client.Info{}), expectedData)
start := time.Now()
var data any
go func() {
time.Sleep(200 * time.Millisecond)
data = <-consumer.dataReceived
}()
consumer.receiveDataAfter(200 * time.Millisecond)
dp.FunctionFinished()
finish := time.Now()
require.WithinRange(t, finish, start.Add(200*time.Millisecond), start.Add(300*time.Millisecond))
data, err := consumer.getReceivedData()
require.NoError(t, err)
require.Equal(t, expectedData, data)

dp.EnvironmentShutdown()
Expand All @@ -146,14 +175,12 @@ func TestLifecycle(t *testing.T) {
expectedData := "data"
dp.queueData(client.NewContext(context.Background(), client.Info{}), expectedData)
start := time.Now()
var data any
go func() {
time.Sleep(200 * time.Millisecond)
data = <-consumer.dataReceived
}()
consumer.receiveDataAfter(200 * time.Millisecond)
require.NoError(t, dp.shutdown(context.Background()))
finish := time.Now()
require.WithinRange(t, finish, start.Add(200*time.Millisecond), start.Add(300*time.Millisecond))
data, err := consumer.getReceivedData()
require.NoError(t, err)
require.Equal(t, expectedData, data)
})
}

0 comments on commit 5bf3d08

Please sign in to comment.