diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1860a597a2..ffb029906d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -761,6 +761,23 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } + // configuration sanity checks + val memoryType = if (s3StreamAllocatorPolicy.isDirect) "Direct buffer" else "Heap buffer" + val memoryLimit = if (s3StreamAllocatorPolicy.isDirect) { + PlatformDependent.maxDirectMemory() + } else { + Runtime.getRuntime.maxMemory() + } + if (s3BlockCacheSize > memoryLimit) { + throw new ConfigException(s"${AutoMQConfig.S3_BLOCK_CACHE_SIZE_CONFIG} of ${s3BlockCacheSize} exceeds ${memoryType} limit of ${memoryLimit}") + } + if (s3WALCacheSize > memoryLimit) { + throw new ConfigException(s"${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize} exceeds ${memoryType} limit of ${memoryLimit}") + } + if (s3WALUploadThreshold > s3WALCacheSize) { + throw new ConfigException(s"${AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_CONFIG} of ${s3WALUploadThreshold} exceeds ${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize}") + } + (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold) }