Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert sync over async network handling to async #835

Merged
merged 70 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
1b3dca6
test garnet benchmark
msft-paddy14 Oct 8, 2024
bb6f943
Merge branch 'main' of github.com:microsoft/garnet
msft-paddy14 Oct 29, 2024
2929295
Merge branch 'main' of github.com:microsoft/garnet
msft-paddy14 Nov 25, 2024
43c5c13
Revert "test garnet benchmark"
msft-paddy14 Nov 25, 2024
ab602de
add async socketprocessing
msft-paddy14 Nov 26, 2024
16e66f6
resolve conflict
msft-paddy14 Nov 26, 2024
2b7dd00
fix async socket processing
msft-paddy14 Nov 26, 2024
3de873f
remove bad files
msft-paddy14 Nov 27, 2024
771cba6
fix formatting
msft-paddy14 Nov 28, 2024
efbcabe
fix exception handling
msft-paddy14 Nov 28, 2024
b1f689c
Merge branch 'main' into users/padgupta/add_async_socket_processing
vazois Dec 3, 2024
b7fce4b
Merge branch 'main' into users/padgupta/add_async_socket_processing
vazois Dec 3, 2024
d390c39
1. convert Task->ValueTask for netowkr processing
msft-paddy14 Dec 5, 2024
95f8ae3
add condition await
msft-paddy14 Dec 6, 2024
3685fcc
Merge branch 'main' into users/padgupta/add_async_socket_processing
vazois Dec 9, 2024
0f61eee
Merge branch 'main' into users/padgupta/add_async_socket_processing
vazois Dec 9, 2024
1f3efb3
add bdn test for networking
msft-paddy14 Dec 16, 2024
5de4248
add async in test
msft-paddy14 Dec 16, 2024
2a30dd8
Merge branch 'users/padgupta/add_async_socket_processing' of github.c…
msft-paddy14 Dec 16, 2024
28cf2ac
remove visibility changes for testing
msft-paddy14 Dec 16, 2024
5dfa63f
add bdn test for networking
msft-paddy14 Dec 16, 2024
6708f98
add async in test
msft-paddy14 Dec 16, 2024
5982be1
fix tests for generic path
msft-paddy14 Dec 16, 2024
0ea355f
remove visibility changes for testing
msft-paddy14 Dec 16, 2024
3fc28b0
fix tests for generic path
msft-paddy14 Dec 16, 2024
6d21e2e
fix formatting
msft-paddy14 Dec 17, 2024
eb40b2f
fix formatting
msft-paddy14 Dec 17, 2024
5420f57
rewrite network test
msft-paddy14 Dec 17, 2024
d83009d
rewrite network test
msft-paddy14 Dec 17, 2024
f20e20a
set max/min threads
msft-paddy14 Dec 17, 2024
841407f
set max/min threads
msft-paddy14 Dec 17, 2024
7cc2d22
fix formatting
msft-paddy14 Dec 17, 2024
467614c
fix formatting
msft-paddy14 Dec 17, 2024
8f5625a
fix naming violations
msft-paddy14 Dec 17, 2024
3c34c7b
remove dead code
msft-paddy14 Dec 17, 2024
5707567
formatting
msft-paddy14 Dec 17, 2024
b2063a3
Merge branch 'main' into users/padgupta/add_async_socket_processing
vazois Dec 17, 2024
39c5ebe
replace with do while
msft-paddy14 Dec 17, 2024
4f5d3ca
remove networking test in BDN
msft-paddy14 Dec 18, 2024
6cae8b7
address formatting comments
msft-paddy14 Dec 19, 2024
1459249
remove bad include
msft-paddy14 Dec 19, 2024
848fcd6
remove more dead code
msft-paddy14 Dec 19, 2024
70e067c
Merge branch 'main' into users/padgupta/add_async_socket_processing
msft-paddy14 Dec 19, 2024
355a705
merge main
msft-paddy14 Dec 19, 2024
3315f67
use new NetworkRecive method for benchmark
msft-paddy14 Dec 19, 2024
a2b8cc5
Merge branch 'main' into users/padgupta/add_async_socket_processing
vazois Dec 19, 2024
f8f7c14
Adjust for async mdoe
msft-paddy14 Dec 19, 2024
cad1a4e
Change method names for clairty
msft-paddy14 Dec 19, 2024
d8650b9
update slowconsume to suync
msft-paddy14 Dec 20, 2024
5c74474
fix formatting
msft-paddy14 Dec 20, 2024
ff53cd4
Merge branch 'users/padgupta/add_async_socket_processing' of https://…
msft-paddy14 Dec 20, 2024
6d6dc00
remove tls cert copy
msft-paddy14 Dec 20, 2024
9973f9e
fix formatting
msft-paddy14 Dec 20, 2024
83b695f
remove whitespace
msft-paddy14 Dec 20, 2024
6311d97
Merge branch 'main' into users/padgupta/add_async_socket_processing
badrishc Dec 20, 2024
1162747
revert names of tests
badrishc Dec 20, 2024
073488f
resolve metge conflicts
msft-paddy14 Dec 21, 2024
5a3700d
bump version and bdn fixes
msft-paddy14 Dec 21, 2024
da8599c
formatting fixes
msft-paddy14 Dec 21, 2024
7b2d699
Merge branch 'users/padgupta/add_async_socket_processing' of https://…
msft-paddy14 Dec 21, 2024
21f7a86
revert unsafe class changes
msft-paddy14 Dec 21, 2024
e1d2f78
fix comments
badrishc Dec 21, 2024
5b2accf
Merge branch 'main' into users/padgupta/add_async_socket_processing
badrishc Dec 22, 2024
2488fc8
Merge branch 'main' of https://github.com/microsoft/garnet into users…
msft-paddy14 Dec 23, 2024
d09be21
Merge branch 'users/padgupta/add_async_socket_processing' of https://…
msft-paddy14 Dec 23, 2024
363bfb9
segregate TLS and non TLS paths
msft-paddy14 Dec 23, 2024
81503e8
separate based on TLS and no-TLS
badrishc Dec 28, 2024
816bb59
nit
badrishc Dec 28, 2024
ca7fedd
fix bdn
badrishc Dec 28, 2024
16ea215
Merge branch 'main' into users/padgupta/add_async_socket_processing
badrishc Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Version.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<!-- Versioning property for builds and packages -->
<PropertyGroup>
<VersionPrefix>1.0.48</VersionPrefix>
<VersionPrefix>1.0.49</VersionPrefix>
</PropertyGroup>
</Project>
14 changes: 10 additions & 4 deletions benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Microsoft.Extensions.Logging;
Expand All @@ -11,8 +12,11 @@ namespace Embedded.server
{
internal class EmbeddedNetworkHandler : NetworkHandler<GarnetServerEmbedded, EmbeddedNetworkSender>
{
readonly bool useTLS;

public EmbeddedNetworkHandler(GarnetServerEmbedded serverHook, EmbeddedNetworkSender networkSender, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer = null, ILogger logger = null) : base(serverHook, networkSender, networkBufferSettings, networkPool, useTLS, messageConsumer, logger)
{
this.useTLS = useTLS;
}

public override string RemoteEndpointName => throw new NotImplementedException();
Expand All @@ -24,14 +28,16 @@ public override void Dispose()

public override bool TryClose() => throw new NotImplementedException();

public unsafe void Send(Request request)
public async ValueTask Send(Request request)
{
networkReceiveBuffer = request.buffer;
networkReceiveBufferPtr = request.bufferPtr;
unsafe { networkReceiveBufferPtr = request.bufferPtr; }

OnNetworkReceive(request.buffer.Length);
if (useTLS)
await OnNetworkReceiveWithTLSAsync(request.buffer.Length);
else
OnNetworkReceiveWithoutTLS(request.buffer.Length);

// We should have consumed the entire buffer
Debug.Assert(networkBytesRead == 0);
Debug.Assert(networkReadHead == 0);
}
badrishc marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
6 changes: 3 additions & 3 deletions benchmark/BDN.benchmark/Network/BasicOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace BDN.benchmark.Network
/// Benchmark for BasicOperations
/// </summary>
[MemoryDiagnoser]
public unsafe class BasicOperations : NetworkBase
public class BasicOperations : NetworkBase
{
static ReadOnlySpan<byte> INLINE_PING => "PING\r\n"u8;
Request ping;
Expand All @@ -22,9 +22,9 @@ public override void GlobalSetup()
}

[Benchmark]
public void InlinePing()
public async ValueTask InlinePing()
{
Send(ping);
await Send(ping);
}
}
}
5 changes: 1 addition & 4 deletions benchmark/BDN.benchmark/Network/NetworkBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ public virtual void GlobalCleanup()
server.Dispose();
}

