Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a Bounded Memory RocksRb config handler #371

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/RandomGenerator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Security.Cryptography;

namespace Streamiz.Kafka.Net
Expand All @@ -22,5 +23,13 @@ public static int GetInt32(int partitionCount)
return RandomNumberGenerator.GetInt32(0, partitionCount);
#endif
}

public static string GetRandomString(int maxLength = 100)
{
var rdLength = GetInt32(maxLength);
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
return new string(Enumerable.Repeat(chars, rdLength)
.Select(s => s[GetInt32(s.Length)]).ToArray());
}
}
}
114 changes: 114 additions & 0 deletions core/State/RocksDb/BoundMemoryRocksDbConfigHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using RocksDbSharp;
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Table;

namespace Streamiz.Kafka.Net.State.RocksDb
{
public class BoundMemoryRocksDbConfigHandler
{
private bool configured;
private Compaction compactionStyle = Compaction.Universal;
private Compression compressionType = Compression.No;
private int maxNumberOfThreads;
private bool useJemallocAllocator;

private IntPtr writeBufferManager;
private IntPtr blockCachePtr;

public ulong CacheSizeCapacity { get; private set; }

public BoundMemoryRocksDbConfigHandler UseJemalloc()
{
useJemallocAllocator = true;
return this;
}

public BoundMemoryRocksDbConfigHandler SetCompactionStyle(Compaction compaction)
{
compactionStyle = compaction;
return this;
}

public BoundMemoryRocksDbConfigHandler SetCompressionType(Compression compression)
{
compressionType = compression;
return this;
}

public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize)
=> LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes));

public BoundMemoryRocksDbConfigHandler LimitTotalMemory(CacheSize maximumCacheSize, bool mutualizeCache)
=> LimitTotalMemory(Convert.ToUInt32(maximumCacheSize.CacheSizeBytes), mutualizeCache);

public BoundMemoryRocksDbConfigHandler ConfigureNumThreads(int numberOfThreads)
{
maxNumberOfThreads = numberOfThreads;
return this;
}

private BoundMemoryRocksDbConfigHandler LimitTotalMemory(ulong totalUnManagedMemory, bool mutualizeCache = false)
{
if (configured)
throw new IllegalStateException(
"BoundMemoryRocksDbConfigHandler is already configured ! To avoid multiple block cache and writer buffer manager allocation, an inner exception is throw. It was due to a bug or misconfiguration in your side");

CacheSizeCapacity = totalUnManagedMemory;
configured = true;
ulong blockCacheSize = mutualizeCache ? totalUnManagedMemory : totalUnManagedMemory / 2;
ulong totalMemtableMemory = totalUnManagedMemory / 2;

// block cache allocator
IntPtr LRUCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUCacheOptionsPtr, new UIntPtr(blockCacheSize));
Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUCacheOptionsPtr, -1);
if (useJemallocAllocator)
{
IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUCacheOptionsPtr, jemallocPtr);
}

blockCachePtr = Native.Instance.rocksdb_cache_create_lru_opts(LRUCacheOptionsPtr);

// wbm allocator
IntPtr LRUWriteCacheOptionsPtr = Native.Instance.rocksdb_lru_cache_options_create();
Native.Instance.rocksdb_lru_cache_options_set_capacity(LRUWriteCacheOptionsPtr, new UIntPtr(totalMemtableMemory));
Native.Instance.rocksdb_lru_cache_options_set_num_shard_bits(LRUWriteCacheOptionsPtr, -1);
if (useJemallocAllocator)
{
IntPtr jemallocPtr = Native.Instance.rocksdb_jemalloc_nodump_allocator_create();
Native.Instance.rocksdb_lru_cache_options_set_memory_allocator(LRUWriteCacheOptionsPtr, jemallocPtr);
}
var cacheWBM = mutualizeCache ? blockCachePtr : Native.Instance.rocksdb_cache_create_lru_opts(LRUWriteCacheOptionsPtr);

writeBufferManager = Native.Instance.rocksdb_write_buffer_manager_create_with_cache(
new UIntPtr(totalMemtableMemory),
cacheWBM,
false);
return this;
}


