Skip to content

Commit

Permalink
Merge pull request #371 from LGouellec/enhancement/bound-rocksdb-config
Browse files Browse the repository at this point in the history
Provide a Bounded Memory RocksRb config handler
  • Loading branch information
LGouellec authored Sep 25, 2024
2 parents 67cf486 + f5acf70 commit 68741e7
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 9 deletions.
9 changes: 9 additions & 0 deletions core/RandomGenerator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Security.Cryptography;

namespace Streamiz.Kafka.Net
Expand All @@ -22,5 +23,13 @@ public static int GetInt32(int partitionCount)
return RandomNumberGenerator.GetInt32(0, partitionCount);
#endif
}

public static string GetRandomString(int maxLength = 100)
{
var rdLength = GetInt32(maxLength);
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
return new string(Enumerable.Repeat(chars, rdLength)
.Select(s => s[GetInt32(s.Length)]).ToArray());
}
}
}
114 changes: 114 additions & 0 deletions core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using RocksDbSharp;
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Table;

namespace Streamiz.Kafka.Net.State.RocksDb
{
public class BoundMemoryRocksDbConfigHandler
{
private bool configured;
private Compaction compactionStyle = Compaction.Universal;
private Compression compressionType = Compression.No;
private int maxNumberOfThreads;
private bool useJemallocAllocator;

private IntPtr writeBufferManager;
private IntPtr blockCachePtr;

public ulong CacheSizeCapacity { get; private set; }

public BoundMemoryRocksDbConfigHandler UseJemalloc()
{
useJemallocAllocator = true;
return this;
}

public BoundMemoryRocksDbConfigHandler SetCompactionStyle(Compaction compaction)
{
compactionStyle = compaction;
return this;
}

public BoundMemoryRocksDbConfigHandler SetCompressionType(Compression compression)
{
compressionType = compression;
return this;
}

public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize)
=> LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes));

public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize, bool mutualizeCache)
=> LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes), mutualizeCache);

public BoundMemoryRocksDbConfigHandler ConfigureNumThreads(int numberOfThreads)
{
maxNumberOfThreads = numberOfThreads;
return this;
}

private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMemory, bool mutualizeCache = false)
{
if (configured)
throw new IllegalStateException(
"BoundMemoryRocksDbConfigHandler is already configured ! To avoid multiple block cache and writer buffer manager allocation, an inner exception is throw. It was due to a bug or misconfiguration in your side");

CacheSizeCapacity = totalUnManagedMemory;
configured = true;
ulong blockCacheSize = mutualizeCache ? totalUnManagedMemory : totalUnManagedMemory / 2;
ulong totalMemtableMemory = totalUnManagedMemory / 2;

// block cache allocator
IntPtr LRUCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUCacheOptionsPtr, new UIntPtr(blockCacheSize));
Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUCacheOptionsPtr, -1);
if (useJemallocAllocator)
{
IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUCacheOptionsPtr, jemallocPtr);
}

blockCachePtr = Native.Instance.rocksdb_cache_create_lru_opts(LRUCacheOptionsPtr);

// wbm allocator
IntPtr LRUWriteCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUWriteCacheOptionsPtr, new UIntPtr(totalMemtableMemory));
Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUWriteCacheOptionsPtr, -1);
if (useJemallocAllocator)
{
IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUWriteCacheOptionsPtr, jemallocPtr);
}
var cacheWBM = mutualizeCache ? blockCachePtr : Native.Instance.rocksdb_cache_create_lru_opts(LRUWriteCacheOptionsPtr);

writeBufferManager = Native.Instance.rocksdb_write_buffer_manager_create_with_cache(
new UIntPtr(totalMemtableMemory),
cacheWBM,
false);
return this;
}


