diff --git a/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java
index 55869333bd..62d7fa3571 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java
@@ -121,4 +121,49 @@ public void close() {
}
}
+ // AutoMQ for Kafka inject start
+ /**
+ * Different from {@link GrowableBufferSupplier}, this buffer supplier caches multiple buffers.
+ * So it is suitable for scenarios where multiple buffers are needed. For example:
+ *
+ * {@code
+ * BufferSupplier supplier = new GrowableMultiBufferSupplier();
+ *
+ * ByteBuffer buffer1 = supplier.get(1024);
+ * ByteBuffer buffer2 = supplier.get(2048);
+ *
+ * supplier.release(buffer1);
+ * supplier.release(buffer2);
+ *
+ * supplier.close();
+ * }
+ *
+ */
+ public static class GrowableMultiBufferSupplier extends BufferSupplier {
+ private final Deque buffers = new ArrayDeque<>(1);
+
+ @Override
+ public ByteBuffer get(int minCapacity) {
+ if (!buffers.isEmpty()) {
+ ByteBuffer buffer = buffers.pollFirst();
+ if (buffer.capacity() >= minCapacity) {
+ return buffer;
+ }
+ }
+ return ByteBuffer.allocate(minCapacity);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {
+ buffer.clear();
+ buffers.addFirst(buffer);
+ }
+
+ @Override
+ public void close() {
+ buffers.clear();
+ }
+ }
+ // AutoMQ for Kafka inject end
+
}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/GrowableMultiBufferSupplierTest.java b/clients/src/test/java/org/apache/kafka/common/utils/GrowableMultiBufferSupplierTest.java
new file mode 100644
index 0000000000..b758fb29d5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/GrowableMultiBufferSupplierTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2024, AutoMQ HK Limited.
+ *
+ * The use of this file is governed by the Business Source License,
+ * as detailed in the file "/LICENSE.S3Stream" included in this repository.
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+class GrowableMultiBufferSupplierTest {
+
+ private BufferSupplier.GrowableMultiBufferSupplier bufferSupplier;
+
+ @BeforeEach
+ void setUp() {
+ bufferSupplier = new BufferSupplier.GrowableMultiBufferSupplier();
+ }
+
+ @Test
+ void testGetWhenNoBuffersAvailable() {
+ ByteBuffer result = bufferSupplier.get(10);
+ assertEquals(10, result.capacity());
+ }
+
+ @Test
+ void testGetWithSufficientCapacity() {
+ ByteBuffer buffer = bufferSupplier.get(10);
+ bufferSupplier.release(buffer);
+
+ ByteBuffer result = bufferSupplier.get(5);
+ assertSame(buffer, result);
+ assertEquals(0, result.position());
+ assertEquals(10, result.capacity());
+ }
+
+ @Test
+ void testGetWithInsufficientCapacity() {
+ ByteBuffer buffer = bufferSupplier.get(10);
+ bufferSupplier.release(buffer);
+
+ ByteBuffer result = bufferSupplier.get(15);
+ assertNotSame(buffer, result);
+ assertEquals(15, result.capacity());
+ }
+
+ @Test
+ void testGetAndReleaseMultipleBuffers() {
+ ByteBuffer buffer1 = bufferSupplier.get(10);
+ ByteBuffer buffer2 = bufferSupplier.get(15);
+ bufferSupplier.release(buffer1);
+ bufferSupplier.release(buffer2);
+
+ ByteBuffer result2 = bufferSupplier.get(5);
+ ByteBuffer result1 = bufferSupplier.get(10);
+ assertSame(buffer1, result1);
+ assertSame(buffer2, result2);
+ assertEquals(0, result1.position());
+ assertEquals(0, result2.position());
+ }
+
+ @Test
+ void testRelease() {
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put((byte) 1);
+ bufferSupplier.release(buffer);
+
+ ByteBuffer result = bufferSupplier.get(5);
+ assertSame(buffer, result);
+ assertEquals(0, result.position());
+ }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 10910c3db7..c02d8f1636 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -347,7 +347,10 @@ private Optional> latestSnapshot() {
return log.latestSnapshot().map(reader ->
RecordsSnapshotReader.of(reader,
serde,
- BufferSupplier.create(),
+ // AutoMQ for Kafka inject start
+ // BufferSupplier.create(),
+ new BufferSupplier.GrowableMultiBufferSupplier(),
+ // AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
true /* Validate batch CRC*/
)
@@ -386,7 +389,10 @@ public void initialize(
Optional.of(VoterSet.fromInetSocketAddresses(listenerName, voterAddresses)),
log,
serde,
- BufferSupplier.create(),
+ // AutoMQ for Kafka inject start
+ // BufferSupplier.create(),
+ new BufferSupplier.GrowableMultiBufferSupplier(),
+ // AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
logContext
);
@@ -2700,7 +2706,10 @@ private void fireHandleCommit(long baseOffset, Records records) {
baseOffset,
records,
serde,
- BufferSupplier.create(),
+ // AutoMQ for Kafka inject start
+ // BufferSupplier.create(),
+ new BufferSupplier.GrowableMultiBufferSupplier(),
+ // AutoMQ for Kafka inject end
MAX_BATCH_SIZE_BYTES,
this,
true /* Validate batch CRC*/