Skip to content

Commit

Permalink
akways use SocketAsyncEventArgs to send
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Jan 4, 2025
1 parent 7deb9a5 commit 5622efb
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 50 deletions.
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<PackageVersion Include="Microsoft.Extensions.Logging" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Debug" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="$(MSExtensionsVersion)" />
<PackageVersion Include="BenchmarkDotNet" Version="0.13.1" />
<PackageVersion Include="Autofac.Extensions.DependencyInjection" Version="6.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
Expand Down
2 changes: 1 addition & 1 deletion src/SuperSocket.Connection/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected async ValueTask<bool> ProcessOutputRead(PipeReader reader)
{
try
{
await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false); ;
await SendOverIOAsync(buffer, CancellationToken.None).ConfigureAwait(false);
UpdateLastActiveTime();
}
catch (Exception e)
Expand Down
108 changes: 108 additions & 0 deletions src/SuperSocket.Connection/Sockets/SocketSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks.Sources;
using Microsoft.Extensions.ObjectPool;

namespace SuperSocket.Connection
{
public class SocketSender : SocketAsyncEventArgs, IValueTaskSource<int>, IResettable
{
private Action<object> _continuation;

private static readonly Action<object?> _continuationCompleted = _ => { };

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

public SocketSender()
{
}

internal void SetBuffer(in ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
var segment = GetArrayByMemory(buffer.First);
SetBuffer(segment.Array, segment.Offset, segment.Count);
}
else
{
var bufferList = new List<ArraySegment<byte>>();

foreach (var piece in buffer)
{
bufferList.Add(GetArrayByMemory(piece));
}

BufferList = bufferList;
}
}

protected override void OnCompleted(SocketAsyncEventArgs e)
{
var continuation = _continuation;

if (continuation != null && Interlocked.CompareExchange(ref _continuation, _continuationCompleted, continuation) != null)
{
var state = UserToken;
UserToken = null;

ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true);

Check failure on line 51 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The best overload for 'UnsafeQueueUserWorkItem' does not have a parameter named 'preferLocal'

Check failure on line 51 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The best overload for 'UnsafeQueueUserWorkItem' does not have a parameter named 'preferLocal'

Check failure on line 51 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The best overload for 'UnsafeQueueUserWorkItem' does not have a parameter named 'preferLocal'
}
}

public int GetResult(short token)
{
return BytesTransferred;
}

public ValueTaskSourceStatus GetStatus(short token)
{
if (!ReferenceEquals(_continuation, _continuationCompleted))
return ValueTaskSourceStatus.Pending;

return SocketError == SocketError.Success
? ValueTaskSourceStatus.Succeeded
: ValueTaskSourceStatus.Faulted;
}

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
UserToken = state;

var prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);

// The task has already completed, so trigger continuation immediately
if (ReferenceEquals(prevContinuation, _continuationCompleted))
{
UserToken = null;
ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true);

Check failure on line 80 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / push

The best overload for 'UnsafeQueueUserWorkItem' does not have a parameter named 'preferLocal'

Check failure on line 80 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The best overload for 'UnsafeQueueUserWorkItem' does not have a parameter named 'preferLocal'

Check failure on line 80 in src/SuperSocket.Connection/Sockets/SocketSender.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

The best overload for 'UnsafeQueueUserWorkItem' does not have a parameter named 'preferLocal'
}
}

public bool TryReset()
{
if (BufferList.Count > 0)
{
BufferList.Clear();
}
else
{
SetBuffer(null, 0, 0);
}

return true;
}

private ArraySegment<byte> GetArrayByMemory(ReadOnlyMemory<byte> memory)
{
if (!MemoryMarshal.TryGetArray<byte>(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}

return result;
}
}
}
1 change: 1 addition & 0 deletions src/SuperSocket.Connection/SuperSocket.Connection.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<PackageReference Include="System.IO.Pipelines" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../SuperSocket.ProtoBase/SuperSocket.ProtoBase.csproj" />
Expand Down
54 changes: 23 additions & 31 deletions src/SuperSocket.Connection/TcpPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Buffers;
using System.Collections.Generic;
using SuperSocket.ProtoBase;
using Microsoft.Extensions.ObjectPool;

