From 32c974fd1d2e2f5f3f07ca9f837520fb26a590c4 Mon Sep 17 00:00:00 2001 From: Agasper Date: Mon, 17 Oct 2022 16:06:21 +0300 Subject: [PATCH] Fixed internal thread stealing by TaskCompletionSource --- .../Tcp/TcpConfigurationPeer.cs | 2 +- Src/Neon.Networking/Tcp/TcpConnection.cs | 4 +-- .../Udp/Channels/DelayedPacket.cs | 2 +- .../Udp/UdpConfigurationPeer.cs | 2 +- Src/Neon.Networking/Udp/UdpConnection.cs | 4 +-- .../Authorization/AuthSessionClient.cs | 2 +- .../Middleware/EncryptionMiddleware.cs | 2 +- Src/Neon.Rpc/Net/RpcConfiguration.cs | 2 +- Src/Neon.Util/DummySynchronizationContext.cs | 20 ----------- Src/Neon.Util/NeonSynchronizationContext.cs | 33 +++++++++++++++++++ 10 files changed, 43 insertions(+), 30 deletions(-) delete mode 100644 Src/Neon.Util/DummySynchronizationContext.cs create mode 100644 Src/Neon.Util/NeonSynchronizationContext.cs diff --git a/Src/Neon.Networking/Tcp/TcpConfigurationPeer.cs b/Src/Neon.Networking/Tcp/TcpConfigurationPeer.cs index 77b6719..93ee96d 100644 --- a/Src/Neon.Networking/Tcp/TcpConfigurationPeer.cs +++ b/Src/Neon.Networking/Tcp/TcpConfigurationPeer.cs @@ -149,7 +149,7 @@ internal virtual void Validate() public TcpConfigurationPeer() { - synchronizationContext = new DummySynchronizationContext(); + synchronizationContext = new NeonSynchronizationContext(); contextSynchronizationMode = ContextSynchronizationMode.Post; memoryManager = Neon.Util.Pooling.MemoryManager.Shared; diff --git a/Src/Neon.Networking/Tcp/TcpConnection.cs b/Src/Neon.Networking/Tcp/TcpConnection.cs index aa5159b..325e99c 100644 --- a/Src/Neon.Networking/Tcp/TcpConnection.cs +++ b/Src/Neon.Networking/Tcp/TcpConnection.cs @@ -586,7 +586,7 @@ async Task SendMessageSkipSimulationAsync(TcpMessage message) sendTask = sendTask.ContinueWith( (task, msg) => { - return SendMessageInternalAsync(msg as TcpMessage); + return SendMessageImmediatelyInternalAsync(msg as TcpMessage); }, message, TaskContinuationOptions.ExecuteSynchronously) .Unwrap(); @@ -596,7 +596,7 @@ async Task SendMessageSkipSimulationAsync(TcpMessage message) await newSendTask.ConfigureAwait(false); } - async Task SendMessageInternalAsync(TcpMessage message) + async Task SendMessageImmediatelyInternalAsync(TcpMessage message) { await sendSemaphore.WaitAsync().ConfigureAwait(false); try diff --git a/Src/Neon.Networking/Udp/Channels/DelayedPacket.cs b/Src/Neon.Networking/Udp/Channels/DelayedPacket.cs index 0cef77b..41f87ec 100644 --- a/Src/Neon.Networking/Udp/Channels/DelayedPacket.cs +++ b/Src/Neon.Networking/Udp/Channels/DelayedPacket.cs @@ -14,7 +14,7 @@ struct DelayedPacket : IDisposable public DelayedPacket(Datagram datagram) { this.Datagram = datagram; - this.tcs = new TaskCompletionSource(); + this.tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } public void Dispose() diff --git a/Src/Neon.Networking/Udp/UdpConfigurationPeer.cs b/Src/Neon.Networking/Udp/UdpConfigurationPeer.cs index 82e7108..2fc93ce 100644 --- a/Src/Neon.Networking/Udp/UdpConfigurationPeer.cs +++ b/Src/Neon.Networking/Udp/UdpConfigurationPeer.cs @@ -156,7 +156,7 @@ public UdpConfigurationPeer() sendBufferSize = 65535; receiveBufferSize = 1048676; reuseAddress = true; - synchronizationContext = new DummySynchronizationContext(); + synchronizationContext = new NeonSynchronizationContext(); tooLargeMessageBehaviour = TooLargeMessageBehaviour.RaiseException; contextSynchronizationMode = ContextSynchronizationMode.Post; networkReceiveThreads = Math.Max(1, Environment.ProcessorCount - 1); diff --git a/Src/Neon.Networking/Udp/UdpConnection.cs b/Src/Neon.Networking/Udp/UdpConnection.cs index 3ef3bbe..47e4375 100644 --- a/Src/Neon.Networking/Udp/UdpConnection.cs +++ b/Src/Neon.Networking/Udp/UdpConnection.cs @@ -90,8 +90,8 @@ public UdpConnection(UdpPeer peer) if (peer == null) throw new ArgumentNullException(nameof(peer)); Random rnd = new Random(); - this.connectingTcs = new TaskCompletionSource(); - this.disconnectingTcs = new TaskCompletionSource(); + this.connectingTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + this.disconnectingTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); this.connectionCancellationToken = new CancellationTokenSource(); this.Id = peer.GetNextConnectionId(); this.status = UdpConnectionStatus.InitialWaiting; diff --git a/Src/Neon.Rpc/Authorization/AuthSessionClient.cs b/Src/Neon.Rpc/Authorization/AuthSessionClient.cs index 6656c46..6c92ae3 100644 --- a/Src/Neon.Rpc/Authorization/AuthSessionClient.cs +++ b/Src/Neon.Rpc/Authorization/AuthSessionClient.cs @@ -27,7 +27,7 @@ internal async Task Start(object arg, CancellationToken cancellationToke if (this.tcs != null) throw new InvalidOperationException($"{nameof(Start)} could be called only once"); - this.tcs = new TaskCompletionSource(); + this.tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AuthenticationRequest request = new AuthenticationRequest(); request.Argument = arg; diff --git a/Src/Neon.Rpc/Middleware/EncryptionMiddleware.cs b/Src/Neon.Rpc/Middleware/EncryptionMiddleware.cs index d23e260..214ee0a 100644 --- a/Src/Neon.Rpc/Middleware/EncryptionMiddleware.cs +++ b/Src/Neon.Rpc/Middleware/EncryptionMiddleware.cs @@ -65,7 +65,7 @@ public void Set(ICipher cipher) Reset(); this.cipher = cipher ?? throw new ArgumentNullException(nameof(cipher)); this.dh = new DiffieHellmanImpl(cipher); - this.tcs = new TaskCompletionSource(); + this.tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } void CheckCipher() diff --git a/Src/Neon.Rpc/Net/RpcConfiguration.cs b/Src/Neon.Rpc/Net/RpcConfiguration.cs index 2a32b6e..4f8d459 100644 --- a/Src/Neon.Rpc/Net/RpcConfiguration.cs +++ b/Src/Neon.Rpc/Net/RpcConfiguration.cs @@ -131,7 +131,7 @@ public RpcConfiguration() Assembly assembly = Assembly.GetEntryAssembly(); serializer = new RpcSerializer(MemoryManager.Shared); remotingInvocationRules = RemotingInvocationRules.Default; - synchronizationContext = new DummySynchronizationContext(); + synchronizationContext = new NeonSynchronizationContext(); contextSynchronizationMode = ContextSynchronizationMode.Post; taskScheduler = TaskScheduler.Default; logManager = Logging.LogManager.Dummy; diff --git a/Src/Neon.Util/DummySynchronizationContext.cs b/Src/Neon.Util/DummySynchronizationContext.cs deleted file mode 100644 index 80265e3..0000000 --- a/Src/Neon.Util/DummySynchronizationContext.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System.Threading; - -namespace Neon.Util -{ - /// - /// A placeholder for a synchronization context. - /// - public class DummySynchronizationContext : SynchronizationContext - { - public override void Post(SendOrPostCallback d, object state) - { - d(state); - } - - public override void Send(SendOrPostCallback d, object state) - { - d(state); - } - } -} \ No newline at end of file diff --git a/Src/Neon.Util/NeonSynchronizationContext.cs b/Src/Neon.Util/NeonSynchronizationContext.cs new file mode 100644 index 0000000..a0f7e8d --- /dev/null +++ b/Src/Neon.Util/NeonSynchronizationContext.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Neon.Util +{ + /// + /// A placeholder for a synchronization context. + /// + public class NeonSynchronizationContext : SynchronizationContext + { + Task task = Task.CompletedTask; + object mutex = new object(); + + public override void Post(SendOrPostCallback d, object state) + { + lock (mutex) + { + task = task.ContinueWith( + (task, args_) => + { + Tuple args__ = (Tuple) args_; + args__.Item1(args__.Item2); + }, new Tuple(d, state), TaskContinuationOptions.RunContinuationsAsynchronously); + } + } + + public override void Send(SendOrPostCallback d, object state) + { + d(state); + } + } +} \ No newline at end of file