From eb1d9adb8c5483123bd576a287278348ced4c729 Mon Sep 17 00:00:00 2001
From: LGouellec <sylvain.legouellec@hotmail.fr>
Date: Mon, 23 Sep 2024 11:51:06 -0700
Subject: [PATCH 1/3] Add a BoundMemoryRocksDbConfigHandler

---
 .../BoundMemoryRocksDbConfigHandler.cs        | 92 +++++++++++++++++++
 core/State/RocksDb/RocksDbKeyValueStore.cs    |  2 +-
 core/State/RocksDb/RocksDbOptions.cs          | 13 +++
 3 files changed, 106 insertions(+), 1 deletion(-)
 create mode 100644 core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs

diff --git a/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs b/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
new file mode 100644
index 00000000..43dd10e3
--- /dev/null
+++ b/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
@@ -0,0 +1,92 @@
+using System;
+using RocksDbSharp;
+using Streamiz.Kafka.Net.Errors;
+using Streamiz.Kafka.Net.Table;
+
+namespace Streamiz.Kafka.Net.State
+{
+    public class BoundMemoryRocksDbConfigHandler
+    {
+        private bool configured;
+        private Compaction compactionStyle;
+        private Compression compressionType;
+        private int maxNumberOfThreads;
+        
+        private IntPtr writeBufferManager;
+        private IntPtr blockCachePtr;
+
+        public BoundMemoryRocksDbConfigHandler SetCompactionStyle(Compaction compaction)
+        {
+            compactionStyle = compaction;
+            return this;
+        }
+
+        public BoundMemoryRocksDbConfigHandler SetCompressionType(Compression compression)
+        {
+            compressionType = compression;
+            return this;
+        }
+
+        public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize)
+            => LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes));
+        
+        public BoundMemoryRocksDbConfigHandler ConfigureNumThreads(int numberOfThreads)
+        {
+            maxNumberOfThreads = numberOfThreads;
+            return this;
+        }
+        
+        private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMemory)
+        {
+            if (configured)
+                throw new IllegalStateException(
+                    "BoundMemoryRocksDbConfigHandler is already configured ! To avoid multiple block cache and writer buffer manager allocation, an inner exception is throw. It was due to a bug or misconfiguration in your side");
+            
+            configured = true;
+            ulong blockCacheSize = totalUnManagedMemory / 2;
+            ulong totalMemtableMemory = totalUnManagedMemory / 2;
+            
+            // block cache allocator
+            Native.Instance.rocksdb_jemalloc_nodump_allocator_create(out IntPtr jemallocPtr);
+            IntPtr LRUCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
+            Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUCacheOptionsPtr, new UIntPtr(blockCacheSize));
+            Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUCacheOptionsPtr, -1);
+            Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUCacheOptionsPtr, jemallocPtr);
+            
+            blockCachePtr = Native.Instance.rocksdb_cache_create_lru_opts(LRUCacheOptionsPtr);
+            
+            // wbm allocator
+            IntPtr LRUWriteCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
+            Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUWriteCacheOptionsPtr, new UIntPtr(totalMemtableMemory));
+            Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUWriteCacheOptionsPtr, -1);
+            Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUWriteCacheOptionsPtr, jemallocPtr);
+
+            writeBufferManager = Native.Instance.rocksdb_write_buffer_manager_create_with_cache(
+                new UIntPtr(totalMemtableMemory),
+                LRUWriteCacheOptionsPtr,
+                false);
+            return this;
+        }
+        
+        
+        public void Handle(string storeName, RocksDbOptions options)
+        {
+            options.SetCompactionStyle(compactionStyle);
+            options.SetCompression(compressionType);
+            
+            var tableConfig = new BlockBasedTableOptions();
+            tableConfig.SetBlockCache(blockCachePtr); // use the same block cache for each state store
+            tableConfig.SetBlockSize(4096L); // 4Kb
+            tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());
+            Native.Instance.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(
+                tableConfig.Handle, true);
+            Native.Instance.rocksdb_block_based_options_set_pin_top_level_index_and_filter(tableConfig.Handle, true);
+            options.SetBlockBasedTableFactory(tableConfig);
+            
+            options.SetWriteBufferManager(writeBufferManager);
+
+            options.SetStatsDumpPeriodSec(0);
+            options.IncreaseParallelism(Math.Max(maxNumberOfThreads, 2));
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/State/RocksDb/RocksDbKeyValueStore.cs b/core/State/RocksDb/RocksDbKeyValueStore.cs
index 96a56ad7..b15551c0 100644
--- a/core/State/RocksDb/RocksDbKeyValueStore.cs
+++ b/core/State/RocksDb/RocksDbKeyValueStore.cs
@@ -341,7 +341,7 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
             BlockBasedTableOptions tableConfig = new BlockBasedTableOptions();
 
             RocksDbOptions rocksDbOptions = new RocksDbOptions(dbOptions, columnFamilyOptions);
