From 5f682594ba95759113ec673ced554c842edd57d3 Mon Sep 17 00:00:00 2001
From: lyzs90 <leongyzs@gmail.com>
Date: Mon, 6 Jan 2025 00:01:57 +0800
Subject: [PATCH] feat(config): add s3 config validation

---
 .../main/scala/kafka/server/KafkaConfig.scala   | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1860a597a2..ffb029906d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -761,6 +761,23 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
       }
     }
 
+    // configuration sanity checks
+    val memoryType = if (s3StreamAllocatorPolicy.isDirect) "Direct buffer" else "Heap buffer"
+    val memoryLimit = if (s3StreamAllocatorPolicy.isDirect) {
+      PlatformDependent.maxDirectMemory()
+    } else {
+      Runtime.getRuntime.maxMemory()
+    }
+    if (s3BlockCacheSize > memoryLimit) {
+      throw new ConfigException(s"${AutoMQConfig.S3_BLOCK_CACHE_SIZE_CONFIG} of ${s3BlockCacheSize} exceeds ${memoryType} limit of ${memoryLimit}")
+    }
+    if (s3WALCacheSize > memoryLimit) {
+      throw new ConfigException(s"${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize} exceeds ${memoryType} limit of ${memoryLimit}")
+    }
+    if (s3WALUploadThreshold > s3WALCacheSize) {
+      throw new ConfigException(s"${AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_CONFIG} of ${s3WALUploadThreshold} exceeds ${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize}")
+    }
+
     (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold)
   }