protected void Send(Request request)
{
networkHandler.Send(request);
}
protected ValueTask Send(Request request) => networkHandler.Send(request);
badrishc marked this conversation as resolved.
Show resolved Hide resolved

protected unsafe void SetupOperation(ref Request request, ReadOnlySpan<byte> operation, int batchSize = batchSize)
{
Expand Down
42 changes: 21 additions & 21 deletions benchmark/BDN.benchmark/Network/RawStringOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace BDN.benchmark.Network
/// Benchmark for RawStringOperations
/// </summary>
[MemoryDiagnoser]
public unsafe class RawStringOperations : NetworkBase
public class RawStringOperations : NetworkBase
{
static ReadOnlySpan<byte> SET => "*3\r\n$3\r\nSET\r\n$1\r\na\r\n$1\r\na\r\n"u8;
Request set;
Expand Down Expand Up @@ -65,63 +65,63 @@ public override void GlobalSetup()
}

[Benchmark]
public void Set()
public async ValueTask Set()
{
Send(set);
await Send(set);
}

[Benchmark]
public void SetEx()
public async ValueTask SetEx()
{
Send(setex);
await Send(setex);
}

[Benchmark]
public void SetNx()
public async ValueTask SetNx()
{
Send(setnx);
await Send(setnx);
}

[Benchmark]
public void SetXx()
public async ValueTask SetXx()
{
Send(setxx);
await Send(setxx);
}

[Benchmark]
public void GetFound()
public async ValueTask GetFound()
{
Send(getf);
await Send(getf);
}

[Benchmark]
public void GetNotFound()
public async ValueTask GetNotFound()
{
Send(getnf);
await Send(getnf);
}

[Benchmark]
public void Increment()
public async ValueTask Increment()
{
Send(incr);
await Send(incr);
}

[Benchmark]
public void Decrement()
public async ValueTask Decrement()
{
Send(decr);
await Send(decr);
}

[Benchmark]
public void IncrementBy()
public async ValueTask IncrementBy()
{
Send(incrby);
await Send(incrby);
}

[Benchmark]
public void DecrementBy()
public async ValueTask DecrementBy()
{
Send(decrby);
await Send(decrby);
}
}
}
47 changes: 25 additions & 22 deletions libs/common/Networking/NetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,39 +267,40 @@ async Task AuthenticateAsClientAsync(SslClientAuthenticationOptions sslClientOpt
}
}