-
+            
             tableConfig.SetBlockCache(RocksDbSharp.Cache.CreateLru(BLOCK_CACHE_SIZE));
             tableConfig.SetBlockSize(BLOCK_SIZE);
             tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());
diff --git a/core/State/RocksDb/RocksDbOptions.cs b/core/State/RocksDb/RocksDbOptions.cs
index 5d98510d..2ca0627a 100644
--- a/core/State/RocksDb/RocksDbOptions.cs
+++ b/core/State/RocksDb/RocksDbOptions.cs
@@ -1303,5 +1303,18 @@ public RocksDbOptions SetWriteBufferSize(ulong value)
         }
         
         #endregion
+        
+        #region Custom
+        
+        /// <summary>
+        /// Set the write buffer manager
+        /// </summary>
+        /// <param name="wbm">Pointer to the writer buffer manager</param>
+        public void SetWriteBufferManager(IntPtr wbm)
+        {
+            Native.Instance.rocksdb_options_set_write_buffer_manager(dbOptions.Handle, wbm);
+        }
+        
+        #endregion
     }
 }
\ No newline at end of file

From d1e67dca5a1237c3f960380741526bd124269d94 Mon Sep 17 00:00:00 2001
From: LGouellec <sylvain.legouellec@hotmail.fr>
Date: Mon, 23 Sep 2024 15:49:36 -0700
Subject: [PATCH 2/3] Add unit test for BoundRocksDbMemory

---
 core/RandomGenerator.cs                       |  9 ++
 .../BoundMemoryRocksDbConfigHandler.cs        | 38 +++++---
 core/State/RocksDb/RocksDbKeyValueStore.cs    |  7 +-
 samples/sample-stream/Program.cs              | 23 ++++-
 .../BoundMemoryRocksDbConfigHandlerTests.cs   | 92 +++++++++++++++++++
 5 files changed, 150 insertions(+), 19 deletions(-)
 create mode 100644 test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs

diff --git a/core/RandomGenerator.cs b/core/RandomGenerator.cs
index cb642a8e..0980f106 100644
--- a/core/RandomGenerator.cs
+++ b/core/RandomGenerator.cs
@@ -1,4 +1,5 @@
 using System;
+using System.Linq;
 using System.Security.Cryptography;
 
 namespace Streamiz.Kafka.Net
@@ -22,5 +23,13 @@ public static int GetInt32(int partitionCount)
             return RandomNumberGenerator.GetInt32(0, partitionCount);
 #endif
         }
+
+        public static string GetRandomString(int maxLength = 100)
+        {
+            var rdLength = GetInt32(maxLength);
+            const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+            return new string(Enumerable.Repeat(chars, rdLength)
+                .Select(s => s[GetInt32(s.Length)]).ToArray());
+        }
     }
 }
diff --git a/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs b/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
index 43dd10e3..d6ec07a9 100644
--- a/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
+++ b/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
@@ -3,18 +3,25 @@
 using Streamiz.Kafka.Net.Errors;
 using Streamiz.Kafka.Net.Table;
 