namespace SuperSocket.Connection
{
public class TcpPipeConnection : PipeConnection
{

private Socket _socket;

private List<ArraySegment<byte>> _segmentsForSend;
public TcpPipeConnection(Socket socket, ConnectionOptions options)
private readonly ObjectPool<SocketSender> _socketSenderPool;

public TcpPipeConnection(Socket socket, ConnectionOptions options, ObjectPool<SocketSender> socketSenderPool = null)
: base(options)
{
_socket = socket;
RemoteEndPoint = socket.RemoteEndPoint;
LocalEndPoint = socket.LocalEndPoint;

_socketSenderPool = socketSenderPool;
}

protected override void OnClosed()
Expand All @@ -42,37 +42,29 @@ private async ValueTask<int> ReceiveAsync(Socket socket, Memory<byte> memory, So
.ConfigureAwait(false);
}

protected override async ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
protected override ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
{
if (buffer.IsSingleSegment)
{
return await _socket
.SendAsync(GetArrayByMemory(buffer.First), SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
}

if (_segmentsForSend == null)
{
_segmentsForSend = new List<ArraySegment<byte>>();
}
else
var socketSender = _socketSenderPool?.Get() ?? new SocketSender();

try
{
_segmentsForSend.Clear();
}
socketSender.SetBuffer(buffer);

var segments = _segmentsForSend;
cancellationToken.ThrowIfCancellationRequested();

if (_socket.SendAsync(socketSender))
{
return new ValueTask<int>(socketSender, 0);
}

foreach (var piece in buffer)
return socketSender.SocketError != SocketError.Success
? ValueTask.FromException<int>(new SocketException((int)socketSender.SocketError))

Check failure on line 61 in src/SuperSocket.Connection/TcpPipeConnection.cs

View workflow job for this annotation

GitHub Actions / push

'ValueTask' does not contain a definition for 'FromException'

Check failure on line 61 in src/SuperSocket.Connection/TcpPipeConnection.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

'ValueTask' does not contain a definition for 'FromException'

Check failure on line 61 in src/SuperSocket.Connection/TcpPipeConnection.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

'ValueTask' does not contain a definition for 'FromException'
: new ValueTask<int>(socketSender.BytesTransferred);
}
finally
{
cancellationToken.ThrowIfCancellationRequested();
_segmentsForSend.Add(GetArrayByMemory(piece));
_socketSenderPool.Return(socketSender);
}

cancellationToken.ThrowIfCancellationRequested();

return await _socket
.SendAsync(_segmentsForSend, SocketFlags.None)
.ConfigureAwait(false);
}

protected override void Close()
Expand Down
13 changes: 10 additions & 3 deletions src/SuperSocket.Server/Connection/TcpConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,30 @@
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;
using SuperSocket.Connection;
using SuperSocket.ProtoBase;
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Connections;

namespace SuperSocket.Server.Connection
{
public class TcpConnectionFactory : TcpConnectionFactoryBase
{
private readonly ObjectPool<SocketSender> _socketSenderPool;

public TcpConnectionFactory(
ListenOptions listenOptions,
ConnectionOptions connectionOptions,
Action<Socket> socketOptionsSetter,
IConnectionStreamInitializersFactory connectionStreamInitializersFactory)
: base(listenOptions, connectionOptions, socketOptionsSetter, connectionStreamInitializersFactory)
{

if (!(connectionOptions.Values?.TryGetValue("socketSenderPoolSize", out var socketSenderPoolSize) == true && int.TryParse(socketSenderPoolSize, out var socketSenderPoolSizeValue)))
{
socketSenderPoolSizeValue = 1000;
}

_socketSenderPool = new DefaultObjectPool<SocketSender>(new DefaultPooledObjectPolicy<SocketSender>(), socketSenderPoolSizeValue);
}

public override async Task<IConnection> CreateConnection(object connection, CancellationToken cancellationToken)
Expand All @@ -43,7 +50,7 @@ public override async Task<IConnection> CreateConnection(object connection, Canc
return new StreamPipeConnection(stream, socket.RemoteEndPoint, socket.LocalEndPoint, ConnectionOptions);
}

return new TcpPipeConnection(socket, ConnectionOptions);
return new TcpPipeConnection(socket, ConnectionOptions, _socketSenderPool);
}
}
}
28 changes: 13 additions & 15 deletions test/SuperSocket.Tests/MainTest.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.ProtoBase;
using Xunit;
using Xunit.Abstractions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SuperSocket;
using Microsoft.Extensions.Options;
using SuperSocket.Connection;
using SuperSocket.Kestrel;
using SuperSocket.ProtoBase;
using SuperSocket.Server;
using SuperSocket.Server.Host;
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Session;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Hosting.Internal;
using System.Linq;
using System.Collections.Generic;
using System.Security.Authentication;
using SuperSocket.Kestrel;
using System.Threading;
using SuperSocket.Server.Host;
using Xunit;
using Xunit.Abstractions;

/// <summary>
/// Run selected test case by command
Expand Down

0 comments on commit 5622efb

Please sign in to comment.