From 50e1c1ec5e830a224f406dcaa45c88682e610716 Mon Sep 17 00:00:00 2001 From: CptMoore <39010654+CptMoore@users.noreply.github.com> Date: Fri, 27 Dec 2024 00:02:10 +0100 Subject: [PATCH] Smaller improvements to async queue. --- .../Logging/LightWeightBlockingQueue.cs | 74 +++++++++---------- ModTek/Features/Logging/MTLoggerAsyncQueue.cs | 37 ++++++---- 2 files changed, 58 insertions(+), 53 deletions(-) diff --git a/ModTek/Features/Logging/LightWeightBlockingQueue.cs b/ModTek/Features/Logging/LightWeightBlockingQueue.cs index a3915f7b..bfdcb16a 100644 --- a/ModTek/Features/Logging/LightWeightBlockingQueue.cs +++ b/ModTek/Features/Logging/LightWeightBlockingQueue.cs @@ -10,60 +10,48 @@ namespace ModTek.Features.Logging; // RingBuffer with nullable reference types=38ns // ConcurrentQueue+custom size tracking+addingComplete=40ns // BlockingCollection=170ns +// Use-Cases: +// 1. no items -> consumer: wait and don't use cpu +// 2. almost no items -> producer + consumer: low latency +// 3. full -> producer: low latency; consumer: high throughput (solution is not optimized for this case) internal class LightWeightBlockingQueue { internal void Shutdown() => _shutdown = true; private volatile bool _shutdown; // some way to break the waiting - // 100k leads to about ~30MB, pre-allocation reports less due to absent strings - private const int MaxRingBufferSize = 100_000; - private const int MaxQueueSize = MaxRingBufferSize - 1; // Start=End need to be distinguishable + // 65k leads to about ~20MB, pre-allocation reports less due to absent strings + // see https://en.wikipedia.org/wiki/Modulo#Performance_issues + private const int MaxRingBufferSize = 1 << 16; // power of 2 required by FastModuloMaskForBitwiseAnd + private const int FastModuloMaskForBitwiseAnd = MaxRingBufferSize - 1; + private const int MaxQueueSize = MaxRingBufferSize - 1; // Start and End need to be distinguishable // ring buffer is used by Disruptor(.NET), seems to work well for them // typed based exchanges are 56ns (fixed as of .NET 7) hence why we use object based ones private readonly MTLoggerMessageDto[] _ringBuffer = new MTLoggerMessageDto[MaxRingBufferSize]; - // end - start = size // all indexes are "excluding" basically, 0 means 0 not yet written (or read) - // TODO how to avoid douple indexes? - private volatile int _nextWritingIndex; // sync in between writers - private volatile int _nextReadIndex; // sync between readers -> writers + // end - start = size + private volatile int _nextWriteIndex; + private volatile int _nextReadIndex; [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int Next(int index) { - return (index + 1) % MaxRingBufferSize; + return (index + 1) & FastModuloMaskForBitwiseAnd; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int Size(int startIndex, int endIndex) { - return (endIndex - startIndex + MaxRingBufferSize) % MaxRingBufferSize; + return (endIndex - startIndex) & FastModuloMaskForBitwiseAnd; } - // the following trick is faster but less human consumption friendly - // my guess is that modulo is already optimized and only the addition for the Size calc is what makes a difference - // see https://en.wikipedia.org/wiki/Modulo#Performance_issues - // Bitwise AND is faster than modulo, just requires size to be power of 2 - // only gained like 1-2ns though (meaning it is within measurement error...) - // private const int MaxRingBufferSize = 1 << 16; // power of 2 required by FastModuloMaskForBitwiseAnd - // private const int FastModuloMaskForBitwiseAnd = MaxRingBufferSize - 1; - // [MethodImpl(MethodImplOptions.AggressiveInlining)] - // private static int Next(int index) - // { - // return (index + 1) & FastModuloMaskForBitwiseAnd; - // } - // [MethodImpl(MethodImplOptions.AggressiveInlining)] - // private static int Size(int startIndex, int endIndex) - // { - // return (endIndex - startIndex) & FastModuloMaskForBitwiseAnd; - // } - internal ref MTLoggerMessageDto AcquireCommittedOrWait() { var spinWait = new SpinWait(); while (true) { var index = _nextReadIndex; - if (Size(index, _nextWritingIndex) > 0) + if (Size(index, _nextWriteIndex) > 0) { ref var item = ref _ringBuffer[index]; + // makes sure no overtake on the ring happens if (item.CommittedToQueue) { if (Interlocked.CompareExchange(ref _nextReadIndex, Next(index), index) == index) @@ -71,19 +59,27 @@ internal ref MTLoggerMessageDto AcquireCommittedOrWait() return ref item; } } - } - - spinWait.SpinOnce(); // reader should yield and sleep if nothing comes in after some time + else + { + // this branch happens 292 times for 157187 dispatches (0.19%) + // for now not worth it to optimize + } - if (_shutdown) + spinWait.Reset(); // fast retry if something was found earlier + } + else { - // this can still drop logs, very unlikely but possible - Thread.Sleep(1); - if (Size(_nextReadIndex, _nextWritingIndex) == 0) + if (_shutdown) { - throw new ObjectDisposedException("Shutting down logging thread using an exception, this is harmless"); + // this can still drop logs, very unlikely but possible + Thread.Sleep(1); + if (Size(_nextReadIndex, _nextWriteIndex) == 0) + { + throw new ShutdownException(); + } } } + spinWait.SpinOnce(); // reader should yield and sleep if nothing comes in after some time } } @@ -91,13 +87,13 @@ internal ref MTLoggerMessageDto AcquireUncommitedOrWait() { while (true) { - var index = _nextWritingIndex; + var index = _nextWriteIndex; if (Size(_nextReadIndex, index) < MaxQueueSize) { ref var item = ref _ringBuffer[index]; if (!item.CommittedToQueue) { - if (Interlocked.CompareExchange(ref _nextWritingIndex, Next(index), index) == index) + if (Interlocked.CompareExchange(ref _nextWriteIndex, Next(index), index) == index) { return ref item; } @@ -107,4 +103,6 @@ internal ref MTLoggerMessageDto AcquireUncommitedOrWait() Thread.SpinWait(4); // main thread should always try to dispatch asap, never wait that much! } } + + internal class ShutdownException : Exception; } \ No newline at end of file diff --git a/ModTek/Features/Logging/MTLoggerAsyncQueue.cs b/ModTek/Features/Logging/MTLoggerAsyncQueue.cs index 389b000a..fc435d6b 100644 --- a/ModTek/Features/Logging/MTLoggerAsyncQueue.cs +++ b/ModTek/Features/Logging/MTLoggerAsyncQueue.cs @@ -67,25 +67,32 @@ Asynchronous logging offloaded {offloadedTime} from the main thread. private void LoggingLoop() { - while (true) + try { - ref var message = ref _queue.AcquireCommittedOrWait(); - - s_loggingStopwatch.Start(); - try - { - _processor.Process(ref message); - } - catch (Exception e) - { - LoggingFeature.WriteExceptionToFatalLog(e); - } - finally + while (true) { - message.Reset(); - s_loggingStopwatch.Stop(); + ref var message = ref _queue.AcquireCommittedOrWait(); + + s_loggingStopwatch.Start(); + try + { + _processor.Process(ref message); + } + catch (Exception e) + { + LoggingFeature.WriteExceptionToFatalLog(e); + } + finally + { + message.Reset(); + s_loggingStopwatch.Stop(); + } } } + catch (LightWeightBlockingQueue.ShutdownException) + { + // ignored + } } internal ref MTLoggerMessageDto AcquireUncommitedOrWait()