-namespace Streamiz.Kafka.Net.State
+namespace Streamiz.Kafka.Net.State.RocksDb
 {
     public class BoundMemoryRocksDbConfigHandler
     {
         private bool configured;
-        private Compaction compactionStyle;
-        private Compression compressionType;
+        private Compaction compactionStyle = Compaction.Universal;
+        private Compression compressionType = Compression.No;
         private int maxNumberOfThreads;
+        private bool useJemallocAllocator;
         
         private IntPtr writeBufferManager;
         private IntPtr blockCachePtr;
 
+        public BoundMemoryRocksDbConfigHandler UseJemalloc()
+        {
+            useJemallocAllocator = true;
+            return this;
+        }
+        
         public BoundMemoryRocksDbConfigHandler SetCompactionStyle(Compaction compaction)
         {
             compactionStyle = compaction;
@@ -43,27 +50,35 @@ private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMem
                     "BoundMemoryRocksDbConfigHandler is already configured ! To avoid multiple block cache and writer buffer manager allocation, an inner exception is throw. It was due to a bug or misconfiguration in your side");
             
             configured = true;
-            ulong blockCacheSize = totalUnManagedMemory / 2;
+            ulong blockCacheSize = totalUnManagedMemory;
             ulong totalMemtableMemory = totalUnManagedMemory / 2;
             
             // block cache allocator
-            Native.Instance.rocksdb_jemalloc_nodump_allocator_create(out IntPtr jemallocPtr);
             IntPtr LRUCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
             Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUCacheOptionsPtr, new UIntPtr(blockCacheSize));
             Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUCacheOptionsPtr, -1);
-            Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUCacheOptionsPtr, jemallocPtr);
-            
+            if (useJemallocAllocator)
+            {
+                IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
+                Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUCacheOptionsPtr, jemallocPtr);
+            }
+
             blockCachePtr = Native.Instance.rocksdb_cache_create_lru_opts(LRUCacheOptionsPtr);
             
             // wbm allocator
             IntPtr LRUWriteCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
             Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUWriteCacheOptionsPtr, new UIntPtr(totalMemtableMemory));
             Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUWriteCacheOptionsPtr, -1);
-            Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUWriteCacheOptionsPtr, jemallocPtr);
-
+            if (useJemallocAllocator)
+            {
+                IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
+                Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUWriteCacheOptionsPtr, jemallocPtr);
+            }
+            var cacheWBM = Native.Instance.rocksdb_cache_create_lru_opts(LRUWriteCacheOptionsPtr);
+            
             writeBufferManager = Native.Instance.rocksdb_write_buffer_manager_create_with_cache(
                 new UIntPtr(totalMemtableMemory),
-                LRUWriteCacheOptionsPtr,
+                cacheWBM,
                 false);
             return this;
         }
@@ -78,13 +93,14 @@ public void Handle(string storeName, RocksDbOptions options)
             tableConfig.SetBlockCache(blockCachePtr); // use the same block cache for each state store
             tableConfig.SetBlockSize(4096L); // 4Kb
             tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());
+            tableConfig.SetCacheIndexAndFilterBlocks(true);
             Native.Instance.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(
                 tableConfig.Handle, true);
             Native.Instance.rocksdb_block_based_options_set_pin_top_level_index_and_filter(tableConfig.Handle, true);
             options.SetBlockBasedTableFactory(tableConfig);
             
             options.SetWriteBufferManager(writeBufferManager);
-
+            
             options.SetStatsDumpPeriodSec(0);
             options.IncreaseParallelism(Math.Max(maxNumberOfThreads, 2));
         }
diff --git a/core/State/RocksDb/RocksDbKeyValueStore.cs b/core/State/RocksDb/RocksDbKeyValueStore.cs
index b15551c0..5ee75f4e 100644
--- a/core/State/RocksDb/RocksDbKeyValueStore.cs
+++ b/core/State/RocksDb/RocksDbKeyValueStore.cs
@@ -347,11 +347,8 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
             tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());
 
             rocksDbOptions.SetOptimizeFiltersForHits(1);
-            rocksDbOptions.SetBlockBasedTableFactory(tableConfig);
             rocksDbOptions.SetCompression(COMPRESSION_TYPE);
-            rocksDbOptions.SetWriteBufferSize(WRITE_BUFFER_SIZE);
             rocksDbOptions.SetCompactionStyle(COMPACTION_STYLE);
-            rocksDbOptions.SetMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
             rocksDbOptions.SetCreateIfMissing(true);
             rocksDbOptions.SetErrorIfExists(false);
             rocksDbOptions.SetInfoLogLevel(InfoLogLevel.Error);
