Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): add config s3.stream.allocator.policy #900

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka

import com.automq.s3shell.sdk.auth.{CredentialsProviderHolder, EnvVariableCredentialsProvider}
import com.automq.s3shell.sdk.model.S3Url
import com.automq.stream.s3.{ByteBufAlloc, ByteBufAllocPolicy}
import joptsimple.OptionParser
import kafka.s3shell.util.S3ShellPropUtil
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
Expand Down Expand Up @@ -107,6 +108,10 @@ object Kafka extends Logging {

private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
// AutoMQ for Kafka inject start
// set allocator's policy as early as possible
ByteBufAlloc.setPolicy(Enum.valueOf(classOf[ByteBufAllocPolicy], config.s3StreamAllocatorPolicy))
// AutoMQ for Kafka inject end
if (config.requiresZookeeper) {
new KafkaServer(
config,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server

import com.automq.stream.s3.ByteBufAllocPolicy
import kafka.autobalancer.config.AutoBalancerControllerConfig

import java.util
Expand Down Expand Up @@ -704,6 +705,7 @@ object KafkaConfig {
val S3ObjectBlockSizeProp = "s3.object.block.size"
val S3ObjectPartSizeProp = "s3.object.part.size"
val S3BlockCacheSizeProp = "s3.block.cache.size"
val S3StreamAllocatorPolicyProp = "s3.stream.allocator.policy"
val S3StreamObjectCompactionIntervalMinutesProp = "s3.stream.object.compaction.interval.minutes"
val S3StreamObjectCompactionMaxSizeBytesProp = "s3.stream.object.compaction.max.size.bytes"
val S3ControllerRequestRetryMaxCountProp = "s3.controller.request.retry.max.count"
Expand Down Expand Up @@ -750,6 +752,7 @@ object KafkaConfig {
val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold."
val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold."
val S3BlockCacheSizeDoc = "The S3 block cache size in MiB."
val S3StreamAllocatorPolicyDoc = "The S3 stream memory allocator policy, supported value: " + ByteBufAllocPolicy.values().mkString(", ")
val S3StreamObjectCompactionIntervalMinutesDoc = "The S3 stream object compaction task interval in minutes."
val S3StreamObjectCompactionMaxSizeBytesDoc = "The S3 stream object compaction max size in bytes."
val S3ControllerRequestRetryMaxCountDoc = "The S3 controller request retry max count."
Expand Down Expand Up @@ -1593,6 +1596,7 @@ object KafkaConfig {
.define(S3ObjectBlockSizeProp, INT, 1048576, MEDIUM, S3ObjectBlockSizeDoc)
.define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc)
.define(S3BlockCacheSizeProp, LONG, 1073741824L, MEDIUM, S3BlockCacheSizeDoc)
.define(S3StreamAllocatorPolicyProp, STRING, ByteBufAllocPolicy.UNPOOLED_HEAP.name, MEDIUM, S3StreamAllocatorPolicyDoc)
.define(S3StreamObjectCompactionIntervalMinutesProp, INT, 30, MEDIUM, S3StreamObjectCompactionIntervalMinutesDoc)
.define(S3StreamObjectCompactionMaxSizeBytesProp, LONG, 1073741824L, MEDIUM, S3StreamObjectCompactionMaxSizeBytesDoc)
.define(S3ControllerRequestRetryMaxCountProp, INT, Integer.MAX_VALUE, MEDIUM, S3ControllerRequestRetryMaxCountDoc)
Expand Down Expand Up @@ -2171,6 +2175,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp)
val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp)
val s3BlockCacheSize = getLong(KafkaConfig.S3BlockCacheSizeProp)
val s3StreamAllocatorPolicy = getString(KafkaConfig.S3StreamAllocatorPolicyProp)
val s3StreamObjectCompactionTaskIntervalMinutes = getInt(KafkaConfig.S3StreamObjectCompactionIntervalMinutesProp)
val s3StreamObjectCompactionMaxSizeBytes = getLong(KafkaConfig.S3StreamObjectCompactionMaxSizeBytesProp)
val s3ControllerRequestRetryMaxCount = getInt(KafkaConfig.S3ControllerRequestRetryMaxCountProp)
Expand Down
Loading