diff --git a/build.gradle b/build.gradle index 2f02c34142..6248e4025e 100644 --- a/build.gradle +++ b/build.gradle @@ -1507,9 +1507,6 @@ project(':clients') { implementation libs.snappy implementation libs.slf4jApi implementation libs.opentelemetryProto - // AutoMQ for Kafka inject start - implementation libs.nettyBuffer - // AutoMQ for Kafka inject end // libraries which should be added as runtime dependencies in generated pom.xml should be defined here: shadowed libs.zstd diff --git a/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java index ad9ad001a7..55869333bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java @@ -17,13 +17,10 @@ package org.apache.kafka.common.utils; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.Map; /** @@ -124,36 +121,4 @@ public void close() { } } - // AutoMQ for Kafka inject start - public static class NettyBufferSupplier extends BufferSupplier { - // We currently use a single block size, so optimise for that case - // visible for testing - final Map bufferMap = new IdentityHashMap<>(1); - - @Override - public ByteBuffer get(int capacity) { - ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(capacity); - ByteBuffer byteBuffer = byteBuf.nioBuffer(0, capacity); - bufferMap.put(byteBuffer, byteBuf); - return byteBuffer; - } - - @Override - public void release(ByteBuffer buffer) { - ByteBuf byteBuf = bufferMap.remove(buffer); - if (byteBuf != null) { - byteBuf.release(); - } - } - - @Override - public void close() { - for (ByteBuf byteBuf : bufferMap.values()) { - byteBuf.release(); - } - bufferMap.clear(); - } - } - // AutoMQ for Kafka inject end - } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BufferSupplierTest.java b/clients/src/test/java/org/apache/kafka/common/utils/BufferSupplierTest.java deleted file mode 100644 index 352cbbdb8f..0000000000 --- a/clients/src/test/java/org/apache/kafka/common/utils/BufferSupplierTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2024, AutoMQ HK Limited. - * - * The use of this file is governed by the Business Source License, - * as detailed in the file "/LICENSE.S3Stream" included in this repository. - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package org.apache.kafka.common.utils; - -import java.nio.ByteBuffer; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -class BufferSupplierTest { - - @Test - public void testNettyBuffer() { - BufferSupplier.NettyBufferSupplier supplier = new BufferSupplier.NettyBufferSupplier(); - - ByteBuffer buffer = supplier.get(1024); - assertEquals(0, buffer.position()); - assertEquals(1024, buffer.capacity()); - assertEquals(1024, buffer.limit()); - assertEquals(1, supplier.bufferMap.size()); - - // make sure modifying the buffer doesn't affect the supplier - buffer.putInt(1); - supplier.release(buffer); - assertEquals(0, supplier.bufferMap.size()); - } -} diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 8989b2a142..10910c3db7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -347,10 +347,7 @@ private Optional> latestSnapshot() { return log.latestSnapshot().map(reader -> RecordsSnapshotReader.of(reader, serde, - // AutoMQ for Kafka inject start - // BufferSupplier.create(), - new BufferSupplier.NettyBufferSupplier(), - // AutoMQ for Kafka inject end + BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, true /* Validate batch CRC*/ ) @@ -389,10 +386,7 @@ public void initialize( Optional.of(VoterSet.fromInetSocketAddresses(listenerName, voterAddresses)), log, serde, - // AutoMQ for Kafka inject start - // BufferSupplier.create(), - new BufferSupplier.NettyBufferSupplier(), - // AutoMQ for Kafka inject end + BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, logContext ); @@ -2706,10 +2700,7 @@ private void fireHandleCommit(long baseOffset, Records records) { baseOffset, records, serde, - // AutoMQ for Kafka inject start - // BufferSupplier.create(), - new BufferSupplier.NettyBufferSupplier(), - // AutoMQ for Kafka inject end + BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, this, true /* Validate batch CRC*/