@@ -367,6 +364,10 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
             // TODO : wrap writeOptions in rocksDbOptions too
             writeOptions.DisableWal(1);
 
+            rocksDbOptions.SetWriteBufferSize(WRITE_BUFFER_SIZE);
+            rocksDbOptions.SetMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+            rocksDbOptions.SetBlockBasedTableFactory(tableConfig);
+            
             context.Configuration.RocksDbConfigHandler?.Invoke(Name, rocksDbOptions);
             rocksDbOptions.SetMinWriteBufferNumberToMerge(2);
             
diff --git a/samples/sample-stream/Program.cs b/samples/sample-stream/Program.cs
index 6cf5d9a3..ea94829e 100644
--- a/samples/sample-stream/Program.cs
+++ b/samples/sample-stream/Program.cs
@@ -3,11 +3,12 @@
 using System.Threading.Tasks;
 using Confluent.Kafka;
 using Microsoft.Extensions.Logging;
+using RocksDbSharp;
 using Streamiz.Kafka.Net.Metrics;
 using Streamiz.Kafka.Net.Metrics.Prometheus;
 using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State.RocksDb;
 using Streamiz.Kafka.Net.Stream;
-using Microsoft.Extensions.Logging;
 using Streamiz.Kafka.Net.Table;
 
 namespace sample_stream
@@ -16,15 +17,22 @@ public static class Program
     {
         public static async Task Main(string[] args)
         {
+            var rocksDbHandler = new BoundMemoryRocksDbConfigHandler()
+                .ConfigureNumThreads(2)
+                .SetCompactionStyle(Compaction.Universal)
+                .SetCompressionType(Compression.Lz4)
+                .LimitTotalMemory(CacheSize.OfMb(40));
+            
            var config = new StreamConfig<StringSerDes, StringSerDes>{
                 ApplicationId = $"test-app",
                 BootstrapServers = "localhost:9092",
                 AutoOffsetReset = AutoOffsetReset.Earliest,
-                Logger = LoggerFactory.Create((b) =>
+                Logger = LoggerFactory.Create(b =>
                 {
                     b.AddConsole();
                     b.SetMinimumLevel(LogLevel.Information);
-                })
+                }),
+                RocksDbConfigHandler = rocksDbHandler.Handle
             };
            
             config.MetricsRecording = MetricsRecordingLevel.DEBUG;
@@ -56,11 +64,16 @@ private static Topology BuildTopology()
                     new StringSerDes(),
                     new StringSerDes());*/
 
-            builder.Stream<string, string>("input")
+            /*builder.Stream<string, string>("input")
                 .DropDuplicate((key, value1, value2) => value1.Equals(value2),
                     TimeSpan.FromMinutes(1))
                 .To(
-                    "output");//, (s, s1, arg3, arg4) => new Partition(0));
+                    "output");*/
+
+            builder.Stream<string, string>("users")
+                .GroupByKey()
+                .Count()
+                .ToStream("count_users");
             
             return builder.Build();
         }