public unsafe void OnNetworkReceiveWithoutTLS(int bytesTransferred)
{
networkBytesRead += bytesTransferred;
transportReceiveBuffer = networkReceiveBuffer;
transportReceiveBufferPtr = networkReceiveBufferPtr;
transportBytesRead = networkBytesRead;

// Process non-TLS code on the synchronous thread
Process();

EndTransformNetworkToTransport();
UpdateNetworkBuffers();
}

/// <summary>
/// On network receive
/// </summary>
/// <param name="bytesTransferred">Number of bytes transferred</param>
public unsafe void OnNetworkReceive(int bytesTransferred)
public async ValueTask OnNetworkReceiveWithTLSAsync(int bytesTransferred)
{
// Wait for SslStream async processing to complete, if any (e.g., authentication phase)
while (readerStatus == TlsReaderStatus.Active)
expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false).GetAwaiter().GetResult();
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);

// Increment network bytes read
networkBytesRead += bytesTransferred;

switch (readerStatus)
{
case TlsReaderStatus.Rest:
// Synchronously try to process the received data
if (sslStream == null)
{
transportReceiveBuffer = networkReceiveBuffer;
transportReceiveBufferPtr = networkReceiveBufferPtr;
transportBytesRead = networkBytesRead;

// We do not have an active read task, so we will process on the network thread
Process();
}
else
{
readerStatus = TlsReaderStatus.Active;
Read();
while (readerStatus == TlsReaderStatus.Active)
expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false).GetAwaiter().GetResult();
}
readerStatus = TlsReaderStatus.Active;
Read();
while (readerStatus == TlsReaderStatus.Active)
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
break;
case TlsReaderStatus.Waiting:
// We have a ReadAsync task waiting for new data, set it to active status
Expand All @@ -309,17 +310,19 @@ public unsafe void OnNetworkReceive(int bytesTransferred)
_ = receivedData.Release();

while (readerStatus == TlsReaderStatus.Active)
expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false).GetAwaiter().GetResult();
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
break;
default:
ThrowInvalidOperationException($"Unexpected reader status {readerStatus}");
break;
}

Debug.Assert(readerStatus != TlsReaderStatus.Active);
UpdateNetworkBuffers();
}

EndTransformNetworkToTransport();

