Skip to content

Commit

Permalink
Fixed internal thread stealing by TaskCompletionSource
Browse files Browse the repository at this point in the history
  • Loading branch information
Agasper committed Oct 17, 2022
1 parent 2e234de commit 32c974f
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Src/Neon.Networking/Tcp/TcpConfigurationPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ internal virtual void Validate()

public TcpConfigurationPeer()
{
synchronizationContext = new DummySynchronizationContext();
synchronizationContext = new NeonSynchronizationContext();
contextSynchronizationMode = ContextSynchronizationMode.Post;

memoryManager = Neon.Util.Pooling.MemoryManager.Shared;
Expand Down
4 changes: 2 additions & 2 deletions Src/Neon.Networking/Tcp/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ async Task SendMessageSkipSimulationAsync(TcpMessage message)
sendTask = sendTask.ContinueWith(
(task, msg) =>
{
return SendMessageInternalAsync(msg as TcpMessage);
return SendMessageImmediatelyInternalAsync(msg as TcpMessage);
}, message, TaskContinuationOptions.ExecuteSynchronously)
.Unwrap();

Expand All @@ -596,7 +596,7 @@ async Task SendMessageSkipSimulationAsync(TcpMessage message)
await newSendTask.ConfigureAwait(false);
}

async Task SendMessageInternalAsync(TcpMessage message)
async Task SendMessageImmediatelyInternalAsync(TcpMessage message)
{
await sendSemaphore.WaitAsync().ConfigureAwait(false);
try
Expand Down
2 changes: 1 addition & 1 deletion Src/Neon.Networking/Udp/Channels/DelayedPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct DelayedPacket : IDisposable
public DelayedPacket(Datagram datagram)
{
this.Datagram = datagram;
this.tcs = new TaskCompletionSource<object>();
this.tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public void Dispose()
Expand Down
2 changes: 1 addition & 1 deletion Src/Neon.Networking/Udp/UdpConfigurationPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public UdpConfigurationPeer()
sendBufferSize = 65535;
receiveBufferSize = 1048676;
reuseAddress = true;
synchronizationContext = new DummySynchronizationContext();
synchronizationContext = new NeonSynchronizationContext();
tooLargeMessageBehaviour = TooLargeMessageBehaviour.RaiseException;
contextSynchronizationMode = ContextSynchronizationMode.Post;
networkReceiveThreads = Math.Max(1, Environment.ProcessorCount - 1);
Expand Down
4 changes: 2 additions & 2 deletions Src/Neon.Networking/Udp/UdpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public UdpConnection(UdpPeer peer)
if (peer == null)
throw new ArgumentNullException(nameof(peer));
Random rnd = new Random();
this.connectingTcs = new TaskCompletionSource<object>();
this.disconnectingTcs = new TaskCompletionSource<object>();
this.connectingTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
this.disconnectingTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
this.connectionCancellationToken = new CancellationTokenSource();
this.Id = peer.GetNextConnectionId();
this.status = UdpConnectionStatus.InitialWaiting;
Expand Down
2 changes: 1 addition & 1 deletion Src/Neon.Rpc/Authorization/AuthSessionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal async Task<object> Start(object arg, CancellationToken cancellationToke
if (this.tcs != null)
throw new InvalidOperationException($"{nameof(Start)} could be called only once");

this.tcs = new TaskCompletionSource<object>();
this.tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

AuthenticationRequest request = new AuthenticationRequest();
request.Argument = arg;
Expand Down
2 changes: 1 addition & 1 deletion Src/Neon.Rpc/Middleware/EncryptionMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void Set(ICipher cipher)
Reset();
this.cipher = cipher ?? throw new ArgumentNullException(nameof(cipher));
this.dh = new DiffieHellmanImpl(cipher);
this.tcs = new TaskCompletionSource<object>();
this.tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}

void CheckCipher()
Expand Down
2 changes: 1 addition & 1 deletion Src/Neon.Rpc/Net/RpcConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public RpcConfiguration()
Assembly assembly = Assembly.GetEntryAssembly();
serializer = new RpcSerializer(MemoryManager.Shared);
remotingInvocationRules = RemotingInvocationRules.Default;
synchronizationContext = new DummySynchronizationContext();
synchronizationContext = new NeonSynchronizationContext();
contextSynchronizationMode = ContextSynchronizationMode.Post;
taskScheduler = TaskScheduler.Default;
logManager = Logging.LogManager.Dummy;
Expand Down
20 changes: 0 additions & 20 deletions Src/Neon.Util/DummySynchronizationContext.cs

This file was deleted.

33 changes: 33 additions & 0 deletions Src/Neon.Util/NeonSynchronizationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Neon.Util
{
/// <summary>
/// A placeholder for a synchronization context.
/// </summary>
public class NeonSynchronizationContext : SynchronizationContext
{
Task task = Task.CompletedTask;
object mutex = new object();

public override void Post(SendOrPostCallback d, object state)
{
lock (mutex)
{
task = task.ContinueWith(
(task, args_) =>
{
Tuple<SendOrPostCallback, object> args__ = (Tuple<SendOrPostCallback, object>) args_;
args__.Item1(args__.Item2);
}, new Tuple<SendOrPostCallback, object>(d, state), TaskContinuationOptions.RunContinuationsAsynchronously);
}
}

public override void Send(SendOrPostCallback d, object state)
{
d(state);
}
}
}

0 comments on commit 32c974f

Please sign in to comment.