Skip to content

Commit

Permalink
fix linter issues
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Jun 19, 2024
1 parent e8b9edd commit fed6f10
Showing 1 changed file with 13 additions and 28 deletions.
41 changes: 13 additions & 28 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
e.t.Helper()
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp)
_ = manager.Init(&e.grp)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -106,7 +106,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) {
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp)
_ = manager.Init(&e.grp)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -127,9 +127,9 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
e.wg.Add(1)
go func(wg *sync.WaitGroup, grp *unison.TaskGroup) {
defer wg.Done()
defer grp.Stop()
defer func() { _ = grp.Stop() }()
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, ID: "fake-ID"}
inp.Run(inputCtx, e.pipeline)
_ = inp.Run(inputCtx, e.pipeline)
}(&e.wg, &e.grp)
}

Expand Down Expand Up @@ -357,14 +357,16 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e
var entry registryEntry
err := inputStore.Get(key, &entry)
if err != nil {
keys := []string{}
inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) {
var keys []string
_ = inputStore.Each(func(key string, _ statestore.ValueDecoder) (bool, error) {
keys = append(keys, key)
return false, nil
})
e.t.Logf("keys in store: %v", keys)

return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err)
return registryEntry{},
fmt.Errorf("error when getting expected key '%s' from store: %w",
key, err)
}

return entry, nil
Expand Down Expand Up @@ -393,7 +395,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
if sum == count {
return true
}
fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d",
fmt.Fprintf(msg, "unexpected number of events; expected: %d, actual: %d\n",
count, sum)

return false
Expand Down Expand Up @@ -492,9 +494,7 @@ func (e *inputTestingEnvironment) getOutputMessages() []string {
func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) {
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
events = append(events, evt)
}
events = append(events, c.GetEvents()...)
}

selectedEvent := events[nr]
Expand All @@ -517,9 +517,7 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
}
events := make([]beat.Event, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
events = append(events, evt)
}
events = append(events, c.GetEvents()...)
}

selectedEvent := events[nr]
Expand Down Expand Up @@ -581,9 +579,7 @@ func (c *mockClient) PublishAll(events []beat.Event) {
}
c.ackHandler.ACKEvents(len(events))

for _, event := range events {
c.published = append(c.published, event)
}
c.published = append(c.published, events...)
}

func (c *mockClient) waitUntilPublishingHasStarted() {
Expand Down Expand Up @@ -655,17 +651,6 @@ func (pc *mockPipelineConnector) cancelAllClients() {
}
}

func (pc *mockPipelineConnector) cancelClient(i int) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

if len(pc.clients) < i+1 {
return
}

pc.clients[i].canceler()
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener {
if !blocking {
return config.EventListener
Expand Down

0 comments on commit fed6f10

Please sign in to comment.