Skip to content

Commit

Permalink
Restore memory queue's internal event cleanup after a batch is vended
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Oct 21, 2024
1 parent cc4f951 commit 5434282
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 0 deletions.
1 change: 1 addition & 0 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
events = append(events, event)
}
}
original.FreeEntries()

b := &ttlBatch{
done: original.Done,
Expand Down
10 changes: 10 additions & 0 deletions libbeat/publisher/pipeline/ttl_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) {
require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback")
}

func TestNewBatchFreesEvents(t *testing.T) {
queueBatch := &mockQueueBatch{}
_ = newBatch(nil, queueBatch, 0)
assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch")
}

type mockQueueBatch struct {
freeEntriesCalled int
}
Expand All @@ -127,6 +133,10 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry {
return fmt.Sprintf("event %v", i)
}

func (b *mockQueueBatch) FreeEntries() {
b.freeEntriesCalled++
}

type mockRetryer struct {
batches []*ttlBatch
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry {
return batch.frames[i].event
}

func (batch *diskQueueBatch) FreeEntries() {
}

func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
9 changes: 9 additions & 0 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ func (b *batch) Entry(i int) queue.Entry {
return b.rawEntry(i).event
}

func (b *batch) FreeEntries() {
// This signals that the event data has been copied out of the batch, and is
// safe to free from the queue buffer, so set all the event pointers to nil.
for i := 0; i < b.count; i++ {
index := (b.start + i) % len(b.queue.buf)
b.queue.buf[index].event = nil
}
}

func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
38 changes: 38 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,41 @@ func TestAdjustInputQueueSize(t *testing.T) {
assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue))
})
}

func TestBatchFreeEntries(t *testing.T) {
const queueSize = 10
const batchSize = 5
// 1. Add 10 events to the queue, request two batches with 5 events each
// 2. Make sure the queue buffer has 10 non-nil events
// 3. Call FreeEntries on the second batch
// 4. Make sure only events 6-10 are nil
// 5. Call FreeEntries on the first batch
// 6. Make sure all events are nil
testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil)
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < queueSize; i++ {
_, ok := producer.Publish(i)
require.True(t, ok, "Queue publish must succeed")
}
batch1, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request")
batch2, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request")
// Slight concurrency subtlety: we check events are non-nil after the queue
// reads, since if we do it before we have no way to be sure the insert
// has been completed.
for i := 0; i < queueSize; i++ {
require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil")
}
batch2.FreeEntries()
for i := 0; i < batchSize; i++ {
require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i)
require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i)
}
batch1.FreeEntries()
for i := 0; i < queueSize; i++ {
require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches")
}
}
4 changes: 4 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type Batch interface {
Count() int
Entry(i int) Entry
Done()
// Release internal references to the contained events if supported
// (the disk queue does not currently implement this).
// Entry() should not be used after this call.
FreeEntries()
}

// Outputs can provide an EncoderFactory to enable early encoding, in which
Expand Down

0 comments on commit 5434282

Please sign in to comment.