diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 68f6939190..3ee1db2826 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -86,12 +86,6 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex listener: listener, } - go func() { - if err := lm.processEvents(ctx); err != nil { - lm.logger.Warn("Failed to process events", zap.Error(err)) - } - }() - factories, _ := lambdacomponents.Components(res.ExtensionID) lm.collector = collector.NewCollector(logger, factories, version) @@ -107,12 +101,17 @@ func (lm *manager) Run(ctx context.Context) error { return err } + lm.wg.Add(1) + go func() { + if err := lm.processEvents(ctx); err != nil { + lm.logger.Warn("Failed to process events", zap.Error(err)) + } + }() lm.wg.Wait() return nil } func (lm *manager) processEvents(ctx context.Context) error { - lm.wg.Add(1) defer lm.wg.Done() for { diff --git a/collector/internal/lifecycle/manager_test.go b/collector/internal/lifecycle/manager_test.go index 410ff37c5b..e121779552 100644 --- a/collector/internal/lifecycle/manager_test.go +++ b/collector/internal/lifecycle/manager_test.go @@ -44,6 +44,18 @@ func (c *MockCollector) Stop() error { func TestRun(t *testing.T) { logger := zaptest.NewLogger(t) ctx := context.Background() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + _, err := w.Write([]byte(`{"time":"2006-01-02T15:04:05.000Z", "eventType":"SHUTDOWN", "record":{}}`)) + require.NoError(t, err) + _, err = io.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + })) + defer server.Close() + u, err := url.Parse(server.URL) + require.NoError(t, err) + // test with an error lm := manager{ collector: &MockCollector{err: fmt.Errorf("test start error")}, @@ -55,14 +67,16 @@ func TestRun(t *testing.T) { lm = manager{ collector: &MockCollector{}, logger: logger, - extensionClient: extensionapi.NewClient(logger, ""), + listener: telemetryapi.NewListener(logger), + extensionClient: extensionapi.NewClient(logger, u.Host), } require.NoError(t, lm.Run(ctx)) // test with waitgroup counter incremented lm = manager{ collector: &MockCollector{}, logger: logger, - extensionClient: extensionapi.NewClient(logger, ""), + listener: telemetryapi.NewListener(logger), + extensionClient: extensionapi.NewClient(logger, u.Host), } lm.wg.Add(1) go func() { @@ -130,6 +144,7 @@ func TestProcessEvents(t *testing.T) { listener: telemetryapi.NewListener(logger), extensionClient: extensionapi.NewClient(logger, u.Host), } + lm.wg.Add(1) if tc.err != nil { err = lm.processEvents(ctx) require.Error(t, err)