Skip to content

Commit

Permalink
Add test for handling processing errors while publishing events (#37491)
Browse files Browse the repository at this point in the history
There were some instances of this code producing infinite loops.
Now there is a test case for the processing pipeline covering this.
  • Loading branch information
rdner authored Jan 10, 2024
1 parent 32c7343 commit 635b286
Showing 1 changed file with 153 additions and 16 deletions.
169 changes: 153 additions & 16 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package pipeline

import (
"context"
"errors"
"io"
"sync"
"testing"
"time"
Expand All @@ -28,33 +30,33 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestClient(t *testing.T) {
makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
conf.Namespace{},
outputs.Group{},
settings,
)
if err != nil {
panic(err)
}
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu
func makePipeline(t *testing.T, settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
conf.Namespace{},
outputs.Group{},
settings,
)
require.NoError(t, err)
// Inject a test queue so the outputController doesn't create one
p.outputController.queue = qu

return p
}
return p
}

func TestClient(t *testing.T) {
t.Run("client close", func(t *testing.T) {
// Note: no asserts. If closing fails we have a deadlock, because Publish
// would block forever
Expand Down Expand Up @@ -90,7 +92,7 @@ func TestClient(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

pipeline := makePipeline(Settings{}, makeTestQueue())
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

var ctx context.Context
Expand Down Expand Up @@ -119,6 +121,105 @@ func TestClient(t *testing.T) {
})
}
})

t.Run("no infinite loop when processing fails", func(t *testing.T) {
logp.TestingSetup()
l := logp.L()

// a small in-memory queue with a very short flush interval
q := memqueue.NewQueue(l, nil, memqueue.Settings{
Events: 5,
FlushMinEvents: 1,
FlushTimeout: time.Millisecond,
}, 5)

// model a processor that we're going to make produce errors after
p := &testProcessor{}
ps := testProcessorSupporter{Processor: p}

// now we create a pipeline that makes sure that all
// events are acked while shutting down
pipeline := makePipeline(t, Settings{
WaitClose: 100 * time.Millisecond,
WaitCloseMode: WaitOnPipelineClose,
Processors: ps,
}, q)
client, err := pipeline.Connect()
require.NoError(t, err)
defer client.Close()

// consuming all the published events
var received []beat.Event
done := make(chan struct{})
go func() {
for {
batch, err := q.Get(2)
if errors.Is(err, io.EOF) {
break
}
assert.NoError(t, err)
if batch == nil {
continue
}
for i := 0; i < batch.Count(); i++ {
e := batch.Entry(i).(publisher.Event)
received = append(received, e.Content)
}
batch.Done()
}
close(done)
}()

sent := []beat.Event{
{
Fields: mapstr.M{"number": 1},
},
{
Fields: mapstr.M{"number": 2},
},
{
Fields: mapstr.M{"number": 3},
},
{
Fields: mapstr.M{"number": 4},
},
}

expected := []beat.Event{
{
Fields: mapstr.M{"number": 1, "test": "value"},
},
{
Fields: mapstr.M{"number": 2, "test": "value"},
},
// {
// // this event must be excluded due to the processor error
// Fields: mapstr.M{"number": 3},
// },
{
Fields: mapstr.M{"number": 4, "test": "value"},
},
}

client.PublishAll(sent[:2]) // first 2

// this causes our processor to malfunction and produce errors for all events
p.ErrorSwitch()

client.PublishAll(sent[2:3]) // number 3

// back to normal
p.ErrorSwitch()

client.PublishAll(sent[3:]) // number 4

client.Close()
pipeline.Close()

// waiting for all events to be consumed from the queue
<-done
require.Equal(t, expected, received)
})
}

func TestClientWaitClose(t *testing.T) {
Expand Down Expand Up @@ -258,3 +359,39 @@ func TestMonitoring(t *testing.T) {
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
assert.Equal(t, int64(numClients), telemetrySnapshot.Ints["output.clients"])
}

type testProcessor struct{ error bool }

func (p *testProcessor) String() string {
return "testProcessor"
}
func (p *testProcessor) Run(in *beat.Event) (event *beat.Event, err error) {
if p.error {
return nil, errors.New("test error")
}
_, err = in.Fields.Put("test", "value")
return in, err
}

func (p *testProcessor) ErrorSwitch() {
p.error = !p.error
}

type testProcessorSupporter struct {
beat.Processor
}

// Create a running processor interface based on the given config
func (p testProcessorSupporter) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) {
return p.Processor, nil
}

// Processors returns a list of config strings for the given processor, for debug purposes
func (p testProcessorSupporter) Processors() []string {
return []string{p.Processor.String()}
}

// Close the processor supporter
func (p testProcessorSupporter) Close() error {
return processors.Close(p.Processor)
}

0 comments on commit 635b286

Please sign in to comment.