Skip to content

Commit

Permalink
feat(s3stream): policies of allocating memory (#938)
Browse files Browse the repository at this point in the history
* feat(s3stream): policies of allocating memory

Signed-off-by: Ning Yu <[email protected]>

* feat(config): add config `s3.stream.allocator.policy`

Signed-off-by: Ning Yu <[email protected]>

* feat(config): add "s3.stream.allocator.policy"

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Mar 15, 2024
1 parent 0540f83 commit 699dae7
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 17 deletions.
3 changes: 3 additions & 0 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ s3.wal.path=/tmp/kraft-broker-logs/s3wal
# and catch up read
# s3.network.baseline.bandwidth=104857600

# The S3 stream memory allocator policy, supported value: UNPOOLED_HEAP, POOLED_HEAP, POOLED_DIRECT, default POOLED_HEAP
# s3.stream.allocator.policy=POOLED_HEAP

############################# Settings for telemetry #############################
# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
# s3.telemetry.metrics.exporter.type=otlp
Expand Down
5 changes: 4 additions & 1 deletion config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ s3.wal.path=/tmp/kraft-combined-logs/s3wal
# and catch up read
# s3.network.baseline.bandwidth=104857600

# The S3 stream memory allocator policy, supported value: UNPOOLED_HEAP, POOLED_HEAP, POOLED_DIRECT, default POOLED_HEAP
# s3.stream.allocator.policy=POOLED_HEAP

############################# Settings for telemetry #############################
# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
# s3.telemetry.metrics.exporter.type=otlp
Expand All @@ -197,4 +200,4 @@ metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
#autobalancer.controller.exclude.topics=topic-a,topic-b,topic-c

# The broker ids to be excluded from balancing
#autobalancer.controller.exclude.broker.ids=0,1,2
#autobalancer.controller.exclude.broker.ids=0,1,2
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.automq.s3shell.sdk.auth.{CredentialsProviderHolder, EnvVariableCreden
import com.automq.s3shell.sdk.model.S3Url

import java.util.Properties
import com.automq.stream.s3.{ByteBufAlloc, ByteBufAllocPolicy}
import joptsimple.OptionParser
import kafka.s3shell.util.S3ShellPropUtil
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
Expand Down Expand Up @@ -107,6 +108,10 @@ object Kafka extends Logging {

private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, doLog = false)
// AutoMQ for Kafka inject start
// set allocator's policy as early as possible
ByteBufAlloc.setPolicy(Enum.valueOf(classOf[ByteBufAllocPolicy], config.s3StreamAllocatorPolicy))
// AutoMQ for Kafka inject end
if (config.requiresZookeeper) {
new KafkaServer(
config,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server

import com.automq.stream.s3.ByteBufAllocPolicy
import kafka.autobalancer.config.AutoBalancerControllerConfig

import java.{lang, util}
Expand Down Expand Up @@ -472,6 +473,7 @@ object KafkaConfig {
val S3ObjectBlockSizeProp = "s3.object.block.size"
val S3ObjectPartSizeProp = "s3.object.part.size"
val S3BlockCacheSizeProp = "s3.block.cache.size"
val S3StreamAllocatorPolicyProp = "s3.stream.allocator.policy"
val S3StreamObjectCompactionIntervalMinutesProp = "s3.stream.object.compaction.interval.minutes"
val S3StreamObjectCompactionMaxSizeBytesProp = "s3.stream.object.compaction.max.size.bytes"
val S3ControllerRequestRetryMaxCountProp = "s3.controller.request.retry.max.count"
Expand Down Expand Up @@ -519,6 +521,7 @@ object KafkaConfig {
val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold."
val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold."
val S3BlockCacheSizeDoc = "The S3 block cache size in MiB."
val S3StreamAllocatorPolicyDoc = "The S3 stream memory allocator policy, supported value: " + ByteBufAllocPolicy.values().mkString(", ")
val S3StreamObjectCompactionIntervalMinutesDoc = "The S3 stream object compaction task interval in minutes."
val S3StreamObjectCompactionMaxSizeBytesDoc = "The S3 stream object compaction max size in bytes."
val S3ControllerRequestRetryMaxCountDoc = "The S3 controller request retry max count."
Expand Down Expand Up @@ -1437,6 +1440,7 @@ object KafkaConfig {
.define(S3ObjectBlockSizeProp, INT, 1048576, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc)
.define(S3BlockCacheSizeProp, LONG, 1073741824L, MEDIUM, S3BlockCacheSizeDoc)
.define(S3StreamAllocatorPolicyProp, STRING, ByteBufAllocPolicy.POOLED_HEAP.name, MEDIUM, S3StreamAllocatorPolicyDoc)
.define(S3StreamObjectCompactionIntervalMinutesProp, INT, 30, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc)
.define(S3StreamObjectCompactionMaxSizeBytesProp, LONG, 1073741824L, MEDIUM, S3StreamObjectCompactionMaxSizeBytesDoc)
.define(S3ControllerRequestRetryMaxCountProp, INT, Integer.MAX_VALUE, MEDIUM, S3ControllerRequestRetryMaxCountDoc)
Expand Down Expand Up @@ -2090,6 +2094,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp)
val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp)
val s3BlockCacheSize = getLong(KafkaConfig.S3BlockCacheSizeProp)
val s3StreamAllocatorPolicy = getString(KafkaConfig.S3StreamAllocatorPolicyProp)
val s3StreamObjectCompactionTaskIntervalMinutes = getInt(KafkaConfig.S3StreamObjectCompactionIntervalMinutesProp)
val s3StreamObjectCompactionMaxSizeBytes = getLong(KafkaConfig.S3StreamObjectCompactionMaxSizeBytesProp)
val s3ControllerRequestRetryMaxCount = getInt(KafkaConfig.S3ControllerRequestRetryMaxCountProp)
Expand Down
3 changes: 3 additions & 0 deletions kshell-sdk/src/main/resources/template/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ s3.wal.path=/tmp/kraft-broker-logs/s3wal
# and catch up read
# s3.network.baseline.bandwidth=104857600

# The S3 stream memory allocator policy, supported value: UNPOOLED_HEAP, POOLED_HEAP, POOLED_DIRECT, default POOLED_HEAP
# s3.stream.allocator.policy=POOLED_HEAP

############################# Settings for telemetry #############################
# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
# s3.telemetry.metrics.exporter.type=otlp
Expand Down
4 changes: 3 additions & 1 deletion kshell-sdk/src/main/resources/template/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ s3.wal.path=/tmp/kraft-combined-logs/s3wal
# and catch up read
# s3.network.baseline.bandwidth=104857600

# The S3 stream memory allocator policy, supported value: UNPOOLED_HEAP, POOLED_HEAP, POOLED_DIRECT, default POOLED_HEAP
# s3.stream.allocator.policy=POOLED_HEAP

############################# Settings for telemetry #############################
# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
# s3.telemetry.metrics.exporter.type=otlp
Expand Down Expand Up @@ -197,4 +200,3 @@ metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter

# The broker ids to be excluded from balancing
#autobalancer.controller.exclude.broker.ids=0,1,2

46 changes: 36 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@

public class ByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));
public static final boolean ALLOCATOR_USAGE_UNPOOLED = Boolean.parseBoolean(System.getenv("AUTOMQ_ALLOCATOR_USAGE_UNPOOLED"));
public static final boolean BUFFER_USAGE_HEAPED = Boolean.parseBoolean(System.getenv("AUTOMQ_BUFFER_USAGE_HEAPED"));

private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufAlloc.class);
private static final AbstractByteBufAllocator ALLOC = ALLOCATOR_USAGE_UNPOOLED ? UnpooledByteBufAllocator.DEFAULT : PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
private static long lastMetricLogTime = System.currentTimeMillis();
private static final Map<Integer, String> ALLOC_TYPE = new HashMap<>();
Expand All @@ -50,6 +47,15 @@ public class ByteBufAlloc {
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
* The policy used to allocate memory.
*/
private static ByteBufAllocPolicy policy = ByteBufAllocPolicy.POOLED_HEAP;
/**
* The allocator used to allocate memory. It should be updated when {@link #policy} is updated.
*/
private static AbstractByteBufAllocator allocator = getAllocatorByPolicy(policy);

static {
registerAllocType(DEFAULT, "default");
registerAllocType(ENCODE_RECORD, "write_record");
Expand All @@ -65,8 +71,21 @@ public class ByteBufAlloc {

}

/**
* Set the policy used to allocate memory.
*/
public static void setPolicy(ByteBufAllocPolicy policy) {
LOGGER.info("Set alloc policy to {}", policy);
ByteBufAlloc.policy = policy;
ByteBufAlloc.allocator = getAllocatorByPolicy(policy);
}

public static ByteBufAllocPolicy getPolicy() {
return policy;
}

public static CompositeByteBuf compositeByteBuffer() {
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
return allocator.compositeDirectBuffer(Integer.MAX_VALUE);
}

public static ByteBuf byteBuffer(int initCapacity) {
Expand All @@ -90,9 +109,9 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric();
LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric);
}
return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
return new WrappedByteBuf(policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity);
return policy.isDirect() ? allocator.directBuffer(initCapacity) : allocator.heapBuffer(initCapacity);
}
} catch (OutOfMemoryError e) {
if (MEMORY_USAGE_DETECT) {
Expand All @@ -114,6 +133,13 @@ public static void registerAllocType(int type, String name) {
ALLOC_TYPE.put(type, name);
}

private static AbstractByteBufAllocator getAllocatorByPolicy(ByteBufAllocPolicy policy) {
if (policy.isPooled()) {
return PooledByteBufAllocator.DEFAULT;
}
return UnpooledByteBufAllocator.DEFAULT;
}

public static class ByteBufAllocMetric {
private final long usedMemory;
private final long allocatedMemory;
Expand All @@ -123,8 +149,8 @@ public ByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) ALLOC).metric();
this.usedMemory = BUFFER_USAGE_HEAPED ? metric.usedHeapMemory() : metric.usedDirectMemory();
ByteBufAllocatorMetric metric = ((ByteBufAllocatorMetricProvider) allocator).metric();
this.usedMemory = policy.isDirect() ? metric.usedDirectMemory() : metric.usedHeapMemory();
this.allocatedMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

Expand All @@ -147,9 +173,9 @@ public String toString() {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
}
sb.append(", pooled=");
sb.append(!ALLOCATOR_USAGE_UNPOOLED);
sb.append(policy.isPooled());
sb.append(", direct=");
sb.append(!BUFFER_USAGE_HEAPED);
sb.append(policy.isDirect());
sb.append("}");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3;

public enum ByteBufAllocPolicy {

/**
* Allocate memory from the heap without pooling.
*/
UNPOOLED_HEAP(false, false),

/**
* Allocate memory from the heap with pooling.
*/
POOLED_HEAP(true, false),

/**
* Use pooled direct memory.
*/
POOLED_DIRECT(true, true);

/**
* Whether the buffer should be pooled or not.
*/
private final boolean pooled;

/**
* Whether the buffer should be direct or not.
*/
private final boolean direct;

ByteBufAllocPolicy(boolean pooled, boolean direct) {
this.pooled = pooled;
this.direct = direct;
}

public boolean isPooled() {
return pooled;
}

public boolean isDirect() {
return direct;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package com.automq.stream.s3.compact;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.Config;
import com.automq.stream.s3.S3ObjectLogger;
import com.automq.stream.s3.StreamDataBlock;
Expand Down Expand Up @@ -715,11 +716,14 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio

streamObjectCfList.stream().map(CompletableFuture::join).forEach(request::addStreamObject);

List<CompactedObject> compactedObjects = compactionPlan.compactedObjects();
for (CompactedObject compactedObject : compactedObjects) {
for (StreamDataBlock block : compactedObject.streamDataBlocks()) {
if (block.getDataCf().join().refCnt() > 0) {
logger.error("Block {} is not released after compaction, compact type: {}", block, compactedObject.type());
if (ByteBufAlloc.getPolicy().isPooled()) {
// Check if all blocks are released after each iteration
List<CompactedObject> compactedObjects = compactionPlan.compactedObjects();
for (CompactedObject compactedObject : compactedObjects) {
for (StreamDataBlock block : compactedObject.streamDataBlocks()) {
if (block.getDataCf().join().refCnt() > 0) {
logger.error("Block {} is not released after compaction, compact type: {}", block, compactedObject.type());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;

import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
Expand All @@ -62,6 +63,7 @@ public class CompactionTestBase {
protected S3Operator s3Operator;

public void setUp() throws Exception {
ByteBufAlloc.setPolicy(POOLED_DIRECT);
streamManager = Mockito.mock(MemoryMetadataManager.class);
when(streamManager.getStreams(Mockito.anyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3.compact;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.Config;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
Expand All @@ -32,6 +33,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -45,6 +47,7 @@ public class CompactionUploaderTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
ByteBufAlloc.setPolicy(POOLED_DIRECT);
s3Operator = new MemoryS3Operator();
objectManager = new MemoryMetadataManager();
config = mock(Config.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3.compact;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.compact.objects.CompactedObject;
Expand All @@ -21,16 +22,23 @@
import com.automq.stream.s3.objects.ObjectStreamRange;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static com.automq.stream.s3.ByteBufAllocPolicy.POOLED_DIRECT;
import static org.junit.jupiter.api.Assertions.assertEquals;

@Timeout(30)
@Tag("S3Unit")
public class CompactionUtilTest extends CompactionTestBase {

@BeforeEach
public void setUp() throws Exception {
ByteBufAlloc.setPolicy(POOLED_DIRECT);
}

@Test
public void testMergeStreamDataBlocks() {
List<StreamDataBlock> streamDataBlocks = List.of(
Expand Down

0 comments on commit 699dae7

Please sign in to comment.