diff --git a/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs b/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs
new file mode 100644
index 00000000..24f70ce8
--- /dev/null
+++ b/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs
@@ -0,0 +1,92 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using Confluent.Kafka;
+using NUnit.Framework;
+using Streamiz.Kafka.Net.State;
+using Streamiz.Kafka.Net.State.RocksDb;
+using Streamiz.Kafka.Net.Table;
+using Moq;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.Processors.Internal;
+using Streamiz.Kafka.Net.SerDes;
+
+namespace Streamiz.Kafka.Net.Tests.Stores;
+
+public class BoundMemoryRocksDbConfigHandlerTests
+{
+    [Test]
+    public void CheckWithMemoryBoundedOptions()
+    {
+        Bytes Key(string k)
+        {
+            StringSerDes s = new StringSerDes();
+            return new Bytes(s.Serialize(k, SerializationContext.Empty));
+        }
+
+        byte[] Value(Int32 number)
+        {
+            Int32SerDes i = new Int32SerDes();
+            return i.Serialize(number, SerializationContext.Empty);
+        }
+        
+        bool stop = false;
+        CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
+        BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
+        handler
+            .LimitTotalMemory(totalUnmanagedMemory);
+
+        var config = new StreamConfig();
+        config.RocksDbConfigHandler = handler.Handle;
+
+        var stateManager = new Mock<IStateManager>();
+        stateManager.Setup(s => 
+            s.Register(It.IsAny<IStateStore>(), It.IsAny<Action<ConsumeResult<byte[], byte[]>>>()));
+        
+        var context = new Mock<ProcessorContext>();
+        context.Setup(c => c.Id)
+            .Returns(new TaskId { Id = 0, Partition = 0 });
+        context.Setup(c => c.StateDir)
+            .Returns("./bound-test/");
+        context.Setup(c => c.Configuration)
+            .Returns(config);
+        context.Setup(c => c.States)
+            .Returns(() => stateManager.Object);
+
+        long totalLogged = 0L;
+        
+        RocksDbKeyValueStore store = new RocksDbKeyValueStore("store");
+        store.Init(context.Object, store);
+
+        List<Bytes> allKeys = new List<Bytes>();
+        while (!stop)
+        {
+            var key = RandomGenerator.GetRandomString();
+            var value = RandomGenerator.GetInt32(Int32.MaxValue);
+
+            var rawKey = Key(key);
+            store.Put(Key(key), Value(value));
+            allKeys.Add(rawKey);
+            totalLogged += key.Length + sizeof(Int32);
+            stop = totalLogged > 2 * totalUnmanagedMemory.CacheSizeBytes; // stop if 2 * totalUnmanagedMemory has pushed
+        }
+
+        long memtableSizeAfterWrite = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
+        long blockCacheSizeBeforeRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.block-cache-usage"));
+        
+        store.Flush();
+        
+        foreach (var key in allKeys)
+            store.Get(key);
+        
+        long memtableSizeAfterRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
+        long blockCacheSizeAfterRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.block-cache-usage"));
+        
+        store.Close();
+        Directory.Delete(context.Object.StateDir, true);
+        
+        Assert.IsTrue(memtableSizeAfterWrite + blockCacheSizeBeforeRead < totalUnmanagedMemory.CacheSizeBytes);
+        Assert.IsTrue(memtableSizeAfterRead + blockCacheSizeAfterRead < totalUnmanagedMemory.CacheSizeBytes);
+    }
+}
\ No newline at end of file

From f5acf70a014c8dfbf06572cae3b9dc8647376f89 Mon Sep 17 00:00:00 2001
From: LGouellec <sylvain.legouellec@hotmail.fr>
Date: Mon, 23 Sep 2024 15:57:59 -0700
Subject: [PATCH 3/3] Add mutualize cache capacity option

---
 .../BoundMemoryRocksDbConfigHandler.cs        | 12 ++++--
 .../BoundMemoryRocksDbConfigHandlerTests.cs   | 41 ++++++++++++++-----
 2 files changed, 39 insertions(+), 14 deletions(-)

diff --git a/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs b/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
index d6ec07a9..e89be67e 100644
--- a/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
+++ b/core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
@@ -15,6 +15,8 @@ public class BoundMemoryRocksDbConfigHandler
         
         private IntPtr writeBufferManager;
         private IntPtr blockCachePtr;
+        
+        public ulong CacheSizeCapacity { get; private set; }
 
         public BoundMemoryRocksDbConfigHandler UseJemalloc()
         {
@@ -37,20 +39,24 @@ public BoundMemoryRocksDbConfigHandler SetCompressionType(Compression compressio
         public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize)
             => LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes));
         
+        public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize, bool mutualizeCache)
+            => LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes), mutualizeCache);
+        
         public BoundMemoryRocksDbConfigHandler ConfigureNumThreads(int numberOfThreads)
         {
             maxNumberOfThreads = numberOfThreads;
             return this;
         }
         
-        private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMemory)
+        private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMemory, bool mutualizeCache = false)
         {
             if (configured)
                 throw new IllegalStateException(
                     "BoundMemoryRocksDbConfigHandler is already configured ! To avoid multiple block cache and writer buffer manager allocation, an inner exception is throw. It was due to a bug or misconfiguration in your side");
             
+            CacheSizeCapacity = totalUnManagedMemory;
             configured = true;
-            ulong blockCacheSize = totalUnManagedMemory;
+            ulong blockCacheSize = mutualizeCache ? totalUnManagedMemory : totalUnManagedMemory / 2;
             ulong totalMemtableMemory = totalUnManagedMemory / 2;
             
             // block cache allocator
@@ -74,7 +80,7 @@ private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMem
                 IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
                 Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUWriteCacheOptionsPtr, jemallocPtr);
             }
