From 2037c14f5d6cec4eed4fe7966b9c2cf52a63d800 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 18 Sep 2024 15:02:36 +0200 Subject: [PATCH] Fix against TimeSpan.Zero for max lifetime (#72) * Fix against TimeSpan.Zero for max lifetime * Move blocking logic to WaitToWrriteAsync and use a more reliable implementation to ensure some drainage happened * remove dead code * ensure noop export waits signicicantly long to force concurrency * model draining before accepting new writes more explicitly * fix typo * fix tests * Ensure WaitToWriteAsync is deterministic (in that it will not wait forever if no drain happens) * revert changes to benchmark project --- .../Program.cs | 2 +- elastic-ingest-dotnet.sln.DotSettings | 5 +- .../Elastic.Channels.Continuous/Program.cs | 30 +-- src/Elastic.Channels/BufferOptions.cs | 11 +- src/Elastic.Channels/BufferedChannelBase.cs | 192 ++++++++++++------ .../Diagnostics/DiagnosticsBufferedChannel.cs | 9 +- .../Diagnostics/NoopBufferedChannel.cs | 2 +- tests/Elastic.Channels.Tests/BehaviorTests.cs | 22 +- .../CalculatedPropertyTests.cs | 56 +++++ 9 files changed, 231 insertions(+), 98 deletions(-) create mode 100644 tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs index 42ce2e1..2564764 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs @@ -19,7 +19,7 @@ //var bm = new BulkIngestion(); //bm.Setup(); -//await bm.BulkAllAsync(); +//await bm.BulkAllAsync() //Console.WriteLine("DONE"); var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks(); diff --git a/elastic-ingest-dotnet.sln.DotSettings b/elastic-ingest-dotnet.sln.DotSettings index 172fc64..673b0d8 100644 --- a/elastic-ingest-dotnet.sln.DotSettings +++ b/elastic-ingest-dotnet.sln.DotSettings @@ -122,7 +122,7 @@ See the LICENSE file in the project root for more information </Entry.Match> <Entry.SortBy> <Kind Is="Member" /> - <Name Is="Enter Pattern Here" /> + <Name /> </Entry.SortBy> </Entry> <Entry DisplayName="Readonly Fields"> @@ -140,7 +140,7 @@ See the LICENSE file in the project root for more information <Entry.SortBy> <Access /> <Readonly /> - <Name Is="Enter Pattern Here" /> + <Name /> </Entry.SortBy> </Entry> <Entry DisplayName="Constructors"> @@ -415,6 +415,7 @@ See the LICENSE file in the project root for more information True True True + True True True False diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs index 7aa6b89..5d2b1df 100644 --- a/examples/Elastic.Channels.Continuous/Program.cs +++ b/examples/Elastic.Channels.Continuous/Program.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Threading.Channels; using Elastic.Channels; using Elastic.Channels.Diagnostics; @@ -13,30 +14,31 @@ var options = new NoopBufferedChannel.NoopChannelOptions { - BufferOptions = new BufferOptions() + //TrackConcurrency = true, + BufferOptions = new BufferOptions { - OutboundBufferMaxSize = 10_000, - InboundBufferMaxSize = 10_000_000, + OutboundBufferMaxLifetime = TimeSpan.Zero, + InboundBufferMaxSize = 1_000_000, + OutboundBufferMaxSize = 1_000_000 }, ExportBufferCallback = () => Console.Write("."), - ExportExceptionCallback = e => Console.Write("!"), - PublishToInboundChannelFailureCallback = () => Console.Write("I"), - PublishToOutboundChannelFailureCallback = () => Console.Write("O"), + ExportExceptionCallback = e => Console.Write("!") }; +Console.WriteLine("2"); var channel = new DiagnosticsBufferedChannel(options); +Console.WriteLine($"Begin: ({channel.OutboundStarted}) {channel.MaxConcurrency} {channel.BatchExportOperations} -> {channel.InflightEvents}"); await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) => { var e = new NoopBufferedChannel.NoopEvent { Id = i }; - var written = false; - //Console.Write('.'); - var ready = await channel.WaitToWriteAsync(ctx); - if (ready) written = channel.TryWrite(e); - if (!written) + if (await channel.WaitToWriteAsync(e)) { - Console.WriteLine(); + + } + + if (i % 10_000 == 0) + { + Console.Clear(); Console.WriteLine(channel); - Console.WriteLine(i); - Environment.Exit(1); } }); diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs index 2477558..3e4d3aa 100644 --- a/src/Elastic.Channels/BufferOptions.cs +++ b/src/Elastic.Channels/BufferOptions.cs @@ -25,13 +25,22 @@ public class BufferOptions /// public int OutboundBufferMaxSize { get; set; } = 1_000; + private TimeSpan _outboundBufferMaxLifetime = TimeSpan.FromSeconds(5); + private readonly TimeSpan _outboundBufferMinLifetime = TimeSpan.FromSeconds(1); + + /// /// The maximum lifetime of a buffer to export to . /// If a buffer is older then the configured it will be flushed to /// regardless of it's current size /// Defaults to 5 seconds + /// Any value less than 1 second will be rounded back up to 1 second /// - public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5); + public TimeSpan OutboundBufferMaxLifetime + { + get => _outboundBufferMaxLifetime; + set => _outboundBufferMaxLifetime = value >= _outboundBufferMinLifetime ? value : _outboundBufferMaxLifetime; + } /// /// The maximum number of consumers allowed to poll for new events on the channel. diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index fac03e5..ae2128c 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -60,8 +61,7 @@ public abstract class BufferedChannelBase { private readonly Task _inTask; private readonly Task _outTask; - private readonly int _maxConcurrency; - private readonly SemaphoreSlim _throttleTasks; + private readonly SemaphoreSlim _throttleExportTasks; private readonly CountdownEvent? _signal; private readonly ChannelCallbackInvoker _callbacks; @@ -69,6 +69,58 @@ public abstract class BufferedChannelBase /// public IChannelDiagnosticsListener? DiagnosticsListener { get; } + /// The channel options currently in use + public TChannelOptions Options { get; } + + /// An overall cancellation token that may be externally provided + protected CancellationTokenSource TokenSource { get; } + + /// Internal cancellation token for signalling that all publishing activity has completed. + private readonly CancellationTokenSource _exitCancelSource = new(); + + private Channel> OutChannel { get; } + private Channel InChannel { get; } + private BufferOptions BufferOptions => Options.BufferOptions; + + private long _inflightEvents; + /// The number of inflight events + public long InflightEvents => _inflightEvents; + + /// Current number of tasks handling exporting the events + public int ExportTasks => _taskList.Count; + + /// + /// The effective concurrency. + /// Either the configured concurrency or the calculated concurrency. + /// + public int MaxConcurrency { get; } + + /// + /// The effective batch export size . + /// Either the configured concurrency or the calculated size. + /// If the configured exceeds ( / ) + /// the batch export size will be lowered to ( / ) to ensure we saturate + /// + public int BatchExportSize { get; } + + /// + /// If is set to + /// and approaches + /// will block until drops with atleast this size + /// + public int DrainSize { get; } + + private int _ongoingExportOperations; + /// Outstanding export operations + public int BatchExportOperations => _ongoingExportOperations; + private readonly CountdownEvent _waitForOutboundRead; + private List _taskList; + + /// + public bool OutboundStarted => _waitForOutboundRead.IsSet; + + internal InboundBuffer InboundBuffer { get; } + /// protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } @@ -93,30 +145,23 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(listeners); - var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize); - // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize - var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize)); - var defaultMaxConcurrency = (int)Math.Ceiling(maxIn / (double)maxOut); - _maxConcurrency = - BufferOptions.ExportMaxConcurrency.HasValue - ? BufferOptions.ExportMaxConcurrency.Value - : Math.Min(defaultMaxConcurrency, Environment.ProcessorCount * 2); + var maxIn = Math.Max(Math.Max(1, BufferOptions.InboundBufferMaxSize), BufferOptions.OutboundBufferMaxSize); + var defaultMaxOut = Math.Max(1, BufferOptions.OutboundBufferMaxSize); + var calculatedConcurrency = (int)Math.Ceiling(maxIn / (double)defaultMaxOut); + var defaultConcurrency = Environment.ProcessorCount * 2; + MaxConcurrency = BufferOptions.ExportMaxConcurrency ?? Math.Min(calculatedConcurrency, defaultConcurrency); + + // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize / (MaxConcurrency * 2) + BatchExportSize = Math.Min(BufferOptions.InboundBufferMaxSize / (MaxConcurrency), Math.Max(1, BufferOptions.OutboundBufferMaxSize)); + DrainSize = Math.Min(100_000, Math.Min(BatchExportSize * 2, maxIn / 2)); - _throttleTasks = new SemaphoreSlim(_maxConcurrency, _maxConcurrency); + _taskList = new List(MaxConcurrency * 2); + + _throttleExportTasks = new SemaphoreSlim(MaxConcurrency, MaxConcurrency); _signal = options.BufferOptions.WaitHandle; - InChannel = Channel.CreateBounded(new BoundedChannelOptions(maxIn) - { - SingleReader = false, - SingleWriter = false, - // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727 - // AFAICT this is fine since we run in a dedicated long running task. - AllowSynchronousContinuations = true, - // wait does not block it simply signals that Writer.TryWrite should return false and be retried - // DropWrite will make `TryWrite` always return true, which is not what we want. - FullMode = options.BufferOptions.BoundedChannelFullMode - }); + _waitForOutboundRead = new CountdownEvent(1); OutChannel = Channel.CreateBounded>( - new BoundedChannelOptions(_maxConcurrency * 2) + new BoundedChannelOptions(MaxConcurrency * 4) { SingleReader = false, SingleWriter = true, @@ -127,8 +172,19 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(new BoundedChannelOptions(maxIn) + { + SingleReader = false, + SingleWriter = false, + // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727 + // AFAICT this is fine since we run in a dedicated long running task. + AllowSynchronousContinuations = true, + // wait does not block it simply signals that Writer.TryWrite should return false and be retried + // DropWrite will make `TryWrite` always return true, which is not what we want. + FullMode = options.BufferOptions.BoundedChannelFullMode + }); - InboundBuffer = new InboundBuffer(maxOut, BufferOptions.OutboundBufferMaxLifetime); + InboundBuffer = new InboundBuffer(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime); _outTask = Task.Factory.StartNew(async () => await ConsumeOutboundEventsAsync().ConfigureAwait(false), @@ -137,7 +193,7 @@ await ConsumeOutboundEventsAsync().ConfigureAwait(false), TaskScheduler.Default); _inTask = Task.Factory.StartNew(async () => - await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false), + await ConsumeInboundEventsAsync(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false), CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, TaskScheduler.Default); @@ -149,23 +205,14 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime) /// protected abstract Task ExportAsync(ArraySegment buffer, CancellationToken ctx = default); - /// The channel options currently in use - public TChannelOptions Options { get; } - - /// An overall cancellation token that may be externally provided - protected CancellationTokenSource TokenSource { get; } - - /// Internal cancellation token for signalling that all publishing activity has completed. - private readonly CancellationTokenSource _exitCancelSource = new CancellationTokenSource(); - - private Channel> OutChannel { get; } - private Channel InChannel { get; } - private BufferOptions BufferOptions => Options.BufferOptions; - - internal InboundBuffer InboundBuffer { get; } - /// - public override ValueTask WaitToWriteAsync(CancellationToken ctx = default) => InChannel.Writer.WaitToWriteAsync(ctx); + public override async ValueTask WaitToWriteAsync(CancellationToken ctx = default) + { + if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize) + for (var i = 0; i < 10 && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize; i++) + await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false); + return await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false); + } /// public override bool TryComplete(Exception? error = null) => InChannel.Writer.TryComplete(error); @@ -175,6 +222,7 @@ public override bool TryWrite(TEvent item) { if (InChannel.Writer.TryWrite(item)) { + Interlocked.Increment(ref _inflightEvents); _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } @@ -199,26 +247,29 @@ public async Task WaitToWriteManyAsync(IEnumerable events, Cancell /// public bool TryWriteMany(IEnumerable events) { - var written = true; + var allWritten = true; foreach (var @event in events) { - written = TryWrite(@event); + var written = TryWrite(@event); + if (!written) allWritten = written; } - return written; + return allWritten; } /// public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken ctx = default) { ctx = ctx == default ? TokenSource.Token : ctx; - if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) && - InChannel.Writer.TryWrite(item)) + + if (await WaitToWriteAsync(ctx).ConfigureAwait(false) && InChannel.Writer.TryWrite(item)) { + Interlocked.Increment(ref _inflightEvents); _callbacks.PublishToInboundChannelCallback?.Invoke(); return true; } + _callbacks.PublishToInboundChannelFailureCallback?.Invoke(); return false; } @@ -227,45 +278,45 @@ public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken /// Subclasses may override this to yield items from that can be retried. /// The default implementation of this simply always returns an empty collection /// - protected virtual ArraySegment RetryBuffer(TResponse response, - ArraySegment currentBuffer, - IWriteTrackingBuffer statistics - ) => EmptyArraySegments.Empty; + protected virtual ArraySegment RetryBuffer(TResponse response, ArraySegment currentBuffer, IWriteTrackingBuffer statistics) => + EmptyArraySegments.Empty; private async Task ConsumeOutboundEventsAsync() { _callbacks.OutboundChannelStartedCallback?.Invoke(); - var taskList = new List(_maxConcurrency); + _taskList = new List(MaxConcurrency * 2); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) - // ReSharper disable once RemoveRedundantBraces { + if (_waitForOutboundRead is { IsSet: false }) + _waitForOutboundRead.Signal(); if (TokenSource.Token.IsCancellationRequested) break; if (_signal is { IsSet: true }) break; while (OutChannel.Reader.TryRead(out var buffer)) { var items = buffer.GetArraySegment(); - await _throttleTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false); + await _throttleExportTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false); var t = ExportBufferAsync(items, buffer); - taskList.Add(t); + _taskList.Add(t); - if (taskList.Count >= _maxConcurrency) + if (_taskList.Count >= MaxConcurrency) { - var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false); - taskList.Remove(completedTask); + var completedTask = await Task.WhenAny(_taskList).ConfigureAwait(false); + _taskList.Remove(completedTask); } - _throttleTasks.Release(); + _throttleExportTasks.Release(); } } - await Task.WhenAll(taskList).ConfigureAwait(false); + await Task.WhenAll(_taskList).ConfigureAwait(false); _exitCancelSource.Cancel(); _callbacks.OutboundChannelExitedCallback?.Invoke(); } private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer buffer) { + Interlocked.Increment(ref _ongoingExportOperations); using var outboundBuffer = buffer; var maxRetries = Options.BufferOptions.ExportMaxRetries; for (var i = 0; i <= maxRetries && items.Count > 0; i++) @@ -302,10 +353,11 @@ private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false); _callbacks.ExportRetryCallback?.Invoke(items); } - // otherwise if retryable items still exist and the user wants to be notified notify the user + // otherwise if retryable items still exist and the user wants to be notified else if (items.Count > 0 && atEndOfRetries) _callbacks.ExportMaxRetriesCallback?.Invoke(items); } + Interlocked.Decrement(ref _ongoingExportOperations); _callbacks.ExportBufferCallback?.Invoke(); if (_signal is { IsSet: false }) _signal.Signal(); @@ -323,6 +375,7 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item)) { InboundBuffer.Add(item); + Interlocked.Decrement(ref _inflightEvents); if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval) break; @@ -357,10 +410,9 @@ async Task AsyncSlowPath(IOutboundBuffer b) var maxRetries = Options.BufferOptions.ExportMaxRetries; for (var i = 0; i <= maxRetries; i++) while (await OutChannel.Writer.WaitToWriteAsync().ConfigureAwait(false)) - { if (OutChannel.Writer.TryWrite(b)) return true; - } + return false; } @@ -370,8 +422,20 @@ async Task AsyncSlowPath(IOutboundBuffer b) } /// > - public override string ToString() => - DiagnosticsListener != null ? DiagnosticsListener.ToString() : base.ToString(); + public override string ToString() + { + if (DiagnosticsListener == null) return base.ToString(); + var sb = new StringBuilder(); + sb.AppendLine(DiagnosticsListener.ToString()); + sb.AppendLine($"{nameof(InflightEvents)}: {InflightEvents:N0}"); + sb.AppendLine($"{nameof(BufferOptions.InboundBufferMaxSize)}: {BufferOptions.InboundBufferMaxSize:N0}"); + sb.AppendLine($"{nameof(BatchExportOperations)}: {BatchExportOperations:N0}"); + sb.AppendLine($"{nameof(BatchExportSize)}: {BatchExportSize:N0}"); + sb.AppendLine($"{nameof(DrainSize)}: {DrainSize:N0}"); + sb.AppendLine($"{nameof(MaxConcurrency)}: {MaxConcurrency:N0}"); + sb.AppendLine($"{nameof(ExportTasks)}: {ExportTasks:N0}"); + return sb.ToString(); + } /// public virtual void Dispose() diff --git a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs index 12c3df2..621becc 100644 --- a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs @@ -41,15 +41,8 @@ protected override Task ExportAsync(ArraySegment buffer #else IList b = buffer; #endif - if (Options.BufferOptions.OutboundBufferMaxSize != buffer.Count) - { + if (BatchExportSize != buffer.Count) Interlocked.Increment(ref _bufferMismatches); - } - else if (b.Count > 0 && b[0].Id.HasValue) - { - if (b[0].Id % Options.BufferOptions.OutboundBufferMaxSize != 0) - Interlocked.Increment(ref _bufferMismatches); - } return base.ExportAsync(buffer, ctx); } diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index fe2dbc4..4c461bf 100644 --- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -67,7 +67,7 @@ protected override async Task ExportAsync(ArraySegment if (!Options.TrackConcurrency) return new NoopResponse(); var max = Interlocked.Increment(ref _currentMax); - await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false); + await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false); Interlocked.Decrement(ref _currentMax); if (max > ObservedConcurrency) ObservedConcurrency = max; return new NoopResponse(); diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs index 5af17e4..c2d33dc 100644 --- a/tests/Elastic.Channels.Tests/BehaviorTests.cs +++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs @@ -15,7 +15,13 @@ namespace Elastic.Channels.Tests; public class BehaviorTests : IDisposable { - public BehaviorTests(ITestOutputHelper testOutput) => XunitContext.Register(testOutput); + private readonly ITestOutputHelper _testOutput; + + public BehaviorTests(ITestOutputHelper testOutput) + { + _testOutput = testOutput; + XunitContext.Register(testOutput); + } void IDisposable.Dispose() => XunitContext.Flush(); @@ -55,7 +61,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() WaitHandle = new CountdownEvent(1), InboundBufferMaxSize = maxInFlight, OutboundBufferMaxSize = bufferSize, - OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(500) + OutboundBufferMaxLifetime = TimeSpan.FromSeconds(1) }; var channel = new NoopBufferedChannel(bufferOptions); @@ -66,7 +72,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() if (await channel.WaitToWriteAsync(e)) written++; } - var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(1)); + var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(2)); signalled.Should().BeTrue("The channel was not drained in the expected time"); written.Should().Be(100); channel.ExportedBuffers.Should().Be(1); @@ -74,7 +80,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() [Fact] public async Task ConcurrencyIsApplied() { - int totalEvents = 5_000, maxInFlight = 5_000, bufferSize = 500; + int totalEvents = 50_000, maxInFlight = 50_000, bufferSize = 5000; var expectedPages = totalEvents / bufferSize; var bufferOptions = new BufferOptions { @@ -85,7 +91,9 @@ [Fact] public async Task ConcurrencyIsApplied() }; var channel = new NoopBufferedChannel(bufferOptions, observeConcurrency: true); + channel.MaxConcurrency.Should().BeGreaterThan(1); + _testOutput.WriteLine($"{channel.MaxConcurrency}"); var written = 0; for (var i = 0; i < totalEvents; i++) { @@ -145,13 +153,13 @@ Task StartChannel(int taskNumber) [Fact] public async Task SlowlyPushEvents() { int totalEvents = 50_000_000, maxInFlight = totalEvents / 5, bufferSize = maxInFlight / 10; - var expectedSentBuffers = totalEvents / bufferSize; + var expectedSentBuffers = totalEvents / 10_000; var bufferOptions = new BufferOptions { WaitHandle = new CountdownEvent(expectedSentBuffers), InboundBufferMaxSize = maxInFlight, OutboundBufferMaxSize = 10_000, - OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(100) + OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(1000) }; using var channel = new DiagnosticsBufferedChannel(bufferOptions, name: $"Slow push channel"); await Task.Delay(TimeSpan.FromMilliseconds(200)); @@ -167,7 +175,7 @@ [Fact] public async Task SlowlyPushEvents() } }, TaskCreationOptions.LongRunning); // wait for some work to have progressed - bufferOptions.WaitHandle.Wait(TimeSpan.FromMilliseconds(500)); + bufferOptions.WaitHandle.Wait(TimeSpan.FromMilliseconds(2000)); //Ensure we written to the channel but not enough to satisfy OutboundBufferMaxSize written.Should().BeGreaterThan(0).And.BeLessThan(10_000); //even though OutboundBufferMaxSize was not hit we should still observe an invocation to Export() diff --git a/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs new file mode 100644 index 0000000..1f2b707 --- /dev/null +++ b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Channels.Tests; + +public class CalculatedPropertyTests : IDisposable +{ + private readonly ITestOutputHelper _testOutput; + + public CalculatedPropertyTests(ITestOutputHelper testOutput) + { + _testOutput = testOutput; + XunitContext.Register(testOutput); + } + + void IDisposable.Dispose() => XunitContext.Flush(); + + [Theory] + [InlineData(500_000, 50_000, 100_000)] + [InlineData(10_00_000, 50_000, 100_000)] + [InlineData(50_000, 50_000, 25_000)] + [InlineData(10_000, 50_000, 20_000)] + [InlineData(10_00_000, 1_000, 2_000)] + public void BatchExportSizeAndDrainSizeConstraints(int maxInFlight, int bufferSize, int drainSize) + { + var bufferOptions = new BufferOptions + { + InboundBufferMaxSize = maxInFlight, + OutboundBufferMaxSize = bufferSize, + }; + var channel = new NoopBufferedChannel(bufferOptions); + + var expectedConcurrency = + Math.Max(1, Math.Min(maxInFlight / bufferSize, Environment.ProcessorCount * 2)); + channel.MaxConcurrency.Should().Be(expectedConcurrency); + if (maxInFlight >= bufferSize) + channel.BatchExportSize.Should().Be(bufferSize); + else + channel.BatchExportSize.Should().Be(maxInFlight / expectedConcurrency); + + // drain size is maxed out at 100_000 + channel.DrainSize.Should().Be(Math.Min(100_000, drainSize)); + + } + +}