diff --git a/Directory.Packages.props b/Directory.Packages.props index 609690110..df58dd5a3 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -18,6 +18,7 @@ + diff --git a/src/SuperSocket.Connection/PipeConnection.cs b/src/SuperSocket.Connection/PipeConnection.cs index 1e3d2922b..f1d3393a6 100644 --- a/src/SuperSocket.Connection/PipeConnection.cs +++ b/src/SuperSocket.Connection/PipeConnection.cs @@ -162,7 +162,7 @@ protected async ValueTask ProcessOutputRead(PipeReader reader) { try { - await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); ; + await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); UpdateLastActiveTime(); } catch (Exception e) diff --git a/src/SuperSocket.Connection/Sockets/SocketSender.cs b/src/SuperSocket.Connection/Sockets/SocketSender.cs new file mode 100644 index 000000000..f1d7b482e --- /dev/null +++ b/src/SuperSocket.Connection/Sockets/SocketSender.cs @@ -0,0 +1,108 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks.Sources; +using Microsoft.Extensions.ObjectPool; + +namespace SuperSocket.Connection +{ + public class SocketSender : SocketAsyncEventArgs, IValueTaskSource, IResettable + { + private Action _continuation; + + private static readonly Action _continuationCompleted = _ => { }; + + public SocketSender() + { + } + + internal void SetBuffer(in ReadOnlySequence buffer) + { + if (buffer.IsSingleSegment) + { + var segment = GetArrayByMemory(buffer.First); + SetBuffer(segment.Array, segment.Offset, segment.Count); + } + else + { + var bufferList = new List>(); + + foreach (var piece in buffer) + { + bufferList.Add(GetArrayByMemory(piece)); + } + + BufferList = bufferList; + } + } + + protected override void OnCompleted(SocketAsyncEventArgs e) + { + var continuation = _continuation; + + if (continuation != null && Interlocked.CompareExchange(ref _continuation, _continuationCompleted, continuation) != null) + { + var state = UserToken; + UserToken = null; + + ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); + } + } + + public int GetResult(short token) + { + return BytesTransferred; + } + + public ValueTaskSourceStatus GetStatus(short token) + { + if (!ReferenceEquals(_continuation, _continuationCompleted)) + return ValueTaskSourceStatus.Pending; + + return SocketError == SocketError.Success + ? ValueTaskSourceStatus.Succeeded + : ValueTaskSourceStatus.Faulted; + } + + public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) + { + UserToken = state; + + var prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); + + // The task has already completed, so trigger continuation immediately + if (ReferenceEquals(prevContinuation, _continuationCompleted)) + { + UserToken = null; + ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); + } + } + + public bool TryReset() + { + if (BufferList.Count > 0) + { + BufferList.Clear(); + } + else + { + SetBuffer(null, 0, 0); + } + + return true; + } + + private ArraySegment GetArrayByMemory(ReadOnlyMemory memory) + { + if (!MemoryMarshal.TryGetArray(memory, out var result)) + { + throw new InvalidOperationException("Buffer backed by array was expected"); + } + + return result; + } + } +} \ No newline at end of file diff --git a/src/SuperSocket.Connection/SuperSocket.Connection.csproj b/src/SuperSocket.Connection/SuperSocket.Connection.csproj index d2c02bdab..ab04d2a47 100644 --- a/src/SuperSocket.Connection/SuperSocket.Connection.csproj +++ b/src/SuperSocket.Connection/SuperSocket.Connection.csproj @@ -9,6 +9,7 @@ + diff --git a/src/SuperSocket.Connection/TcpPipeConnection.cs b/src/SuperSocket.Connection/TcpPipeConnection.cs index 2bb406852..34f411cf2 100644 --- a/src/SuperSocket.Connection/TcpPipeConnection.cs +++ b/src/SuperSocket.Connection/TcpPipeConnection.cs @@ -3,24 +3,24 @@ using System.Threading.Tasks; using System.Net.Sockets; using System.Buffers; -using System.Collections.Generic; -using SuperSocket.ProtoBase; +using Microsoft.Extensions.ObjectPool; namespace SuperSocket.Connection { public class TcpPipeConnection : PipeConnection { - private Socket _socket; - private List> _segmentsForSend; - - public TcpPipeConnection(Socket socket, ConnectionOptions options) + private readonly ObjectPool _socketSenderPool; + + public TcpPipeConnection(Socket socket, ConnectionOptions options, ObjectPool socketSenderPool = null) : base(options) { _socket = socket; RemoteEndPoint = socket.RemoteEndPoint; LocalEndPoint = socket.LocalEndPoint; + + _socketSenderPool = socketSenderPool; } protected override void OnClosed() @@ -42,37 +42,29 @@ private async ValueTask ReceiveAsync(Socket socket, Memory memory, So .ConfigureAwait(false); } - protected override async ValueTask SendOverIOAsync(ReadOnlySequence buffer, CancellationToken cancellationToken) + protected override ValueTask SendOverIOAsync(ReadOnlySequence buffer, CancellationToken cancellationToken) { - if (buffer.IsSingleSegment) - { - return await _socket - .SendAsync(GetArrayByMemory(buffer.First), SocketFlags.None, cancellationToken) - .ConfigureAwait(false); - } - - if (_segmentsForSend == null) - { - _segmentsForSend = new List>(); - } - else + var socketSender = _socketSenderPool?.Get() ?? new SocketSender(); + + try { - _segmentsForSend.Clear(); - } + socketSender.SetBuffer(buffer); - var segments = _segmentsForSend; + cancellationToken.ThrowIfCancellationRequested(); + + if (_socket.SendAsync(socketSender)) + { + return new ValueTask(socketSender, 0); + } - foreach (var piece in buffer) + return socketSender.SocketError != SocketError.Success + ? ValueTask.FromException(new SocketException((int)socketSender.SocketError)) + : new ValueTask(socketSender.BytesTransferred); + } + finally { - cancellationToken.ThrowIfCancellationRequested(); - _segmentsForSend.Add(GetArrayByMemory(piece)); + _socketSenderPool.Return(socketSender); } - - cancellationToken.ThrowIfCancellationRequested(); - - return await _socket - .SendAsync(_segmentsForSend, SocketFlags.None) - .ConfigureAwait(false); } protected override void Close() diff --git a/src/SuperSocket.Server/Connection/TcpConnectionFactory.cs b/src/SuperSocket.Server/Connection/TcpConnectionFactory.cs index 9c6ff92a9..d44343876 100644 --- a/src/SuperSocket.Server/Connection/TcpConnectionFactory.cs +++ b/src/SuperSocket.Server/Connection/TcpConnectionFactory.cs @@ -5,8 +5,8 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.ObjectPool; using SuperSocket.Connection; -using SuperSocket.ProtoBase; using SuperSocket.Server.Abstractions; using SuperSocket.Server.Abstractions.Connections; @@ -14,6 +14,8 @@ namespace SuperSocket.Server.Connection { public class TcpConnectionFactory : TcpConnectionFactoryBase { + private readonly ObjectPool _socketSenderPool; + public TcpConnectionFactory( ListenOptions listenOptions, ConnectionOptions connectionOptions, @@ -21,7 +23,12 @@ public TcpConnectionFactory( IConnectionStreamInitializersFactory connectionStreamInitializersFactory) : base(listenOptions, connectionOptions, socketOptionsSetter, connectionStreamInitializersFactory) { - + if (!(connectionOptions.Values?.TryGetValue("socketSenderPoolSize", out var socketSenderPoolSize) == true && int.TryParse(socketSenderPoolSize, out var socketSenderPoolSizeValue))) + { + socketSenderPoolSizeValue = 1000; + } + + _socketSenderPool = new DefaultObjectPool(new DefaultPooledObjectPolicy(), socketSenderPoolSizeValue); } public override async Task CreateConnection(object connection, CancellationToken cancellationToken) @@ -43,7 +50,7 @@ public override async Task CreateConnection(object connection, Canc return new StreamPipeConnection(stream, socket.RemoteEndPoint, socket.LocalEndPoint, ConnectionOptions); } - return new TcpPipeConnection(socket, ConnectionOptions); + return new TcpPipeConnection(socket, ConnectionOptions, _socketSenderPool); } } } \ No newline at end of file diff --git a/test/SuperSocket.Tests/MainTest.cs b/test/SuperSocket.Tests/MainTest.cs index f25d88682..d555ac6b0 100644 --- a/test/SuperSocket.Tests/MainTest.cs +++ b/test/SuperSocket.Tests/MainTest.cs @@ -1,29 +1,27 @@ using System; +using System.Collections.Generic; using System.IO; -using System.Net; +using System.IO.Pipelines; +using System.Linq; using System.Net.Sockets; +using System.Security.Authentication; using System.Text; +using System.Threading; using System.Threading.Tasks; -using SuperSocket.ProtoBase; -using Xunit; -using Xunit.Abstractions; -using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using SuperSocket; +using Microsoft.Extensions.Options; using SuperSocket.Connection; +using SuperSocket.Kestrel; +using SuperSocket.ProtoBase; using SuperSocket.Server; -using SuperSocket.Server.Host; using SuperSocket.Server.Abstractions; using SuperSocket.Server.Abstractions.Session; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Options; -using Microsoft.Extensions.Hosting.Internal; -using System.Linq; -using System.Collections.Generic; -using System.Security.Authentication; -using SuperSocket.Kestrel; -using System.Threading; +using SuperSocket.Server.Host; +using Xunit; +using Xunit.Abstractions; /// /// Run selected test case by command