diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index dab77fa5659..0ef4408b613 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -77,6 +77,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { events = append(events, event) } } + original.FreeEntries() b := &ttlBatch{ done: original.Done, diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index 769ccc37c35..4c5207acbb0 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -112,6 +112,12 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) { require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback") } +func TestNewBatchFreesEvents(t *testing.T) { + queueBatch := &mockQueueBatch{} + _ = newBatch(nil, queueBatch, 0) + assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch") +} + type mockQueueBatch struct { freeEntriesCalled int } @@ -127,6 +133,10 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry { return fmt.Sprintf("event %v", i) } +func (b *mockQueueBatch) FreeEntries() { + b.freeEntriesCalled++ +} + type mockRetryer struct { batches []*ttlBatch } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 20e6648d927..a0e5e944df3 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -97,6 +97,9 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry { return batch.frames[i].event } +func (batch *diskQueueBatch) FreeEntries() { +} + func (batch *diskQueueBatch) Done() { batch.queue.acks.addFrames(batch.frames) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b617bae6110..3e3e47e502c 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -398,6 +398,15 @@ func (b *batch) Entry(i int) queue.Entry { return b.rawEntry(i).event } +func (b *batch) FreeEntries() { + // This signals that the event data has been copied out of the batch, and is + // safe to free from the queue buffer, so set all the event pointers to nil. + for i := 0; i < b.count; i++ { + index := (b.start + i) % len(b.queue.buf) + b.queue.buf[index].event = nil + } +} + func (b *batch) Done() { b.doneChan <- batchDoneMsg{} } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 9cd209bbd51..168c923e598 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -262,3 +262,41 @@ func TestAdjustInputQueueSize(t *testing.T) { assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue)) }) } + +func TestBatchFreeEntries(t *testing.T) { + const queueSize = 10 + const batchSize = 5 + // 1. Add 10 events to the queue, request two batches with 5 events each + // 2. Make sure the queue buffer has 10 non-nil events + // 3. Call FreeEntries on the second batch + // 4. Make sure only events 6-10 are nil + // 5. Call FreeEntries on the first batch + // 6. Make sure all events are nil + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil) + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < queueSize; i++ { + _, ok := producer.Publish(i) + require.True(t, ok, "Queue publish must succeed") + } + batch1, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request") + batch2, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request") + // Slight concurrency subtlety: we check events are non-nil after the queue + // reads, since if we do it before we have no way to be sure the insert + // has been completed. + for i := 0; i < queueSize; i++ { + require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil") + } + batch2.FreeEntries() + for i := 0; i < batchSize; i++ { + require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i) + require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i) + } + batch1.FreeEntries() + for i := 0; i < queueSize; i++ { + require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches") + } +} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 075d7ad66a4..983a835a069 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -112,6 +112,10 @@ type Batch interface { Count() int Entry(i int) Entry Done() + // Release internal references to the contained events if supported + // (the disk queue does not currently implement this). + // Entry() should not be used after this call. + FreeEntries() } // Outputs can provide an EncoderFactory to enable early encoding, in which