Skip to content

Commit

Permalink
fix(core): Revert "perf: use netty buffer to pool memory (#1788)" (#1797
Browse files Browse the repository at this point in the history
)

Revert "perf: use netty buffer to pool memory (#1788)"

This reverts commit 5317eff.
  • Loading branch information
ShadowySpirits authored Aug 14, 2024
1 parent 4f4a93c commit dd6b5c1
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 86 deletions.
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1507,9 +1507,6 @@ project(':clients') {
implementation libs.snappy
implementation libs.slf4jApi
implementation libs.opentelemetryProto
// AutoMQ for Kafka inject start
implementation libs.nettyBuffer
// AutoMQ for Kafka inject end

// libraries which should be added as runtime dependencies in generated pom.xml should be defined here:
shadowed libs.zstd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.kafka.common.utils;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -124,36 +121,4 @@ public void close() {
}
}

// AutoMQ for Kafka inject start
public static class NettyBufferSupplier extends BufferSupplier {
// We currently use a single block size, so optimise for that case
// visible for testing
final Map<ByteBuffer, ByteBuf> bufferMap = new IdentityHashMap<>(1);

@Override
public ByteBuffer get(int capacity) {
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(capacity);
ByteBuffer byteBuffer = byteBuf.nioBuffer(0, capacity);
bufferMap.put(byteBuffer, byteBuf);
return byteBuffer;
}

@Override
public void release(ByteBuffer buffer) {
ByteBuf byteBuf = bufferMap.remove(buffer);
if (byteBuf != null) {
byteBuf.release();
}
}

@Override
public void close() {
for (ByteBuf byteBuf : bufferMap.values()) {
byteBuf.release();
}
bufferMap.clear();
}
}
// AutoMQ for Kafka inject end

}

This file was deleted.

15 changes: 3 additions & 12 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,7 @@ private Optional<SnapshotReader<T>> latestSnapshot() {
return log.latestSnapshot().map(reader ->
RecordsSnapshotReader.of(reader,
serde,
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.NettyBufferSupplier(),
// AutoMQ for Kafka inject end
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
true /* Validate batch CRC*/
)
Expand Down Expand Up @@ -389,10 +386,7 @@ public void initialize(
Optional.of(VoterSet.fromInetSocketAddresses(listenerName, voterAddresses)),
log,
serde,
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.NettyBufferSupplier(),
// AutoMQ for Kafka inject end
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
logContext
);
Expand Down Expand Up @@ -2706,10 +2700,7 @@ private void fireHandleCommit(long baseOffset, Records records) {
baseOffset,
records,
serde,
// AutoMQ for Kafka inject start
// BufferSupplier.create(),
new BufferSupplier.NettyBufferSupplier(),
// AutoMQ for Kafka inject end
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
this,
true /* Validate batch CRC*/
Expand Down

0 comments on commit dd6b5c1

Please sign in to comment.