Skip to content

Commit

Permalink
feat: adding batching to unreliable messages
Browse files Browse the repository at this point in the history
  • Loading branch information
James-Frowen committed Apr 25, 2024
1 parent 4b77799 commit 186cd65
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 20 deletions.
4 changes: 2 additions & 2 deletions Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ private void UpdateConnected()
internal abstract void ReceiveNotifyAck(Packet packet);
internal abstract void ReceiveReliableFragment(Packet packet);

protected void HandleReliableBatched(byte[] array, int offset, int packetLength)
protected void HandleReliableBatched(byte[] array, int offset, int packetLength, PacketType packetType)
{
while (offset < packetLength)
{
var length = ByteUtils.ReadUShort(array, ref offset);
var message = new ArraySegment<byte>(array, offset, length);
offset += length;

_metrics?.OnReceiveMessageReliable(length);
_metrics?.OnReceiveMessage(packetType, length);
_dataHandler.ReceiveMessage(this, message);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public override void SendReliable(byte[] message, int offset, int length)

internal override void ReceiveReliablePacket(Packet packet)
{
HandleReliableBatched(packet.Buffer.array, 1, packet.Length);
HandleReliableBatched(packet.Buffer.array, 1, packet.Length, PacketType.Reliable);
}

internal override void ReceiveUnreliablePacket(Packet packet) => throw new NotSupportedException();
Expand Down
28 changes: 11 additions & 17 deletions Assets/Mirage/Runtime/SocketLayer/Connection/ReliableConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ namespace Mirage.SocketLayer
internal sealed class ReliableConnection : Connection, IRawConnection, IDisposable
{
private readonly AckSystem _ackSystem;
private readonly Batch _unreliableBatch;
private readonly Pool<ByteBuffer> _bufferPool;

internal ReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, Pool<ByteBuffer> bufferPool, ILogger logger, Metrics metrics)
: base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics)
{

_bufferPool = bufferPool;
_unreliableBatch = new ArrayBatch(_maxPacketSize, SendBatchInternal, PacketType.Unreliable);
_ackSystem = new AckSystem(this, config, maxPacketSize, time, bufferPool, logger, metrics);
}

private void SendBatchInternal(byte[] batch, int length)
{
_peer.Send(this, batch, length);
}

public void Dispose()
{
_ackSystem.Dispose();
Expand Down Expand Up @@ -70,26 +76,13 @@ public override void SendUnreliable(byte[] packet, int offset, int length)
throw new ArgumentException($"Message is bigger than MTU, size:{length} but max Unreliable message size is {_maxPacketSize - 1}");
}

using (var buffer = _bufferPool.Take())
{
Buffer.BlockCopy(packet, offset, buffer.array, 1, length);
// set header
buffer.array[0] = (byte)PacketType.Unreliable;

_peer.Send(this, buffer.array, length + 1);
}

_unreliableBatch.AddMessage(packet, offset, length);
_metrics?.OnSendMessageUnreliable(length);
}

internal override void ReceiveUnreliablePacket(Packet packet)
{
var count = packet.Length - 1;
var segment = new ArraySegment<byte>(packet.Buffer.array, 1, count);
_dataHandler.ReceiveMessage(this, segment);


_metrics?.OnReceiveMessageUnreliable(count);
HandleReliableBatched(packet.Buffer.array, 1, packet.Length, PacketType.Unreliable);
}

internal override void ReceiveReliablePacket(Packet packet)
Expand Down Expand Up @@ -169,7 +162,7 @@ private void HandleBatchedMessageInPacket(AckSystem.ReliableReceived received)
var array = received.Buffer.array;
var packetLength = received.Length;
var offset = 0;
HandleReliableBatched(array, offset, packetLength);
HandleReliableBatched(array, offset, packetLength, PacketType.Reliable);

// release buffer after all its message have been handled
received.Buffer.Release();
Expand All @@ -193,6 +186,7 @@ internal override void ReceiveNotifyAck(Packet packet)
public override void FlushBatch()
{
_ackSystem.Update();
_unreliableBatch.Flush();
}

internal override bool IsValidSize(Packet packet)
Expand Down
16 changes: 16 additions & 0 deletions Assets/Mirage/Runtime/SocketLayer/Metrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ public void OnReceiveMessageNotify(int length)
buffer[tick].receiveMessagesNotifyBytes += length;
}

public void OnReceiveMessage(PacketType packetType, int length)
{
switch (packetType)
{
case PacketType.Reliable:
OnReceiveMessageReliable(length);
break;
case PacketType.Unreliable:
OnReceiveMessageUnreliable(length);
break;
case PacketType.Notify:
OnReceiveMessageNotify(length);
break;
}
}

public struct Frame
{
/// <summary>
Expand Down

0 comments on commit 186cd65

Please sign in to comment.