diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 3f308c8c7fd..f9804bb16f3 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -94,7 +94,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) e.t.Helper() e.grp = unison.TaskGroup{} manager := e.getManager() - manager.Init(&e.grp) + _ = manager.Init(&e.grp) c := conf.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -106,7 +106,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) { e.grp = unison.TaskGroup{} manager := e.getManager() - manager.Init(&e.grp) + _ = manager.Init(&e.grp) c := conf.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -127,9 +127,9 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) e.wg.Add(1) go func(wg *sync.WaitGroup, grp *unison.TaskGroup) { defer wg.Done() - defer grp.Stop() + defer func() { _ = grp.Stop() }() inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"} - inp.Run(inputCtx, e.pipeline) + _ = inp.Run(inputCtx, e.pipeline) }(&e.wg, &e.grp) } @@ -357,14 +357,16 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e var entry registryEntry err := inputStore.Get(key, &entry) if err != nil { - keys := []string{} - inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) { + var keys []string + _ = inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) { keys = append(keys, key) return false, nil }) e.t.Logf("keys in store: %v", keys) - return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err) + return registryEntry{}, + fmt.Errorf("error when getting expected key '%s' from store: %w", + key, err) } return entry, nil @@ -393,7 +395,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) { if sum == count { return true } - fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d", + fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d\n", count, sum) return false @@ -492,9 +494,7 @@ func (e *inputTestingEnvironment) getOutputMessages() []string { func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) { events := make([]beat.Event, 0) for _, c := range e.pipeline.clients { - for _, evt := range c.GetEvents() { - events = append(events, evt) - } + events = append(events, c.GetEvents()...) } selectedEvent := events[nr] @@ -517,9 +517,7 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) { } events := make([]beat.Event, 0) for _, c := range e.pipeline.clients { - for _, evt := range c.GetEvents() { - events = append(events, evt) - } + events = append(events, c.GetEvents()...) } selectedEvent := events[nr] @@ -581,9 +579,7 @@ func (c *mockClient) PublishAll(events []beat.Event) { } c.ackHandler.ACKEvents(len(events)) - for _, event := range events { - c.published = append(c.published, event) - } + c.published = append(c.published, events...) } func (c *mockClient) waitUntilPublishingHasStarted() { @@ -655,17 +651,6 @@ func (pc *mockPipelineConnector) cancelAllClients() { } } -func (pc *mockPipelineConnector) cancelClient(i int) { - pc.mtx.Lock() - defer pc.mtx.Unlock() - - if len(pc.clients) < i+1 { - return - } - - pc.clients[i].canceler() -} - func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener { if !blocking { return config.EventListener