diff --git a/collector/processor/decoupleprocessor/processor_test.go b/collector/processor/decoupleprocessor/processor_test.go index 41e857e112..4e8a5f714d 100644 --- a/collector/processor/decoupleprocessor/processor_test.go +++ b/collector/processor/decoupleprocessor/processor_test.go @@ -2,10 +2,12 @@ package decoupleprocessor import ( "context" + "errors" "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/processor/processortest" + "sync" "testing" "time" ) @@ -21,6 +23,9 @@ func (m *MockLifecycleNotifier) AddListener(l lambdalifecycle.Listener) { type MockConsumer struct { info client.Info dataReceived chan any + lock sync.Mutex + gotData bool + data any } func (m *MockConsumer) consume(ctx context.Context, data any) error { @@ -29,6 +34,32 @@ func (m *MockConsumer) consume(ctx context.Context, data any) error { return nil } +func (m *MockConsumer) receiveDataAfter(d time.Duration) { + go func() { + time.Sleep(d) + m.gotData = false + select { + case data := <-m.dataReceived: + m.lock.Lock() + m.data = data + m.gotData = true + m.lock.Unlock() + } + }() +} + +func (m *MockConsumer) getReceivedData() (any, error) { + m.lock.Lock() + gotData := m.gotData + data := m.data + m.lock.Unlock() + if gotData { + return data, nil + } + + return nil, errors.New("no data received") +} + func newMockConsumer() *MockConsumer { m := MockConsumer{ info: client.Info{}, @@ -120,14 +151,12 @@ func TestLifecycle(t *testing.T) { expectedData := "data" dp.queueData(client.NewContext(context.Background(), client.Info{}), expectedData) start := time.Now() - var data any - go func() { - time.Sleep(200 * time.Millisecond) - data = <-consumer.dataReceived - }() + consumer.receiveDataAfter(200 * time.Millisecond) dp.FunctionFinished() finish := time.Now() require.WithinRange(t, finish, start.Add(200*time.Millisecond), start.Add(300*time.Millisecond)) + data, err := consumer.getReceivedData() + require.NoError(t, err) require.Equal(t, expectedData, data) dp.EnvironmentShutdown() @@ -146,14 +175,12 @@ func TestLifecycle(t *testing.T) { expectedData := "data" dp.queueData(client.NewContext(context.Background(), client.Info{}), expectedData) start := time.Now() - var data any - go func() { - time.Sleep(200 * time.Millisecond) - data = <-consumer.dataReceived - }() + consumer.receiveDataAfter(200 * time.Millisecond) require.NoError(t, dp.shutdown(context.Background())) finish := time.Now() require.WithinRange(t, finish, start.Add(200*time.Millisecond), start.Add(300*time.Millisecond)) + data, err := consumer.getReceivedData() + require.NoError(t, err) require.Equal(t, expectedData, data) }) }