public void Handle(string storeName, RocksDbOptions options)
{
options.SetCompactionStyle(compactionStyle);
options.SetCompression(compressionType);

var tableConfig = new BlockBasedTableOptions();
tableConfig.SetBlockCache(blockCachePtr); // use the same block cache for each state store
tableConfig.SetBlockSize(4096L); // 4Kb
tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());
tableConfig.SetCacheIndexAndFilterBlocks(true);
Native.Instance.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(
tableConfig.Handle, true);
Native.Instance.rocksdb_block_based_options_set_pin_top_level_index_and_filter(tableConfig.Handle, true);
options.SetBlockBasedTableFactory(tableConfig);

options.SetWriteBufferManager(writeBufferManager);

options.SetStatsDumpPeriodSec(0);
options.IncreaseParallelism(Math.Max(maxNumberOfThreads, 2));
}
}
}
9 changes: 5 additions & 4 deletions core/State/RocksDb/RocksDbKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,14 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
BlockBasedTableOptions tableConfig = new BlockBasedTableOptions();

RocksDbOptions rocksDbOptions = new RocksDbOptions(dbOptions, columnFamilyOptions);

tableConfig.SetBlockCache(RocksDbSharp.Cache.CreateLru(BLOCK_CACHE_SIZE));
tableConfig.SetBlockSize(BLOCK_SIZE);
tableConfig.SetFilterPolicy(BloomFilterPolicy.Create());

rocksDbOptions.SetOptimizeFiltersForHits(1);
rocksDbOptions.SetBlockBasedTableFactory(tableConfig);
rocksDbOptions.SetCompression(COMPRESSION_TYPE);
rocksDbOptions.SetWriteBufferSize(WRITE_BUFFER_SIZE);
rocksDbOptions.SetCompactionStyle(COMPACTION_STYLE);
rocksDbOptions.SetMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
rocksDbOptions.SetCreateIfMissing(true);
rocksDbOptions.SetErrorIfExists(false);
rocksDbOptions.SetInfoLogLevel(InfoLogLevel.Error);
Expand All @@ -367,6 +364,10 @@ public IEnumerable<KeyValuePair<Bytes, byte[]>> ReverseAll()
// TODO : wrap writeOptions in rocksDbOptions too
writeOptions.DisableWal(1);

rocksDbOptions.SetWriteBufferSize(WRITE_BUFFER_SIZE);
rocksDbOptions.SetMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
rocksDbOptions.SetBlockBasedTableFactory(tableConfig);

context.Configuration.RocksDbConfigHandler?.Invoke(Name, rocksDbOptions);
rocksDbOptions.SetMinWriteBufferNumberToMerge(2);

Expand Down
13 changes: 13 additions & 0 deletions core/State/RocksDb/RocksDbOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,5 +1303,18 @@ public RocksDbOptions SetWriteBufferSize(ulong value)
}

#endregion

#region Custom

/// <summary>
/// Set the write buffer manager
/// </summary>
/// <param name="wbm">Pointer to the writer buffer manager</param>
public void SetWriteBufferManager(IntPtr wbm)
{
Native.Instance.rocksdb_options_set_write_buffer_manager(dbOptions.Handle, wbm);
}

#endregion
}
}
23 changes: 18 additions & 5 deletions samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using RocksDbSharp;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.RocksDb;
using Streamiz.Kafka.Net.Stream;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Table;

namespace sample_stream
Expand All @@ -16,15 +17,22 @@ public static class Program
{
public static async Task Main(string[] args)
{
var rocksDbHandler = new BoundMemoryRocksDbConfigHandler()
.ConfigureNumThreads(2)
.SetCompactionStyle(Compaction.Universal)
.SetCompressionType(Compression.Lz4)
.LimitTotalMemory(CacheSize.OfMb(40));

var config = new StreamConfig<StringSerDes, StringSerDes>{
ApplicationId = $"test-app",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
Logger = LoggerFactory.Create((b) =>
Logger = LoggerFactory.Create(b =>
{
b.AddConsole();
b.SetMinimumLevel(LogLevel.Information);
})
}),
RocksDbConfigHandler = rocksDbHandler.Handle
};

config.MetricsRecording = MetricsRecordingLevel.DEBUG;
Expand Down Expand Up @@ -56,11 +64,16 @@ private static Topology BuildTopology()
new StringSerDes(),
new StringSerDes());*/

