Skip to content

Commit

Permalink
Fix against TimeSpan.Zero for max lifetime (#72)
Browse files Browse the repository at this point in the history
* Fix against TimeSpan.Zero for max lifetime

* Move blocking logic to WaitToWrriteAsync and use a more reliable implementation to ensure some drainage happened

* remove dead code

* ensure noop export waits signicicantly long to force concurrency

* model draining before accepting new writes more explicitly

* fix typo

* fix tests

* Ensure WaitToWriteAsync is deterministic (in that it will not wait forever if no drain happens)

* revert changes to benchmark project
  • Loading branch information
Mpdreamz authored Sep 18, 2024
1 parent 60713d1 commit 2037c14
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

//var bm = new BulkIngestion();
//bm.Setup();
//await bm.BulkAllAsync();
//await bm.BulkAllAsync()
//Console.WriteLine("DONE");

var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks();
Expand Down
5 changes: 3 additions & 2 deletions elastic-ingest-dotnet.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ See the LICENSE file in the project root for more information</s:String>
&lt;/Entry.Match&gt;&#xD;
&lt;Entry.SortBy&gt;&#xD;
&lt;Kind Is="Member" /&gt;&#xD;
&lt;Name Is="Enter Pattern Here" /&gt;&#xD;
&lt;Name /&gt;&#xD;
&lt;/Entry.SortBy&gt;&#xD;
&lt;/Entry&gt;&#xD;
&lt;Entry DisplayName="Readonly Fields"&gt;&#xD;
Expand All @@ -140,7 +140,7 @@ See the LICENSE file in the project root for more information</s:String>
&lt;Entry.SortBy&gt;&#xD;
&lt;Access /&gt;&#xD;
&lt;Readonly /&gt;&#xD;
&lt;Name Is="Enter Pattern Here" /&gt;&#xD;
&lt;Name /&gt;&#xD;
&lt;/Entry.SortBy&gt;&#xD;
&lt;/Entry&gt;&#xD;
&lt;Entry DisplayName="Constructors"&gt;&#xD;
Expand Down Expand Up @@ -415,6 +415,7 @@ See the LICENSE file in the project root for more information</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002EMemberReordering_002EMigrations_002ECSharpFileLayoutPatternRemoveIsAttributeUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/HighlightingManager/HighlightingEnabledByDefault/@EntryValue">False</s:Boolean>
Expand Down
30 changes: 16 additions & 14 deletions examples/Elastic.Channels.Continuous/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Threading.Channels;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;

Expand All @@ -13,30 +14,31 @@

var options = new NoopBufferedChannel.NoopChannelOptions
{
BufferOptions = new BufferOptions()
//TrackConcurrency = true,
BufferOptions = new BufferOptions
{
OutboundBufferMaxSize = 10_000,
InboundBufferMaxSize = 10_000_000,
OutboundBufferMaxLifetime = TimeSpan.Zero,
InboundBufferMaxSize = 1_000_000,
OutboundBufferMaxSize = 1_000_000
},
ExportBufferCallback = () => Console.Write("."),
ExportExceptionCallback = e => Console.Write("!"),
PublishToInboundChannelFailureCallback = () => Console.Write("I"),
PublishToOutboundChannelFailureCallback = () => Console.Write("O"),
ExportExceptionCallback = e => Console.Write("!")

};
Console.WriteLine("2");
var channel = new DiagnosticsBufferedChannel(options);
Console.WriteLine($"Begin: ({channel.OutboundStarted}) {channel.MaxConcurrency} {channel.BatchExportOperations} -> {channel.InflightEvents}");
await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) =>
{
var e = new NoopBufferedChannel.NoopEvent { Id = i };
var written = false;
//Console.Write('.');
var ready = await channel.WaitToWriteAsync(ctx);
if (ready) written = channel.TryWrite(e);
if (!written)
if (await channel.WaitToWriteAsync(e))
{
Console.WriteLine();
}
if (i % 10_000 == 0)
{
Console.Clear();
Console.WriteLine(channel);
Console.WriteLine(i);
Environment.Exit(1);
}
});
11 changes: 10 additions & 1 deletion src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ public class BufferOptions
/// </summary>
public int OutboundBufferMaxSize { get; set; } = 1_000;

private TimeSpan _outboundBufferMaxLifetime = TimeSpan.FromSeconds(5);
private readonly TimeSpan _outboundBufferMinLifetime = TimeSpan.FromSeconds(1);


/// <summary>
/// The maximum lifetime of a buffer to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>.
/// If a buffer is older then the configured <see cref="OutboundBufferMaxLifetime"/> it will be flushed to
/// <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> regardless of it's current size
/// <para>Defaults to <c>5 seconds</c></para>
/// <para>Any value less than <c>1 second</c> will be rounded back up to <c>1 second</c></para>
/// </summary>
public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
public TimeSpan OutboundBufferMaxLifetime
{
get => _outboundBufferMaxLifetime;
set => _outboundBufferMaxLifetime = value >= _outboundBufferMinLifetime ? value : _outboundBufferMaxLifetime;
}

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
Expand Down
Loading

0 comments on commit 2037c14

Please sign in to comment.