Skip to content

Commit

Permalink
always use SocketAsyncEventArgs to send
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Jan 6, 2025
1 parent 7deb9a5 commit eb1aacf
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<LangVersion>10.0</LangVersion>
<DotNetTargetFrameworks>net6.0;net7.0;net8.0</DotNetTargetFrameworks>
<TargetFrameworks>netstandard2.1;$(DotNetTargetFrameworks)</TargetFrameworks>
<TargetFrameworks>$(DotNetTargetFrameworks)</TargetFrameworks>
<SamplesTargetFrameworks>net8.0</SamplesTargetFrameworks>
</PropertyGroup>
<PropertyGroup>
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<PackageVersion Include="Microsoft.Extensions.Logging" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Debug" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="BenchmarkDotNet" Version="0.13.1" />
<PackageVersion Include="Autofac.Extensions.DependencyInjection" Version="6.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
Expand Down
12 changes: 10 additions & 2 deletions src/SuperSocket.Connection/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,20 @@ protected async ValueTask<bool> ProcessOutputRead(PipeReader reader)
var buffer = result.Buffer;
var end = buffer.End;

if (!buffer.IsEmpty)
while (!buffer.IsEmpty)
{
try
{
await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); ;
var bytesToSend = buffer.Length;
var bytesSent = await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false);
UpdateLastActiveTime();

if (bytesSent == bytesToSend)
{
break;
}

buffer = buffer.Slice(bytesSent);
}
catch (Exception e)
{
Expand Down
126 changes: 126 additions & 0 deletions src/SuperSocket.Connection/Sockets/SocketSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Microsoft.Extensions.ObjectPool;

namespace SuperSocket.Connection
{
public class SocketSender : SocketAsyncEventArgs, IValueTaskSource<int>, IResettable
{
private Action<object> _continuation;

private static readonly Action<object?> _continuationCompleted = _ => { };

public SocketSender()
: base(unsafeSuppressExecutionContextFlow: true)
{
}

internal ValueTask<int> SendAsync(Socket socket, in ReadOnlySequence<byte> buffer)
{
SetBuffer(buffer);

if (socket.SendAsync(this))
{
return new ValueTask<int>(this, 0);
}

return SocketError != SocketError.Success
? new ValueTask<int>(Task.FromException<int>(new SocketException((int)SocketError)))
: new ValueTask<int>(BytesTransferred);
}

private void SetBuffer(in ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
var segment = GetArrayByMemory(buffer.First);
SetBuffer(segment.Array, segment.Offset, segment.Count);
}
else
{
var bufferList = new List<ArraySegment<byte>>();

foreach (var piece in buffer)
{
bufferList.Add(GetArrayByMemory(piece));
}

BufferList = bufferList;
}
}

protected override void OnCompleted(SocketAsyncEventArgs e)
{
var continuation = _continuation;

if (continuation != null && Interlocked.CompareExchange(ref _continuation, _continuationCompleted, continuation) == continuation)
{
var state = UserToken;
UserToken = null;

ThreadPool.UnsafeQueueUserWorkItem(continuation, state, false);
}
}

public int GetResult(short token)
{
return BytesTransferred;
}

public ValueTaskSourceStatus GetStatus(short token)
{
if (!ReferenceEquals(_continuation, _continuationCompleted))
return ValueTaskSourceStatus.Pending;

return SocketError == SocketError.Success
? ValueTaskSourceStatus.Succeeded
: ValueTaskSourceStatus.Faulted;
}

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
UserToken = state;

var prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);

// The task has already completed, so trigger continuation immediately
if (ReferenceEquals(prevContinuation, _continuationCompleted))
{
UserToken = null;
ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true);
}
}

public bool TryReset()
{
_continuation = null;

if (BufferList != null)
{
BufferList = null;
}
else
{
SetBuffer(null, 0, 0);
}

return true;
}

private ArraySegment<byte> GetArrayByMemory(ReadOnlyMemory<byte> memory)
{
if (!MemoryMarshal.TryGetArray<byte>(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}

return result;
}
}
}
1 change: 1 addition & 0 deletions src/SuperSocket.Connection/SuperSocket.Connection.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<PackageReference Include="System.IO.Pipelines" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../SuperSocket.ProtoBase/SuperSocket.ProtoBase.csproj" />
Expand Down
51 changes: 21 additions & 30 deletions src/SuperSocket.Connection/TcpPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Buffers;
using System.Collections.Generic;
using SuperSocket.ProtoBase;
using Microsoft.Extensions.ObjectPool;

