diff --git a/Common/Networking/Channels/ChannelBase.cs b/Common/Networking/Channels/ChannelBase.cs index 4dc6cc9..1bc5f70 100644 --- a/Common/Networking/Channels/ChannelBase.cs +++ b/Common/Networking/Channels/ChannelBase.cs @@ -58,6 +58,6 @@ public async Task UpdateQueueAsync() /// /// /// - public abstract Task HandlePacketAsync(NetworkPacket packet); + public abstract ValueTask HandlePacketAsync(NetworkPacket packet); } } \ No newline at end of file diff --git a/Common/Networking/Channels/ReliableChannel.cs b/Common/Networking/Channels/ReliableChannel.cs index 7e53f2e..9e33007 100644 --- a/Common/Networking/Channels/ReliableChannel.cs +++ b/Common/Networking/Channels/ReliableChannel.cs @@ -14,14 +14,14 @@ internal sealed class ReliableChannel : ChannelBase private readonly byte _id; private readonly bool _ordered; - private readonly NetworkPacket _outgoingAcks; //for send acks + private NetworkPacket _outgoingAcks; //for send acks private readonly SemaphoreSlim _outgoingAcksSem = new(1, 1); private readonly PendingReliablePacket[] _pendingPackets; //for unacked packets and duplicates private readonly SemaphoreSlim _pendingPacketsSem = new(1, 1); private readonly SemaphoreSlim _pendingPacketsSemaphore = new(1, 1); - private readonly NetworkPacket[]? _receivedPackets; //for order + private readonly NetworkPacket?[]? _receivedPackets; //for order private readonly int _windowSize; private int _localSequence; @@ -43,7 +43,7 @@ public ReliableChannel(PeerBase peer, bool ordered, byte id) : base(peer) if (_ordered) { _deliveryMethod = DeliveryMethod.ReliableOrdered; - _receivedPackets = new NetworkPacket[_windowSize]; + _receivedPackets = new NetworkPacket?[_windowSize]; _earlyReceived = Array.Empty(); } else @@ -60,7 +60,7 @@ public ReliableChannel(PeerBase peer, bool ordered, byte id) : base(peer) _outgoingAcks.ChannelId = id; } - private async Task ProcessAckAsync(NetworkPacket packet) + private async ValueTask ProcessAckAsync(NetworkPacket packet) { if (packet.Data.Count != _outgoingAcks.Data.Count) { @@ -188,7 +188,7 @@ protected override async Task FlushQueueAsync() //Process incoming packet - public override async Task HandlePacketAsync(NetworkPacket packet) + public override async ValueTask HandlePacketAsync(NetworkPacket packet) { if (packet.Property == PacketProperty.Ack) { @@ -238,7 +238,9 @@ public override async Task HandlePacketAsync(NetworkPacket packet) { //New window position var newWindowStart = (_remoteWindowStart + relate - _windowSize + 1) % NetConstants.MaxSequence; - _outgoingAcks.Sequence = (ushort)newWindowStart; + var oa = _outgoingAcks; + oa.Sequence = (ushort)newWindowStart; + _outgoingAcks = oa; //Clean old data while (_remoteWindowStart != newWindowStart) @@ -285,12 +287,12 @@ public override async Task HandlePacketAsync(NetworkPacket packet) if (_ordered) { - NetworkPacket p; + NetworkPacket? p; while ((p = _receivedPackets[_remoteSequence % _windowSize]) != null) { //process holden packet _receivedPackets[_remoteSequence % _windowSize] = null; - await Peer.AddReliablePacket(_deliveryMethod, p); + await Peer.AddReliablePacket(_deliveryMethod, p.Value); _remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence; } } @@ -329,7 +331,7 @@ private class PendingReliablePacket public override string ToString() { - return _packet == null ? "Empty" : _packet.Sequence.ToString(); + return _packet == null ? "Empty" : _packet.Value.Sequence.ToString(); } public void Init(NetworkPacket packet) @@ -354,7 +356,7 @@ public async Task TrySendAsync(long utcNowTicks, PeerBase peer) _timeStamp = utcNowTicks; _isSent = true; - await peer.SendUserData(_packet); + await peer.SendUserData(_packet.Value); return true; } @@ -362,7 +364,7 @@ public async Task ClearAsync(PeerBase peer) { if (_packet != null) { - await peer.RecycleAndDeliver(_packet); + await peer.RecycleAndDeliver(_packet.Value); _packet = null; return true; } diff --git a/Common/Networking/Channels/SequencedChannel.cs b/Common/Networking/Channels/SequencedChannel.cs index 7a66c7e..a81f365 100644 --- a/Common/Networking/Channels/SequencedChannel.cs +++ b/Common/Networking/Channels/SequencedChannel.cs @@ -7,7 +7,7 @@ namespace Promul.Common.Networking.Channels { internal sealed class SequencedChannel : ChannelBase { - private readonly NetworkPacket? _ackPacket; + private NetworkPacket? _ackPacket; private readonly byte _id; private readonly SemaphoreSlim _outgoingQueueSem = new(1, 1); @@ -24,8 +24,9 @@ public SequencedChannel(PeerBase peer, bool reliable, byte id) : base(peer) _reliable = reliable; if (_reliable) { - _ackPacket = NetworkPacket.FromProperty(PacketProperty.Ack, 0); - _ackPacket.ChannelId = id; + var ap = NetworkPacket.FromProperty(PacketProperty.Ack, 0); + ap.ChannelId = id; + _ackPacket = ap; } } @@ -41,7 +42,7 @@ protected override async Task FlushQueueAsync() if (packet != null) { _lastPacketSendTime = currentTime; - await Peer.SendUserData(packet); + await Peer.SendUserData(packet.Value); } } } @@ -71,20 +72,22 @@ protected override async Task FlushQueueAsync() if (_reliable && _mustSendAck) { _mustSendAck = false; - _ackPacket!.Sequence = _remoteSequence; - await Peer.SendUserData(_ackPacket); + var p = _ackPacket.Value; + p.Sequence = _remoteSequence; + _ackPacket = p; + await Peer.SendUserData(_ackPacket.Value); } return _lastPacket != null; } - public override async Task HandlePacketAsync(NetworkPacket packet) + public override async ValueTask HandlePacketAsync(NetworkPacket packet) { if (packet.IsFragmented) return false; if (packet.Property == PacketProperty.Ack) { - if (_reliable && _lastPacket != null && packet.Sequence == _lastPacket.Sequence) + if (_reliable && _lastPacket != null && packet.Sequence == _lastPacket.Value.Sequence) _lastPacket = null; return false; } diff --git a/Common/Networking/Core/PromulManager.Events.cs b/Common/Networking/Core/PromulManager.Events.cs index 3348c32..ff6c7cc 100644 --- a/Common/Networking/Core/PromulManager.Events.cs +++ b/Common/Networking/Core/PromulManager.Events.cs @@ -7,27 +7,27 @@ namespace Promul.Common.Networking { public partial class PromulManager { - public delegate Task ConnectionlessReceiveEvent(IPEndPoint remoteEndPoint, CompositeReader reader, + public delegate ValueTask ConnectionlessReceiveEvent(IPEndPoint remoteEndPoint, CompositeReader reader, UnconnectedMessageType messageType); - public delegate Task ConnectionRequestEvent(ConnectionRequest request); + public delegate ValueTask ConnectionRequestEvent(ConnectionRequest request); - public delegate Task DeliveryEvent(PeerBase peer, object? userData); + public delegate ValueTask DeliveryEvent(PeerBase peer, object? userData); - public delegate Task NetworkErrorEvent(IPEndPoint endPoint, SocketError socketError); + public delegate ValueTask NetworkErrorEvent(IPEndPoint endPoint, SocketError socketError); - public delegate Task NetworkLatencyUpdateEvent(PeerBase peer, int latency); + public delegate ValueTask NetworkLatencyUpdateEvent(PeerBase peer, int latency); - public delegate Task NetworkReceiveEvent(PeerBase peer, CompositeReader reader, byte channel, + public delegate ValueTask NetworkReceiveEvent(PeerBase peer, CompositeReader reader, byte channel, DeliveryMethod deliveryMethod); - public delegate Task NtpResponseEvent(NtpPacket packet); + public delegate ValueTask NtpResponseEvent(NtpPacket packet); - public delegate Task PeerAddressChangedEvent(PeerBase peer, IPEndPoint previousAddress); + public delegate ValueTask PeerAddressChangedEvent(PeerBase peer, IPEndPoint previousAddress); - public delegate Task PeerConnectedEvent(PeerBase peer); + public delegate ValueTask PeerConnectedEvent(PeerBase peer); - public delegate Task PeerDisconnectedEvent(PeerBase peer, DisconnectInfo disconnectInfo); + public delegate ValueTask PeerDisconnectedEvent(PeerBase peer, DisconnectInfo disconnectInfo); /// /// Invoked when a connection is made with a remote peer. diff --git a/Common/Networking/Core/PromulManager.Socket.cs b/Common/Networking/Core/PromulManager.Socket.cs index ec9acf9..7756b61 100644 --- a/Common/Networking/Core/PromulManager.Socket.cs +++ b/Common/Networking/Core/PromulManager.Socket.cs @@ -79,7 +79,7 @@ private async Task ProcessError(SocketException ex) return false; } - private async Task ReceiveInternalAsync(Socket s, EndPoint bufferEndPoint, CancellationToken ct = default) + private async ValueTask ReceiveInternalAsync(Socket s, EndPoint bufferEndPoint, CancellationToken ct = default) { var receiveBuffer = new byte[NetConstants.MaxPacketSize]; var receive = await s.ReceiveFromAsync(receiveBuffer, SocketFlags.None, bufferEndPoint); @@ -106,8 +106,8 @@ public async Task ListenAsync(CancellationToken cancellationToken = default) await ReceiveInternalAsync(_udpSocketv4, bufferEndPoint4, cancellationToken); else await Task.WhenAll( - ReceiveInternalAsync(_udpSocketv4, bufferEndPoint4, cancellationToken), - ReceiveInternalAsync(_udpSocketv6, bufferEndPoint6, cancellationToken) + ReceiveInternalAsync(_udpSocketv4, bufferEndPoint4, cancellationToken).AsTask(), + ReceiveInternalAsync(_udpSocketv6, bufferEndPoint6, cancellationToken).AsTask() ); } catch (SocketException ex) @@ -294,7 +294,7 @@ private bool BindInternal(Socket socket, IPEndPoint ep) /// The remote endpoint to send to. /// The cancellation token used to cancel the send operation. /// The number of bytes sent. - internal async Task RawSendAsync(ArraySegment data, IPEndPoint remoteEndPoint, + internal async ValueTask RawSendAsync(ArraySegment data, IPEndPoint remoteEndPoint, CancellationToken ct = default) { var socket = _udpSocketv4; @@ -359,7 +359,7 @@ await ForceDisconnectPeerAsync( return result; } - public async Task SendBroadcast(ArraySegment data, int port) + public async ValueTask SendBroadcast(ArraySegment data, int port) { NetworkPacket packet; if (_extraPacketLayer != null) diff --git a/Common/Networking/Core/PromulManager.cs b/Common/Networking/Core/PromulManager.cs index 8ccf161..f6ee3c6 100644 --- a/Common/Networking/Core/PromulManager.cs +++ b/Common/Networking/Core/PromulManager.cs @@ -426,10 +426,10 @@ private async Task ProcessConnectRequest( if (OnConnectionRequest != null) await OnConnectionRequest(req); } - internal async Task CreateReceiveEvent(NetworkPacket packet, DeliveryMethod method, byte channelNumber, + internal ValueTask CreateReceiveEvent(NetworkPacket packet, DeliveryMethod method, byte channelNumber, int headerSize, PeerBase fromPeer) { - if (OnReceive != null) await OnReceive(fromPeer, packet.CreateReader(headerSize), channelNumber, method); + return OnReceive?.Invoke(fromPeer, packet.CreateReader(headerSize), channelNumber, method) ?? default; } /// @@ -655,7 +655,7 @@ private struct IncomingData private const int MinLatencyThreshold = 5; #endif - private async Task OnMessageReceived(NetworkPacket packet, IPEndPoint remoteEndPoint) + private async ValueTask OnMessageReceived(NetworkPacket packet, IPEndPoint remoteEndPoint) { #if DEBUG if (SimulatePacketLoss && _randomGenerator.NextDouble() * 100 < SimulatePacketLossChance) @@ -680,7 +680,7 @@ private async Task OnMessageReceived(NetworkPacket packet, IPEndPoint remoteEndP await DebugMessageReceived(packet, remoteEndPoint); } - private async Task DebugMessageReceived(NetworkPacket packet, IPEndPoint remoteEndPoint) + private async ValueTask DebugMessageReceived(NetworkPacket packet, IPEndPoint remoteEndPoint) { #endif var originalPacketSize = packet.Data.Count; @@ -731,7 +731,6 @@ private async Task DebugMessageReceived(NetworkPacket packet, IPEndPoint remoteE if (!packet.Verify()) { NetDebug.WriteError("[NM] DataReceived: bad!"); - //PoolRecycle(packet); return; } diff --git a/Common/Networking/Packets/NetworkPacket.cs b/Common/Networking/Packets/NetworkPacket.cs index 0acaa66..62f4bf2 100644 --- a/Common/Networking/Packets/NetworkPacket.cs +++ b/Common/Networking/Packets/NetworkPacket.cs @@ -25,7 +25,7 @@ public enum PacketProperty : byte InvalidProtocol } - public class NetworkPacket + public readonly struct NetworkPacket { private static readonly int PropertiesCount = Enum.GetValues(typeof(PacketProperty)).Length; private static readonly int[] HeaderSizes; diff --git a/Common/Networking/Peers/OutgoingPeer.cs b/Common/Networking/Peers/OutgoingPeer.cs index 6b097bd..7c630db 100644 --- a/Common/Networking/Peers/OutgoingPeer.cs +++ b/Common/Networking/Peers/OutgoingPeer.cs @@ -11,11 +11,11 @@ public class OutgoingPeer : PeerBase { private readonly NetworkPacket _connectRequestPacket; - public OutgoingPeer(PromulManager manager, IPEndPoint remote, int id, long connectTime, byte connectionNumber, + public OutgoingPeer(PromulManager manager, IPEndPoint remote, int id, byte connectionNumber, ArraySegment data) - : base(manager, remote, id, connectTime, connectionNumber) + : base(manager, remote, id, DateTime.UtcNow.Ticks, connectionNumber) { - var packet = NetConnectRequestPacket.Make(data, remote.Serialize(), connectTime, id); + var packet = NetConnectRequestPacket.Make(data, remote.Serialize(), ConnectTime, id); packet.ConnectionNumber = connectionNumber; ConnectionState = ConnectionState.Outgoing; _connectRequestPacket = packet; diff --git a/Common/Networking/Peers/PeerBase.cs b/Common/Networking/Peers/PeerBase.cs index 9877d41..bd4a7ed 100644 --- a/Common/Networking/Peers/PeerBase.cs +++ b/Common/Networking/Peers/PeerBase.cs @@ -30,12 +30,12 @@ public partial class PeerBase private readonly Dictionary _holdedFragments; // Merging - private readonly NetworkPacket _mergeData; + private NetworkPacket _mergeData; private readonly SemaphoreSlim _mtuMutex = new(1, 1); - private readonly NetworkPacket _pingPacket = NetworkPacket.FromProperty(PacketProperty.Pong, 0); + private NetworkPacket _pingPacket = NetworkPacket.FromProperty(PacketProperty.Pong, 0); private readonly Stopwatch _pingTimer = new(); - private readonly NetworkPacket _pongPacket = NetworkPacket.FromProperty(PacketProperty.Ping, 0); + private NetworkPacket _pongPacket = NetworkPacket.FromProperty(PacketProperty.Ping, 0); private readonly SemaphoreSlim _shutdownSemaphore // Locks for shutdown operations = new(1, 1); @@ -258,10 +258,7 @@ private ChannelBase CreateChannel(byte idx) internal static async Task ConnectToAsync(PromulManager manager, IPEndPoint remote, int id, byte connectionNumber, ArraySegment data) { - var time = DateTime.UtcNow.Ticks; - var packet = NetConnectRequestPacket.Make(data, remote.Serialize(), time, id); - packet.ConnectionNumber = connectionNumber; - var peer = new OutgoingPeer(manager, remote, id, time, connectionNumber, data); + var peer = new OutgoingPeer(manager, remote, id, connectionNumber, data); await peer.SendConnectionRequestAsync(); return peer; @@ -448,16 +445,17 @@ internal async Task ShutdownAsync(ArraySegment data, bool Interlocked.Exchange(ref _timeSinceLastPacket, 0); - _shutdownPacket = NetworkPacket.FromProperty(PacketProperty.Disconnect, data.Count); - _shutdownPacket.ConnectionNumber = _connectNumber; - FastBitConverter.GetBytes(_shutdownPacket.Data.Array, _shutdownPacket.Data.Offset + 1, ConnectTime); - if (_shutdownPacket.Data.Count >= MaximumTransferUnit) + var sp = NetworkPacket.FromProperty(PacketProperty.Disconnect, data.Count); + sp.ConnectionNumber = _connectNumber; + _shutdownPacket = sp; + FastBitConverter.GetBytes(_shutdownPacket.Value.Data.Array, _shutdownPacket.Value.Data.Offset + 1, ConnectTime); + if (_shutdownPacket.Value.Data.Count >= MaximumTransferUnit) //Drop additional data NetDebug.WriteError("[Peer] Disconnect additional data size more than MTU - 8!"); else if (data != null && data.Count > 0) - data.CopyTo(_shutdownPacket.Data.Array, _shutdownPacket.Data.Offset + 9); + data.CopyTo(_shutdownPacket.Value.Data.Array, _shutdownPacket.Value.Data.Offset + 9); ConnectionState = ConnectionState.ShutdownRequested; - await PromulManager.RawSendAsync(_shutdownPacket, EndPoint); + await PromulManager.RawSendAsync(_shutdownPacket.Value, EndPoint); return result; } finally @@ -474,7 +472,7 @@ private void UpdateRoundTripTime(int roundTripTime) ResendDelay = 25.0 + RoundTripTime * 2.1; // 25 ms + double rtt } - internal async Task AddReliablePacket(DeliveryMethod method, NetworkPacket p) + internal async ValueTask AddReliablePacket(DeliveryMethod method, NetworkPacket p) { if (p.IsFragmented) { @@ -485,7 +483,7 @@ internal async Task AddReliablePacket(DeliveryMethod method, NetworkPacket p) { incomingFragments = new IncomingFragments { - Fragments = new NetworkPacket[p.FragmentsTotal], + Fragments = new NetworkPacket?[p.FragmentsTotal], ChannelId = p.ChannelId }; _holdedFragments.Add(packetFragId, incomingFragments); @@ -526,34 +524,37 @@ internal async Task AddReliablePacket(DeliveryMethod method, NetworkPacket p) for (var i = 0; i < incomingFragments.ReceivedCount; i++) { var fragment = fragments[i]; - var writtenSize = fragment.Data.Count - NetConstants.FragmentedHeaderTotalSize; - - if (pos + writtenSize > resultingPacket.Data.Count) + if (fragment.HasValue) { - _holdedFragments.Remove(packetFragId); - NetDebug.WriteError( - $"Fragment error pos: {pos + writtenSize} >= resultPacketSize: {resultingPacket.Data.Count} , totalSize: {incomingFragments.TotalSize}"); - return; - } + var writtenSize = fragment.Value.Data.Count - NetConstants.FragmentedHeaderTotalSize; - if (fragment.Data.Count > fragment.Data.Count) - { - _holdedFragments.Remove(packetFragId); - NetDebug.WriteError( - $"Fragment error size: {fragment.Data.Count} > fragment.RawData.Length: {fragment.Data.Count}"); - return; - } + if (pos + writtenSize > resultingPacket.Data.Count) + { + _holdedFragments.Remove(packetFragId); + NetDebug.WriteError( + $"Fragment error pos: {pos + writtenSize} >= resultPacketSize: {resultingPacket.Data.Count} , totalSize: {incomingFragments.TotalSize}"); + return; + } - //Create resulting big packet - Buffer.BlockCopy( - fragment.Data.Array, - fragment.Data.Offset + NetConstants.FragmentedHeaderTotalSize, - resultingPacket.Data.Array, - resultingPacket.Data.Offset + pos, - writtenSize); - pos += writtenSize; + if (fragment.Value.Data.Count > fragment.Value.Data.Count) + { + _holdedFragments.Remove(packetFragId); + NetDebug.WriteError( + $"Fragment error size: {fragment.Value.Data.Count} > fragment.RawData.Length: {fragment.Value.Data.Count}"); + return; + } + + //Create resulting big packet + Buffer.BlockCopy( + fragment.Value.Data.Array, + fragment.Value.Data.Offset + NetConstants.FragmentedHeaderTotalSize, + resultingPacket.Data.Array, + resultingPacket.Data.Offset + pos, + writtenSize); + pos += writtenSize; - fragments[i] = null; + fragments[i] = null; + } } //Clear memory @@ -573,7 +574,7 @@ await PromulManager.CreateReceiveEvent(p, method, (byte)(p.ChannelId / NetConsta } //Process incoming packet - internal async Task ProcessPacket(NetworkPacket packet) + internal async ValueTask ProcessPacket(NetworkPacket packet) { if (ConnectionState == ConnectionState.Outgoing || ConnectionState == ConnectionState.Disconnected) return; if (packet.Property == PacketProperty.ShutdownOk) @@ -746,7 +747,7 @@ internal async Task Update(long deltaTime) if (_shutdownTimer >= ShutdownDelay) { _shutdownTimer = 0; - await PromulManager.RawSendAsync(_shutdownPacket, EndPoint); + await PromulManager.RawSendAsync(_shutdownPacket.Value, EndPoint); } } @@ -861,7 +862,7 @@ public void LogDebug(string text) // Fragments private struct IncomingFragments { - public NetworkPacket[] Fragments; + public NetworkPacket?[] Fragments; public int ReceivedCount; public int TotalSize; public byte ChannelId; diff --git a/Server~/Relay/RelayServer.cs b/Server~/Relay/RelayServer.cs index e318a68..03efbde 100644 --- a/Server~/Relay/RelayServer.cs +++ b/Server~/Relay/RelayServer.cs @@ -52,7 +52,7 @@ public async Task DestroySession(RelaySession session) _sessionsByCode.Remove(session.JoinCode); } - public async Task OnNetworkReceive(PeerBase peer, CompositeReader reader, byte channelNumber, + public async ValueTask OnNetworkReceive(PeerBase peer, CompositeReader reader, byte channelNumber, DeliveryMethod deliveryMethod) { var packet = reader.ReadRelayControlMessage(); @@ -68,7 +68,7 @@ public async Task OnNetworkReceive(PeerBase peer, CompositeReader reader, byte c await session.OnReceive(peer, packet, deliveryMethod); } - public async Task OnConnectionRequest(ConnectionRequest request) + public async ValueTask OnConnectionRequest(ConnectionRequest request) { var joinCode = request.Data.ReadString(); @@ -86,12 +86,12 @@ public async Task OnConnectionRequest(ConnectionRequest request) _sessionsByPeer[peer.Id] = keyedSession; } - public async Task OnPeerConnected(PeerBase peer) + public async ValueTask OnPeerConnected(PeerBase peer) { _logger.LogInformation($"Connected to {peer.EndPoint}"); } - public async Task OnPeerDisconnected(PeerBase peer, DisconnectInfo disconnectInfo) + public async ValueTask OnPeerDisconnected(PeerBase peer, DisconnectInfo disconnectInfo) { _logger.LogInformation( $"Peer {peer.Id} disconnected: {disconnectInfo.Reason} {disconnectInfo.SocketErrorCode}");