builder.Stream<string, string>("input")
/*builder.Stream<string, string>("input")
.DropDuplicate((key, value1, value2) => value1.Equals(value2),
TimeSpan.FromMinutes(1))
.To(
"output");//, (s, s1, arg3, arg4) => new Partition(0));
"output");*/

builder.Stream<string, string>("users")
.GroupByKey()
.Count()
.ToStream("count_users");

return builder.Build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.IO;
using Confluent.Kafka;
using NUnit.Framework;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.State.RocksDb;
using Streamiz.Kafka.Net.Table;
using Moq;
using Streamiz.Kafka.Net.Crosscutting;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.SerDes;

namespace Streamiz.Kafka.Net.Tests.Stores;

public class BoundMemoryRocksDbConfigHandlerTests
{
private void MemoryBoundedOptionsImpl(BoundMemoryRocksDbConfigHandler handler)
{
Bytes Key(string k)
{
StringSerDes s = new StringSerDes();
return new Bytes(s.Serialize(k, SerializationContext.Empty));
}

byte[] Value(Int32 number)
{
Int32SerDes i = new Int32SerDes();
return i.Serialize(number, SerializationContext.Empty);
}

bool stop = false;

var config = new StreamConfig();
config.RocksDbConfigHandler = handler.Handle;

var stateManager = new Mock<IStateManager>();
stateManager.Setup(s =>
s.Register(It.IsAny<IStateStore>(), It.IsAny<Action<ConsumeResult<byte[], byte[]>>>()));

var context = new Mock<ProcessorContext>();
context.Setup(c => c.Id)
.Returns(new TaskId { Id = 0, Partition = 0 });
context.Setup(c => c.StateDir)
.Returns("./bound-test/");
context.Setup(c => c.Configuration)
.Returns(config);
context.Setup(c => c.States)
.Returns(() => stateManager.Object);

ulong totalLogged = 0L;

RocksDbKeyValueStore store = new RocksDbKeyValueStore("store");
store.Init(context.Object, store);

List<Bytes> allKeys = new List<Bytes>();
while (!stop)
{
var key = RandomGenerator.GetRandomString();
var value = RandomGenerator.GetInt32(Int32.MaxValue);

var rawKey = Key(key);
store.Put(Key(key), Value(value));
allKeys.Add(rawKey);
totalLogged += Convert.ToUInt32(key.Length + sizeof(Int32));
stop = totalLogged > 2 * handler.CacheSizeCapacity; // stop if 2 * totalUnmanagedMemory has pushed
}

long memtableSizeAfterWrite = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
long blockCacheSizeBeforeRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.block-cache-usage"));

store.Flush();

foreach (var key in allKeys)
store.Get(key);

long memtableSizeAfterRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.cur-size-all-mem-tables"));
long blockCacheSizeAfterRead = Convert.ToInt32(store.Db.GetProperty("rocksdb.block-cache-usage"));

store.Close();
Directory.Delete(context.Object.StateDir, true);

Assert.IsTrue(Convert.ToUInt32(memtableSizeAfterWrite + blockCacheSizeBeforeRead) < handler.CacheSizeCapacity);
Assert.IsTrue(Convert.ToUInt32(memtableSizeAfterRead + blockCacheSizeAfterRead) < handler.CacheSizeCapacity);
}

[Test]
[NonParallelizable]
public void CheckWithMemoryBoundedOptions()
{
CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
handler
.LimitTotalMemory(totalUnmanagedMemory);

MemoryBoundedOptionsImpl(handler);
}

[Test]
[NonParallelizable]
public void MutualizeCacheMemoryBoundedOptions()
{
CacheSize totalUnmanagedMemory = CacheSize.OfMb(10);
BoundMemoryRocksDbConfigHandler handler = new BoundMemoryRocksDbConfigHandler();
handler
.LimitTotalMemory(totalUnmanagedMemory,true);

MemoryBoundedOptionsImpl(handler);
}
}
Loading