From a534c380f2b95209373ad47a2ddb242209742b5b Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Tue, 27 Feb 2024 16:46:43 +0800 Subject: [PATCH] feat(s3stream): add memory usage detect switch (#948) Signed-off-by: Robin Han --- .../automq/stream/s3/DirectByteBufAlloc.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java index 91f999952..cdf51d469 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java @@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory; public class DirectByteBufAlloc { + public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT")); + private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class); private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; private static final Map USAGE_STATS = new ConcurrentHashMap<>(); @@ -67,24 +69,32 @@ public static ByteBuf byteBuffer(int initCapacity) { public static ByteBuf byteBuffer(int initCapacity, int type) { try { - LongAdder usage = USAGE_STATS.compute(type, (k, v) -> { - if (v == null) { - v = new LongAdder(); + if (MEMORY_USAGE_DETECT) { + LongAdder usage = USAGE_STATS.compute(type, (k, v) -> { + if (v == null) { + v = new LongAdder(); + } + v.add(initCapacity); + return v; + }); + long now = System.currentTimeMillis(); + if (now - lastMetricLogTime > 60000) { + // it's ok to be not thread safe + lastMetricLogTime = now; + DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); + LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric); } - v.add(initCapacity); - return v; - }); - long now = System.currentTimeMillis(); - if (now - lastMetricLogTime > 60000) { - // it's ok to be not thread safe - lastMetricLogTime = now; - DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric); + return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); + } else { + return ALLOC.directBuffer(initCapacity); } - return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } catch (OutOfMemoryError e) { - DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e); + if (MEMORY_USAGE_DETECT) { + DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); + LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e); + } else { + LOGGER.error("alloc direct buffer OOM", e); + } System.err.println("alloc direct buffer OOM"); Runtime.getRuntime().halt(1); throw e;