void UpdateNetworkBuffers()
{
// Shift network buffer after processing is done
if (networkReadHead > 0)
ShiftNetworkReceiveBuffer();
Expand Down
71 changes: 56 additions & 15 deletions libs/common/Networking/TcpNetworkHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public TcpNetworkHandlerBase(TServerHook serverHook, TNetworkSender networkSende

remoteEndpoint = socket.RemoteEndPoint is IPEndPoint remote ? $"{remote.Address}:{remote.Port}" : "";
localEndpoint = socket.LocalEndPoint is IPEndPoint local ? $"{local.Address}:{local.Port}" : "";

AllocateNetworkReceiveBuffer();
}

Expand All @@ -52,28 +51,28 @@ public TcpNetworkHandlerBase(TServerHook serverHook, TNetworkSender networkSende
/// <inheritdoc />
public override void Start(SslServerAuthenticationOptions tlsOptions = null, string remoteEndpointName = null, CancellationToken token = default)
{
Start();
Start(tlsOptions != null);
base.Start(tlsOptions, remoteEndpointName, token);
}

/// <inheritdoc />
public override async Task StartAsync(SslServerAuthenticationOptions tlsOptions = null, string remoteEndpointName = null, CancellationToken token = default)
{
Start();
Start(tlsOptions != null);
await base.StartAsync(tlsOptions, remoteEndpointName, token).ConfigureAwait(false);
}

/// <inheritdoc />
public override void Start(SslClientAuthenticationOptions tlsOptions, string remoteEndpointName = null, CancellationToken token = default)
{
Start();
Start(tlsOptions != null);
base.Start(tlsOptions, remoteEndpointName, token);
}

/// <inheritdoc />
public override async Task StartAsync(SslClientAuthenticationOptions tlsOptions, string remoteEndpointName = null, CancellationToken token = default)
{
Start();
Start(tlsOptions != null);
await base.StartAsync(tlsOptions, remoteEndpointName, token).ConfigureAwait(false);
}

Expand Down Expand Up @@ -102,17 +101,22 @@ public override bool TryClose()
return true;
}

void Start()
void Start(bool useTLS)
{
var receiveEventArgs = new SocketAsyncEventArgs { AcceptSocket = socket };
receiveEventArgs.SetBuffer(networkReceiveBuffer, 0, networkReceiveBuffer.Length);
receiveEventArgs.Completed += RecvEventArg_Completed;
receiveEventArgs.Completed += useTLS ? RecvEventArgCompletedWithTLS : RecvEventArgCompletedWithoutTLS;

// If the client already have packets, avoid handling it here on the handler so we don't block future accepts.
try
{
if (!socket.ReceiveAsync(receiveEventArgs))
Task.Run(() => RecvEventArg_Completed(null, receiveEventArgs));
{
if (useTLS)
Task.Run(() => RecvEventArgCompletedWithTLS(null, receiveEventArgs));
else
Task.Run(() => RecvEventArgCompletedWithoutTLS(null, receiveEventArgs));
}
}
catch (Exception ex)
{
Expand All @@ -134,7 +138,35 @@ void Dispose(SocketAsyncEventArgs e)
e.Dispose();
}

void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
void RecvEventArgCompletedWithTLS(object sender, SocketAsyncEventArgs e) =>
_ = HandleReceiveWithTLSAsync(sender, e);

void RecvEventArgCompletedWithoutTLS(object sender, SocketAsyncEventArgs e) =>
HandleReceiveWithoutTLS(sender, e);

private void HandleReceiveWithoutTLS(object sender, SocketAsyncEventArgs e)
{
try
{
do
{
if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || serverHook.Disposed)
{
// No more things to receive
Dispose(e);
break;
}
OnNetworkReceiveWithoutTLS(e.BytesTransferred);
e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead);
} while (!e.AcceptSocket.ReceiveAsync(e));
}
catch (Exception ex)
{
HandleReceiveFailure(ex, e);
}
}

private async ValueTask HandleReceiveWithTLSAsync(object sender, SocketAsyncEventArgs e)
{
try
{
Expand All @@ -146,20 +178,29 @@ void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
Dispose(e);
break;
}
OnNetworkReceive(e.BytesTransferred);
var receiveTask = OnNetworkReceiveWithTLSAsync(e.BytesTransferred);
if (!receiveTask.IsCompletedSuccessfully)
{
await receiveTask;
}
e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead);
} while (!e.AcceptSocket.ReceiveAsync(e));
}
catch (Exception ex)
{
if (ex is ObjectDisposedException ex2 && ex2.ObjectName == "System.Net.Sockets.Socket")
logger?.LogTrace("Accept socket was disposed at RecvEventArg_Completed");
else
logger?.LogError(ex, "An error occurred at RecvEventArg_Completed");
Dispose(e);
HandleReceiveFailure(ex, e);
}
}

void HandleReceiveFailure(Exception ex, SocketAsyncEventArgs e)
{
if (ex is ObjectDisposedException ex2 && ex2.ObjectName == "System.Net.Sockets.Socket")
logger?.LogTrace("Accept socket was disposed at RecvEventArg_Completed");
else
logger?.LogError(ex, "An error occurred at RecvEventArg_Completed");
Dispose(e);
}

unsafe void AllocateNetworkReceiveBuffer()
{
networkReceiveBufferEntry = networkPool.Get(networkBufferSettings.initialReceiveBufferSize);
Expand Down
Loading