public void Handle(string storeName, RocksDbOptions options)
{
options.SetCompactionStyle(compactionStyle);
options.SetCompression(compressionType);

var tableConfig = new BlockBasedTableOptions();
tableConfig.SetBlockCache(blockCachePtr); // use the same block cache for each state store
tableConfig.SetBlockSize(4096L); // 4Kb
tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());
tableConfig.SetCacheIndexAndFilterBlocks(true);
Native.Instance.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(
tableConfig.Handle, true);
Native.Instance.rocksdb_block_based_options_set_pin_top_level_index_and_filter(tableConfig.Handle, true);
options.SetBlockBasedTableFactory(tableConfig);

options.SetWriteBufferManager(writeBufferManager);

options.SetStatsDumpPeriodSec(0);
options.IncreaseParallelism(Math.Max(maxNumberOfThreads, 2));
}
}
}
9 changes: 5 additions & 4 deletions core/State/RocksDb/RocksDbKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,14 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
BlockBasedTableOptions tableConfig = new BlockBasedTableOptions();

RocksDbOptions rocksDbOptions = new RocksDbOptions(dbOptions, columnFamilyOptions);

tableConfig.SetBlockCache(RocksDbSharp.Cache.CreateLru(BLOCK_CACHE_SIZE));
tableConfig.SetBlockSize(BLOCK_SIZE);
tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());

rocksDbOptions.SetOptimizeFiltersForHits(1);
rocksDbOptions.SetBlockBasedTableFactory(tableConfig);
rocksDbOptions.SetCompression(COMPRESSION_TYPE);
rocksDbOptions.SetWriteBufferSize(WRITE_BUFFER_SIZE);
rocksDbOptions.SetCompactionStyle(COMPACTION_STYLE);
rocksDbOptions.SetMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
rocksDbOptions.SetCreateIfMissing(true);
rocksDbOptions.SetErrorIfExists(false);
rocksDbOptions.SetInfoLogLevel(InfoLogLevel.Error);
Expand All @@ -367,6 +364,10 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
// TODO : wrap writeOptions in rocksDbOptions too
writeOptions.DisableWal(1);

rocksDbOptions.SetWriteBufferSize(WRITE_BUFFER_SIZE);
rocksDbOptions.SetMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
rocksDbOptions.SetBlockBasedTableFactory(tableConfig);

context.Configuration.RocksDbConfigHandler?.Invoke(Name, rocksDbOptions);
rocksDbOptions.SetMinWriteBufferNumberToMerge(2);

Expand Down
13 changes: 13 additions & 0 deletions core/State/RocksDb/RocksDbOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,5 +1303,18 @@ public RocksDbOptions SetWriteBufferSize(ulong value)
}

#endregion

#region Custom

/// <summary>
/// Set the write buffer manager
/// </summary>
/// <param name="wbm">Pointer to the writer buffer manager</param>
public void SetWriteBufferManager(IntPtr wbm)
{
Native.Instance.rocksdb_options_set_write_buffer_manager(dbOptions.Handle, wbm);
}

#endregion
}
}
23 changes: 18 additions & 5 deletions samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using RocksDbSharp;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.RocksDb;
using Streamiz.Kafka.Net.Stream;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Table;

namespace sample_stream
Expand All @@ -16,15 +17,22 @@ public static class Program
{
public static async Task Main(string[] args)
{
var rocksDbHandler = new BoundMemoryRocksDbConfigHandler()
.ConfigureNumThreads(2)
.SetCompactionStyle(Compaction.Universal)
.SetCompressionType(Compression.Lz4)
.LimitTotalMemory(CacheSize.OfMb(40));

var config = new StreamConfig<StringSerDes, StringSerDes>{
ApplicationId = $"test-app",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
Logger = LoggerFactory.Create((b) =>
Logger = LoggerFactory.Create(b =>
{
b.AddConsole();
b.SetMinimumLevel(LogLevel.Information);
})
}),
RocksDbConfigHandler = rocksDbHandler.Handle
};

config.MetricsRecording = MetricsRecordingLevel.DEBUG;
Expand Down Expand Up @@ -56,11 +64,16 @@ private static Topology BuildTopology()
new StringSerDes(),
new StringSerDes());*/