-            var cacheWBM = Native.Instance.rocksdb_cache_create_lru_opts(LRUWriteCacheOptionsPtr);
+            var cacheWBM = mutualizeCache ? blockCachePtr : Native.Instance.rocksdb_cache_create_lru_opts(LRUWriteCacheOptionsPtr);
             
             writeBufferManager = Native.Instance.rocksdb_write_buffer_manager_create_with_cache(
                 new UIntPtr(totalMemtableMemory),
diff --git a/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs b/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs
index 24f70ce8..d66b56d6 100644
--- a/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs
+++ b/test/Streamiz.Kafka.Net.Tests/Stores/BoundMemoryRocksDbConfigHandlerTests.cs
@@ -16,8 +16,7 @@ namespace Streamiz.Kafka.Net.Tests.Stores;
 
 public class BoundMemoryRocksDbConfigHandlerTests
 {
-    [Test]
-    public void CheckWithMemoryBoundedOptions()
+    private void MemoryBoundedOptionsImpl(BoundMemoryRocksDbConfigHandler handler)
     {
         Bytes Key(string k)
         {
@@ -32,10 +31,6 @@ byte[] Value(Int32 number)
         }
         
         bool stop = false;
-        CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
-        BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
-        handler
-            .LimitTotalMemory(totalUnmanagedMemory);
 
         var config = new StreamConfig();
         config.RocksDbConfigHandler = handler.Handle;
@@ -54,7 +49,7 @@ byte[] Value(Int32 number)
         context.Setup(c => c.States)
             .Returns(() => stateManager.Object);
 
-        long totalLogged = 0L;
+        ulong totalLogged = 0L;
         
         RocksDbKeyValueStore store = new RocksDbKeyValueStore("store");
         store.Init(context.Object, store);
@@ -68,8 +63,8 @@ byte[] Value(Int32 number)
             var rawKey = Key(key);
             store.Put(Key(key), Value(value));
             allKeys.Add(rawKey);
-            totalLogged += key.Length + sizeof(Int32);
-            stop = totalLogged > 2 * totalUnmanagedMemory.CacheSizeBytes; // stop if 2 * totalUnmanagedMemory has pushed
+            totalLogged += Convert.ToUInt32(key.Length + sizeof(Int32));
+            stop = totalLogged > 2 * handler.CacheSizeCapacity; // stop if 2 * totalUnmanagedMemory has pushed
         }
 
         long memtableSizeAfterWrite = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
@@ -86,7 +81,31 @@ byte[] Value(Int32 number)
         store.Close();
         Directory.Delete(context.Object.StateDir, true);
         
-        Assert.IsTrue(memtableSizeAfterWrite + blockCacheSizeBeforeRead < totalUnmanagedMemory.CacheSizeBytes);
-        Assert.IsTrue(memtableSizeAfterRead + blockCacheSizeAfterRead < totalUnmanagedMemory.CacheSizeBytes);
+        Assert.IsTrue(Convert.ToUInt32(memtableSizeAfterWrite + blockCacheSizeBeforeRead) < handler.CacheSizeCapacity);
+        Assert.IsTrue(Convert.ToUInt32(memtableSizeAfterRead + blockCacheSizeAfterRead) < handler.CacheSizeCapacity);
+    }
+    
+    [Test]
+    [NonParallelizable]
+    public void CheckWithMemoryBoundedOptions()
+    {
+        CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
+        BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
+        handler
+            .LimitTotalMemory(totalUnmanagedMemory);
+        
+        MemoryBoundedOptionsImpl(handler);
+    }
+    
+    [Test]
+    [NonParallelizable]
+    public void MutualizeCacheMemoryBoundedOptions()
+    {
+        CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
+        BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
+        handler
+            .LimitTotalMemory(totalUnmanagedMemory,true);
+        
+        MemoryBoundedOptionsImpl(handler);
     }
 }
\ No newline at end of file