diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java index 63be46327..dfe72bede 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java @@ -28,8 +28,11 @@ public class ByteBufAlloc { public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT")); + public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED")); + public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED")); private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class); + private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT; private static final Map USAGE_STATS = new ConcurrentHashMap<>(); private static long lastMetricLogTime = System.currentTimeMillis(); private static final Map ALLOC_TYPE = new HashMap<>(); @@ -47,15 +50,6 @@ public class ByteBufAlloc { public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; public static ByteBufAllocMetric byteBufAllocMetric = null; - /** - * The policy used to allocate memory. - */ - private static ByteBufAllocPolicy policy = ByteBufAllocPolicy.UNPOOLED_HEAP; - /** - * The allocator used to allocate memory. It should be updated when {@link #policy} is updated. - */ - private static AbstractByteBufAllocator allocator = getAllocatorByPolicy(policy); - static { registerAllocType(DEFAULT, "default"); registerAllocType(ENCODE_RECORD, "write_record"); @@ -71,17 +65,8 @@ public class ByteBufAlloc { } - /** - * Set the policy used to allocate memory. - */ - public static void setPolicy(ByteBufAllocPolicy policy) { - LOGGER.info("Set alloc policy to {}", policy); - ByteBufAlloc.policy = policy; - ByteBufAlloc.allocator = getAllocatorByPolicy(policy); - } - public static CompositeByteBuf compositeByteBuffer() { - return allocator.compositeDirectBuffer(Integer.MAX_VALUE); + return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE); } public static ByteBuf byteBuffer(int initCapacity) { @@ -105,9 +90,9 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric(); LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric); } - return new WrappedByteBuf(policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity), () -> usage.add(-initCapacity)); + return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } else { - return policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity); + return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity); } } catch (OutOfMemoryError e) { if (MEMORY_USAGE_DETECT) { @@ -129,13 +114,6 @@ public static void registerAllocType(int type, String name) { ALLOC_TYPE.put(type, name); } - private static AbstractByteBufAllocator getAllocatorByPolicy(ByteBufAllocPolicy policy) { - if (policy.isPooled()) { - return PooledByteBufAllocator.DEFAULT; - } - return UnpooledByteBufAllocator.DEFAULT; - } - public static class ByteBufAllocMetric { private final long usedMemory; private final long allocatedMemory; @@ -145,8 +123,8 @@ public ByteBufAllocMetric() { USAGE_STATS.forEach((k, v) -> { detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue()); }); - ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) allocator).metric(); - this.usedMemory = policy.isDirect() ? metric.usedDirectMemory() : metric.usedHeapMemory(); + ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric(); + this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory(); this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum(); } @@ -169,9 +147,9 @@ public String toString() { sb.append(entry.getKey()).append("=").append(entry.getValue()).append(","); } sb.append(", pooled="); - sb.append(policy.isPooled()); + sb.append(!ALLOCATOR_USAGE_UNPOOLED); sb.append(", direct="); - sb.append(policy.isDirect()); + sb.append(!BUFFER_USAGE_HEAPED); sb.append("}"); return sb.toString(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAllocPolicy.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAllocPolicy.java deleted file mode 100644 index 11c4c84df..000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAllocPolicy.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2024, AutoMQ CO.,LTD. - * - * Use of this software is governed by the Business Source License - * included in the file BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package com.automq.stream.s3; - -public enum ByteBufAllocPolicy { - - /** - * Allocate memory from the heap without pooling. - */ - UNPOOLED_HEAP(false, false), - - /** - * Use pooled direct memory. - */ - POOLED_DIRECT(true, true); - - /** - * Whether the buffer should be pooled or not. - */ - private final boolean pooled; - - /** - * Whether the buffer should be direct or not. - */ - private final boolean direct; - - ByteBufAllocPolicy(boolean pooled, boolean direct) { - this.pooled = pooled; - this.direct = direct; - } - - public boolean isPooled() { - return pooled; - } - - public boolean isDirect() { - return direct; - } -} diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java index e28b30071..0bbea6925 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionTestBase.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import org.mockito.Mockito; -import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; @@ -63,7 +62,6 @@ public class CompactionTestBase { protected S3Operator s3Operator; public void setUp() throws Exception { - ByteBufAlloc.setPolicy(POOLED_DIRECT); streamManager = Mockito.mock(MemoryMetadataManager.class); when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture( List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED), diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java index 9ce121a61..c4e68ac1e 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUploaderTest.java @@ -45,7 +45,6 @@ public class CompactionUploaderTest extends CompactionTestBase { @BeforeEach public void setUp() throws Exception { - super.setUp(); s3Operator = new MemoryS3Operator(); objectManager = new MemoryMetadataManager(); config = mock(Config.class); diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java index 85e727b9a..abd9396d8 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionUtilTest.java @@ -21,7 +21,6 @@ import com.automq.stream.s3.objects.ObjectStreamRange; import java.util.List; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -32,11 +31,6 @@ @Tag("S3Unit") public class CompactionUtilTest extends CompactionTestBase { - @BeforeEach - public void setUp() throws Exception { - super.setUp(); - } - @Test public void testMergeStreamDataBlocks() { List streamDataBlocks = List.of(