Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use a new GrowableMultiBufferSupplier to avoid memory waste #1800

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,49 @@ public void close() {
}
}

// AutoMQ for Kafka inject start
/**
* Different from {@link GrowableBufferSupplier}, this buffer supplier caches multiple buffers.
* So it is suitable for scenarios where multiple buffers are needed. For example:
* <pre>
* {@code
* BufferSupplier supplier = new GrowableMultiBufferSupplier();
*
* ByteBuffer buffer1 = supplier.get(1024);
* ByteBuffer buffer2 = supplier.get(2048);
*
* supplier.release(buffer1);
* supplier.release(buffer2);
*
* supplier.close();
* }
* </pre>
*/
public static class GrowableMultiBufferSupplier extends BufferSupplier {
private final Deque<ByteBuffer> buffers = new ArrayDeque<>(1);

@Override
public ByteBuffer get(int minCapacity) {
if (!buffers.isEmpty()) {
ByteBuffer buffer = buffers.pollFirst();
if (buffer.capacity() >= minCapacity) {
return buffer;
}
}
return ByteBuffer.allocate(minCapacity);
}

@Override
public void release(ByteBuffer buffer) {
buffer.clear();
buffers.addFirst(buffer);
}

@Override
public void close() {
buffers.clear();
}
}
// AutoMQ for Kafka inject end

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.common.utils;

import java.nio.ByteBuffer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;

class GrowableMultiBufferSupplierTest {

private BufferSupplier.GrowableMultiBufferSupplier bufferSupplier;

@BeforeEach
void setUp() {
bufferSupplier = new BufferSupplier.GrowableMultiBufferSupplier();
}

@Test
void testGetWhenNoBuffersAvailable() {
ByteBuffer result = bufferSupplier.get(10);
assertEquals(10, result.capacity());
}

@Test
void testGetWithSufficientCapacity() {
ByteBuffer buffer = bufferSupplier.get(10);
bufferSupplier.release(buffer);

ByteBuffer result = bufferSupplier.get(5);
assertSame(buffer, result);
assertEquals(0, result.position());
assertEquals(10, result.capacity());
}

@Test
void testGetWithInsufficientCapacity() {
ByteBuffer buffer = bufferSupplier.get(10);
bufferSupplier.release(buffer);

ByteBuffer result = bufferSupplier.get(15);
assertNotSame(buffer, result);
assertEquals(15, result.capacity());
}

@Test
void testGetAndReleaseMultipleBuffers() {
ByteBuffer buffer1 = bufferSupplier.get(10);
ByteBuffer buffer2 = bufferSupplier.get(15);
bufferSupplier.release(buffer1);
bufferSupplier.release(buffer2);

ByteBuffer result2 = bufferSupplier.get(5);
ByteBuffer result1 = bufferSupplier.get(10);
assertSame(buffer1, result1);
assertSame(buffer2, result2);
assertEquals(0, result1.position());
assertEquals(0, result2.position());
}

@Test
void testRelease() {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 1);
bufferSupplier.release(buffer);

ByteBuffer result = bufferSupplier.get(5);
assertSame(buffer, result);
assertEquals(0, result.position());
}
}
15 changes: 12 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ private Optional<SnapshotReader<T>> latestSnapshot() {
return log.latestSnapshot().map(reader ->
RecordsSnapshotReader.of(reader,
serde,
BufferSupplier.create(),
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.GrowableMultiBufferSupplier(),
// AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
true /* Validate batch CRC*/
)
Expand Down Expand Up @@ -386,7 +389,10 @@ public void initialize(
Optional.of(VoterSet.fromInetSocketAddresses(listenerName, voterAddresses)),
log,
serde,
BufferSupplier.create(),
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.GrowableMultiBufferSupplier(),
// AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
logContext
);
Expand Down Expand Up @@ -2700,7 +2706,10 @@ private void fireHandleCommit(long baseOffset, Records records) {
baseOffset,
records,
serde,
BufferSupplier.create(),
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.GrowableMultiBufferSupplier(),
// AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
this,
true /* Validate batch CRC*/
Expand Down
Loading