From b50bd851e81f33b9e9dafd91eebad8a9cf963c8f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 16 Aug 2024 16:42:38 -0500 Subject: [PATCH 01/26] Working on abstracting the channels out of the transport --- src/TurboMqtt/IO/MqttTransport.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/TurboMqtt/IO/MqttTransport.cs b/src/TurboMqtt/IO/MqttTransport.cs index c0c78497..ff2d6d68 100644 --- a/src/TurboMqtt/IO/MqttTransport.cs +++ b/src/TurboMqtt/IO/MqttTransport.cs @@ -11,6 +11,12 @@ namespace TurboMqtt.IO; +internal interface IDuplexTransport +{ + public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } +} + /// /// Represents the underlying transport mechanism used to send and receive MQTT messages. /// From 47dcf7ac5eed24b66138a79341b90e6d8209a0f3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 16 Aug 2024 19:00:54 -0500 Subject: [PATCH 02/26] creating abstraction to make it easy to wrap `SslStream` around `IMqttTransport` --- .../IO/InMem/InMemoryMqttTransport.cs | 6 ++-- src/TurboMqtt/IO/MqttTransport.cs | 29 +++++++++++++++---- src/TurboMqtt/IO/Tcp/TcpTransport.cs | 6 ++-- src/TurboMqtt/Streams/MqttClientStreams.cs | 6 ++-- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index 5cf1233e..da5d8ce2 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -56,8 +56,7 @@ public InMemoryMqttTransport(int maxFrameSize, ILoggingAdapter log, MqttProtocol MaxFrameSize = maxFrameSize; Log = log; ProtocolVersion = protocolVersion; - Reader = _readsFromTransport; - Writer = _writesToTransport; + Transport = new DuplexTransport(_writesToTransport, _readsFromTransport); _serverHandle = protocolVersion switch { @@ -161,7 +160,6 @@ private async Task DoByteWritesAsync(CancellationToken ct) } public int MaxFrameSize { get; } - public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } - public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } + public IDuplexTransport Transport { get; } } \ No newline at end of file diff --git a/src/TurboMqtt/IO/MqttTransport.cs b/src/TurboMqtt/IO/MqttTransport.cs index ff2d6d68..05b262a8 100644 --- a/src/TurboMqtt/IO/MqttTransport.cs +++ b/src/TurboMqtt/IO/MqttTransport.cs @@ -17,6 +17,18 @@ internal interface IDuplexTransport public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } } +internal sealed class DuplexTransport : IDuplexTransport +{ + public DuplexTransport(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> writer, ChannelReader<(IMemoryOwner buffer, int readableBytes)> reader) + { + Writer = writer; + Reader = reader; + } + + public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } +} + /// /// Represents the underlying transport mechanism used to send and receive MQTT messages. /// @@ -95,14 +107,19 @@ internal interface IMqttTransport public int MaxFrameSize { get; } /// - /// Used to write data to the underlying transport. + /// Contains a reader and a writer used to send and receive data over the transport. /// - public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + public IDuplexTransport Transport { get; } - /// - /// Used to read data from the underlying transport. - /// - public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } + // /// + // /// Used to write data to the underlying transport. + // /// + // public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + // + // /// + // /// Used to read data from the underlying transport. + // /// + // public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } } public enum ConnectionStatus diff --git a/src/TurboMqtt/IO/Tcp/TcpTransport.cs b/src/TurboMqtt/IO/Tcp/TcpTransport.cs index 5f7748a3..7bbd1f65 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransport.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransport.cs @@ -54,8 +54,7 @@ internal TcpTransport(ILoggingAdapter log, TcpTransportActor.ConnectionState sta Log = log; State = state; _connectionActor = connectionActor; - Reader = state.Reader; - Writer = state.Writer; + Transport = new DuplexTransport(State.Writer, State.Reader); MaxFrameSize = state.MaxFrameSize; } @@ -94,6 +93,5 @@ public async Task ConnectAsync(CancellationToken ct = default) } public int MaxFrameSize { get; } - public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } - public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } + public IDuplexTransport Transport { get; } } \ No newline at end of file diff --git a/src/TurboMqtt/Streams/MqttClientStreams.cs b/src/TurboMqtt/Streams/MqttClientStreams.cs index fa517345..d7514afd 100644 --- a/src/TurboMqtt/Streams/MqttClientStreams.cs +++ b/src/TurboMqtt/Streams/MqttClientStreams.cs @@ -23,7 +23,7 @@ internal static class MqttClientStreams public static Sink Mqtt311OutboundPacketSink(string clientId, IMqttTransport transport, MemoryPool memoryPool, int maxFrameSize, int maxPacketSize, bool withTelemetry = true) { - var finalSink = Sink.FromWriter(transport.Writer, true); + var finalSink = Sink.FromWriter(transport.Transport.Writer, true); if (withTelemetry) return Flow.Create() .Via(OpenTelemetryFlows.MqttPacketRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, @@ -44,7 +44,7 @@ public static Source Mqtt311InboundMessageSource(string cl { if (withTelemetry) - return (ChannelSource.FromReader(transport.Reader) + return (ChannelSource.FromReader(transport.Transport.Reader) .Via(OpenTelemetryFlows.MqttBitRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, OpenTelemetrySupport.Direction.Inbound)) .Via(MqttDecodingFlows.Mqtt311Decoding()) @@ -57,7 +57,7 @@ public static Source Mqtt311InboundMessageSource(string cl .Where(c => c.PacketType == MqttPacketType.Publish) .Select(c => ((PublishPacket)c).FromPacket())); - return (ChannelSource.FromReader(transport.Reader) + return (ChannelSource.FromReader(transport.Transport.Reader) .Via(MqttDecodingFlows.Mqtt311Decoding()) .Async() .Via(MqttReceiverFlows.ClientAckingFlow(outboundPackets, actors, disconnectPromise)) From 80ff505f6b274f674ab094d29b77359ee04b8458 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 9 Sep 2024 19:26:39 -0500 Subject: [PATCH 03/26] does not compile --- src/TurboMqtt/IO/DuplexChannel.cs | 28 ++++++++++++++++ .../IO/InMem/InMemoryMqttTransport.cs | 8 ++--- .../IO/Internal/DuplexChannelStream.cs | 32 +++++++++++++++++++ src/TurboMqtt/IO/MqttTransport.cs | 20 +----------- src/TurboMqtt/IO/Tcp/TcpTransport.cs | 4 +-- src/TurboMqtt/Streams/MqttClientStreams.cs | 6 ++-- 6 files changed, 70 insertions(+), 28 deletions(-) create mode 100644 src/TurboMqtt/IO/DuplexChannel.cs create mode 100644 src/TurboMqtt/IO/Internal/DuplexChannelStream.cs diff --git a/src/TurboMqtt/IO/DuplexChannel.cs b/src/TurboMqtt/IO/DuplexChannel.cs new file mode 100644 index 00000000..7f1d4164 --- /dev/null +++ b/src/TurboMqtt/IO/DuplexChannel.cs @@ -0,0 +1,28 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.Buffers; +using System.Threading.Channels; + +namespace TurboMqtt.IO; + +internal interface IDuplexChannel +{ + public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } +} + +internal sealed class DuplexChannel : IDuplexChannel +{ + public DuplexChannel(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> writer, ChannelReader<(IMemoryOwner buffer, int readableBytes)> reader) + { + Writer = writer; + Reader = reader; + } + + public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } +} \ No newline at end of file diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index da5d8ce2..e360468c 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -43,10 +43,10 @@ internal sealed class InMemoryMqttTransport : IMqttTransport private readonly TaskCompletionSource _terminationSource = new(); private readonly Channel<(IMemoryOwner buffer, int readableBytes)> _writesToTransport = - Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); + System.Threading.Channels.Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); private readonly Channel<(IMemoryOwner buffer, int readableBytes)> _readsFromTransport = - Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); + System.Threading.Channels.Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); private readonly CancellationTokenSource _shutdownTokenSource = new(); private readonly IFakeServerHandle _serverHandle; @@ -56,7 +56,7 @@ public InMemoryMqttTransport(int maxFrameSize, ILoggingAdapter log, MqttProtocol MaxFrameSize = maxFrameSize; Log = log; ProtocolVersion = protocolVersion; - Transport = new DuplexTransport(_writesToTransport, _readsFromTransport); + Channel = new DuplexChannel(_writesToTransport, _readsFromTransport); _serverHandle = protocolVersion switch { @@ -160,6 +160,6 @@ private async Task DoByteWritesAsync(CancellationToken ct) } public int MaxFrameSize { get; } - public IDuplexTransport Transport { get; } + public IDuplexChannel Channel { get; } } \ No newline at end of file diff --git a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs b/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs new file mode 100644 index 00000000..b87b1b26 --- /dev/null +++ b/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs @@ -0,0 +1,32 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.Buffers; +using System.Threading.Channels; + +namespace TurboMqtt.IO.Internal; + +public class DuplexChannelStream : Stream +{ + private readonly ChannelWriter<(IMemoryOwner buffer, int readableBytes)> _output; + private readonly ChannelReader<(IMemoryOwner buffer, int readableBytes)> _input; + private readonly bool _throwOnCancelled; + private volatile bool _cancelCalled; + + public DuplexChannelStream(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> output, + ChannelReader<(IMemoryOwner buffer, int readableBytes)> input, bool throwOnCancelled) + { + _output = output; + _input = input; + _throwOnCancelled = throwOnCancelled; + } + + public void CancelPendingRead() + { + _cancelCalled = true; + } + +} \ No newline at end of file diff --git a/src/TurboMqtt/IO/MqttTransport.cs b/src/TurboMqtt/IO/MqttTransport.cs index 05b262a8..f7d9f45a 100644 --- a/src/TurboMqtt/IO/MqttTransport.cs +++ b/src/TurboMqtt/IO/MqttTransport.cs @@ -11,24 +11,6 @@ namespace TurboMqtt.IO; -internal interface IDuplexTransport -{ - public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } - public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } -} - -internal sealed class DuplexTransport : IDuplexTransport -{ - public DuplexTransport(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> writer, ChannelReader<(IMemoryOwner buffer, int readableBytes)> reader) - { - Writer = writer; - Reader = reader; - } - - public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } - public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } -} - /// /// Represents the underlying transport mechanism used to send and receive MQTT messages. /// @@ -109,7 +91,7 @@ internal interface IMqttTransport /// /// Contains a reader and a writer used to send and receive data over the transport. /// - public IDuplexTransport Transport { get; } + public IDuplexChannel Channel { get; } // /// // /// Used to write data to the underlying transport. diff --git a/src/TurboMqtt/IO/Tcp/TcpTransport.cs b/src/TurboMqtt/IO/Tcp/TcpTransport.cs index 7bbd1f65..ff8ed311 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransport.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransport.cs @@ -54,7 +54,7 @@ internal TcpTransport(ILoggingAdapter log, TcpTransportActor.ConnectionState sta Log = log; State = state; _connectionActor = connectionActor; - Transport = new DuplexTransport(State.Writer, State.Reader); + Channel = new DuplexChannel(State.Writer, State.Reader); MaxFrameSize = state.MaxFrameSize; } @@ -93,5 +93,5 @@ public async Task ConnectAsync(CancellationToken ct = default) } public int MaxFrameSize { get; } - public IDuplexTransport Transport { get; } + public IDuplexChannel Channel { get; } } \ No newline at end of file diff --git a/src/TurboMqtt/Streams/MqttClientStreams.cs b/src/TurboMqtt/Streams/MqttClientStreams.cs index d7514afd..b5b2b778 100644 --- a/src/TurboMqtt/Streams/MqttClientStreams.cs +++ b/src/TurboMqtt/Streams/MqttClientStreams.cs @@ -23,7 +23,7 @@ internal static class MqttClientStreams public static Sink Mqtt311OutboundPacketSink(string clientId, IMqttTransport transport, MemoryPool memoryPool, int maxFrameSize, int maxPacketSize, bool withTelemetry = true) { - var finalSink = Sink.FromWriter(transport.Transport.Writer, true); + var finalSink = Sink.FromWriter(transport.Channel.Writer, true); if (withTelemetry) return Flow.Create() .Via(OpenTelemetryFlows.MqttPacketRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, @@ -44,7 +44,7 @@ public static Source Mqtt311InboundMessageSource(string cl { if (withTelemetry) - return (ChannelSource.FromReader(transport.Transport.Reader) + return (ChannelSource.FromReader(transport.Channel.Reader) .Via(OpenTelemetryFlows.MqttBitRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, OpenTelemetrySupport.Direction.Inbound)) .Via(MqttDecodingFlows.Mqtt311Decoding()) @@ -57,7 +57,7 @@ public static Source Mqtt311InboundMessageSource(string cl .Where(c => c.PacketType == MqttPacketType.Publish) .Select(c => ((PublishPacket)c).FromPacket())); - return (ChannelSource.FromReader(transport.Transport.Reader) + return (ChannelSource.FromReader(transport.Channel.Reader) .Via(MqttDecodingFlows.Mqtt311Decoding()) .Async() .Via(MqttReceiverFlows.ClientAckingFlow(outboundPackets, actors, disconnectPromise)) From 53ff70a6515ffeb4cede6b5ef63e49d086a3dce5 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 9 Sep 2024 20:22:00 -0500 Subject: [PATCH 04/26] wrapping up Channel work --- .../IO/Internal/DuplexChannelStream.cs | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs b/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs index b87b1b26..ffcd15d2 100644 --- a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs +++ b/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs @@ -16,17 +16,58 @@ public class DuplexChannelStream : Stream private readonly bool _throwOnCancelled; private volatile bool _cancelCalled; - public DuplexChannelStream(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> output, + public DuplexChannelStream(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> output, ChannelReader<(IMemoryOwner buffer, int readableBytes)> input, bool throwOnCancelled) { _output = output; _input = input; _throwOnCancelled = throwOnCancelled; } - + public void CancelPendingRead() { _cancelCalled = true; } - + + public override void Flush() + { + // channels don't really have a concept of Flush + } + + public override int Read(byte[] buffer, int offset, int count) + { + + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = new CancellationToken()) + { + return _output.WriteAsync((new UnsharedMemoryOwner(buffer), buffer.Length), cancellationToken); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } } \ No newline at end of file From 7764fe5256f7c320bcccb39a5f2d9273b12abccb Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 9 Sep 2024 21:18:24 -0500 Subject: [PATCH 05/26] working on pipe sink --- src/TurboMqtt/Streams/MqttEncodingFlows.cs | 40 ++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/TurboMqtt/Streams/MqttEncodingFlows.cs b/src/TurboMqtt/Streams/MqttEncodingFlows.cs index 0b87d9c5..9a02555a 100644 --- a/src/TurboMqtt/Streams/MqttEncodingFlows.cs +++ b/src/TurboMqtt/Streams/MqttEncodingFlows.cs @@ -5,6 +5,7 @@ // ----------------------------------------------------------------------- using System.Buffers; +using System.IO.Pipelines; using Akka; using Akka.Event; using Akka.Streams; @@ -46,6 +47,45 @@ public static class MqttEncodingFlows } } +internal sealed class MqttEncoderSink : GraphStage>> +{ + private readonly PipeWriter _writer; + private readonly MqttProtocolVersion _protocolVersion; + + public MqttEncoderSink(PipeWriter writer, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V3_1_1) + { + _writer = writer; + _protocolVersion = protocolVersion; + In = new Inlet>($"MqttEncoderFlow{_protocolVersion}.In"); + Shape = new SinkShape>(In); + } + + public Inlet> In { get; } + + public override SinkShape> Shape { get; } + protected override Attributes InitialAttributes => DefaultAttributes.Select; + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + throw new NotImplementedException(); + } + + private sealed class Logic : InGraphStageLogic + { + private readonly PipeWriter _pipeWriter; + + public Logic(MqttEncoderSink encoderSink) : base(encoderSink.Shape) + { + _pipeWriter = encoderSink._writer; + SetHandler(encoderSink.In, this); + } + + public override void OnPush() + { + + } + } +} + /// /// Accepts a range of MqttPackets and uses a shared memory pool for encode them into an /// From 8feadd9c39c804ac3803326c4b5b92343791e1fa Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 9 Sep 2024 21:22:39 -0500 Subject: [PATCH 06/26] pipe processing --- src/TurboMqtt/Streams/MqttEncodingFlows.cs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/TurboMqtt/Streams/MqttEncodingFlows.cs b/src/TurboMqtt/Streams/MqttEncodingFlows.cs index 9a02555a..fe3c6b71 100644 --- a/src/TurboMqtt/Streams/MqttEncodingFlows.cs +++ b/src/TurboMqtt/Streams/MqttEncodingFlows.cs @@ -72,16 +72,37 @@ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) private sealed class Logic : InGraphStageLogic { private readonly PipeWriter _pipeWriter; + private readonly MqttProtocolVersion _protocolVersion; + private readonly MqttEncoderSink _graphStage; public Logic(MqttEncoderSink encoderSink) : base(encoderSink.Shape) { _pipeWriter = encoderSink._writer; + _protocolVersion = encoderSink._protocolVersion; + _graphStage = encoderSink; SetHandler(encoderSink.In, this); } public override void OnPush() { + var elements = Grab(_graphStage.In); + var totalSize = elements.Sum(c => c.predictedSize.TotalSize); + var buffer = _pipeWriter.GetMemory(totalSize); + var bytesWritten = Mqtt311Encoder.EncodePackets(elements, ref buffer); + CODE BOMB + } + + public override void PreStart() + { + Pull(_graphStage.In); + base.PreStart(); + } + + public override void OnUpstreamFailure(Exception e) + { + base.OnUpstreamFailure(e); + _pipeWriter.Complete(e); } } } From 4ada8673b1a18082ce1c33acecc50cd20810ed06 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 12:07:58 -0500 Subject: [PATCH 07/26] defined initial Pipe-based stream stages --- src/TurboMqtt/Streams/MqttDecoderSource.cs | 115 +++++++++++++++++++++ src/TurboMqtt/Streams/MqttEncoderSink.cs | 108 +++++++++++++++++++ src/TurboMqtt/Streams/MqttEncodingFlows.cs | 61 ----------- 3 files changed, 223 insertions(+), 61 deletions(-) create mode 100644 src/TurboMqtt/Streams/MqttDecoderSource.cs create mode 100644 src/TurboMqtt/Streams/MqttEncoderSink.cs diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs new file mode 100644 index 00000000..27d3d5ad --- /dev/null +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -0,0 +1,115 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.Buffers; +using System.Collections.Immutable; +using System.IO.Pipelines; +using Akka.Event; +using Akka.Streams; +using Akka.Streams.Implementation.Stages; +using Akka.Streams.Stage; +using TurboMqtt.PacketTypes; +using TurboMqtt.Protocol; + +namespace TurboMqtt.Streams; + +internal sealed class MqttDecoderSource : GraphStage>> +{ + private readonly PipeReader _reader; + private readonly MqttProtocolVersion _protocolVersion; + + public MqttDecoderSource(PipeReader reader, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V3_1_1) + { + _reader = reader; + _protocolVersion = protocolVersion; + Out = new Outlet>($"MqttDecoderSink{_protocolVersion}.Out"); + Shape = new SourceShape>(Out); + } + + protected override Attributes InitialAttributes => DefaultAttributes.Select; + + public Outlet> Out { get; } + + public override SourceShape> Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + throw new NotImplementedException(); + } + + private class Logic : OutGraphStageLogic + { + private readonly PipeReader _pipeReader; + private readonly MqttProtocolVersion _protocolVersion; + private readonly MqttDecoderSource _graphStage; + private readonly Mqtt311Decoder _mqtt311Decoder; + private readonly Action _onReadReady; + + public Logic(MqttDecoderSource graphStage) : base(graphStage.Shape) + { + _graphStage = graphStage; + _pipeReader = graphStage._reader; + _protocolVersion = graphStage._protocolVersion; + _mqtt311Decoder = new Mqtt311Decoder(); + _onReadReady = GetAsyncCallback(HandleReadResult); + SetHandler(graphStage.Out, this); + } + + public override void OnPull() + { + if (_pipeReader.TryRead(out var readResult)) + { + HandleReadResult(readResult); + } + else + { + var continuation = _pipeReader.ReadAsync(); + if (continuation.IsCompletedSuccessfully) + { + HandleReadResult(continuation.GetAwaiter().GetResult()); + } + else + { + async Task WaitForRead() + { + _onReadReady(await continuation); + } + + _ = WaitForRead(); + } + } + } + + private void HandleReadResult(ReadResult readResult) + { + if (!readResult.Buffer.IsEmpty) + { + PushReadBytes(readResult.Buffer); + } + + if (readResult.IsCompleted) + { + // we are done reading + CompleteStage(); + } + } + + private void PushReadBytes(in ReadOnlySequence buffer) + { + // consume this entire sequence by copying it into a new buffer + // have to copy because there's no guarantee we can safely release a shared buffer + // once we hand the message over to the end-user. + var newMemory = new Memory(new byte[buffer.Length]); + buffer.CopyTo(newMemory.Span); + + if (_mqtt311Decoder.TryDecode(newMemory, out var decoded)) + { + Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count, newMemory.Length); + Push(_graphStage.Out, decoded); + } + } + } +} \ No newline at end of file diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs new file mode 100644 index 00000000..793492fc --- /dev/null +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -0,0 +1,108 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.Buffers; +using System.IO.Pipelines; +using Akka.Event; +using Akka.Streams; +using Akka.Streams.Implementation.Stages; +using Akka.Streams.Stage; +using TurboMqtt.PacketTypes; +using TurboMqtt.Protocol; + +namespace TurboMqtt.Streams; + +internal sealed class MqttEncoderSink : GraphStage>> +{ + private readonly PipeWriter _writer; + private readonly MqttProtocolVersion _protocolVersion; + + public MqttEncoderSink(PipeWriter writer, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V3_1_1) + { + _writer = writer; + _protocolVersion = protocolVersion; + In = new Inlet>($"MqttEncoderSink{_protocolVersion}.In"); + Shape = new SinkShape>(In); + } + + public Inlet> In { get; } + + public override SinkShape> Shape { get; } + protected override Attributes InitialAttributes => DefaultAttributes.Select; + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new Logic(this); + } + + private sealed class Logic : InGraphStageLogic + { + private readonly PipeWriter _pipeWriter; + private readonly MqttProtocolVersion _protocolVersion; + private readonly MqttEncoderSink _graphStage; + private readonly Action _flushCallback; + + public Logic(MqttEncoderSink encoderSink) : base(encoderSink.Shape) + { + _pipeWriter = encoderSink._writer; + _protocolVersion = encoderSink._protocolVersion; + _graphStage = encoderSink; + _flushCallback = GetAsyncCallback(OnFlushComplete); + SetHandler(encoderSink.In, this); + } + + private void OnFlushComplete(FlushResult obj) + { + if (obj.IsCompleted) + { + Log.Info("PipeWriter completed - terminating stage"); + CompleteStage(); + return; + } + + if (obj.IsCanceled) + { + Log.Debug("Pending flush was cancelled - we'll catch it on next attempt"); + } + } + + protected override object LogSource => Akka.Event.LogSource.Create($"Mqtt{_protocolVersion}EncoderFlow"); + + public override void OnPush() + { + var packets = Grab(_graphStage.In); + var totalBytes = packets.Sum(c => c.predictedSize.TotalSize); + + var buffer = _pipeWriter.GetMemory(totalBytes); + // TODO: handle switching between MQTT 3.1.1 and 5 + var bytesWritten = Mqtt311Encoder.EncodePackets(packets, ref buffer); + + System.Diagnostics.Debug.Assert(bytesWritten == totalBytes, "bytesWritten == predictedBytes"); + + Log.Debug("Encoded {0} messages using {1} bytes", packets.Count, bytesWritten); + + DoFlush().GetAwaiter().GetResult(); + return; + + async Task DoFlush() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + _flushCallback(await _pipeWriter.WriteAsync(buffer, cts.Token)); + } + } + + public override void PreStart() + { + Pull(_graphStage.In); + base.PreStart(); + } + + public override void OnUpstreamFailure(Exception e) + { + base.OnUpstreamFailure(e); + _pipeWriter.Complete(e); + } + } +} \ No newline at end of file diff --git a/src/TurboMqtt/Streams/MqttEncodingFlows.cs b/src/TurboMqtt/Streams/MqttEncodingFlows.cs index fe3c6b71..0b87d9c5 100644 --- a/src/TurboMqtt/Streams/MqttEncodingFlows.cs +++ b/src/TurboMqtt/Streams/MqttEncodingFlows.cs @@ -5,7 +5,6 @@ // ----------------------------------------------------------------------- using System.Buffers; -using System.IO.Pipelines; using Akka; using Akka.Event; using Akka.Streams; @@ -47,66 +46,6 @@ public static class MqttEncodingFlows } } -internal sealed class MqttEncoderSink : GraphStage>> -{ - private readonly PipeWriter _writer; - private readonly MqttProtocolVersion _protocolVersion; - - public MqttEncoderSink(PipeWriter writer, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V3_1_1) - { - _writer = writer; - _protocolVersion = protocolVersion; - In = new Inlet>($"MqttEncoderFlow{_protocolVersion}.In"); - Shape = new SinkShape>(In); - } - - public Inlet> In { get; } - - public override SinkShape> Shape { get; } - protected override Attributes InitialAttributes => DefaultAttributes.Select; - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) - { - throw new NotImplementedException(); - } - - private sealed class Logic : InGraphStageLogic - { - private readonly PipeWriter _pipeWriter; - private readonly MqttProtocolVersion _protocolVersion; - private readonly MqttEncoderSink _graphStage; - - public Logic(MqttEncoderSink encoderSink) : base(encoderSink.Shape) - { - _pipeWriter = encoderSink._writer; - _protocolVersion = encoderSink._protocolVersion; - _graphStage = encoderSink; - SetHandler(encoderSink.In, this); - } - - public override void OnPush() - { - var elements = Grab(_graphStage.In); - var totalSize = elements.Sum(c => c.predictedSize.TotalSize); - var buffer = _pipeWriter.GetMemory(totalSize); - var bytesWritten = Mqtt311Encoder.EncodePackets(elements, ref buffer); - - CODE BOMB - } - - public override void PreStart() - { - Pull(_graphStage.In); - base.PreStart(); - } - - public override void OnUpstreamFailure(Exception e) - { - base.OnUpstreamFailure(e); - _pipeWriter.Complete(e); - } - } -} - /// /// Accepts a range of MqttPackets and uses a shared memory pool for encode them into an /// From 154096882a01045f2d2a114988668bb7e2900c59 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 12:10:59 -0500 Subject: [PATCH 08/26] ported Pipe / Stream adapter code from ASP.NET --- .../IO/Internal/DuplexChannelStream.cs | 140 +++++++++++++++--- src/TurboMqtt/IO/Internal/TaskToApm.cs | 111 ++++++++++++++ .../IO/Internal/ValueTaskExtensions.cs | 45 ++++++ 3 files changed, 274 insertions(+), 22 deletions(-) create mode 100644 src/TurboMqtt/IO/Internal/TaskToApm.cs create mode 100644 src/TurboMqtt/IO/Internal/ValueTaskExtensions.cs diff --git a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs b/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs index ffcd15d2..eb59c09c 100644 --- a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs +++ b/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs @@ -5,69 +5,165 @@ // ----------------------------------------------------------------------- using System.Buffers; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; using System.Threading.Channels; namespace TurboMqtt.IO.Internal; -public class DuplexChannelStream : Stream +internal class DuplexPipeStream : Stream { - private readonly ChannelWriter<(IMemoryOwner buffer, int readableBytes)> _output; - private readonly ChannelReader<(IMemoryOwner buffer, int readableBytes)> _input; + private readonly PipeReader _input; + private readonly PipeWriter _output; private readonly bool _throwOnCancelled; private volatile bool _cancelCalled; - public DuplexChannelStream(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> output, - ChannelReader<(IMemoryOwner buffer, int readableBytes)> input, bool throwOnCancelled) + public DuplexPipeStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false) { - _output = output; _input = input; + _output = output; _throwOnCancelled = throwOnCancelled; } public void CancelPendingRead() { _cancelCalled = true; + _input.CancelPendingRead(); } - public override void Flush() + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length { - // channels don't really have a concept of Flush + get + { + throw new NotSupportedException(); + } } - public override int Read(byte[] buffer, int offset, int count) + public override long Position { - + get + { + throw new NotSupportedException(); + } + set + { + throw new NotSupportedException(); + } } public override long Seek(long offset, SeekOrigin origin) { - throw new NotImplementedException(); + throw new NotSupportedException(); } public override void SetLength(long value) { - throw new NotImplementedException(); + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + ValueTask vt = ReadAsyncInternal(new Memory(buffer, offset, count), default); + return vt.IsCompleted ? + vt.Result : + vt.AsTask().GetAwaiter().GetResult(); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(destination, cancellationToken); } public override void Write(byte[] buffer, int offset, int count) { - throw new NotImplementedException(); + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = new CancellationToken()) + public override Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken) { - return _output.WriteAsync((new UnsharedMemoryOwner(buffer), buffer.Length), cancellationToken); + return _output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).GetAsTask(); } - public override bool CanRead => true; - public override bool CanSeek => false; - public override bool CanWrite => true; + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + return _output.WriteAsync(source, cancellationToken).GetAsValueTask(); + } - public override long Length => throw new NotSupportedException(); + public override void Flush() + { + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); + } - public override long Position + public override Task FlushAsync(CancellationToken cancellationToken) + { + return _output.FlushAsync(cancellationToken).GetAsTask(); + } + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + private async ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) + { + while (true) + { + var result = await _input.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + try + { + if (_throwOnCancelled && result.IsCanceled && _cancelCalled) + { + // Reset the bool + _cancelCalled = false; + throw new OperationCanceledException(); + } + + if (!readableBuffer.IsEmpty) + { + // buffer.Count is int + var count = (int)Math.Min(readableBuffer.Length, destination.Length); + readableBuffer = readableBuffer.Slice(0, count); + readableBuffer.CopyTo(destination.Span); + return count; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + _input.AdvanceTo(readableBuffer.End, readableBuffer.End); + } + } + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) { - get => throw new NotSupportedException(); - set => throw new NotSupportedException(); + TaskToApm.End(asyncResult); } } \ No newline at end of file diff --git a/src/TurboMqtt/IO/Internal/TaskToApm.cs b/src/TurboMqtt/IO/Internal/TaskToApm.cs new file mode 100644 index 00000000..be0afeae --- /dev/null +++ b/src/TurboMqtt/IO/Internal/TaskToApm.cs @@ -0,0 +1,111 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.Diagnostics; + +namespace TurboMqtt.IO.Internal; + +/// +/// Provides support for efficiently using Tasks to implement the APM (Begin/End) pattern. +/// +internal static class TaskToApm +{ + /// + /// Marshals the Task as an IAsyncResult, using the supplied callback and state + /// to implement the APM pattern. + /// + /// The Task to be marshaled. + /// The callback to be invoked upon completion. + /// The state to be stored in the IAsyncResult. + /// An IAsyncResult to represent the task's asynchronous operation. + public static IAsyncResult Begin(Task task, AsyncCallback? callback, object? state) => + new TaskAsyncResult(task, state, callback); + + /// Processes an IAsyncResult returned by Begin. + /// The IAsyncResult to unwrap. + public static void End(IAsyncResult asyncResult) + { + if (asyncResult is TaskAsyncResult twar) + { + twar._task.GetAwaiter().GetResult(); + return; + } + + ArgumentNullException.ThrowIfNull(asyncResult, nameof(asyncResult)); + } + + /// Processes an IAsyncResult returned by Begin. + /// The IAsyncResult to unwrap. + public static TResult End(IAsyncResult asyncResult) + { + if (asyncResult is TaskAsyncResult twar && twar._task is Task task) + { + return task.GetAwaiter().GetResult(); + } + + throw new ArgumentException($"{nameof(asyncResult)} must be a TaskAsyncResult wrapping a Task returning a {typeof(TResult).Name}.", nameof(asyncResult)); + } + + /// Provides a simple IAsyncResult that wraps a Task. + /// + /// We could use the Task as the IAsyncResult if the Task's AsyncState is the same as the object state, + /// but that's very rare, in particular in a situation where someone cares about allocation, and always + /// using TaskAsyncResult simplifies things and enables additional optimizations. + /// + internal sealed class TaskAsyncResult : IAsyncResult + { + /// The wrapped Task. + internal readonly Task _task; + /// Callback to invoke when the wrapped task completes. + private readonly AsyncCallback? _callback; + + /// Initializes the IAsyncResult with the Task to wrap and the associated object state. + /// The Task to wrap. + /// The new AsyncState value. + /// Callback to invoke when the wrapped task completes. + internal TaskAsyncResult(Task task, object? state, AsyncCallback? callback) + { + Debug.Assert(task != null); + _task = task; + AsyncState = state; + + if (task.IsCompleted) + { + // Synchronous completion. Invoke the callback. No need to store it. + CompletedSynchronously = true; + callback?.Invoke(this); + } + else if (callback != null) + { + // Asynchronous completion, and we have a callback; schedule it. We use OnCompleted rather than ContinueWith in + // order to avoid running synchronously if the task has already completed by the time we get here but still run + // synchronously as part of the task's completion if the task completes after (the more common case). + _callback = callback; + _task.ConfigureAwait(continueOnCapturedContext: false) + .GetAwaiter() + .OnCompleted(InvokeCallback); // allocates a delegate, but avoids a closure + } + } + + /// Invokes the callback. + private void InvokeCallback() + { + Debug.Assert(!CompletedSynchronously); + Debug.Assert(_callback != null); + _callback.Invoke(this); + } + + /// Gets a user-defined object that qualifies or contains information about an asynchronous operation. + public object? AsyncState { get; } + /// Gets a value that indicates whether the asynchronous operation completed synchronously. + /// This is set lazily based on whether the has completed by the time this object is created. + public bool CompletedSynchronously { get; } + /// Gets a value that indicates whether the asynchronous operation has completed. + public bool IsCompleted => _task.IsCompleted; + /// Gets a that is used to wait for an asynchronous operation to complete. + public WaitHandle AsyncWaitHandle => ((IAsyncResult)_task).AsyncWaitHandle; + } +} \ No newline at end of file diff --git a/src/TurboMqtt/IO/Internal/ValueTaskExtensions.cs b/src/TurboMqtt/IO/Internal/ValueTaskExtensions.cs new file mode 100644 index 00000000..e9f1b02d --- /dev/null +++ b/src/TurboMqtt/IO/Internal/ValueTaskExtensions.cs @@ -0,0 +1,45 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.IO.Pipelines; +using System.Runtime.CompilerServices; + +namespace TurboMqtt.IO.Internal; + +internal static class ValueTaskExtensions +{ + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static Task GetAsTask(this in ValueTask valueTask) + { + // Try to avoid the allocation from AsTask + if (valueTask.IsCompletedSuccessfully) + { + // Signal consumption to the IValueTaskSource + valueTask.GetAwaiter().GetResult(); + return Task.CompletedTask; + } + else + { + return valueTask.AsTask(); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ValueTask GetAsValueTask(this in ValueTask valueTask) + { + // Try to avoid the allocation from AsTask + if (valueTask.IsCompletedSuccessfully) + { + // Signal consumption to the IValueTaskSource + valueTask.GetAwaiter().GetResult(); + return default; + } + else + { + return new ValueTask(valueTask.AsTask()); + } + } +} \ No newline at end of file From 1c294e4a9378e1bf8fc2c73fec9a2d8d146b9e19 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 13:51:15 -0500 Subject: [PATCH 09/26] in mem --- .../IO/InMem/InMemoryMqttTransport.cs | 77 +++++++++++-------- src/TurboMqtt/IO/Internal/DuplexPipe.cs | 46 +++++++++++ ...exChannelStream.cs => DuplexPipeStream.cs} | 51 +++++++++++- src/TurboMqtt/IO/MqttTransport.cs | 3 +- 4 files changed, 144 insertions(+), 33 deletions(-) create mode 100644 src/TurboMqtt/IO/Internal/DuplexPipe.cs rename src/TurboMqtt/IO/Internal/{DuplexChannelStream.cs => DuplexPipeStream.cs} (77%) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index e360468c..7342afe4 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -5,8 +5,10 @@ // ----------------------------------------------------------------------- using System.Buffers; +using System.IO.Pipelines; using System.Threading.Channels; using Akka.Event; +using TurboMqtt.IO.Internal; using TurboMqtt.IO.Tcp; using TurboMqtt.PacketTypes; using TurboMqtt.Protocol; @@ -42,12 +44,10 @@ internal sealed class InMemoryMqttTransport : IMqttTransport { private readonly TaskCompletionSource _terminationSource = new(); - private readonly Channel<(IMemoryOwner buffer, int readableBytes)> _writesToTransport = - System.Threading.Channels.Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); + private readonly IDuplexPipe _transport; + + private readonly IDuplexPipe _application; - private readonly Channel<(IMemoryOwner buffer, int readableBytes)> _readsFromTransport = - System.Threading.Channels.Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); - private readonly CancellationTokenSource _shutdownTokenSource = new(); private readonly IFakeServerHandle _serverHandle; @@ -56,7 +56,13 @@ public InMemoryMqttTransport(int maxFrameSize, ILoggingAdapter log, MqttProtocol MaxFrameSize = maxFrameSize; Log = log; ProtocolVersion = protocolVersion; - Channel = new DuplexChannel(_writesToTransport, _readsFromTransport); + var pipeOptions = new PipeOptions(pauseWriterThreshold: MaxFrameSize, + resumeWriterThreshold: MaxFrameSize / 2, + useSynchronizationContext: false); + var pipes = DuplexPipe.CreateConnectionPair(pipeOptions, pipeOptions); + _transport = pipes.Transport; + _application = pipes.Application; + Channel = _transport; _serverHandle = protocolVersion switch { @@ -70,7 +76,18 @@ async Task CloseFn() await CloseAsync(); } - bool PushFn((IMemoryOwner buffer, int estimatedSize) msg) => _readsFromTransport.Writer.TryWrite(msg); + bool PushFn((IMemoryOwner buffer, int estimatedSize) msg) + { + try + { + _application.Output.Write(msg.buffer.Memory.Span); + return true; + } + finally + { + msg.buffer.Dispose(); + } + } } public MqttProtocolVersion ProtocolVersion { get; } @@ -79,30 +96,28 @@ async Task CloseFn() public ConnectionStatus Status { get; private set; } = ConnectionStatus.NotStarted; public Task WhenTerminated => _terminationSource.Task; - + private readonly TaskCompletionSource _waitForPendingWrites = new(); public Task WaitForPendingWrites => _waitForPendingWrites.Task; - public async Task CloseAsync(CancellationToken ct = default) { Status = ConnectionStatus.Disconnected; - _writesToTransport.Writer.TryComplete(); + await _application.Output.CompleteAsync(); + await _transport.Output.CompleteAsync(); await _waitForPendingWrites.Task; await _shutdownTokenSource.CancelAsync(); - _readsFromTransport.Writer.TryComplete(); _terminationSource.TrySetResult(DisconnectReasonCode.NormalDisconnection); return true; } - public Task AbortAsync(CancellationToken ct = default) + public async Task AbortAsync(CancellationToken ct = default) { Status = ConnectionStatus.Disconnected; - _writesToTransport.Writer.TryComplete(); - _readsFromTransport.Writer.TryComplete(); + await _application.Output.CompleteAsync(); + await _transport.Output.CompleteAsync(); _terminationSource.TrySetResult(DisconnectReasonCode.UnspecifiedError); - return Task.CompletedTask; } public Task ConnectAsync(CancellationToken ct = default) @@ -110,7 +125,7 @@ public Task ConnectAsync(CancellationToken ct = default) if (Status == ConnectionStatus.NotStarted) { Status = ConnectionStatus.Connected; - + #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed DoByteWritesAsync(_shutdownTokenSource.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed @@ -127,31 +142,32 @@ public Task ConnectAsync(CancellationToken ct = default) private async Task DoByteWritesAsync(CancellationToken ct) { Log.Debug("Starting to read from transport."); - while(!_writesToTransport.Reader.Completion.IsCompleted) + while (ct.IsCancellationRequested == false) { - if (!_writesToTransport.Reader.TryRead(out var msg)) + if (!_transport.Input.TryRead(out var msg)) { try { - await _writesToTransport.Reader.WaitToReadAsync(ct); + msg = await _transport.Input.ReadAsync(ct); } - catch(OperationCanceledException) + catch (OperationCanceledException) { - } - continue; } - try + var buffer = msg.Buffer; + Log.Debug("Received {0} bytes from transport.", buffer.Length); + + if (!buffer.IsEmpty) { - ReadOnlyMemory buffer = msg.buffer.Memory.Slice(0, msg.readableBytes); - Log.Debug("Received {0} bytes from transport.", buffer.Length); - _serverHandle.HandleBytes(buffer); + // TODO: eliminate copying + var newMem = new ReadOnlyMemory(buffer.ToArray()); + _serverHandle.HandleBytes(newMem); } - finally + + if (msg.IsCompleted) { - // have to free the shared buffer - msg.buffer.Dispose(); + break; } } @@ -160,6 +176,5 @@ private async Task DoByteWritesAsync(CancellationToken ct) } public int MaxFrameSize { get; } - public IDuplexChannel Channel { get; } - + public IDuplexPipe Channel { get; } } \ No newline at end of file diff --git a/src/TurboMqtt/IO/Internal/DuplexPipe.cs b/src/TurboMqtt/IO/Internal/DuplexPipe.cs new file mode 100644 index 00000000..9d6293af --- /dev/null +++ b/src/TurboMqtt/IO/Internal/DuplexPipe.cs @@ -0,0 +1,46 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2024 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System.IO.Pipelines; + +namespace TurboMqtt.IO.Internal; + +internal sealed class DuplexPipe : IDuplexPipe +{ + public DuplexPipe(PipeReader reader, PipeWriter writer) + { + Input = reader; + Output = writer; + } + + public PipeReader Input { get; } + + public PipeWriter Output { get; } + + public static DuplexPipePair CreateConnectionPair(PipeOptions inputOptions, PipeOptions outputOptions) + { + var input = new Pipe(inputOptions); + var output = new Pipe(outputOptions); + + var transportToApplication = new DuplexPipe(output.Reader, input.Writer); + var applicationToTransport = new DuplexPipe(input.Reader, output.Writer); + + return new DuplexPipePair(applicationToTransport, transportToApplication); + } + + // This class exists to work around issues with value tuple on .NET Framework + public readonly struct DuplexPipePair + { + public IDuplexPipe Transport { get; } + public IDuplexPipe Application { get; } + + public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application) + { + Transport = transport; + Application = application; + } + } +} \ No newline at end of file diff --git a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs b/src/TurboMqtt/IO/Internal/DuplexPipeStream.cs similarity index 77% rename from src/TurboMqtt/IO/Internal/DuplexChannelStream.cs rename to src/TurboMqtt/IO/Internal/DuplexPipeStream.cs index eb59c09c..c05a6dab 100644 --- a/src/TurboMqtt/IO/Internal/DuplexChannelStream.cs +++ b/src/TurboMqtt/IO/Internal/DuplexPipeStream.cs @@ -7,10 +7,59 @@ using System.Buffers; using System.IO.Pipelines; using System.Runtime.CompilerServices; -using System.Threading.Channels; namespace TurboMqtt.IO.Internal; +/// +/// A helper for wrapping a Stream decorator from an . +/// +/// +internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream +{ + private bool _disposed; + private readonly object _disposeLock = new object(); + + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : + this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) + { + } + + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : + base(duplexPipe.Input, duplexPipe.Output) + { + var stream = createStream(this); + Stream = stream; + Input = PipeReader.Create(stream, readerOptions); + Output = PipeWriter.Create(stream, writerOptions); + } + + public TStream Stream { get; } + + public PipeReader Input { get; } + + public PipeWriter Output { get; } + + public override async ValueTask DisposeAsync() + { + lock (_disposeLock) + { + if (_disposed) + { + return; + } + _disposed = true; + } + + await Input.CompleteAsync(); + await Output.CompleteAsync(); + } + + protected override void Dispose(bool disposing) + { + throw new NotSupportedException(); + } +} + internal class DuplexPipeStream : Stream { private readonly PipeReader _input; diff --git a/src/TurboMqtt/IO/MqttTransport.cs b/src/TurboMqtt/IO/MqttTransport.cs index f7d9f45a..8eedcb83 100644 --- a/src/TurboMqtt/IO/MqttTransport.cs +++ b/src/TurboMqtt/IO/MqttTransport.cs @@ -5,6 +5,7 @@ // ----------------------------------------------------------------------- using System.Buffers; +using System.IO.Pipelines; using System.Threading.Channels; using Akka.Event; using TurboMqtt.PacketTypes; @@ -91,7 +92,7 @@ internal interface IMqttTransport /// /// Contains a reader and a writer used to send and receive data over the transport. /// - public IDuplexChannel Channel { get; } + public IDuplexPipe Channel { get; } // /// // /// Used to write data to the underlying transport. From 80414ee1d19617d7ae8db320e28bf7e7c4572ce1 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 14:41:56 -0500 Subject: [PATCH 10/26] fixed TCP and In memory transports --- .../IO/InMem/InMemoryMqttTransport.cs | 13 +- src/TurboMqtt/IO/Tcp/TcpTransport.cs | 5 +- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 182 +++++++++--------- 3 files changed, 99 insertions(+), 101 deletions(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index 7342afe4..9c034f84 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -105,7 +105,7 @@ public async Task CloseAsync(CancellationToken ct = default) { Status = ConnectionStatus.Disconnected; await _application.Output.CompleteAsync(); - await _transport.Output.CompleteAsync(); + await _transport.Input.CompleteAsync(); await _waitForPendingWrites.Task; await _shutdownTokenSource.CancelAsync(); _terminationSource.TrySetResult(DisconnectReasonCode.NormalDisconnection); @@ -116,7 +116,7 @@ public async Task AbortAsync(CancellationToken ct = default) { Status = ConnectionStatus.Disconnected; await _application.Output.CompleteAsync(); - await _transport.Output.CompleteAsync(); + await _transport.Input.CompleteAsync(); _terminationSource.TrySetResult(DisconnectReasonCode.UnspecifiedError); } @@ -160,9 +160,12 @@ private async Task DoByteWritesAsync(CancellationToken ct) if (!buffer.IsEmpty) { - // TODO: eliminate copying - var newMem = new ReadOnlyMemory(buffer.ToArray()); - _serverHandle.HandleBytes(newMem); + var seqPosition = buffer.Start; + while (buffer.TryGet(ref seqPosition, out var memory)) + { + _serverHandle.HandleBytes(memory); + } + } if (msg.IsCompleted) diff --git a/src/TurboMqtt/IO/Tcp/TcpTransport.cs b/src/TurboMqtt/IO/Tcp/TcpTransport.cs index ff8ed311..df51dd84 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransport.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransport.cs @@ -5,6 +5,7 @@ // ----------------------------------------------------------------------- using System.Buffers; +using System.IO.Pipelines; using System.Threading.Channels; using Akka.Actor; using Akka.Event; @@ -54,7 +55,7 @@ internal TcpTransport(ILoggingAdapter log, TcpTransportActor.ConnectionState sta Log = log; State = state; _connectionActor = connectionActor; - Channel = new DuplexChannel(State.Writer, State.Reader); + Channel = state.Pipe; MaxFrameSize = state.MaxFrameSize; } @@ -93,5 +94,5 @@ public async Task ConnectAsync(CancellationToken ct = default) } public int MaxFrameSize { get; } - public IDuplexChannel Channel { get; } + public IDuplexPipe Channel { get; } } \ No newline at end of file diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index 64333fd6..5f527ad1 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -11,7 +11,9 @@ using System.Threading.Channels; using Akka.Actor; using Akka.Event; +using Akka.Streams.Implementation.Fusing; using TurboMqtt.Client; +using TurboMqtt.IO.Internal; using TurboMqtt.PacketTypes; using TurboMqtt.Protocol; using Debug = System.Diagnostics.Debug; @@ -33,21 +35,22 @@ internal sealed class TcpTransportActor : UntypedActor /// public sealed class ConnectionState { - public ConnectionState(ChannelWriter<(IMemoryOwner buffer, int readableBytes)> writer, - ChannelReader<(IMemoryOwner buffer, int readableBytes)> reader, + public ConnectionState(IDuplexPipe dataPipes, Task whenTerminated, int maxFrameSize, Task waitForPendingWrites) { - Writer = writer; - Reader = reader; + Pipe = dataPipes; WhenTerminated = whenTerminated; MaxFrameSize = maxFrameSize; WaitForPendingWrites = waitForPendingWrites; } - + private volatile ConnectionStatus _status = ConnectionStatus.NotStarted; - public ConnectionStatus Status { get => _status; - set => _status = value; } + public ConnectionStatus Status + { + get => _status; + set => _status = value; + } public CancellationTokenSource ShutDownCts { get; set; } = new(); @@ -57,12 +60,14 @@ public ConnectionState(ChannelWriter<(IMemoryOwner buffer, int readableByt public Task WaitForPendingWrites { get; } - public ChannelWriter<(IMemoryOwner buffer, int readableBytes)> Writer { get; } + public IDuplexPipe Pipe { get; } + + public PipeWriter Writer => Pipe.Output; /// /// Used to read data from the underlying transport. /// - public ChannelReader<(IMemoryOwner buffer, int readableBytes)> Reader { get; } + public PipeReader Reader => Pipe.Input; } /// @@ -111,27 +116,29 @@ public sealed record ConnectionUnexpectedlyClosed(DisconnectReasonCode Reason, s private Socket? _tcpClient; - private readonly Channel<(IMemoryOwner buffer, int readableBytes)> _writesToTransport = - Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); + private readonly IDuplexPipe _transport; - private readonly Channel<(IMemoryOwner buffer, int readableBytes)> _readsFromTransport = - Channel.CreateUnbounded<(IMemoryOwner buffer, int readableBytes)>(); + private readonly IDuplexPipe _application; private readonly TaskCompletionSource _whenTerminated = new(); + private readonly TaskCompletionSource _writingCompleted = new(); private readonly ILoggingAdapter _log = Context.GetLogger(); - private readonly Pipe _pipe; - public TcpTransportActor(MqttClientTcpOptions tcpOptions) { TcpOptions = tcpOptions; MaxFrameSize = tcpOptions.MaxFrameSize; - State = new ConnectionState(_writesToTransport.Writer, _readsFromTransport.Reader, _whenTerminated.Task, - MaxFrameSize, _writesToTransport.Reader.Completion); // we signal completion when _writesToTransport is done + var pipeOptions = new PipeOptions(pauseWriterThreshold: ScaleBufferSize(MaxFrameSize), + resumeWriterThreshold: ScaleBufferSize(MaxFrameSize) / 2, + useSynchronizationContext: false); + var pipes = DuplexPipe.CreateConnectionPair(pipeOptions, pipeOptions); + + _transport = pipes.Transport; + _application = pipes.Application; - _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: ScaleBufferSize(MaxFrameSize), resumeWriterThreshold: ScaleBufferSize(MaxFrameSize) / 2, - useSynchronizationContext: false)); + State = new ConnectionState(pipes.Application, _whenTerminated.Task, + MaxFrameSize, _writingCompleted.Task); // we signal completion when _writesToTransport is done } /* @@ -140,7 +147,7 @@ public TcpTransportActor(MqttClientTcpOptions tcpOptions) * Connecting --> DoConnect --> Connecting (already connecting) --> ConnectResult (Connected) BECOME Running * Running --> DoWriteToPipeAsync --> Running (read data from socket) --> DoWriteToSocketAsync --> Running (write data to socket) */ - + /// /// Performs the max buffer size scaling for the socket. /// @@ -150,11 +157,11 @@ internal static int ScaleBufferSize(int maxFrameSize) // if the max frame size is under 128kb, scale it up to 512kb if (maxFrameSize <= 128 * 1024) return 512 * 1024; - + // between 128kb and 1mb, scale it up to 2mb if (maxFrameSize <= 1024 * 1024) return 2 * 1024 * 1024; - + // if the max frame size is above 1mb, 2x it return maxFrameSize * 2; } @@ -244,7 +251,7 @@ private void TransportCreated(object message) _log.Info("Attempting to connect to [{0}:{1}]", TcpOptions.Host, TcpOptions.Port); var sender = Sender; - + // set status to connecting State.Status = ConnectionStatus.Connecting; @@ -264,7 +271,7 @@ async Task ResolveAndConnect(CancellationToken ct) await DoConnectAsync(resolved, TcpOptions.Port, sender, ct).ConfigureAwait(false); } }); - + break; } case DoConnect: @@ -303,46 +310,62 @@ private void BecomeRunning() Become(Running); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - DoWriteToPipeAsync(State.ShutDownCts.Token); - ReadFromPipeAsync(State.ShutDownCts.Token); + DoWriteToOutputPipeAsync(State.ShutDownCts.Token); DoWriteToSocketAsync(State.ShutDownCts.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } private async Task DoWriteToSocketAsync(CancellationToken ct) { - while (!_writesToTransport.Reader.Completion.IsCompleted) + while (!ct.IsCancellationRequested) { try { - while (await _writesToTransport.Reader.WaitToReadAsync(ct).ConfigureAwait(false)) - while (_writesToTransport.Reader.TryRead(out var item)) + if (!_transport.Input.TryRead(out var readResult)) { - var (buffer, readableBytes) = item; try { - var workingBuffer = buffer.Memory; - while (readableBytes > 0 && _tcpClient is { Connected: true }) + readResult = await _transport.Input.ReadAsync(ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + + + var buffer = readResult.Buffer; + + var totalSent = 0; + try + { + var seqPosition = buffer.Start; + + while (buffer.TryGet(ref seqPosition, out var workingBuffer) && _tcpClient is { Connected: true }) + { + var sent = await _tcpClient!.SendAsync(workingBuffer, ct) + .ConfigureAwait(false); + + totalSent += sent; + + if (sent == 0) { - var sent = await _tcpClient!.SendAsync(workingBuffer.Slice(0, readableBytes), ct) - .ConfigureAwait(false); - if (sent == 0) - { - _log.Warning("Failed to write to socket - no bytes written."); - _closureSelf.Tell(ReadFinished.Instance); - goto WritesFinished; - } - - readableBytes -= sent; - workingBuffer = workingBuffer.Slice(sent); + _log.Warning("Failed to write to socket - no bytes written."); + _closureSelf.Tell(ReadFinished.Instance); + goto WritesFinished; } } - finally + + if (readResult.IsCompleted) { - // free the pooled buffer - buffer.Dispose(); + // reader is done reading + break; } } + finally + { + // free the pooled buffer + _log.Debug("Sent {0} bytes to socket.", totalSent); + } } catch (OperationCanceledException) { @@ -364,14 +387,16 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) } WritesFinished: - _writesToTransport.Writer.TryComplete(); // can't write anymore either + await _application.Output.CompleteAsync(); // can't write anymore either + await _transport.Output.CompleteAsync(); + _writingCompleted.TrySetResult(); // signal that we are done writing } - private async Task DoWriteToPipeAsync(CancellationToken ct) + private async Task DoWriteToOutputPipeAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { - var memory = _pipe.Writer.GetMemory(TcpOptions.MaxFrameSize / 4); + var memory = _transport.Output.GetMemory(TcpOptions.MaxFrameSize / 4); try { int bytesRead = await _tcpClient!.ReceiveAsync(memory, SocketFlags.None, ct); @@ -384,7 +409,7 @@ private async Task DoWriteToPipeAsync(CancellationToken ct) return; } - _pipe.Writer.Advance(bytesRead); + _transport.Output.Advance(bytesRead); } catch (OperationCanceledException) { @@ -404,7 +429,7 @@ private async Task DoWriteToPipeAsync(CancellationToken ct) } // make data available to PipeReader - var result = await _pipe.Writer.FlushAsync(ct); + var result = await _transport.Output.FlushAsync(ct); if (result.IsCompleted) { _closureSelf.Tell(ReadFinished.Instance); @@ -412,40 +437,8 @@ private async Task DoWriteToPipeAsync(CancellationToken ct) } } - await _pipe.Writer.CompleteAsync(); - } - - private async Task ReadFromPipeAsync(CancellationToken ct) - { - while (!ct.IsCancellationRequested) - { - try - { - var result = await _pipe.Reader.ReadAsync(ct); - var buffer = result.Buffer; - - // consume this entire sequence by copying it into a new buffer - // have to copy because there's no guarantee we can safely release a shared buffer - // once we hand the message over to the end-user. - var newMemory = new Memory(new byte[buffer.Length]); - var unshared = new UnsharedMemoryOwner(newMemory); - buffer.CopyTo(newMemory.Span); - _readsFromTransport.Writer.TryWrite((unshared, newMemory.Length)); - - // tell the pipe we're done with this data - _pipe.Reader.AdvanceTo(buffer.End); - - if (result.IsCompleted) - { - _closureSelf.Tell(ReadFinished.Instance); - return; - } - } - catch (OperationCanceledException) - { - _closureSelf.Tell(ReadFinished.Instance); - } - } + // we are done reading from the transport + await _transport.Output.CompleteAsync(); } private void Running(object message) @@ -476,12 +469,13 @@ private void Running(object message) private async Task CleanUpGracefully(bool waitOnReads = false) { // add a simulated DisconnectPacket to help ensure the stream gets terminated - _readsFromTransport.Writer.TryWrite(DisconnectToBinary.NormalDisconnectPacket.ToBinary(MqttProtocolVersion.V3_1_1)); + // _readsFromTransport.Writer.TryWrite( + // DisconnectToBinary.NormalDisconnectPacket.ToBinary(MqttProtocolVersion.V3_1_1)); State.Status = ConnectionStatus.Disconnected; - + // no more writes to transport - _writesToTransport.Writer.TryComplete(); + await _transport.Input.CompleteAsync(); // wait for any pending writes to finish await State.WaitForPendingWrites; @@ -489,11 +483,11 @@ private async Task CleanUpGracefully(bool waitOnReads = false) if (waitOnReads) { // wait for any reads to finish (should be terminated by Akka.Streams once the `DisconnectPacket` is processed.) - await _readsFromTransport.Reader.Completion; + await _application.Input.CompleteAsync(); } else // if we're not waiting on reads, just complete the reader { - _readsFromTransport.Writer.TryComplete(); + await _application.Output.CompleteAsync(); } _closureSelf.Tell(PoisonPill.Instance); @@ -513,8 +507,8 @@ private void DisposeSocket(ConnectionStatus newStatus) // stop reading from the socket State.ShutDownCts.Cancel(); - _pipe.Reader.Complete(); - _pipe.Writer.Complete(); + _transport.Input.Complete(); + _transport.Output.Complete(); _tcpClient?.Close(); _tcpClient?.Dispose(); } @@ -534,8 +528,8 @@ private void DisposeSocket(ConnectionStatus newStatus) private void FullShutdown(DisconnectReasonCode reason = DisconnectReasonCode.NormalDisconnection) { // mark the channels as complete (should have already been done by the time we get here, but doesn't hurt) - _writesToTransport.Writer.TryComplete(); - _readsFromTransport.Writer.TryComplete(); + _application.Output.Complete(); + _transport.Output.Complete(); var newStatus = reason switch { From b2f76bf483c84aaf3834ca620a0a27808f0a7a7a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 15:03:38 -0500 Subject: [PATCH 11/26] fixed all compilation errors --- src/TurboMqtt/Client/ClientStreamInstance.cs | 1 - src/TurboMqtt/Streams/MqttClientStreams.cs | 22 +++++++------------- src/TurboMqtt/Streams/MqttDecodingFlows.cs | 7 +++++++ src/TurboMqtt/Streams/MqttEncoderSink.cs | 1 + src/TurboMqtt/Streams/MqttEncodingFlows.cs | 19 +++++++++++++++++ 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/TurboMqtt/Client/ClientStreamInstance.cs b/src/TurboMqtt/Client/ClientStreamInstance.cs index 0d88c506..bd1ea24f 100644 --- a/src/TurboMqtt/Client/ClientStreamInstance.cs +++ b/src/TurboMqtt/Client/ClientStreamInstance.cs @@ -145,7 +145,6 @@ private static (Source inbound, Sink var outboundMessages = MqttClientStreams.Mqtt311OutboundPacketSink( clientConnectOptions.ClientId, transport, - MemoryPool.Shared, maxFrameSize, (int)clientConnectOptions.MaximumPacketSize, clientConnectOptions.EnableOpenTelemetry); diff --git a/src/TurboMqtt/Streams/MqttClientStreams.cs b/src/TurboMqtt/Streams/MqttClientStreams.cs index b5b2b778..9f69b93f 100644 --- a/src/TurboMqtt/Streams/MqttClientStreams.cs +++ b/src/TurboMqtt/Streams/MqttClientStreams.cs @@ -21,20 +21,18 @@ namespace TurboMqtt.Streams; internal static class MqttClientStreams { public static Sink Mqtt311OutboundPacketSink(string clientId, IMqttTransport transport, - MemoryPool memoryPool, int maxFrameSize, int maxPacketSize, bool withTelemetry = true) + int maxFrameSize, int maxPacketSize, bool withTelemetry = true) { - var finalSink = Sink.FromWriter(transport.Channel.Writer, true); + var finalSink = MqttEncodingFlows.Mqtt311EncodingSink(transport.Channel.Output, maxFrameSize, maxPacketSize); if (withTelemetry) return Flow.Create() .Via(OpenTelemetryFlows.MqttPacketRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, OpenTelemetrySupport.Direction.Outbound)) - .Via(MqttEncodingFlows.Mqtt311Encoding(memoryPool, maxFrameSize, maxPacketSize)) - .Via(OpenTelemetryFlows.MqttBitRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, - OpenTelemetrySupport.Direction.Outbound)) + // .Via(OpenTelemetryFlows.MqttBitRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, + // OpenTelemetrySupport.Direction.Outbound)) .To(finalSink); return Flow.Create() - .Via(MqttEncodingFlows.Mqtt311Encoding(memoryPool, maxFrameSize, maxPacketSize)) .To(finalSink); } @@ -44,10 +42,7 @@ public static Source Mqtt311InboundMessageSource(string cl { if (withTelemetry) - return (ChannelSource.FromReader(transport.Channel.Reader) - .Via(OpenTelemetryFlows.MqttBitRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, - OpenTelemetrySupport.Direction.Inbound)) - .Via(MqttDecodingFlows.Mqtt311Decoding()) + return MqttDecodingFlows.Mqtt311DecoderSource(transport.Channel.Input) .Async() .Via(OpenTelemetryFlows.MqttMultiPacketRateTelemetryFlow(MqttProtocolVersion.V3_1_1, clientId, OpenTelemetrySupport.Direction.Inbound)) @@ -55,14 +50,13 @@ public static Source Mqtt311InboundMessageSource(string cl actors, disconnectPromise)) .Async() .Where(c => c.PacketType == MqttPacketType.Publish) - .Select(c => ((PublishPacket)c).FromPacket())); + .Select(c => ((PublishPacket)c).FromPacket()); - return (ChannelSource.FromReader(transport.Channel.Reader) - .Via(MqttDecodingFlows.Mqtt311Decoding()) + return MqttDecodingFlows.Mqtt311DecoderSource(transport.Channel.Input) .Async() .Via(MqttReceiverFlows.ClientAckingFlow(outboundPackets, actors, disconnectPromise)) .Async() .Where(c => c.PacketType == MqttPacketType.Publish) - .Select(c => ((PublishPacket)c).FromPacket())); + .Select(c => ((PublishPacket)c).FromPacket()); } } \ No newline at end of file diff --git a/src/TurboMqtt/Streams/MqttDecodingFlows.cs b/src/TurboMqtt/Streams/MqttDecodingFlows.cs index f8b3cd7c..bb53d82c 100644 --- a/src/TurboMqtt/Streams/MqttDecodingFlows.cs +++ b/src/TurboMqtt/Streams/MqttDecodingFlows.cs @@ -6,6 +6,7 @@ using System.Buffers; using System.Collections.Immutable; +using System.IO.Pipelines; using Akka; using Akka.Event; using Akka.Streams; @@ -37,6 +38,12 @@ public static class MqttDecodingFlows return g; } + + public static Source, NotUsed> Mqtt311DecoderSource(PipeReader reader) + { + var g = new MqttDecoderSource(reader); + return Source.FromGraph(g); + } } /// diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index 793492fc..d0bd74ce 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -6,6 +6,7 @@ using System.Buffers; using System.IO.Pipelines; +using Akka; using Akka.Event; using Akka.Streams; using Akka.Streams.Implementation.Stages; diff --git a/src/TurboMqtt/Streams/MqttEncodingFlows.cs b/src/TurboMqtt/Streams/MqttEncodingFlows.cs index 0b87d9c5..19000c52 100644 --- a/src/TurboMqtt/Streams/MqttEncodingFlows.cs +++ b/src/TurboMqtt/Streams/MqttEncodingFlows.cs @@ -5,6 +5,7 @@ // ----------------------------------------------------------------------- using System.Buffers; +using System.IO.Pipelines; using Akka; using Akka.Event; using Akka.Streams; @@ -44,6 +45,24 @@ public static class MqttEncodingFlows return g; } + + public static IGraph, NotUsed> Mqtt311EncodingSink( + PipeWriter writer, int maxFrameSize, int maxPacketSize) + { + var g = Flow.Create() + .Select(c => (c, MqttPacketSizeEstimator.EstimateMqtt3PacketSize(c))) + .Via(new PacketSizeFilter( + maxPacketSize)) // drops any packets bigger than the maximum frame size with a warning (accounts for header too) + .BatchWeighted(maxFrameSize, tuple => tuple.Item2.TotalSize, tuple => new List<(MqttPacket packet, PacketSize readableBytes)>(){ tuple }, + (list, tuple) => + { + list.Add(tuple); + return list; + }) // group packets into a single frame + .To(new MqttEncoderSink(writer)); + + return g; + } } /// From 53de9779e7ecccf064fbd4d37bac08a8a26dfc3b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 15:08:48 -0500 Subject: [PATCH 12/26] fixed NotImplementedException --- src/TurboMqtt/Streams/MqttDecoderSource.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index 27d3d5ad..94ebf6bc 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -37,7 +37,7 @@ public MqttDecoderSource(PipeReader reader, MqttProtocolVersion protocolVersion protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) { - throw new NotImplementedException(); + return new Logic(this); } private class Logic : OutGraphStageLogic From 056defe2f185b3c0304d66d6d446012c541aa055 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 15:24:36 -0500 Subject: [PATCH 13/26] fixing some issues with concurrent read / write --- .../IO/InMem/InMemoryMqttTransport.cs | 38 +++++++++---------- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 11 +----- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index 9c034f84..5e21e822 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -62,7 +62,7 @@ public InMemoryMqttTransport(int maxFrameSize, ILoggingAdapter log, MqttProtocol var pipes = DuplexPipe.CreateConnectionPair(pipeOptions, pipeOptions); _transport = pipes.Transport; _application = pipes.Application; - Channel = _transport; + Channel = _application; _serverHandle = protocolVersion switch { @@ -144,32 +144,32 @@ private async Task DoByteWritesAsync(CancellationToken ct) Log.Debug("Starting to read from transport."); while (ct.IsCancellationRequested == false) { - if (!_transport.Input.TryRead(out var msg)) + try { - try - { - msg = await _transport.Input.ReadAsync(ct); - } - catch (OperationCanceledException) + var msg = await _transport.Input.ReadAsync(ct); + + var buffer = msg.Buffer; + Log.Debug("Received {0} bytes from transport.", buffer.Length); + + if (!buffer.IsEmpty) { + var seqPosition = buffer.Start; + while (buffer.TryGet(ref seqPosition, out var memory)) + { + _serverHandle.HandleBytes(memory); + } + } - } - - var buffer = msg.Buffer; - Log.Debug("Received {0} bytes from transport.", buffer.Length); - if (!buffer.IsEmpty) - { - var seqPosition = buffer.Start; - while (buffer.TryGet(ref seqPosition, out var memory)) + if (msg.IsCompleted) { - _serverHandle.HandleBytes(memory); + break; } - } - - if (msg.IsCompleted) + catch (Exception exception) { + Log.Error(exception, "Error reading from transport."); + _transport.Input.CancelPendingRead(); break; } } diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index 5f527ad1..0a73815f 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -321,16 +321,7 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) { try { - if (!_transport.Input.TryRead(out var readResult)) - { - try - { - readResult = await _transport.Input.ReadAsync(ct).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - } - } + var readResult = await _transport.Input.ReadAsync(ct).ConfigureAwait(false); var buffer = readResult.Buffer; From 7585fda2431a7cc94ba576ef2e778a0ef2b625fc Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 15:44:12 -0500 Subject: [PATCH 14/26] working on fixes related to Pipe buffer movement --- src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs | 1 + src/TurboMqtt/Streams/MqttDecoderSource.cs | 1 + src/TurboMqtt/Streams/MqttEncoderSink.cs | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index 5e21e822..78b124fd 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -157,6 +157,7 @@ private async Task DoByteWritesAsync(CancellationToken ct) while (buffer.TryGet(ref seqPosition, out var memory)) { _serverHandle.HandleBytes(memory); + _transport.Input.AdvanceTo(seqPosition); } } diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index 94ebf6bc..707df41f 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -104,6 +104,7 @@ private void PushReadBytes(in ReadOnlySequence buffer) // once we hand the message over to the end-user. var newMemory = new Memory(new byte[buffer.Length]); buffer.CopyTo(newMemory.Span); + _pipeReader.AdvanceTo(buffer.End); if (_mqtt311Decoder.TryDecode(newMemory, out var decoded)) { diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index d0bd74ce..9f1337d6 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -90,7 +90,8 @@ public override void OnPush() async Task DoFlush() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - _flushCallback(await _pipeWriter.WriteAsync(buffer, cts.Token)); + _pipeWriter.Advance(totalBytes); + _flushCallback(await _pipeWriter.FlushAsync(cts.Token)); } } From 4c83178b85f47394b738b9fbd1249e6a21ecfe11 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 16:07:38 -0500 Subject: [PATCH 15/26] have some connect and disconnect packets working --- src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs | 11 ++++++----- src/TurboMqtt/Streams/MqttDecoderSource.cs | 5 +++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index 78b124fd..eb8bd8a2 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -80,7 +80,8 @@ bool PushFn((IMemoryOwner buffer, int estimatedSize) msg) { try { - _application.Output.Write(msg.buffer.Memory.Span); + _transport.Output.Write(msg.buffer.Memory.Span); + _ = _transport.Output.FlushAsync(); return true; } finally @@ -105,7 +106,7 @@ public async Task CloseAsync(CancellationToken ct = default) { Status = ConnectionStatus.Disconnected; await _application.Output.CompleteAsync(); - await _transport.Input.CompleteAsync(); + await _transport.Output.CompleteAsync(); await _waitForPendingWrites.Task; await _shutdownTokenSource.CancelAsync(); _terminationSource.TrySetResult(DisconnectReasonCode.NormalDisconnection); @@ -116,7 +117,7 @@ public async Task AbortAsync(CancellationToken ct = default) { Status = ConnectionStatus.Disconnected; await _application.Output.CompleteAsync(); - await _transport.Input.CompleteAsync(); + await _transport.Output.CompleteAsync(); _terminationSource.TrySetResult(DisconnectReasonCode.UnspecifiedError); } @@ -157,9 +158,9 @@ private async Task DoByteWritesAsync(CancellationToken ct) while (buffer.TryGet(ref seqPosition, out var memory)) { _serverHandle.HandleBytes(memory); - _transport.Input.AdvanceTo(seqPosition); + var nextPost = buffer.GetPosition(memory.Length); + _transport.Input.AdvanceTo(seqPosition, nextPost); } - } if (msg.IsCompleted) diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index 707df41f..b7076c23 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -75,7 +75,8 @@ public override void OnPull() { async Task WaitForRead() { - _onReadReady(await continuation); + var r = await continuation; + _onReadReady(r); } _ = WaitForRead(); @@ -104,7 +105,7 @@ private void PushReadBytes(in ReadOnlySequence buffer) // once we hand the message over to the end-user. var newMemory = new Memory(new byte[buffer.Length]); buffer.CopyTo(newMemory.Span); - _pipeReader.AdvanceTo(buffer.End); + _pipeReader.AdvanceTo(buffer.Start, buffer.End); if (_mqtt311Decoder.TryDecode(newMemory, out var decoded)) { From b936d323c87a3fefdce55d1ff09ae5b8d1b60b9a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 16:24:10 -0500 Subject: [PATCH 16/26] connect and reconnect now work --- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index 0a73815f..34e542fa 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -337,7 +337,8 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) .ConfigureAwait(false); totalSent += sent; - + var position = buffer.GetPosition(workingBuffer.Length); + _transport.Input.AdvanceTo(seqPosition, position); if (sent == 0) { _log.Warning("Failed to write to socket - no bytes written."); From 4fa9b43886f398a7b0201a19415b4070bccd3246 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 16:26:46 -0500 Subject: [PATCH 17/26] fixed MqttEncoderSink --- src/TurboMqtt/Streams/MqttEncoderSink.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index 9f1337d6..348e777c 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -85,6 +85,7 @@ public override void OnPush() Log.Debug("Encoded {0} messages using {1} bytes", packets.Count, bytesWritten); DoFlush().GetAwaiter().GetResult(); + Pull(_graphStage.In); return; async Task DoFlush() From 0f39fa87625ba584c4f229607689939f44d8b556 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 16:53:48 -0500 Subject: [PATCH 18/26] buffer is not getting cleared in the MqttDecoderSource --- .../IO/InMem/InMemoryMqttTransport.cs | 1 + src/TurboMqtt/Streams/MqttDecoderSource.cs | 22 ++++++++++++------- src/TurboMqtt/Streams/MqttEncoderSink.cs | 13 ++++------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index eb8bd8a2..c68f6f57 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -86,6 +86,7 @@ bool PushFn((IMemoryOwner buffer, int estimatedSize) msg) } finally { + _waitForPendingWrites.TrySetResult(true); msg.buffer.Dispose(); } } diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index b7076c23..a54cb09f 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -79,7 +79,7 @@ async Task WaitForRead() _onReadReady(r); } - _ = WaitForRead(); + WaitForRead().GetAwaiter().GetResult(); } } } @@ -103,14 +103,20 @@ private void PushReadBytes(in ReadOnlySequence buffer) // consume this entire sequence by copying it into a new buffer // have to copy because there's no guarantee we can safely release a shared buffer // once we hand the message over to the end-user. - var newMemory = new Memory(new byte[buffer.Length]); - buffer.CopyTo(newMemory.Span); - _pipeReader.AdvanceTo(buffer.Start, buffer.End); - - if (_mqtt311Decoder.TryDecode(newMemory, out var decoded)) + // var newMemory = new Memory(new byte[buffer.Length]); + // buffer.CopyTo(newMemory.Span); + // _pipeReader.AdvanceTo(buffer.End); + + var seqPosition = buffer.Start; + while (buffer.TryGet(ref seqPosition, out var memory)) { - Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count, newMemory.Length); - Push(_graphStage.Out, decoded); + if (_mqtt311Decoder.TryDecode(memory, out var decoded)) + { + Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count,memory.Length); + Push(_graphStage.Out, decoded); + } + var nextPost = buffer.GetPosition(memory.Length); + _pipeReader.AdvanceTo(seqPosition, nextPost); } } } diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index 348e777c..559a0276 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -84,16 +84,11 @@ public override void OnPush() Log.Debug("Encoded {0} messages using {1} bytes", packets.Count, bytesWritten); - DoFlush().GetAwaiter().GetResult(); + _pipeWriter.Advance(totalBytes); + OnFlushComplete(_pipeWriter.FlushAsync().GetAwaiter().GetResult()); + + Pull(_graphStage.In); - return; - - async Task DoFlush() - { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - _pipeWriter.Advance(totalBytes); - _flushCallback(await _pipeWriter.FlushAsync(cts.Token)); - } } public override void PreStart() From b56569a2222ba40818b308afca82fea8be63f34a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 17:03:17 -0500 Subject: [PATCH 19/26] fixing some Akka.Streams issues --- src/TurboMqtt/IO/FakeServerHandle.cs | 2 +- src/TurboMqtt/Streams/MqttDecoderSource.cs | 2 +- src/TurboMqtt/Streams/MqttEncoderSink.cs | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/TurboMqtt/IO/FakeServerHandle.cs b/src/TurboMqtt/IO/FakeServerHandle.cs index 8bfaa4de..4f468bec 100644 --- a/src/TurboMqtt/IO/FakeServerHandle.cs +++ b/src/TurboMqtt/IO/FakeServerHandle.cs @@ -91,7 +91,7 @@ public virtual void FlushPackets() } else { - Log.Debug("Successfully wrote N packets {0} [{1} bytes] to transport.", _pendingPackets.Count, totalSize); + Log.Debug("Successfully wrote {0} packets [{1} bytes] to transport.", _pendingPackets.Count, totalSize); } bufferPooled.Dispose(); diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index a54cb09f..7a9ba2bf 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -116,7 +116,7 @@ private void PushReadBytes(in ReadOnlySequence buffer) Push(_graphStage.Out, decoded); } var nextPost = buffer.GetPosition(memory.Length); - _pipeReader.AdvanceTo(seqPosition, nextPost); + _pipeReader.AdvanceTo(nextPost); } } } diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index 559a0276..55f395b0 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -87,8 +87,11 @@ public override void OnPush() _pipeWriter.Advance(totalBytes); OnFlushComplete(_pipeWriter.FlushAsync().GetAwaiter().GetResult()); + if(!HasBeenPulled(_graphStage.In)) + Pull(_graphStage.In); - Pull(_graphStage.In); + if(IsClosed(_graphStage.In)) + CompleteStage(); } public override void PreStart() From 40b7736e607617fcd23ebcf7d3bc469e5102388b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 10 Sep 2024 17:06:07 -0500 Subject: [PATCH 20/26] re-arranged closure code --- src/TurboMqtt/Streams/MqttEncoderSink.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index 55f395b0..8b427cbc 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -86,12 +86,15 @@ public override void OnPush() _pipeWriter.Advance(totalBytes); OnFlushComplete(_pipeWriter.FlushAsync().GetAwaiter().GetResult()); + + if (IsClosed(_graphStage.In)) + { + CompleteStage(); + return; + } if(!HasBeenPulled(_graphStage.In)) Pull(_graphStage.In); - - if(IsClosed(_graphStage.In)) - CompleteStage(); } public override void PreStart() From 327c49716780f76cd3f48d4428c67cc611642219 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Sep 2024 07:49:06 -0500 Subject: [PATCH 21/26] improved design of decoder stage --- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 10 +- src/TurboMqtt/Streams/MqttDecoderSource.cs | 106 +++++++++++++-------- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index 34e542fa..00a7f06c 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -337,8 +337,14 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) .ConfigureAwait(false); totalSent += sent; - var position = buffer.GetPosition(workingBuffer.Length); - _transport.Input.AdvanceTo(seqPosition, position); + + if (!readResult.IsCompleted) + { + // can't advance the pipe reader if we're done reading + var position = buffer.GetPosition(workingBuffer.Length); + _transport.Input.AdvanceTo(seqPosition, position); + } + if (sent == 0) { _log.Warning("Failed to write to socket - no bytes written."); diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index 7a9ba2bf..e8791dcc 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -46,7 +46,11 @@ private class Logic : OutGraphStageLogic private readonly MqttProtocolVersion _protocolVersion; private readonly MqttDecoderSource _graphStage; private readonly Mqtt311Decoder _mqtt311Decoder; - private readonly Action _onReadReady; + private readonly Action> _onReadReady; + private readonly CancellationTokenSource _shutdownCts = new(); + + // ImmutableLists are returned by the decoder, and we queue them to preserve order before emission + private ImmutableQueue> _packets = ImmutableQueue>.Empty; public Logic(MqttDecoderSource graphStage) : base(graphStage.Shape) { @@ -54,69 +58,89 @@ public Logic(MqttDecoderSource graphStage) : base(graphStage.Shape) _pipeReader = graphStage._reader; _protocolVersion = graphStage._protocolVersion; _mqtt311Decoder = new Mqtt311Decoder(); - _onReadReady = GetAsyncCallback(HandleReadResult); + _onReadReady = GetAsyncCallback>(HandleReadResult); SetHandler(graphStage.Out, this); } - public override void OnPull() + private async Task ReadLoop() { - if (_pipeReader.TryRead(out var readResult)) + while (!_shutdownCts.IsCancellationRequested) { - HandleReadResult(readResult); - } - else - { - var continuation = _pipeReader.ReadAsync(); - if (continuation.IsCompletedSuccessfully) + var result = await _pipeReader.ReadAsync(_shutdownCts.Token); + + if (result.IsCompleted) + { + CompleteStage(); + } + + if (result.IsCanceled) { - HandleReadResult(continuation.GetAwaiter().GetResult()); + continue; } - else + + var buffer = result.Buffer; + + if (!buffer.IsEmpty) { - async Task WaitForRead() + // consume this entire sequence by copying it into a new buffer + // have to copy because there's no guarantee we can safely release a shared buffer + // once we hand the message over to the end-user. + // var newMemory = new Memory(new byte[buffer.Length]); + // buffer.CopyTo(newMemory.Span); + // _pipeReader.AdvanceTo(buffer.End); + + var seqPosition = buffer.Start; + while (buffer.TryGet(ref seqPosition, out var memory)) { - var r = await continuation; - _onReadReady(r); + if (_mqtt311Decoder.TryDecode(memory, out var decoded)) + { + Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count, memory.Length); + _onReadReady(decoded); + } + var nextPost = buffer.GetPosition(memory.Length); + _pipeReader.AdvanceTo(nextPost); } - - WaitForRead().GetAwaiter().GetResult(); } } } - private void HandleReadResult(ReadResult readResult) + public override void PostStop() { - if (!readResult.Buffer.IsEmpty) - { - PushReadBytes(readResult.Buffer); - } + _shutdownCts.Cancel(); + base.PostStop(); + } - if (readResult.IsCompleted) + public override void PreStart() + { + // start the read loop asynchronously + _ = ReadLoop(); + base.PreStart(); + } + + public override void OnPull() + { + if (_packets.IsEmpty) return; + + // aggregate all the packets we've decoded so far + var builder = ImmutableList.CreateBuilder(); + foreach (var packetSet in _packets) { - // we are done reading - CompleteStage(); + builder.AddRange(packetSet); } + _packets = ImmutableQueue>.Empty; + Push(_graphStage.Out, builder.ToImmutable()); } - private void PushReadBytes(in ReadOnlySequence buffer) + private void HandleReadResult(ImmutableList decoded) { - // consume this entire sequence by copying it into a new buffer - // have to copy because there's no guarantee we can safely release a shared buffer - // once we hand the message over to the end-user. - // var newMemory = new Memory(new byte[buffer.Length]); - // buffer.CopyTo(newMemory.Span); - // _pipeReader.AdvanceTo(buffer.End); - var seqPosition = buffer.Start; - while (buffer.TryGet(ref seqPosition, out var memory)) + if (IsAvailable(_graphStage.Out)) { - if (_mqtt311Decoder.TryDecode(memory, out var decoded)) - { - Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count,memory.Length); - Push(_graphStage.Out, decoded); - } - var nextPost = buffer.GetPosition(memory.Length); - _pipeReader.AdvanceTo(nextPost); + Push(_graphStage.Out, decoded); + } + else + { + _packets = _packets.Enqueue(decoded); } } } From 761bc67b7a335f2166637d99f667c53546a621fd Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Sep 2024 08:06:55 -0500 Subject: [PATCH 22/26] have all in-memory specs passing --- .../IO/InMem/InMemoryMqttTransport.cs | 10 ++++++---- src/TurboMqtt/Streams/MqttDecoderSource.cs | 18 ++++++------------ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs index c68f6f57..9865ef01 100644 --- a/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs +++ b/src/TurboMqtt/IO/InMem/InMemoryMqttTransport.cs @@ -156,11 +156,13 @@ private async Task DoByteWritesAsync(CancellationToken ct) if (!buffer.IsEmpty) { var seqPosition = buffer.Start; - while (buffer.TryGet(ref seqPosition, out var memory)) + var copyBuffer = new ReadOnlyMemory(buffer.ToArray()); + _serverHandle.HandleBytes(copyBuffer); + + if (!msg.IsCompleted) { - _serverHandle.HandleBytes(memory); - var nextPost = buffer.GetPosition(memory.Length); - _transport.Input.AdvanceTo(seqPosition, nextPost); + // we will throw if we try to advance past the end of the buffer + _transport.Input.AdvanceTo(buffer.End); } } diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index e8791dcc..1f23fb75 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -85,20 +85,14 @@ private async Task ReadLoop() // consume this entire sequence by copying it into a new buffer // have to copy because there's no guarantee we can safely release a shared buffer // once we hand the message over to the end-user. - // var newMemory = new Memory(new byte[buffer.Length]); - // buffer.CopyTo(newMemory.Span); - // _pipeReader.AdvanceTo(buffer.End); + var newMemory = new Memory(new byte[buffer.Length]); + buffer.CopyTo(newMemory.Span); + _pipeReader.AdvanceTo(buffer.End); - var seqPosition = buffer.Start; - while (buffer.TryGet(ref seqPosition, out var memory)) + if (_mqtt311Decoder.TryDecode(newMemory, out var decoded)) { - if (_mqtt311Decoder.TryDecode(memory, out var decoded)) - { - Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count, memory.Length); - _onReadReady(decoded); - } - var nextPost = buffer.GetPosition(memory.Length); - _pipeReader.AdvanceTo(nextPost); + Log.Debug("Decoded [{0}] packets totaling [{1}] bytes", decoded.Count, newMemory.Length); + _onReadReady(decoded); } } } From 316dd4aabdb38ed9b555dd5cfd593f7499a7c167 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Sep 2024 08:13:54 -0500 Subject: [PATCH 23/26] need to change how we write to socket --- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index 00a7f06c..b3bf0afc 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -330,9 +330,14 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) try { var seqPosition = buffer.Start; - - while (buffer.TryGet(ref seqPosition, out var workingBuffer) && _tcpClient is { Connected: true }) + var keepDelivering = true; + var remainingLength = buffer.Length; + while (keepDelivering && _tcpClient is { Connected: true }) { + var frameToSend = buffer.IsSingleSegment + ? buffer.First + : buffer.Slice(seqPosition, Math.Min(MaxFrameSize, buffer.Length)).; + var sent = await _tcpClient!.SendAsync(workingBuffer, ct) .ConfigureAwait(false); From 658936617c828b5b7f4baab31c3aa9e0f19dcd63 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Sep 2024 10:11:51 -0500 Subject: [PATCH 24/26] fixed issues with duplicate socket read --- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 51 ++++++++++------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index b3bf0afc..2404981b 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -329,46 +329,41 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) var totalSent = 0; try { - var seqPosition = buffer.Start; - var keepDelivering = true; - var remainingLength = buffer.Length; - while (keepDelivering && _tcpClient is { Connected: true }) + if(_tcpClient is { Connected: true }) { - var frameToSend = buffer.IsSingleSegment - ? buffer.First - : buffer.Slice(seqPosition, Math.Min(MaxFrameSize, buffer.Length)).; - - var sent = await _tcpClient!.SendAsync(workingBuffer, ct) - .ConfigureAwait(false); - - totalSent += sent; - - if (!readResult.IsCompleted) + foreach (var workingBuffer in buffer) { - // can't advance the pipe reader if we're done reading - var position = buffer.GetPosition(workingBuffer.Length); - _transport.Input.AdvanceTo(seqPosition, position); + var sent = await _tcpClient!.SendAsync(workingBuffer, ct) + .ConfigureAwait(false); + + totalSent += sent; + + if (sent == 0) + { + _log.Warning("Failed to write to socket - no bytes written."); + _closureSelf.Tell(ReadFinished.Instance); + goto WritesFinished; + } } - - if (sent == 0) + + if (!readResult.IsCompleted) { - _log.Warning("Failed to write to socket - no bytes written."); - _closureSelf.Tell(ReadFinished.Instance); - goto WritesFinished; + //advance the pipe reader if we're not done reading + _transport.Input.AdvanceTo(buffer.End); } } - - if (readResult.IsCompleted) - { - // reader is done reading - break; - } } finally { // free the pooled buffer _log.Debug("Sent {0} bytes to socket.", totalSent); } + + if (readResult.IsCompleted) + { + // reader is done reading + break; + } } catch (OperationCanceledException) { From a77089428945ccd199f62715b75821b764173837 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Sep 2024 10:41:58 -0500 Subject: [PATCH 25/26] guarantee stream termination --- src/TurboMqtt/Client/ClientStreamInstance.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/TurboMqtt/Client/ClientStreamInstance.cs b/src/TurboMqtt/Client/ClientStreamInstance.cs index bd1ea24f..72283e2e 100644 --- a/src/TurboMqtt/Client/ClientStreamInstance.cs +++ b/src/TurboMqtt/Client/ClientStreamInstance.cs @@ -107,6 +107,13 @@ private void PrepareStream(MqttClientConnectOptions clientConnectOptions, // begin inbound stream inboundStream + .WatchTermination(async (_, task) => + { + await task; + + // guarantee that we kill the client in the event that the stream terminates + disconnectPromise.TrySetResult(DisconnectPacket.Instance); + }) // setting IsOwner to false here is crucial - otherwise, we can't reboot the client after broker disconnects .To(ChannelSink.FromWriter(inboundPackets.Writer, false)) .Run(_materializer); From 51465efc068ed1b97e69fa003399560fff6373a3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 11 Sep 2024 10:54:02 -0500 Subject: [PATCH 26/26] more unclean shutdown --- src/TurboMqtt/IO/Tcp/TcpTransportActor.cs | 19 +++++++++++++------ src/TurboMqtt/Streams/MqttDecoderSource.cs | 3 +++ src/TurboMqtt/Streams/MqttEncoderSink.cs | 11 ++++++++++- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs index 2404981b..2ba0aba8 100644 --- a/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs +++ b/src/TurboMqtt/IO/Tcp/TcpTransportActor.cs @@ -322,22 +322,22 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) try { var readResult = await _transport.Input.ReadAsync(ct).ConfigureAwait(false); - + var buffer = readResult.Buffer; - + var totalSent = 0; try { - if(_tcpClient is { Connected: true }) + if (_tcpClient is { Connected: true }) { foreach (var workingBuffer in buffer) { var sent = await _tcpClient!.SendAsync(workingBuffer, ct) .ConfigureAwait(false); - + totalSent += sent; - + if (sent == 0) { _log.Warning("Failed to write to socket - no bytes written."); @@ -358,13 +358,20 @@ private async Task DoWriteToSocketAsync(CancellationToken ct) // free the pooled buffer _log.Debug("Sent {0} bytes to socket.", totalSent); } - + if (readResult.IsCompleted) { // reader is done reading break; } } + catch (InvalidOperationException) + { + // reads were finished without signaling `.IsCompleted` + _log.Debug("Shutting down write to socket."); + _closureSelf.Tell(ReadFinished.Instance); + goto WritesFinished; + } catch (OperationCanceledException) { // we're being shut down diff --git a/src/TurboMqtt/Streams/MqttDecoderSource.cs b/src/TurboMqtt/Streams/MqttDecoderSource.cs index 1f23fb75..6cda3c44 100644 --- a/src/TurboMqtt/Streams/MqttDecoderSource.cs +++ b/src/TurboMqtt/Streams/MqttDecoderSource.cs @@ -70,7 +70,10 @@ private async Task ReadLoop() if (result.IsCompleted) { + _onReadReady(ImmutableList.Empty.Add(new DisconnectPacket(){ ReasonCode = DisconnectReasonCode.NormalDisconnection})); + SetKeepGoing(true); CompleteStage(); + await _shutdownCts.CancelAsync(); // stop us from reading again } if (result.IsCanceled) diff --git a/src/TurboMqtt/Streams/MqttEncoderSink.cs b/src/TurboMqtt/Streams/MqttEncoderSink.cs index 8b427cbc..62c8d425 100644 --- a/src/TurboMqtt/Streams/MqttEncoderSink.cs +++ b/src/TurboMqtt/Streams/MqttEncoderSink.cs @@ -85,7 +85,9 @@ public override void OnPush() Log.Debug("Encoded {0} messages using {1} bytes", packets.Count, bytesWritten); _pipeWriter.Advance(totalBytes); - OnFlushComplete(_pipeWriter.FlushAsync().GetAwaiter().GetResult()); + + _ = DoFlush(); + if (IsClosed(_graphStage.In)) { @@ -95,6 +97,13 @@ public override void OnPush() if(!HasBeenPulled(_graphStage.In)) Pull(_graphStage.In); + return; + + async Task DoFlush() + { + var flushResult = await _pipeWriter.FlushAsync(); + _flushCallback(flushResult); + } } public override void PreStart()