Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add iocp #719

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions SuperSocket.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.Connection", "s
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.Kestrel", "src\SuperSocket.Kestrel\SuperSocket.Kestrel.csproj", "{8C8507D6-903F-4786-8F18-ACA54257454B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.IOCP.Connection", "src\SuperSocket.IOCP.Connection\SuperSocket.IOCP.Connection.csproj", "{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.IOCP.ConnectionFactory", "src\SuperSocket.IOCP.ConnectionFactory\SuperSocket.IOCP.ConnectionFactory.csproj", "{76DDFACD-4930-4C3A-8035-773A8A83A02E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -241,6 +245,30 @@ Global
{8C8507D6-903F-4786-8F18-ACA54257454B}.Release|x64.Build.0 = Release|Any CPU
{8C8507D6-903F-4786-8F18-ACA54257454B}.Release|x86.ActiveCfg = Release|Any CPU
{8C8507D6-903F-4786-8F18-ACA54257454B}.Release|x86.Build.0 = Release|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Debug|x64.ActiveCfg = Debug|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Debug|x64.Build.0 = Debug|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Debug|x86.ActiveCfg = Debug|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Debug|x86.Build.0 = Debug|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Release|Any CPU.Build.0 = Release|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Release|x64.ActiveCfg = Release|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Release|x64.Build.0 = Release|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Release|x86.ActiveCfg = Release|Any CPU
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD}.Release|x86.Build.0 = Release|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Debug|x64.ActiveCfg = Debug|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Debug|x64.Build.0 = Debug|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Debug|x86.ActiveCfg = Debug|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Debug|x86.Build.0 = Debug|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Release|Any CPU.Build.0 = Release|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Release|x64.ActiveCfg = Release|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Release|x64.Build.0 = Release|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Release|x86.ActiveCfg = Release|Any CPU
{76DDFACD-4930-4C3A-8035-773A8A83A02E}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -262,6 +290,8 @@ Global
{8454E8D5-777D-46CB-B050-76C5119B624B} = {DDA4741E-097F-40C3-A252-2E1E3476C1A7}
{FC2B529F-4AF4-4C39-BC4F-A3836CC7B37C} = {DDA4741E-097F-40C3-A252-2E1E3476C1A7}
{8C8507D6-903F-4786-8F18-ACA54257454B} = {DDA4741E-097F-40C3-A252-2E1E3476C1A7}
{75C18F74-E1B0-4AB0-A2F7-82C499FD15AD} = {DDA4741E-097F-40C3-A252-2E1E3476C1A7}
{76DDFACD-4930-4C3A-8035-773A8A83A02E} = {DDA4741E-097F-40C3-A252-2E1E3476C1A7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {ADB30AA2-A848-4CB3-8A20-488C80F1BA9E}
Expand Down
23 changes: 23 additions & 0 deletions src/SuperSocket.IOCP.Connection/Internal/BufferExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Runtime.InteropServices;

namespace SuperSocket.IOCP.Connection;

internal static class BufferExtensions
{
public static ArraySegment<byte> GetArray(this Memory<byte> memory)
{
return ((ReadOnlyMemory<byte>)memory).GetArray();
}

public static ArraySegment<byte> GetArray(this ReadOnlyMemory<byte> memory)
{
if (!MemoryMarshal.TryGetArray(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}
return result;
}
}
76 changes: 76 additions & 0 deletions src/SuperSocket.IOCP.Connection/Internal/IOQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.IO.Pipelines;

#nullable enable

namespace SuperSocket.IOCP.Connection;

internal sealed class IOQueue : PipeScheduler, IThreadPoolWorkItem
{
private readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
private int _doingWork;

public override void Schedule(Action<object?> action, object? state)
{
_workItems.Enqueue(new Work(action, state));

// Set working if it wasn't (via atomic Interlocked).
if (Interlocked.CompareExchange(ref _doingWork, 1, 0) == 0)
{
// Wasn't working, schedule.
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}

void IThreadPoolWorkItem.Execute()
{
while (true)
{
while (_workItems.TryDequeue(out Work item))
{
item.Callback(item.State);
}

// All work done.

// Set _doingWork (0 == false) prior to checking IsEmpty to catch any missed work in interim.
// This doesn't need to be volatile due to the following barrier (i.e. it is volatile).
_doingWork = 0;

// Ensure _doingWork is written before IsEmpty is read.
// As they are two different memory locations, we insert a barrier to guarantee ordering.
Thread.MemoryBarrier();

// Check if there is work to do
if (_workItems.IsEmpty)
{
// Nothing to do, exit.
break;
}

// Is work, can we set it as active again (via atomic Interlocked), prior to scheduling?
if (Interlocked.Exchange(ref _doingWork, 1) == 1)
{
// Execute has been rescheduled already, exit.
break;
}

// Is work, wasn't already scheduled so continue loop.
}
}

private readonly struct Work
{
public readonly Action<object?> Callback;
public readonly object? State;

public Work(Action<object?> callback, object? state)
{
Callback = callback;
State = state;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
namespace System.Buffers;

/// <summary>
/// Used to allocate and distribute re-usable blocks of memory.
/// </summary>
internal sealed class DiagnosticMemoryPool : MemoryPool<byte>
{
private readonly MemoryPool<byte> _pool;

private readonly bool _allowLateReturn;

private readonly bool _rentTracking;

private readonly object _syncObj;

private readonly HashSet<DiagnosticPoolBlock> _blocks;

private readonly List<Exception> _blockAccessExceptions;

private readonly TaskCompletionSource _allBlocksReturned;

private int _totalBlocks;

/// <summary>
/// This default value passed in to Rent to use the default value for the pool.
/// </summary>
private const int AnySize = -1;

public DiagnosticMemoryPool(MemoryPool<byte> pool, bool allowLateReturn = false, bool rentTracking = false)
{
_pool = pool;
_allowLateReturn = allowLateReturn;
_rentTracking = rentTracking;
_blocks = new HashSet<DiagnosticPoolBlock>();
_syncObj = new object();
_allBlocksReturned = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_blockAccessExceptions = new List<Exception>();
}

public bool IsDisposed { get; private set; }

public override IMemoryOwner<byte> Rent(int size = AnySize)
{
lock (_syncObj)
{
if (IsDisposed)
{
MemoryPoolThrowHelper.ThrowObjectDisposedException(MemoryPoolThrowHelper.ExceptionArgument.MemoryPool);
}

var diagnosticPoolBlock = new DiagnosticPoolBlock(this, _pool.Rent(size));
if (_rentTracking)
{
diagnosticPoolBlock.Track();
}
_totalBlocks++;
_blocks.Add(diagnosticPoolBlock);
return diagnosticPoolBlock;
}
}

public override int MaxBufferSize => _pool.MaxBufferSize;

internal void Return(DiagnosticPoolBlock block)
{
bool returnedAllBlocks;
lock (_syncObj)
{
_blocks.Remove(block);
returnedAllBlocks = _blocks.Count == 0;
}

if (IsDisposed)
{
if (!_allowLateReturn)
{
MemoryPoolThrowHelper.ThrowInvalidOperationException_BlockReturnedToDisposedPool(block);
}

if (returnedAllBlocks)
{
SetAllBlocksReturned();
}
}
}

internal void ReportException(Exception exception)
{
lock (_syncObj)
{
_blockAccessExceptions.Add(exception);
}
}

protected override void Dispose(bool disposing)
{
if (IsDisposed)
{
MemoryPoolThrowHelper.ThrowInvalidOperationException_DoubleDispose();
}

bool allBlocksReturned = false;
try
{
lock (_syncObj)
{
IsDisposed = true;
allBlocksReturned = _blocks.Count == 0;
if (!allBlocksReturned && !_allowLateReturn)
{
MemoryPoolThrowHelper.ThrowInvalidOperationException_DisposingPoolWithActiveBlocks(_totalBlocks - _blocks.Count, _totalBlocks, _blocks.ToArray());
}

if (_blockAccessExceptions.Count > 0)
{
throw CreateAccessExceptions();
}
}
}
finally
{
if (allBlocksReturned)
{
SetAllBlocksReturned();
}

_pool.Dispose();
}
}

private void SetAllBlocksReturned()
{
if (_blockAccessExceptions.Count > 0)
{
_allBlocksReturned.SetException(CreateAccessExceptions());
}
else
{
_allBlocksReturned.SetResult();
}
}

private AggregateException CreateAccessExceptions()
{
return new AggregateException("Exceptions occurred while accessing blocks", _blockAccessExceptions.ToArray());
}

public async Task WhenAllBlocksReturnedAsync(TimeSpan timeout)
{
var task = await Task.WhenAny(_allBlocksReturned.Task, Task.Delay(timeout));
if (task != _allBlocksReturned.Task)
{
MemoryPoolThrowHelper.ThrowInvalidOperationException_BlocksWereNotReturnedInTime(_totalBlocks - _blocks.Count, _totalBlocks, _blocks.ToArray());
}

await task;
}
}
Loading
Loading