Skip to content

Commit

Permalink
Fix telemetry api subscription 403 forbidden error (#1452)
Browse files Browse the repository at this point in the history
* Fixed telemetry api receiver subscription errors

* Fixed tests
  • Loading branch information
jerrytfleung authored Aug 16, 2024
1 parent 1af8cf6 commit cd8e21f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
13 changes: 6 additions & 7 deletions collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
listener: listener,
}

go func() {
if err := lm.processEvents(ctx); err != nil {
lm.logger.Warn("Failed to process events", zap.Error(err))
}
}()

factories, _ := lambdacomponents.Components(res.ExtensionID)
lm.collector = collector.NewCollector(logger, factories, version)

Expand All @@ -107,12 +101,17 @@ func (lm *manager) Run(ctx context.Context) error {
return err
}

lm.wg.Add(1)
go func() {
if err := lm.processEvents(ctx); err != nil {
lm.logger.Warn("Failed to process events", zap.Error(err))
}
}()
lm.wg.Wait()
return nil
}

func (lm *manager) processEvents(ctx context.Context) error {
lm.wg.Add(1)
defer lm.wg.Done()

for {
Expand Down
19 changes: 17 additions & 2 deletions collector/internal/lifecycle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ func (c *MockCollector) Stop() error {
func TestRun(t *testing.T) {
logger := zaptest.NewLogger(t)
ctx := context.Background()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, err := w.Write([]byte(`{"time":"2006-01-02T15:04:05.000Z", "eventType":"SHUTDOWN", "record":{}}`))
require.NoError(t, err)
_, err = io.ReadAll(r.Body)
require.NoError(t, err, "failed to read request body: %v", err)
}))
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)

// test with an error
lm := manager{
collector: &MockCollector{err: fmt.Errorf("test start error")},
Expand All @@ -55,14 +67,16 @@ func TestRun(t *testing.T) {
lm = manager{
collector: &MockCollector{},
logger: logger,
extensionClient: extensionapi.NewClient(logger, ""),
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
}
require.NoError(t, lm.Run(ctx))
// test with waitgroup counter incremented
lm = manager{
collector: &MockCollector{},
logger: logger,
extensionClient: extensionapi.NewClient(logger, ""),
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
}
lm.wg.Add(1)
go func() {
Expand Down Expand Up @@ -130,6 +144,7 @@ func TestProcessEvents(t *testing.T) {
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
}
lm.wg.Add(1)
if tc.err != nil {
err = lm.processEvents(ctx)
require.Error(t, err)
Expand Down

0 comments on commit cd8e21f

Please sign in to comment.