Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS implementation - high performance #182

Draft
wants to merge 29 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b50bd85
Working on abstracting the channels out of the transport
Aaronontheweb Aug 16, 2024
47dcf7a
creating abstraction to make it easy to wrap `SslStream` around `IMqt…
Aaronontheweb Aug 17, 2024
d7fb1cc
Merge branch 'dev' into tls-support2
Aaronontheweb Aug 30, 2024
80ff505
does not compile
Aaronontheweb Sep 10, 2024
18551b8
Merge branch 'tls-support2' of https://github.com/Aaronontheweb/Turbo…
Aaronontheweb Sep 10, 2024
bf035db
Merge branch 'dev' into tls-support2
Aaronontheweb Sep 10, 2024
53ff70a
wrapping up Channel work
Aaronontheweb Sep 10, 2024
7764fe5
working on pipe sink
Aaronontheweb Sep 10, 2024
8feadd9
pipe processing
Aaronontheweb Sep 10, 2024
4ada867
defined initial Pipe-based stream stages
Aaronontheweb Sep 10, 2024
1540968
ported Pipe / Stream adapter code from ASP.NET
Aaronontheweb Sep 10, 2024
1c294e4
in mem
Aaronontheweb Sep 10, 2024
80414ee
fixed TCP and In memory transports
Aaronontheweb Sep 10, 2024
b2f76bf
fixed all compilation errors
Aaronontheweb Sep 10, 2024
53de977
fixed NotImplementedException
Aaronontheweb Sep 10, 2024
056defe
fixing some issues with concurrent read / write
Aaronontheweb Sep 10, 2024
7585fda
working on fixes related to Pipe buffer movement
Aaronontheweb Sep 10, 2024
4c83178
have some connect and disconnect packets working
Aaronontheweb Sep 10, 2024
b936d32
connect and reconnect now work
Aaronontheweb Sep 10, 2024
4fa9b43
fixed MqttEncoderSink
Aaronontheweb Sep 10, 2024
0f39fa8
buffer is not getting cleared in the MqttDecoderSource
Aaronontheweb Sep 10, 2024
b56569a
fixing some Akka.Streams issues
Aaronontheweb Sep 10, 2024
40b7736
re-arranged closure code
Aaronontheweb Sep 10, 2024
327c497
improved design of decoder stage
Aaronontheweb Sep 11, 2024
761bc67
have all in-memory specs passing
Aaronontheweb Sep 11, 2024
316dd4a
need to change how we write to socket
Aaronontheweb Sep 11, 2024
6589366
fixed issues with duplicate socket read
Aaronontheweb Sep 11, 2024
a770894
guarantee stream termination
Aaronontheweb Sep 11, 2024
51465ef
more unclean shutdown
Aaronontheweb Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/TurboMqtt/Client/ClientStreamInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ private void PrepareStream(MqttClientConnectOptions clientConnectOptions,

// begin inbound stream
inboundStream
.WatchTermination(async (_, task) =>
{
await task;

// guarantee that we kill the client in the event that the stream terminates
disconnectPromise.TrySetResult(DisconnectPacket.Instance);
})
// setting IsOwner to false here is crucial - otherwise, we can't reboot the client after broker disconnects
.To(ChannelSink.FromWriter(inboundPackets.Writer, false))
.Run(_materializer);
Expand Down Expand Up @@ -145,7 +152,6 @@ private static (Source<MqttMessage, NotUsed> inbound, Sink<MqttPacket, NotUsed>
var outboundMessages = MqttClientStreams.Mqtt311OutboundPacketSink(
clientConnectOptions.ClientId,
transport,
MemoryPool<byte>.Shared,
maxFrameSize, (int)clientConnectOptions.MaximumPacketSize,
clientConnectOptions.EnableOpenTelemetry);

Expand Down
28 changes: 28 additions & 0 deletions src/TurboMqtt/IO/DuplexChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// -----------------------------------------------------------------------
// <copyright file="DuplexChannel.cs" company="Petabridge, LLC">
// Copyright (C) 2024 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System.Buffers;
using System.Threading.Channels;

namespace TurboMqtt.IO;

internal interface IDuplexChannel
{
public ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> Writer { get; }
public ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> Reader { get; }
}

internal sealed class DuplexChannel : IDuplexChannel
{
public DuplexChannel(ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> writer, ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> reader)
{
Writer = writer;
Reader = reader;
}

public ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> Writer { get; }
public ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> Reader { get; }
}
2 changes: 1 addition & 1 deletion src/TurboMqtt/IO/FakeServerHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public virtual void FlushPackets()
}
else
{
Log.Debug("Successfully wrote N packets {0} [{1} bytes] to transport.", _pendingPackets.Count, totalSize);
Log.Debug("Successfully wrote {0} packets [{1} bytes] to transport.", _pendingPackets.Count, totalSize);
}

bufferPooled.Dispose();
Expand Down
95 changes: 58 additions & 37 deletions src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
// -----------------------------------------------------------------------

using System.Buffers;
using System.IO.Pipelines;
using System.Threading.Channels;
using Akka.Event;
using TurboMqtt.IO.Internal;
using TurboMqtt.IO.Tcp;
using TurboMqtt.PacketTypes;
using TurboMqtt.Protocol;
Expand Down Expand Up @@ -42,12 +44,10 @@ internal sealed class InMemoryMqttTransport : IMqttTransport
{
private readonly TaskCompletionSource<DisconnectReasonCode> _terminationSource = new();

private readonly Channel<(IMemoryOwner<byte> buffer, int readableBytes)> _writesToTransport =
Channel.CreateUnbounded<(IMemoryOwner<byte> buffer, int readableBytes)>();
private readonly IDuplexPipe _transport;

private readonly IDuplexPipe _application;

private readonly Channel<(IMemoryOwner<byte> buffer, int readableBytes)> _readsFromTransport =
Channel.CreateUnbounded<(IMemoryOwner<byte> buffer, int readableBytes)>();

private readonly CancellationTokenSource _shutdownTokenSource = new();
private readonly IFakeServerHandle _serverHandle;

Expand All @@ -56,8 +56,13 @@ public InMemoryMqttTransport(int maxFrameSize, ILoggingAdapter log, MqttProtocol
MaxFrameSize = maxFrameSize;
Log = log;
ProtocolVersion = protocolVersion;
Reader = _readsFromTransport;
Writer = _writesToTransport;
var pipeOptions = new PipeOptions(pauseWriterThreshold: MaxFrameSize,
resumeWriterThreshold: MaxFrameSize / 2,
useSynchronizationContext: false);
var pipes = DuplexPipe.CreateConnectionPair(pipeOptions, pipeOptions);
_transport = pipes.Transport;
_application = pipes.Application;
Channel = _application;

_serverHandle = protocolVersion switch
{
Expand All @@ -71,7 +76,20 @@ async Task CloseFn()
await CloseAsync();
}

bool PushFn((IMemoryOwner<byte> buffer, int estimatedSize) msg) => _readsFromTransport.Writer.TryWrite(msg);
bool PushFn((IMemoryOwner<byte> buffer, int estimatedSize) msg)
{
try
{
_transport.Output.Write(msg.buffer.Memory.Span);
_ = _transport.Output.FlushAsync();
return true;
}
finally
{
_waitForPendingWrites.TrySetResult(true);
msg.buffer.Dispose();
}
}
}

public MqttProtocolVersion ProtocolVersion { get; }
Expand All @@ -80,38 +98,36 @@ async Task CloseFn()
public ConnectionStatus Status { get; private set; } = ConnectionStatus.NotStarted;

public Task<DisconnectReasonCode> WhenTerminated => _terminationSource.Task;

private readonly TaskCompletionSource<bool> _waitForPendingWrites = new();
public Task WaitForPendingWrites => _waitForPendingWrites.Task;



public async Task<bool> CloseAsync(CancellationToken ct = default)
{
Status = ConnectionStatus.Disconnected;
_writesToTransport.Writer.TryComplete();
await _application.Output.CompleteAsync();
await _transport.Output.CompleteAsync();
await _waitForPendingWrites.Task;
await _shutdownTokenSource.CancelAsync();
_readsFromTransport.Writer.TryComplete();
_terminationSource.TrySetResult(DisconnectReasonCode.NormalDisconnection);
return true;
}

public Task AbortAsync(CancellationToken ct = default)
public async Task AbortAsync(CancellationToken ct = default)
{
Status = ConnectionStatus.Disconnected;
_writesToTransport.Writer.TryComplete();
_readsFromTransport.Writer.TryComplete();
await _application.Output.CompleteAsync();
await _transport.Output.CompleteAsync();
_terminationSource.TrySetResult(DisconnectReasonCode.UnspecifiedError);
return Task.CompletedTask;
}

public Task<bool> ConnectAsync(CancellationToken ct = default)
{
if (Status == ConnectionStatus.NotStarted)
{
Status = ConnectionStatus.Connected;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
DoByteWritesAsync(_shutdownTokenSource.Token);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Expand All @@ -128,31 +144,38 @@ public Task<bool> ConnectAsync(CancellationToken ct = default)
private async Task DoByteWritesAsync(CancellationToken ct)
{
Log.Debug("Starting to read from transport.");
while(!_writesToTransport.Reader.Completion.IsCompleted)
while (ct.IsCancellationRequested == false)
{
if (!_writesToTransport.Reader.TryRead(out var msg))
try
{
try
var msg = await _transport.Input.ReadAsync(ct);

var buffer = msg.Buffer;
Log.Debug("Received {0} bytes from transport.", buffer.Length);

if (!buffer.IsEmpty)
{
await _writesToTransport.Reader.WaitToReadAsync(ct);
var seqPosition = buffer.Start;
var copyBuffer = new ReadOnlyMemory<byte>(buffer.ToArray());
_serverHandle.HandleBytes(copyBuffer);

if (!msg.IsCompleted)
{
// we will throw if we try to advance past the end of the buffer
_transport.Input.AdvanceTo(buffer.End);
}
}
catch(OperationCanceledException)

if (msg.IsCompleted)
{

break;
}
continue;
}

try
catch (Exception exception)
{
ReadOnlyMemory<byte> buffer = msg.buffer.Memory.Slice(0, msg.readableBytes);
Log.Debug("Received {0} bytes from transport.", buffer.Length);
_serverHandle.HandleBytes(buffer);
}
finally
{
// have to free the shared buffer
msg.buffer.Dispose();
Log.Error(exception, "Error reading from transport.");
_transport.Input.CancelPendingRead();
break;
}
}

Expand All @@ -161,7 +184,5 @@ private async Task DoByteWritesAsync(CancellationToken ct)
}

public int MaxFrameSize { get; }
public ChannelWriter<(IMemoryOwner<byte> buffer, int readableBytes)> Writer { get; }
public ChannelReader<(IMemoryOwner<byte> buffer, int readableBytes)> Reader { get; }

public IDuplexPipe Channel { get; }
}
46 changes: 46 additions & 0 deletions src/TurboMqtt/IO/Internal/DuplexPipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// -----------------------------------------------------------------------
// <copyright file="DuplexPipe.cs" company="Petabridge, LLC">
// Copyright (C) 2024 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System.IO.Pipelines;

namespace TurboMqtt.IO.Internal;

internal sealed class DuplexPipe : IDuplexPipe
{
public DuplexPipe(PipeReader reader, PipeWriter writer)
{
Input = reader;
Output = writer;
}

public PipeReader Input { get; }

public PipeWriter Output { get; }

public static DuplexPipePair CreateConnectionPair(PipeOptions inputOptions, PipeOptions outputOptions)
{
var input = new Pipe(inputOptions);
var output = new Pipe(outputOptions);

var transportToApplication = new DuplexPipe(output.Reader, input.Writer);
var applicationToTransport = new DuplexPipe(input.Reader, output.Writer);

return new DuplexPipePair(applicationToTransport, transportToApplication);
}

// This class exists to work around issues with value tuple on .NET Framework
public readonly struct DuplexPipePair
{
public IDuplexPipe Transport { get; }
public IDuplexPipe Application { get; }

public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application)
{
Transport = transport;
Application = application;
}
}
}
Loading
Loading