namespace SuperSocket.Connection
{
public class TcpPipeConnection : PipeConnection
{

private Socket _socket;

private List<ArraySegment<byte>> _segmentsForSend;
public TcpPipeConnection(Socket socket, ConnectionOptions options)
private readonly ObjectPool<SocketSender> _socketSenderPool;

public TcpPipeConnection(Socket socket, ConnectionOptions options, ObjectPool<SocketSender> socketSenderPool = null)
: base(options)
{
_socket = socket;
RemoteEndPoint = socket.RemoteEndPoint;
LocalEndPoint = socket.LocalEndPoint;

_socketSenderPool = socketSenderPool;
}

protected override void OnClosed()
Expand All @@ -44,35 +44,26 @@ private async ValueTask<int> ReceiveAsync(Socket socket, Memory<byte> memory, So

protected override async ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
{
if (buffer.IsSingleSegment)
{
return await _socket
.SendAsync(GetArrayByMemory(buffer.First), SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
}

if (_segmentsForSend == null)
{
_segmentsForSend = new List<ArraySegment<byte>>();
}
else
var socketSenderPool = _socketSenderPool;

var socketSender = socketSenderPool?.Get() ?? new SocketSender();

try
{
_segmentsForSend.Clear();
}
var sentBytes = await socketSender.SendAsync(_socket, buffer).ConfigureAwait(false);

var segments = _segmentsForSend;
if (socketSenderPool != null)
{
socketSenderPool.Return(socketSender);
socketSender = null;
}

foreach (var piece in buffer)
return sentBytes;
}
finally
{
cancellationToken.ThrowIfCancellationRequested();
_segmentsForSend.Add(GetArrayByMemory(piece));
socketSender?.Dispose();
}

cancellationToken.ThrowIfCancellationRequested();

return await _socket
.SendAsync(_segmentsForSend, SocketFlags.None)
.ConfigureAwait(false);
}

protected override void Close()
Expand Down
13 changes: 10 additions & 3 deletions src/SuperSocket.Server/Connection/TcpConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,30 @@
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;
using SuperSocket.Connection;
using SuperSocket.ProtoBase;
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Connections;

namespace SuperSocket.Server.Connection
{
public class TcpConnectionFactory : TcpConnectionFactoryBase
{
private readonly ObjectPool<SocketSender> _socketSenderPool;

public TcpConnectionFactory(
ListenOptions listenOptions,
ConnectionOptions connectionOptions,
Action<Socket> socketOptionsSetter,
IConnectionStreamInitializersFactory connectionStreamInitializersFactory)
: base(listenOptions, connectionOptions, socketOptionsSetter, connectionStreamInitializersFactory)
{

if (!(connectionOptions.Values?.TryGetValue("socketSenderPoolSize", out var socketSenderPoolSize) == true && int.TryParse(socketSenderPoolSize, out var socketSenderPoolSizeValue)))
{
socketSenderPoolSizeValue = 1000;
}

_socketSenderPool = new DefaultObjectPool<SocketSender>(new DefaultPooledObjectPolicy<SocketSender>(), socketSenderPoolSizeValue);
}

public override async Task<IConnection> CreateConnection(object connection, CancellationToken cancellationToken)
Expand All @@ -43,7 +50,7 @@ public override async Task<IConnection> CreateConnection(object connection, Canc
return new StreamPipeConnection(stream, socket.RemoteEndPoint, socket.LocalEndPoint, ConnectionOptions);
}

return new TcpPipeConnection(socket, ConnectionOptions);
return new TcpPipeConnection(socket, ConnectionOptions, _socketSenderPool);
}
}
}
28 changes: 13 additions & 15 deletions test/SuperSocket.Tests/MainTest.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.ProtoBase;
using Xunit;
using Xunit.Abstractions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SuperSocket;
using Microsoft.Extensions.Options;
using SuperSocket.Connection;
using SuperSocket.Kestrel;
using SuperSocket.ProtoBase;
using SuperSocket.Server;
using SuperSocket.Server.Host;
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Session;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Hosting.Internal;
using System.Linq;
using System.Collections.Generic;
using System.Security.Authentication;
using SuperSocket.Kestrel;
using System.Threading;
using SuperSocket.Server.Host;
using Xunit;
using Xunit.Abstractions;

/// <summary>
/// Run selected test case by command
Expand Down

0 comments on commit eb1aacf

Please sign in to comment.