diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs
index 42ce2e1..2564764 100644
--- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs
+++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs
@@ -19,7 +19,7 @@
//var bm = new BulkIngestion();
//bm.Setup();
-//await bm.BulkAllAsync();
+//await bm.BulkAllAsync()
//Console.WriteLine("DONE");
var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks();
diff --git a/elastic-ingest-dotnet.sln.DotSettings b/elastic-ingest-dotnet.sln.DotSettings
index 172fc64..673b0d8 100644
--- a/elastic-ingest-dotnet.sln.DotSettings
+++ b/elastic-ingest-dotnet.sln.DotSettings
@@ -122,7 +122,7 @@ See the LICENSE file in the project root for more information
</Entry.Match>
<Entry.SortBy>
<Kind Is="Member" />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Readonly Fields">
@@ -140,7 +140,7 @@ See the LICENSE file in the project root for more information
<Entry.SortBy>
<Access />
<Readonly />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Constructors">
@@ -415,6 +415,7 @@ See the LICENSE file in the project root for more information
True
True
True
+ True
True
True
False
diff --git a/examples/Elastic.Channels.Continuous/Program.cs b/examples/Elastic.Channels.Continuous/Program.cs
index 7aa6b89..5d2b1df 100644
--- a/examples/Elastic.Channels.Continuous/Program.cs
+++ b/examples/Elastic.Channels.Continuous/Program.cs
@@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
+using System.Threading.Channels;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
@@ -13,30 +14,31 @@
var options = new NoopBufferedChannel.NoopChannelOptions
{
- BufferOptions = new BufferOptions()
+ //TrackConcurrency = true,
+ BufferOptions = new BufferOptions
{
- OutboundBufferMaxSize = 10_000,
- InboundBufferMaxSize = 10_000_000,
+ OutboundBufferMaxLifetime = TimeSpan.Zero,
+ InboundBufferMaxSize = 1_000_000,
+ OutboundBufferMaxSize = 1_000_000
},
ExportBufferCallback = () => Console.Write("."),
- ExportExceptionCallback = e => Console.Write("!"),
- PublishToInboundChannelFailureCallback = () => Console.Write("I"),
- PublishToOutboundChannelFailureCallback = () => Console.Write("O"),
+ ExportExceptionCallback = e => Console.Write("!")
};
+Console.WriteLine("2");
var channel = new DiagnosticsBufferedChannel(options);
+Console.WriteLine($"Begin: ({channel.OutboundStarted}) {channel.MaxConcurrency} {channel.BatchExportOperations} -> {channel.InflightEvents}");
await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) =>
{
var e = new NoopBufferedChannel.NoopEvent { Id = i };
- var written = false;
- //Console.Write('.');
- var ready = await channel.WaitToWriteAsync(ctx);
- if (ready) written = channel.TryWrite(e);
- if (!written)
+ if (await channel.WaitToWriteAsync(e))
{
- Console.WriteLine();
+
+ }
+
+ if (i % 10_000 == 0)
+ {
+ Console.Clear();
Console.WriteLine(channel);
- Console.WriteLine(i);
- Environment.Exit(1);
}
});
diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs
index 2477558..3e4d3aa 100644
--- a/src/Elastic.Channels/BufferOptions.cs
+++ b/src/Elastic.Channels/BufferOptions.cs
@@ -25,13 +25,22 @@ public class BufferOptions
///
public int OutboundBufferMaxSize { get; set; } = 1_000;
+ private TimeSpan _outboundBufferMaxLifetime = TimeSpan.FromSeconds(5);
+ private readonly TimeSpan _outboundBufferMinLifetime = TimeSpan.FromSeconds(1);
+
+
///
/// The maximum lifetime of a buffer to export to .
/// If a buffer is older then the configured it will be flushed to
/// regardless of it's current size
/// Defaults to 5 seconds
+ /// Any value less than 1 second will be rounded back up to 1 second
///
- public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
+ public TimeSpan OutboundBufferMaxLifetime
+ {
+ get => _outboundBufferMaxLifetime;
+ set => _outboundBufferMaxLifetime = value >= _outboundBufferMinLifetime ? value : _outboundBufferMaxLifetime;
+ }
///
/// The maximum number of consumers allowed to poll for new events on the channel.
diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs
index fac03e5..ae2128c 100644
--- a/src/Elastic.Channels/BufferedChannelBase.cs
+++ b/src/Elastic.Channels/BufferedChannelBase.cs
@@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -60,8 +61,7 @@ public abstract class BufferedChannelBase
{
private readonly Task _inTask;
private readonly Task _outTask;
- private readonly int _maxConcurrency;
- private readonly SemaphoreSlim _throttleTasks;
+ private readonly SemaphoreSlim _throttleExportTasks;
private readonly CountdownEvent? _signal;
private readonly ChannelCallbackInvoker _callbacks;
@@ -69,6 +69,58 @@ public abstract class BufferedChannelBase
///
public IChannelDiagnosticsListener? DiagnosticsListener { get; }
+ /// The channel options currently in use
+ public TChannelOptions Options { get; }
+
+ /// An overall cancellation token that may be externally provided
+ protected CancellationTokenSource TokenSource { get; }
+
+ /// Internal cancellation token for signalling that all publishing activity has completed.
+ private readonly CancellationTokenSource _exitCancelSource = new();
+
+ private Channel> OutChannel { get; }
+ private Channel InChannel { get; }
+ private BufferOptions BufferOptions => Options.BufferOptions;
+
+ private long _inflightEvents;
+ /// The number of inflight events
+ public long InflightEvents => _inflightEvents;
+
+ /// Current number of tasks handling exporting the events
+ public int ExportTasks => _taskList.Count;
+
+ ///
+ /// The effective concurrency.
+ /// Either the configured concurrency or the calculated concurrency.
+ ///
+ public int MaxConcurrency { get; }
+
+ ///
+ /// The effective batch export size .
+ /// Either the configured concurrency or the calculated size.
+ /// If the configured exceeds ( / )
+ /// the batch export size will be lowered to ( / ) to ensure we saturate
+ ///
+ public int BatchExportSize { get; }
+
+ ///
+ /// If is set to
+ /// and approaches
+ /// will block until drops with atleast this size
+ ///
+ public int DrainSize { get; }
+
+ private int _ongoingExportOperations;
+ /// Outstanding export operations
+ public int BatchExportOperations => _ongoingExportOperations;
+ private readonly CountdownEvent _waitForOutboundRead;
+ private List _taskList;
+
+ ///
+ public bool OutboundStarted => _waitForOutboundRead.IsSet;
+
+ internal InboundBuffer InboundBuffer { get; }
+
///
protected BufferedChannelBase(TChannelOptions options) : this(options, null) { }
@@ -93,30 +145,23 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(listeners);
- var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize);
- // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize
- var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize));
- var defaultMaxConcurrency = (int)Math.Ceiling(maxIn / (double)maxOut);
- _maxConcurrency =
- BufferOptions.ExportMaxConcurrency.HasValue
- ? BufferOptions.ExportMaxConcurrency.Value
- : Math.Min(defaultMaxConcurrency, Environment.ProcessorCount * 2);
+ var maxIn = Math.Max(Math.Max(1, BufferOptions.InboundBufferMaxSize), BufferOptions.OutboundBufferMaxSize);
+ var defaultMaxOut = Math.Max(1, BufferOptions.OutboundBufferMaxSize);
+ var calculatedConcurrency = (int)Math.Ceiling(maxIn / (double)defaultMaxOut);
+ var defaultConcurrency = Environment.ProcessorCount * 2;
+ MaxConcurrency = BufferOptions.ExportMaxConcurrency ?? Math.Min(calculatedConcurrency, defaultConcurrency);
+
+ // The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize / (MaxConcurrency * 2)
+ BatchExportSize = Math.Min(BufferOptions.InboundBufferMaxSize / (MaxConcurrency), Math.Max(1, BufferOptions.OutboundBufferMaxSize));
+ DrainSize = Math.Min(100_000, Math.Min(BatchExportSize * 2, maxIn / 2));
- _throttleTasks = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
+ _taskList = new List(MaxConcurrency * 2);
+
+ _throttleExportTasks = new SemaphoreSlim(MaxConcurrency, MaxConcurrency);
_signal = options.BufferOptions.WaitHandle;
- InChannel = Channel.CreateBounded(new BoundedChannelOptions(maxIn)
- {
- SingleReader = false,
- SingleWriter = false,
- // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
- // AFAICT this is fine since we run in a dedicated long running task.
- AllowSynchronousContinuations = true,
- // wait does not block it simply signals that Writer.TryWrite should return false and be retried
- // DropWrite will make `TryWrite` always return true, which is not what we want.
- FullMode = options.BufferOptions.BoundedChannelFullMode
- });
+ _waitForOutboundRead = new CountdownEvent(1);
OutChannel = Channel.CreateBounded>(
- new BoundedChannelOptions(_maxConcurrency * 2)
+ new BoundedChannelOptions(MaxConcurrency * 4)
{
SingleReader = false,
SingleWriter = true,
@@ -127,8 +172,19 @@ protected BufferedChannelBase(TChannelOptions options, ICollection(new BoundedChannelOptions(maxIn)
+ {
+ SingleReader = false,
+ SingleWriter = false,
+ // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
+ // AFAICT this is fine since we run in a dedicated long running task.
+ AllowSynchronousContinuations = true,
+ // wait does not block it simply signals that Writer.TryWrite should return false and be retried
+ // DropWrite will make `TryWrite` always return true, which is not what we want.
+ FullMode = options.BufferOptions.BoundedChannelFullMode
+ });
- InboundBuffer = new InboundBuffer(maxOut, BufferOptions.OutboundBufferMaxLifetime);
+ InboundBuffer = new InboundBuffer(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime);
_outTask = Task.Factory.StartNew(async () =>
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
@@ -137,7 +193,7 @@ await ConsumeOutboundEventsAsync().ConfigureAwait(false),
TaskScheduler.Default);
_inTask = Task.Factory.StartNew(async () =>
- await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
+ await ConsumeInboundEventsAsync(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);
@@ -149,23 +205,14 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime)
///
protected abstract Task ExportAsync(ArraySegment buffer, CancellationToken ctx = default);
- /// The channel options currently in use
- public TChannelOptions Options { get; }
-
- /// An overall cancellation token that may be externally provided
- protected CancellationTokenSource TokenSource { get; }
-
- /// Internal cancellation token for signalling that all publishing activity has completed.
- private readonly CancellationTokenSource _exitCancelSource = new CancellationTokenSource();
-
- private Channel> OutChannel { get; }
- private Channel InChannel { get; }
- private BufferOptions BufferOptions => Options.BufferOptions;
-
- internal InboundBuffer InboundBuffer { get; }
-
///
- public override ValueTask WaitToWriteAsync(CancellationToken ctx = default) => InChannel.Writer.WaitToWriteAsync(ctx);
+ public override async ValueTask WaitToWriteAsync(CancellationToken ctx = default)
+ {
+ if (BufferOptions.BoundedChannelFullMode == BoundedChannelFullMode.Wait && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize)
+ for (var i = 0; i < 10 && _inflightEvents >= BufferOptions.InboundBufferMaxSize - DrainSize; i++)
+ await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false);
+ return await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false);
+ }
///
public override bool TryComplete(Exception? error = null) => InChannel.Writer.TryComplete(error);
@@ -175,6 +222,7 @@ public override bool TryWrite(TEvent item)
{
if (InChannel.Writer.TryWrite(item))
{
+ Interlocked.Increment(ref _inflightEvents);
_callbacks.PublishToInboundChannelCallback?.Invoke();
return true;
}
@@ -199,26 +247,29 @@ public async Task WaitToWriteManyAsync(IEnumerable events, Cancell
///
public bool TryWriteMany(IEnumerable events)
{
- var written = true;
+ var allWritten = true;
foreach (var @event in events)
{
- written = TryWrite(@event);
+ var written = TryWrite(@event);
+ if (!written) allWritten = written;
}
- return written;
+ return allWritten;
}
///
public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
- if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) &&
- InChannel.Writer.TryWrite(item))
+
+ if (await WaitToWriteAsync(ctx).ConfigureAwait(false) && InChannel.Writer.TryWrite(item))
{
+ Interlocked.Increment(ref _inflightEvents);
_callbacks.PublishToInboundChannelCallback?.Invoke();
return true;
}
+
_callbacks.PublishToInboundChannelFailureCallback?.Invoke();
return false;
}
@@ -227,45 +278,45 @@ public virtual async Task WaitToWriteAsync(TEvent item, CancellationToken
/// Subclasses may override this to yield items from that can be retried.
/// The default implementation of this simply always returns an empty collection
///
- protected virtual ArraySegment RetryBuffer(TResponse response,
- ArraySegment currentBuffer,
- IWriteTrackingBuffer statistics
- ) => EmptyArraySegments.Empty;
+ protected virtual ArraySegment RetryBuffer(TResponse response, ArraySegment currentBuffer, IWriteTrackingBuffer statistics) =>
+ EmptyArraySegments.Empty;
private async Task ConsumeOutboundEventsAsync()
{
_callbacks.OutboundChannelStartedCallback?.Invoke();
- var taskList = new List(_maxConcurrency);
+ _taskList = new List(MaxConcurrency * 2);
while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
- // ReSharper disable once RemoveRedundantBraces
{
+ if (_waitForOutboundRead is { IsSet: false })
+ _waitForOutboundRead.Signal();
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;
while (OutChannel.Reader.TryRead(out var buffer))
{
var items = buffer.GetArraySegment();
- await _throttleTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false);
+ await _throttleExportTasks.WaitAsync(TokenSource.Token).ConfigureAwait(false);
var t = ExportBufferAsync(items, buffer);
- taskList.Add(t);
+ _taskList.Add(t);
- if (taskList.Count >= _maxConcurrency)
+ if (_taskList.Count >= MaxConcurrency)
{
- var completedTask = await Task.WhenAny(taskList).ConfigureAwait(false);
- taskList.Remove(completedTask);
+ var completedTask = await Task.WhenAny(_taskList).ConfigureAwait(false);
+ _taskList.Remove(completedTask);
}
- _throttleTasks.Release();
+ _throttleExportTasks.Release();
}
}
- await Task.WhenAll(taskList).ConfigureAwait(false);
+ await Task.WhenAll(_taskList).ConfigureAwait(false);
_exitCancelSource.Cancel();
_callbacks.OutboundChannelExitedCallback?.Invoke();
}
private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer buffer)
{
+ Interlocked.Increment(ref _ongoingExportOperations);
using var outboundBuffer = buffer;
var maxRetries = Options.BufferOptions.ExportMaxRetries;
for (var i = 0; i <= maxRetries && items.Count > 0; i++)
@@ -302,10 +353,11 @@ private async Task ExportBufferAsync(ArraySegment items, IOutboundBuffer
await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false);
_callbacks.ExportRetryCallback?.Invoke(items);
}
- // otherwise if retryable items still exist and the user wants to be notified notify the user
+ // otherwise if retryable items still exist and the user wants to be notified
else if (items.Count > 0 && atEndOfRetries)
_callbacks.ExportMaxRetriesCallback?.Invoke(items);
}
+ Interlocked.Decrement(ref _ongoingExportOperations);
_callbacks.ExportBufferCallback?.Invoke();
if (_signal is { IsSet: false })
_signal.Signal();
@@ -323,6 +375,7 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max
while (InboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item))
{
InboundBuffer.Add(item);
+ Interlocked.Decrement(ref _inflightEvents);
if (InboundBuffer.DurationSinceFirstWaitToRead >= maxInterval)
break;
@@ -357,10 +410,9 @@ async Task AsyncSlowPath(IOutboundBuffer b)
var maxRetries = Options.BufferOptions.ExportMaxRetries;
for (var i = 0; i <= maxRetries; i++)
while (await OutChannel.Writer.WaitToWriteAsync().ConfigureAwait(false))
- {
if (OutChannel.Writer.TryWrite(b))
return true;
- }
+
return false;
}
@@ -370,8 +422,20 @@ async Task AsyncSlowPath(IOutboundBuffer b)
}
/// >
- public override string ToString() =>
- DiagnosticsListener != null ? DiagnosticsListener.ToString() : base.ToString();
+ public override string ToString()
+ {
+ if (DiagnosticsListener == null) return base.ToString();
+ var sb = new StringBuilder();
+ sb.AppendLine(DiagnosticsListener.ToString());
+ sb.AppendLine($"{nameof(InflightEvents)}: {InflightEvents:N0}");
+ sb.AppendLine($"{nameof(BufferOptions.InboundBufferMaxSize)}: {BufferOptions.InboundBufferMaxSize:N0}");
+ sb.AppendLine($"{nameof(BatchExportOperations)}: {BatchExportOperations:N0}");
+ sb.AppendLine($"{nameof(BatchExportSize)}: {BatchExportSize:N0}");
+ sb.AppendLine($"{nameof(DrainSize)}: {DrainSize:N0}");
+ sb.AppendLine($"{nameof(MaxConcurrency)}: {MaxConcurrency:N0}");
+ sb.AppendLine($"{nameof(ExportTasks)}: {ExportTasks:N0}");
+ return sb.ToString();
+ }
///
public virtual void Dispose()
diff --git a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs
index 12c3df2..621becc 100644
--- a/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs
+++ b/src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs
@@ -41,15 +41,8 @@ protected override Task ExportAsync(ArraySegment buffer
#else
IList b = buffer;
#endif
- if (Options.BufferOptions.OutboundBufferMaxSize != buffer.Count)
- {
+ if (BatchExportSize != buffer.Count)
Interlocked.Increment(ref _bufferMismatches);
- }
- else if (b.Count > 0 && b[0].Id.HasValue)
- {
- if (b[0].Id % Options.BufferOptions.OutboundBufferMaxSize != 0)
- Interlocked.Increment(ref _bufferMismatches);
- }
return base.ExportAsync(buffer, ctx);
}
diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs
index fe2dbc4..4c461bf 100644
--- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs
+++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs
@@ -67,7 +67,7 @@ protected override async Task ExportAsync(ArraySegment
if (!Options.TrackConcurrency) return new NoopResponse();
var max = Interlocked.Increment(ref _currentMax);
- await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false);
+ await Task.Delay(TimeSpan.FromMilliseconds(100), ctx).ConfigureAwait(false);
Interlocked.Decrement(ref _currentMax);
if (max > ObservedConcurrency) ObservedConcurrency = max;
return new NoopResponse();
diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs
index 5af17e4..c2d33dc 100644
--- a/tests/Elastic.Channels.Tests/BehaviorTests.cs
+++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs
@@ -15,7 +15,13 @@ namespace Elastic.Channels.Tests;
public class BehaviorTests : IDisposable
{
- public BehaviorTests(ITestOutputHelper testOutput) => XunitContext.Register(testOutput);
+ private readonly ITestOutputHelper _testOutput;
+
+ public BehaviorTests(ITestOutputHelper testOutput)
+ {
+ _testOutput = testOutput;
+ XunitContext.Register(testOutput);
+ }
void IDisposable.Dispose() => XunitContext.Flush();
@@ -55,7 +61,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers()
WaitHandle = new CountdownEvent(1),
InboundBufferMaxSize = maxInFlight,
OutboundBufferMaxSize = bufferSize,
- OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(500)
+ OutboundBufferMaxLifetime = TimeSpan.FromSeconds(1)
};
var channel = new NoopBufferedChannel(bufferOptions);
@@ -66,7 +72,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers()
if (await channel.WaitToWriteAsync(e))
written++;
}
- var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(1));
+ var signalled = bufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(2));
signalled.Should().BeTrue("The channel was not drained in the expected time");
written.Should().Be(100);
channel.ExportedBuffers.Should().Be(1);
@@ -74,7 +80,7 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers()
[Fact] public async Task ConcurrencyIsApplied()
{
- int totalEvents = 5_000, maxInFlight = 5_000, bufferSize = 500;
+ int totalEvents = 50_000, maxInFlight = 50_000, bufferSize = 5000;
var expectedPages = totalEvents / bufferSize;
var bufferOptions = new BufferOptions
{
@@ -85,7 +91,9 @@ [Fact] public async Task ConcurrencyIsApplied()
};
var channel = new NoopBufferedChannel(bufferOptions, observeConcurrency: true);
+ channel.MaxConcurrency.Should().BeGreaterThan(1);
+ _testOutput.WriteLine($"{channel.MaxConcurrency}");
var written = 0;
for (var i = 0; i < totalEvents; i++)
{
@@ -145,13 +153,13 @@ Task StartChannel(int taskNumber)
[Fact] public async Task SlowlyPushEvents()
{
int totalEvents = 50_000_000, maxInFlight = totalEvents / 5, bufferSize = maxInFlight / 10;
- var expectedSentBuffers = totalEvents / bufferSize;
+ var expectedSentBuffers = totalEvents / 10_000;
var bufferOptions = new BufferOptions
{
WaitHandle = new CountdownEvent(expectedSentBuffers),
InboundBufferMaxSize = maxInFlight,
OutboundBufferMaxSize = 10_000,
- OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(100)
+ OutboundBufferMaxLifetime = TimeSpan.FromMilliseconds(1000)
};
using var channel = new DiagnosticsBufferedChannel(bufferOptions, name: $"Slow push channel");
await Task.Delay(TimeSpan.FromMilliseconds(200));
@@ -167,7 +175,7 @@ [Fact] public async Task SlowlyPushEvents()
}
}, TaskCreationOptions.LongRunning);
// wait for some work to have progressed
- bufferOptions.WaitHandle.Wait(TimeSpan.FromMilliseconds(500));
+ bufferOptions.WaitHandle.Wait(TimeSpan.FromMilliseconds(2000));
//Ensure we written to the channel but not enough to satisfy OutboundBufferMaxSize
written.Should().BeGreaterThan(0).And.BeLessThan(10_000);
//even though OutboundBufferMaxSize was not hit we should still observe an invocation to Export()
diff --git a/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs
new file mode 100644
index 0000000..1f2b707
--- /dev/null
+++ b/tests/Elastic.Channels.Tests/CalculatedPropertyTests.cs
@@ -0,0 +1,56 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Elastic.Channels.Diagnostics;
+using FluentAssertions;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Elastic.Channels.Tests;
+
+public class CalculatedPropertyTests : IDisposable
+{
+ private readonly ITestOutputHelper _testOutput;
+
+ public CalculatedPropertyTests(ITestOutputHelper testOutput)
+ {
+ _testOutput = testOutput;
+ XunitContext.Register(testOutput);
+ }
+
+ void IDisposable.Dispose() => XunitContext.Flush();
+
+ [Theory]
+ [InlineData(500_000, 50_000, 100_000)]
+ [InlineData(10_00_000, 50_000, 100_000)]
+ [InlineData(50_000, 50_000, 25_000)]
+ [InlineData(10_000, 50_000, 20_000)]
+ [InlineData(10_00_000, 1_000, 2_000)]
+ public void BatchExportSizeAndDrainSizeConstraints(int maxInFlight, int bufferSize, int drainSize)
+ {
+ var bufferOptions = new BufferOptions
+ {
+ InboundBufferMaxSize = maxInFlight,
+ OutboundBufferMaxSize = bufferSize,
+ };
+ var channel = new NoopBufferedChannel(bufferOptions);
+
+ var expectedConcurrency =
+ Math.Max(1, Math.Min(maxInFlight / bufferSize, Environment.ProcessorCount * 2));
+ channel.MaxConcurrency.Should().Be(expectedConcurrency);
+ if (maxInFlight >= bufferSize)
+ channel.BatchExportSize.Should().Be(bufferSize);
+ else
+ channel.BatchExportSize.Should().Be(maxInFlight / expectedConcurrency);
+
+ // drain size is maxed out at 100_000
+ channel.DrainSize.Should().Be(Math.Min(100_000, drainSize));
+
+ }
+
+}