Skip to content

Commit

Permalink
Smaller improvements to async queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
CptMoore committed Dec 26, 2024
1 parent 08afb07 commit 50e1c1e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 53 deletions.
74 changes: 36 additions & 38 deletions ModTek/Features/Logging/LightWeightBlockingQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,94 +10,90 @@ namespace ModTek.Features.Logging;
// RingBuffer with nullable reference types=38ns
// ConcurrentQueue+custom size tracking+addingComplete=40ns
// BlockingCollection=170ns
// Use-Cases:
// 1. no items -> consumer: wait and don't use cpu
// 2. almost no items -> producer + consumer: low latency
// 3. full -> producer: low latency; consumer: high throughput (solution is not optimized for this case)
internal class LightWeightBlockingQueue
{
internal void Shutdown() => _shutdown = true;
private volatile bool _shutdown; // some way to break the waiting

// 100k leads to about ~30MB, pre-allocation reports less due to absent strings
private const int MaxRingBufferSize = 100_000;
private const int MaxQueueSize = MaxRingBufferSize - 1; // Start=End need to be distinguishable
// 65k leads to about ~20MB, pre-allocation reports less due to absent strings
// see https://en.wikipedia.org/wiki/Modulo#Performance_issues
private const int MaxRingBufferSize = 1 << 16; // power of 2 required by FastModuloMaskForBitwiseAnd
private const int FastModuloMaskForBitwiseAnd = MaxRingBufferSize - 1;
private const int MaxQueueSize = MaxRingBufferSize - 1; // Start and End need to be distinguishable
// ring buffer is used by Disruptor(.NET), seems to work well for them
// typed based exchanges are 56ns (fixed as of .NET 7) hence why we use object based ones
private readonly MTLoggerMessageDto[] _ringBuffer = new MTLoggerMessageDto[MaxRingBufferSize];
// end - start = size // all indexes are "excluding" basically, 0 means 0 not yet written (or read)
// TODO how to avoid douple indexes?
private volatile int _nextWritingIndex; // sync in between writers
private volatile int _nextReadIndex; // sync between readers -> writers
// end - start = size
private volatile int _nextWriteIndex;
private volatile int _nextReadIndex;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int Next(int index)
{
return (index + 1) % MaxRingBufferSize;
return (index + 1) & FastModuloMaskForBitwiseAnd;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int Size(int startIndex, int endIndex)
{
return (endIndex - startIndex + MaxRingBufferSize) % MaxRingBufferSize;
return (endIndex - startIndex) & FastModuloMaskForBitwiseAnd;
}

// the following trick is faster but less human consumption friendly
// my guess is that modulo is already optimized and only the addition for the Size calc is what makes a difference
// see https://en.wikipedia.org/wiki/Modulo#Performance_issues
// Bitwise AND is faster than modulo, just requires size to be power of 2
// only gained like 1-2ns though (meaning it is within measurement error...)
// private const int MaxRingBufferSize = 1 << 16; // power of 2 required by FastModuloMaskForBitwiseAnd
// private const int FastModuloMaskForBitwiseAnd = MaxRingBufferSize - 1;
// [MethodImpl(MethodImplOptions.AggressiveInlining)]
// private static int Next(int index)
// {
// return (index + 1) & FastModuloMaskForBitwiseAnd;
// }
// [MethodImpl(MethodImplOptions.AggressiveInlining)]
// private static int Size(int startIndex, int endIndex)
// {
// return (endIndex - startIndex) & FastModuloMaskForBitwiseAnd;
// }

internal ref MTLoggerMessageDto AcquireCommittedOrWait()
{
var spinWait = new SpinWait();
while (true)
{
var index = _nextReadIndex;
if (Size(index, _nextWritingIndex) > 0)
if (Size(index, _nextWriteIndex) > 0)
{
ref var item = ref _ringBuffer[index];
// makes sure no overtake on the ring happens
if (item.CommittedToQueue)
{
if (Interlocked.CompareExchange(ref _nextReadIndex, Next(index), index) == index)
{
return ref item;
}
}
}

spinWait.SpinOnce(); // reader should yield and sleep if nothing comes in after some time
else
{
// this branch happens 292 times for 157187 dispatches (0.19%)
// for now not worth it to optimize
}

if (_shutdown)
spinWait.Reset(); // fast retry if something was found earlier
}
else
{
// this can still drop logs, very unlikely but possible
Thread.Sleep(1);
if (Size(_nextReadIndex, _nextWritingIndex) == 0)
if (_shutdown)
{
throw new ObjectDisposedException("Shutting down logging thread using an exception, this is harmless");
// this can still drop logs, very unlikely but possible
Thread.Sleep(1);
if (Size(_nextReadIndex, _nextWriteIndex) == 0)
{
throw new ShutdownException();
}
}
}
spinWait.SpinOnce(); // reader should yield and sleep if nothing comes in after some time
}
}

internal ref MTLoggerMessageDto AcquireUncommitedOrWait()
{
while (true)
{
var index = _nextWritingIndex;
var index = _nextWriteIndex;
if (Size(_nextReadIndex, index) < MaxQueueSize)
{
ref var item = ref _ringBuffer[index];
if (!item.CommittedToQueue)
{
if (Interlocked.CompareExchange(ref _nextWritingIndex, Next(index), index) == index)
if (Interlocked.CompareExchange(ref _nextWriteIndex, Next(index), index) == index)
{
return ref item;
}
Expand All @@ -107,4 +103,6 @@ internal ref MTLoggerMessageDto AcquireUncommitedOrWait()
Thread.SpinWait(4); // main thread should always try to dispatch asap, never wait that much!
}
}

internal class ShutdownException : Exception;
}
37 changes: 22 additions & 15 deletions ModTek/Features/Logging/MTLoggerAsyncQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,32 @@ Asynchronous logging offloaded {offloadedTime} from the main thread.

private void LoggingLoop()
{
while (true)
try
{
ref var message = ref _queue.AcquireCommittedOrWait();

s_loggingStopwatch.Start();
try
{
_processor.Process(ref message);
}
catch (Exception e)
{
LoggingFeature.WriteExceptionToFatalLog(e);
}
finally
while (true)
{
message.Reset();
s_loggingStopwatch.Stop();
ref var message = ref _queue.AcquireCommittedOrWait();

s_loggingStopwatch.Start();
try
{
_processor.Process(ref message);
}
catch (Exception e)
{
LoggingFeature.WriteExceptionToFatalLog(e);
}
finally
{
message.Reset();
s_loggingStopwatch.Stop();
}
}
}
catch (LightWeightBlockingQueue.ShutdownException)
{
// ignored
}
}

internal ref MTLoggerMessageDto AcquireUncommitedOrWait()
Expand Down

0 comments on commit 50e1c1e

Please sign in to comment.