builder.Stream<string, string>("input")
/*builder.Stream<string, string>("input")
.DropDuplicate((key, value1, value2) => value1.Equals(value2),
TimeSpan.FromMinutes(1))
.To(
"output");//, (s, s1, arg3, arg4) => new Partition(0));
"output");*/

builder.Stream<string, string>("users")
.GroupByKey()
.Count()
.ToStream("count_users");

return builder.Build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.IO;
using Confluent.Kafka;
using NUnit.Framework;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.State.RocksDb;
using Streamiz.Kafka.Net.Table;
using Moq;
using Streamiz.Kafka.Net.Crosscutting;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.SerDes;

namespace Streamiz.Kafka.Net.Tests.Stores;

public class BoundMemoryRocksDbConfigHandlerTests
{
private void MemoryBoundedOptionsImpl(BoundMemoryRocksDbConfigHandler handler)
{
Bytes Key(string k)
{
StringSerDes s = new StringSerDes();
return new Bytes(s.Serialize(k, SerializationContext.Empty));
}

byte[] Value(Int32 number)
{
Int32SerDes i = new Int32SerDes();
return i.Serialize(number, SerializationContext.Empty);
}

bool stop = false;

var config = new StreamConfig();
config.RocksDbConfigHandler = handler.Handle;

var stateManager = new Mock<IStateManager>();
stateManager.Setup(s =>
s.Register(It.IsAny<IStateStore>(), It.IsAny<Action<ConsumeResult<byte[], byte[]>>>()));

var context = new Mock<ProcessorContext>();
context.Setup(c => c.Id)
.Returns(new TaskId { Id = 0, Partition = 0 });
context.Setup(c => c.StateDir)
.Returns("./bound-test/");
context.Setup(c => c.Configuration)
.Returns(config);
context.Setup(c => c.States)
.Returns(() => stateManager.Object);

ulong totalLogged = 0L;

RocksDbKeyValueStore store = new RocksDbKeyValueStore("store");
store.Init(context.Object, store);

List<Bytes> allKeys = new List<Bytes>();
while (!stop)
{
var key = RandomGenerator.GetRandomString();
var value = RandomGenerator.GetInt32(Int32.MaxValue);

var rawKey = Key(key);
store.Put(Key(key), Value(value));
allKeys.Add(rawKey);
totalLogged += Convert.ToUInt32(key.Length + sizeof(Int32));
stop = totalLogged > 2 * handler.CacheSizeCapacity; // stop if 2 * totalUnmanagedMemory has pushed
}

long memtableSizeAfterWrite = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
long blockCacheSizeBeforeRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.block-cache-usage"));

store.Flush();

foreach (var key in allKeys)
store.Get(key);

long memtableSizeAfterRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
long blockCacheSizeAfterRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.block-cache-usage"));

store.Close();
Directory.Delete(context.Object.StateDir, true);

Assert.IsTrue(Convert.ToUInt32(memtableSizeAfterWrite + blockCacheSizeBeforeRead) < handler.CacheSizeCapacity);
Assert.IsTrue(Convert.ToUInt32(memtableSizeAfterRead + blockCacheSizeAfterRead) < handler.CacheSizeCapacity);
}

[Test]
[NonParallelizable]
public void CheckWithMemoryBoundedOptions()
{
CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
handler
.LimitTotalMemory(totalUnmanagedMemory);

MemoryBoundedOptionsImpl(handler);
}

[Test]
[NonParallelizable]
public void MutualizeCacheMemoryBoundedOptions()
{
CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
handler
.LimitTotalMemory(totalUnmanagedMemory,true);

MemoryBoundedOptionsImpl(handler);
}
}

0 comments on commit 68741e7

Please sign in to comment.