From 1606f7b131c423fa1698d01eaf4647b13445ff7a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 2 Mar 2023 15:54:05 -0800 Subject: [PATCH] Write to the writing Pipeline directly See: https://blog.marcgravell.com/2018/07/pipe-dreams-part-3.html#writes-and-wrongs Fixup API validation Add assertion Fix a missing Advance/1 call Add FUTURE --- RabbitMQ.Stream.Client/Client.cs | 2 +- RabbitMQ.Stream.Client/CloseRequest.cs | 6 +- RabbitMQ.Stream.Client/CloseResponse.cs | 3 +- RabbitMQ.Stream.Client/Connection.cs | 88 ++++++++++++++++--- .../ConsumerUpdateQueryResponse.cs | 2 +- .../ConsumerUpdateRequest.cs | 6 +- RabbitMQ.Stream.Client/Create.cs | 8 +- RabbitMQ.Stream.Client/Credit.cs | 6 +- RabbitMQ.Stream.Client/CreditResponse.cs | 2 +- .../DeclarePublisherRequest.cs | 6 +- .../DeclarePublisherResponse.cs | 2 +- RabbitMQ.Stream.Client/Delete.cs | 7 +- .../DeletePublisherRequest.cs | 6 +- .../DeletePublisherResponse.cs | 3 +- RabbitMQ.Stream.Client/Deliver.cs | 2 +- RabbitMQ.Stream.Client/HeartBeatRequest.cs | 6 +- RabbitMQ.Stream.Client/ICommand.cs | 6 +- RabbitMQ.Stream.Client/MetaData.cs | 9 +- RabbitMQ.Stream.Client/OpenRequest.cs | 6 +- RabbitMQ.Stream.Client/OpenResponse.cs | 2 +- .../PartitionsQueryRequest.cs | 6 +- .../PartitionsQueryResponse.cs | 2 +- RabbitMQ.Stream.Client/PeerProperties.cs | 6 +- RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 22 ----- .../PublicAPI.Unshipped.txt | 24 ++++- RabbitMQ.Stream.Client/Publish.cs | 6 +- RabbitMQ.Stream.Client/PublishConfirm.cs | 2 +- RabbitMQ.Stream.Client/PublishError.cs | 2 +- RabbitMQ.Stream.Client/QueryOffsetRequest.cs | 6 +- RabbitMQ.Stream.Client/QueryOffsetResponse.cs | 2 +- .../QueryPublisherRequest.cs | 6 +- .../QueryPublisherResponse.cs | 3 +- .../SaslAuthenticateRequest.cs | 5 +- .../SaslAuthenticateResponse.cs | 2 +- .../SaslHandshakeRequest.cs | 6 +- .../SaslHandshakeResponse.cs | 3 +- RabbitMQ.Stream.Client/StoreOffsetRequest.cs | 6 +- RabbitMQ.Stream.Client/StreamStatsRequest.cs | 6 +- RabbitMQ.Stream.Client/StreamStatsResponse.cs | 2 +- RabbitMQ.Stream.Client/SubEntryPublish.cs | 6 +- RabbitMQ.Stream.Client/Subscribe.cs | 6 +- RabbitMQ.Stream.Client/Tune.cs | 5 +- RabbitMQ.Stream.Client/TuneRequest.cs | 6 +- RabbitMQ.Stream.Client/Unubscribe.cs | 6 +- RabbitMQ.Stream.Client/WireFormatting.cs | 5 +- 45 files changed, 223 insertions(+), 106 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 40689dba..871e58a6 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -85,7 +85,7 @@ public OutgoingMsg(byte publisherId, ulong publishingId, Message data) public Message Data => data; public int SizeNeeded => 0; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/CloseRequest.cs b/RabbitMQ.Stream.Client/CloseRequest.cs index d2830e84..c9c1f364 100644 --- a/RabbitMQ.Stream.Client/CloseRequest.cs +++ b/RabbitMQ.Stream.Client/CloseRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -20,13 +20,15 @@ public CloseRequest(uint correlationId, string reason) public int SizeNeeded => 10 + WireFormatting.StringSize(reason); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteUInt16(span.Slice(offset), 1); //ok code offset += WireFormatting.WriteString(span.Slice(offset), reason); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/CloseResponse.cs b/RabbitMQ.Stream.Client/CloseResponse.cs index 84232279..f33c3d73 100644 --- a/RabbitMQ.Stream.Client/CloseResponse.cs +++ b/RabbitMQ.Stream.Client/CloseResponse.cs @@ -25,10 +25,11 @@ public CloseResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out CloseResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index bf32c0e3..49a98931 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -83,34 +83,94 @@ public static async Task Create(EndPoint endpoint, Func return new Connection(socket, commandCallback, closedCallBack, sslOption); } - public async ValueTask Write(T command) where T : struct, ICommand + public ValueTask Write(T command) where T : struct, ICommand { - await WriteCommand(command).ConfigureAwait(false); - // we return true to indicate that the command was written - // In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220 - // we made all WriteCommand async so await is enough to indicate that the command was written - // We decided to keep the return value to avoid a breaking change - return true; + if (!_writeLock.Wait(0)) + { + // https://blog.marcgravell.com/2018/07/pipe-dreams-part-3.html + var writeSlowPath = WriteCommandAsyncSlowPath(command); + writeSlowPath.ConfigureAwait(false); + return writeSlowPath; + } + else + { + var release = true; + try + { + var payloadSize = WriteCommandPayloadSize(command); + var written = command.Write(writer); + Debug.Assert(payloadSize == written); + var flush = writer.FlushAsync(); + flush.ConfigureAwait(false); + if (flush.IsCompletedSuccessfully) + { + // we return true to indicate that the command was written + // In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220 + // we made all WriteCommand async so await is enough to indicate that the command was written + // We decided to keep the return value to avoid a breaking change + return ValueTask.FromResult(true); + } + else + { + release = false; + return AwaitFlushAndRelease(flush); + } + } + finally + { + if (release) + { + _writeLock.Release(); + } + } + } } - private async Task WriteCommand(T command) where T : struct, ICommand + private async ValueTask WriteCommandAsyncSlowPath(T command) where T : struct, ICommand { // Only one thread should be able to write to the output pipeline at a time. await _writeLock.WaitAsync().ConfigureAwait(false); try { - var size = command.SizeNeeded; - var mem = new byte[4 + size]; // + 4 to write the size - WireFormatting.WriteUInt32(mem, (uint)size); - var written = command.Write(mem.AsSpan()[4..]); - await writer.WriteAsync(new ReadOnlyMemory(mem)).ConfigureAwait(false); - Debug.Assert(size == written); + var payloadSize = WriteCommandPayloadSize(command); + var written = command.Write(writer); + Debug.Assert(payloadSize == written); await writer.FlushAsync().ConfigureAwait(false); } finally { _writeLock.Release(); } + + return true; + } + + private async ValueTask AwaitFlushAndRelease(ValueTask task) + { + try + { + await task.ConfigureAwait(false); + } + finally + { + _writeLock.Release(); + } + + return true; + } + + private int WriteCommandPayloadSize(T command) where T : struct, ICommand + { + /* + * TODO FUTURE + * This code could be moved into a common base class for all outgoing + * commands + */ + var payloadSize = command.SizeNeeded; + var mem = writer.GetSpan(WireFormatting.SizeofUInt32); + var written = WireFormatting.WriteUInt32(mem, (uint)payloadSize); + writer.Advance(written); + return payloadSize; } private async Task ProcessIncomingFrames() diff --git a/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs b/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs index ea7b529a..995da4eb 100644 --- a/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs +++ b/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs @@ -28,7 +28,7 @@ private ConsumerUpdateQueryResponse(uint correlationId, byte subscriptionId, byt public bool IsActive => active == 1; public int SizeNeeded => throw new NotImplementedException(); - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs b/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs index 5c2ac3d3..7d69d167 100644 --- a/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs +++ b/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -27,13 +27,15 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), _correlationId); offset += WireFormatting.WriteUInt16(span.Slice(offset), (ushort)ResponseCode.Ok); offset += OffsetSpecification.Write(span.Slice(offset)); + writer.Advance(offset); return offset; } diff --git a/RabbitMQ.Stream.Client/Create.cs b/RabbitMQ.Stream.Client/Create.cs index 3fa188de..7c5995f5 100644 --- a/RabbitMQ.Stream.Client/Create.cs +++ b/RabbitMQ.Stream.Client/Create.cs @@ -35,8 +35,10 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); + var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); @@ -49,6 +51,7 @@ public int Write(Span span) offset += WireFormatting.WriteString(span.Slice(offset), value); } + writer.Advance(offset); return offset; } } @@ -71,10 +74,11 @@ public CreateResponse(uint correlationId, ushort responseCode) public ResponseCode ResponseCode => (ResponseCode)responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out CreateResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/Credit.cs b/RabbitMQ.Stream.Client/Credit.cs index cc183795..1d096ca7 100644 --- a/RabbitMQ.Stream.Client/Credit.cs +++ b/RabbitMQ.Stream.Client/Credit.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -19,13 +19,15 @@ public CreditRequest(byte subscriptionId, ushort credit) } public int SizeNeeded => 7; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); offset += WireFormatting.WriteByte(span.Slice(offset), subscriptionId); offset += WireFormatting.WriteUInt16(span.Slice(offset), credit); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/CreditResponse.cs b/RabbitMQ.Stream.Client/CreditResponse.cs index 9c0a0979..9b7c1e35 100644 --- a/RabbitMQ.Stream.Client/CreditResponse.cs +++ b/RabbitMQ.Stream.Client/CreditResponse.cs @@ -24,7 +24,7 @@ private CreditResponse(ResponseCode responseCode, byte subscriptionId) private ResponseCode ResponseCode { get; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs b/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs index 0d41588a..c9069546 100644 --- a/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs +++ b/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -24,14 +24,16 @@ public DeclarePublisherRequest(uint correlationId, byte publisherId, string publ public int SizeNeeded => 8 + 1 + WireFormatting.StringSize(publisherRef) + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteByte(span.Slice(offset), publisherId); offset += WireFormatting.WriteString(span.Slice(offset), publisherRef); offset += WireFormatting.WriteString(span.Slice(offset), stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs b/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs index 62b1c318..260c6bb7 100644 --- a/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs +++ b/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs @@ -25,7 +25,7 @@ public DeclarePublisherResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/Delete.cs b/RabbitMQ.Stream.Client/Delete.cs index 8af1e48a..79f50ad0 100644 --- a/RabbitMQ.Stream.Client/Delete.cs +++ b/RabbitMQ.Stream.Client/Delete.cs @@ -21,12 +21,14 @@ public DeleteRequest(uint correlationId, string stream) public int SizeNeeded => 8 + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteString(span.Slice(offset), stream); + writer.Advance(offset); return offset; } } @@ -49,10 +51,11 @@ public DeleteResponse(uint correlationId, ushort responseCode) public ResponseCode ResponseCode => (ResponseCode)responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out DeleteResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/DeletePublisherRequest.cs b/RabbitMQ.Stream.Client/DeletePublisherRequest.cs index df6b019b..b48d973e 100644 --- a/RabbitMQ.Stream.Client/DeletePublisherRequest.cs +++ b/RabbitMQ.Stream.Client/DeletePublisherRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -20,12 +20,14 @@ public DeletePublisherRequest(uint correlationId, byte publisherId) public int SizeNeeded => 8 + 1; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteByte(span.Slice(offset), publisherId); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/DeletePublisherResponse.cs b/RabbitMQ.Stream.Client/DeletePublisherResponse.cs index 1fabdaee..feb8f819 100644 --- a/RabbitMQ.Stream.Client/DeletePublisherResponse.cs +++ b/RabbitMQ.Stream.Client/DeletePublisherResponse.cs @@ -24,10 +24,11 @@ public DeletePublisherResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out DeletePublisherResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/Deliver.cs b/RabbitMQ.Stream.Client/Deliver.cs index 5d581b43..48b6dc8f 100644 --- a/RabbitMQ.Stream.Client/Deliver.cs +++ b/RabbitMQ.Stream.Client/Deliver.cs @@ -27,7 +27,7 @@ private Deliver(byte subscriptionId, Chunk chunk) public byte SubscriptionId => subscriptionId; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/HeartBeatRequest.cs b/RabbitMQ.Stream.Client/HeartBeatRequest.cs index 87f38d2b..b1607665 100644 --- a/RabbitMQ.Stream.Client/HeartBeatRequest.cs +++ b/RabbitMQ.Stream.Client/HeartBeatRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -12,10 +12,12 @@ namespace RabbitMQ.Stream.Client; public int SizeNeeded => 4; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/ICommand.cs b/RabbitMQ.Stream.Client/ICommand.cs index 1f035e76..85ccfbd3 100644 --- a/RabbitMQ.Stream.Client/ICommand.cs +++ b/RabbitMQ.Stream.Client/ICommand.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -10,7 +10,7 @@ public interface ICommand { ushort Version => 1; uint CorrelationId => uint.MaxValue; - public int SizeNeeded { get; } - int Write(Span span); + int SizeNeeded { get; } + int Write(IBufferWriter writer); } } diff --git a/RabbitMQ.Stream.Client/MetaData.cs b/RabbitMQ.Stream.Client/MetaData.cs index ad5ae841..a5c710c5 100644 --- a/RabbitMQ.Stream.Client/MetaData.cs +++ b/RabbitMQ.Stream.Client/MetaData.cs @@ -22,19 +22,22 @@ public MetaDataQuery(uint correlationId, IList streams) public int SizeNeeded => 12 + streams.Sum(WireFormatting.StringSize); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], command.Version); offset += WireFormatting.WriteUInt32(span[offset..], correlationId); // map offset += WireFormatting.WriteInt32(span[offset..], streams.Count()); + foreach (var s in streams) { offset += WireFormatting.WriteString(span[offset..], s); } + writer.Advance(offset); return offset; } } @@ -145,7 +148,7 @@ internal static int Read(ReadOnlySequence frame, out MetaDataResponse comm return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } @@ -180,7 +183,7 @@ internal static int Read(ReadOnlySequence frame, out MetaDataUpdate comman return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/OpenRequest.cs b/RabbitMQ.Stream.Client/OpenRequest.cs index 8f0c674b..f0ae217c 100644 --- a/RabbitMQ.Stream.Client/OpenRequest.cs +++ b/RabbitMQ.Stream.Client/OpenRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -19,12 +19,14 @@ public OpenRequest(uint correlationId, string vhost) } public int SizeNeeded => 8 + WireFormatting.StringSize(vhost); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteString(span.Slice(offset), vhost); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/OpenResponse.cs b/RabbitMQ.Stream.Client/OpenResponse.cs index 3528edcb..35bfd33d 100644 --- a/RabbitMQ.Stream.Client/OpenResponse.cs +++ b/RabbitMQ.Stream.Client/OpenResponse.cs @@ -31,7 +31,7 @@ private OpenResponse(uint correlationId, ResponseCode responseCode, public IDictionary ConnectionProperties => connectionProperties; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs b/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs index 4d7f754c..73a4a8c6 100644 --- a/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs +++ b/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -22,13 +22,15 @@ public PartitionsQueryRequest(uint correlationId, string superStream) 4 + WireFormatting.StringSize(_superStream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, key); offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), _correlationId); offset += WireFormatting.WriteString(span.Slice(offset), _superStream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs b/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs index e6b4f355..6824b284 100644 --- a/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs +++ b/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs @@ -26,7 +26,7 @@ public PartitionsQueryResponse(uint correlationId, ResponseCode responseCode, st public string[] Streams { get; } public int SizeNeeded { get; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PeerProperties.cs b/RabbitMQ.Stream.Client/PeerProperties.cs index 293aeef4..e7404684 100644 --- a/RabbitMQ.Stream.Client/PeerProperties.cs +++ b/RabbitMQ.Stream.Client/PeerProperties.cs @@ -34,8 +34,9 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); @@ -48,6 +49,7 @@ public int Write(Span span) offset += WireFormatting.WriteString(span.Slice(offset), v); } + writer.Advance(offset); return offset; } } @@ -94,7 +96,7 @@ internal static int Read(ReadOnlySequence frame, out PeerPropertiesRespons return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index cf089bd4..232ed1a9 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -221,7 +221,6 @@ RabbitMQ.Stream.Client.CloseResponse.CloseResponse(uint correlationId, RabbitMQ. RabbitMQ.Stream.Client.CloseResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.CloseResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.CloseResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CloseResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.CodecAlreadyExistException RabbitMQ.Stream.Client.CodecAlreadyExistException.CodecAlreadyExistException(string s) -> void RabbitMQ.Stream.Client.CodecNotFoundException @@ -251,7 +250,6 @@ RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.IsActive.get -> bool RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.SubscriptionId.get -> byte -RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.CreateConsumerException RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s) -> void RabbitMQ.Stream.Client.CreateProducerException @@ -262,34 +260,29 @@ RabbitMQ.Stream.Client.CreateResponse.CreateResponse() -> void RabbitMQ.Stream.Client.CreateResponse.CreateResponse(uint correlationId, ushort responseCode) -> void RabbitMQ.Stream.Client.CreateResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.CreateResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CreateResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.CreateStreamException RabbitMQ.Stream.Client.CreateStreamException.CreateStreamException(string s) -> void RabbitMQ.Stream.Client.CreditResponse RabbitMQ.Stream.Client.CreditResponse.CreditResponse() -> void RabbitMQ.Stream.Client.CreditResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CreditResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeclarePublisherResponse RabbitMQ.Stream.Client.DeclarePublisherResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeclarePublisherResponse.DeclarePublisherResponse() -> void RabbitMQ.Stream.Client.DeclarePublisherResponse.DeclarePublisherResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.DeclarePublisherResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeclarePublisherResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeclarePublisherResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeletePublisherResponse RabbitMQ.Stream.Client.DeletePublisherResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeletePublisherResponse.DeletePublisherResponse() -> void RabbitMQ.Stream.Client.DeletePublisherResponse.DeletePublisherResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.DeletePublisherResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeletePublisherResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeletePublisherResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeleteResponse RabbitMQ.Stream.Client.DeleteResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeleteResponse.DeleteResponse() -> void RabbitMQ.Stream.Client.DeleteResponse.DeleteResponse(uint correlationId, ushort responseCode) -> void RabbitMQ.Stream.Client.DeleteResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeleteResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeleteResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeleteStreamException RabbitMQ.Stream.Client.DeleteStreamException.DeleteStreamException(string s) -> void RabbitMQ.Stream.Client.Deliver @@ -297,7 +290,6 @@ RabbitMQ.Stream.Client.Deliver.Chunk.get -> RabbitMQ.Stream.Client.Chunk RabbitMQ.Stream.Client.Deliver.Deliver() -> void RabbitMQ.Stream.Client.Deliver.SizeNeeded.get -> int RabbitMQ.Stream.Client.Deliver.SubscriptionId.get -> byte -RabbitMQ.Stream.Client.Deliver.Write(System.Span span) -> int RabbitMQ.Stream.Client.GenericProtocolException RabbitMQ.Stream.Client.GenericProtocolException.GenericProtocolException(RabbitMQ.Stream.Client.ResponseCode responseCode, string s) -> void RabbitMQ.Stream.Client.GzipCompressionCodec @@ -330,7 +322,6 @@ RabbitMQ.Stream.Client.ICommand RabbitMQ.Stream.Client.ICommand.CorrelationId.get -> uint RabbitMQ.Stream.Client.ICommand.SizeNeeded.get -> int RabbitMQ.Stream.Client.ICommand.Version.get -> ushort -RabbitMQ.Stream.Client.ICommand.Write(System.Span span) -> int RabbitMQ.Stream.Client.ICompressionCodec RabbitMQ.Stream.Client.ICompressionCodec.Compress(System.Collections.Generic.List messages) -> void RabbitMQ.Stream.Client.ICompressionCodec.CompressedSize.get -> int @@ -409,7 +400,6 @@ RabbitMQ.Stream.Client.Message.Properties.get -> RabbitMQ.Stream.Client.AMQP.Pro RabbitMQ.Stream.Client.Message.Properties.set -> void RabbitMQ.Stream.Client.Message.Serialize() -> System.Buffers.ReadOnlySequence RabbitMQ.Stream.Client.Message.Size.get -> int -RabbitMQ.Stream.Client.Message.Write(System.Span span) -> int RabbitMQ.Stream.Client.MessageContext RabbitMQ.Stream.Client.MessageContext.MessageContext() -> void RabbitMQ.Stream.Client.MessageContext.MessageContext(ulong offset, System.TimeSpan timestamp) -> void @@ -419,21 +409,18 @@ RabbitMQ.Stream.Client.MetaDataQuery RabbitMQ.Stream.Client.MetaDataQuery.MetaDataQuery() -> void RabbitMQ.Stream.Client.MetaDataQuery.MetaDataQuery(uint correlationId, System.Collections.Generic.IList streams) -> void RabbitMQ.Stream.Client.MetaDataQuery.SizeNeeded.get -> int -RabbitMQ.Stream.Client.MetaDataQuery.Write(System.Span span) -> int RabbitMQ.Stream.Client.MetaDataResponse RabbitMQ.Stream.Client.MetaDataResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.MetaDataResponse.MetaDataResponse() -> void RabbitMQ.Stream.Client.MetaDataResponse.MetaDataResponse(uint correlationId, System.Collections.Generic.IDictionary streamInfos) -> void RabbitMQ.Stream.Client.MetaDataResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.MetaDataResponse.StreamInfos.get -> System.Collections.Generic.IDictionary -RabbitMQ.Stream.Client.MetaDataResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.MetaDataUpdate RabbitMQ.Stream.Client.MetaDataUpdate.Code.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.MetaDataUpdate.MetaDataUpdate() -> void RabbitMQ.Stream.Client.MetaDataUpdate.MetaDataUpdate(string stream, RabbitMQ.Stream.Client.ResponseCode code) -> void RabbitMQ.Stream.Client.MetaDataUpdate.SizeNeeded.get -> int RabbitMQ.Stream.Client.MetaDataUpdate.Stream.get -> string -RabbitMQ.Stream.Client.MetaDataUpdate.Write(System.Span span) -> int RabbitMQ.Stream.Client.NoneCompressionCodec RabbitMQ.Stream.Client.NoneCompressionCodec.Compress(System.Collections.Generic.List messages) -> void RabbitMQ.Stream.Client.NoneCompressionCodec.CompressedSize.get -> int @@ -487,7 +474,6 @@ RabbitMQ.Stream.Client.PartitionsQueryResponse.PartitionsQueryResponse(uint corr RabbitMQ.Stream.Client.PartitionsQueryResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.PartitionsQueryResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.PartitionsQueryResponse.Streams.get -> string[] -RabbitMQ.Stream.Client.PartitionsQueryResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.PeerPropertiesResponse RabbitMQ.Stream.Client.PeerPropertiesResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.PeerPropertiesResponse.PeerPropertiesResponse() -> void @@ -495,7 +481,6 @@ RabbitMQ.Stream.Client.PeerPropertiesResponse.PeerPropertiesResponse(uint correl RabbitMQ.Stream.Client.PeerPropertiesResponse.Properties.get -> System.Collections.Generic.IDictionary RabbitMQ.Stream.Client.PeerPropertiesResponse.ResponseCode.get -> ushort RabbitMQ.Stream.Client.PeerPropertiesResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PeerPropertiesResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.ProtocolException RabbitMQ.Stream.Client.ProtocolException.ProtocolException(string s) -> void RabbitMQ.Stream.Client.Publish @@ -503,19 +488,16 @@ RabbitMQ.Stream.Client.Publish.MessageCount.get -> int RabbitMQ.Stream.Client.Publish.Publish() -> void RabbitMQ.Stream.Client.Publish.Publish(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages) -> void RabbitMQ.Stream.Client.Publish.SizeNeeded.get -> int -RabbitMQ.Stream.Client.Publish.Write(System.Span span) -> int RabbitMQ.Stream.Client.PublishConfirm RabbitMQ.Stream.Client.PublishConfirm.PublishConfirm() -> void RabbitMQ.Stream.Client.PublishConfirm.PublisherId.get -> byte RabbitMQ.Stream.Client.PublishConfirm.PublishingIds.get -> System.ReadOnlyMemory RabbitMQ.Stream.Client.PublishConfirm.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PublishConfirm.Write(System.Span span) -> int RabbitMQ.Stream.Client.PublishError RabbitMQ.Stream.Client.PublishError.PublisherId.get -> byte RabbitMQ.Stream.Client.PublishError.PublishError() -> void RabbitMQ.Stream.Client.PublishError.PublishingErrors.get -> (ulong, RabbitMQ.Stream.Client.ResponseCode)[] RabbitMQ.Stream.Client.PublishError.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PublishError.Write(System.Span span) -> int RabbitMQ.Stream.Client.QueryException RabbitMQ.Stream.Client.QueryException.QueryException(string s) -> void RabbitMQ.Stream.Client.QueryOffsetResponse @@ -525,7 +507,6 @@ RabbitMQ.Stream.Client.QueryOffsetResponse.QueryOffsetResponse() -> void RabbitMQ.Stream.Client.QueryOffsetResponse.QueryOffsetResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, ulong offsetValue) -> void RabbitMQ.Stream.Client.QueryOffsetResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.QueryOffsetResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.QueryOffsetResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.QueryPublisherResponse RabbitMQ.Stream.Client.QueryPublisherResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.QueryPublisherResponse.QueryPublisherResponse() -> void @@ -533,7 +514,6 @@ RabbitMQ.Stream.Client.QueryPublisherResponse.QueryPublisherResponse(uint correl RabbitMQ.Stream.Client.QueryPublisherResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.QueryPublisherResponse.Sequence.get -> ulong RabbitMQ.Stream.Client.QueryPublisherResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.RawConsumer RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawConsumer.Dispose() -> void @@ -787,13 +767,11 @@ RabbitMQ.Stream.Client.SubscribeResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.SubscribeResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.SubscribeResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.SubscribeResponse.SubscribeResponse() -> void -RabbitMQ.Stream.Client.SubscribeResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.UnsubscribeResponse RabbitMQ.Stream.Client.UnsubscribeResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.UnsubscribeResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.UnsubscribeResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.UnsubscribeResponse.UnsubscribeResponse() -> void -RabbitMQ.Stream.Client.UnsubscribeResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.Version RabbitMQ.Stream.Client.VirtualHostAccessFailureException RabbitMQ.Stream.Client.VirtualHostAccessFailureException.VirtualHostAccessFailureException(string s) -> void diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 9ee92db4..b0fbc521 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -3,6 +3,26 @@ RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask +RabbitMQ.Stream.Client.CloseResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.CreateResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.CreditResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.DeclarePublisherResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.DeletePublisherResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.DeleteResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.Deliver.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.ICommand.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.Message.Write(System.Span span) -> int +RabbitMQ.Stream.Client.MetaDataQuery.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.MetaDataResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.MetaDataUpdate.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PartitionsQueryResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PeerPropertiesResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.Publish.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PublishConfirm.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PublishError.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.QueryOffsetResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task @@ -22,6 +42,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.Generic.IDictionary RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary statistic) -> void -RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.SubscribeResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.UnsubscribeResponse.Write(System.Buffers.IBufferWriter writer) -> int static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task \ No newline at end of file diff --git a/RabbitMQ.Stream.Client/Publish.cs b/RabbitMQ.Stream.Client/Publish.cs index 582bb589..af9eb463 100644 --- a/RabbitMQ.Stream.Client/Publish.cs +++ b/RabbitMQ.Stream.Client/Publish.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; using System.Collections.Generic; namespace RabbitMQ.Stream.Client @@ -37,8 +37,9 @@ public Publish(byte publisherId, List<(ulong, Message)> messages) MessageCount = messages.Count; } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], Version); offset += WireFormatting.WriteByte(span[offset..], publisherId); @@ -53,6 +54,7 @@ public int Write(Span span) offset += msg.Write(span[offset..]); } + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/PublishConfirm.cs b/RabbitMQ.Stream.Client/PublishConfirm.cs index a9e94acb..036f09a6 100644 --- a/RabbitMQ.Stream.Client/PublishConfirm.cs +++ b/RabbitMQ.Stream.Client/PublishConfirm.cs @@ -43,7 +43,7 @@ internal static int Read(ReadOnlySequence frame, out PublishConfirm comman return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PublishError.cs b/RabbitMQ.Stream.Client/PublishError.cs index 3fde5b22..1567c867 100644 --- a/RabbitMQ.Stream.Client/PublishError.cs +++ b/RabbitMQ.Stream.Client/PublishError.cs @@ -43,7 +43,7 @@ internal static int Read(ReadOnlySequence frame, out PublishError command) return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/QueryOffsetRequest.cs b/RabbitMQ.Stream.Client/QueryOffsetRequest.cs index 1cc6af81..70ede619 100644 --- a/RabbitMQ.Stream.Client/QueryOffsetRequest.cs +++ b/RabbitMQ.Stream.Client/QueryOffsetRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -23,14 +23,16 @@ public QueryOffsetRequest(string stream, uint corrId, string reference) public int SizeNeeded => 2 + 2 + 4 + WireFormatting.StringSize(reference) + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), corrId); offset += WireFormatting.WriteString(span.Slice(offset), reference); offset += WireFormatting.WriteString(span.Slice(offset), stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/QueryOffsetResponse.cs b/RabbitMQ.Stream.Client/QueryOffsetResponse.cs index 689689d2..720b7643 100644 --- a/RabbitMQ.Stream.Client/QueryOffsetResponse.cs +++ b/RabbitMQ.Stream.Client/QueryOffsetResponse.cs @@ -28,7 +28,7 @@ public QueryOffsetResponse(uint correlationId, ResponseCode responseCode, ulong public ResponseCode ResponseCode => responseCode; public ulong Offset => offsetValue; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/QueryPublisherRequest.cs b/RabbitMQ.Stream.Client/QueryPublisherRequest.cs index 5e032509..34e0fcdf 100644 --- a/RabbitMQ.Stream.Client/QueryPublisherRequest.cs +++ b/RabbitMQ.Stream.Client/QueryPublisherRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -21,13 +21,15 @@ public QueryPublisherRequest(uint correlationId, string publisherRef, string str } public int SizeNeeded => 8 + WireFormatting.StringSize(publisherRef) + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteString(span.Slice(offset), publisherRef); offset += WireFormatting.WriteString(span.Slice(offset), stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/QueryPublisherResponse.cs b/RabbitMQ.Stream.Client/QueryPublisherResponse.cs index 3904915d..7f229c4c 100644 --- a/RabbitMQ.Stream.Client/QueryPublisherResponse.cs +++ b/RabbitMQ.Stream.Client/QueryPublisherResponse.cs @@ -27,10 +27,11 @@ public QueryPublisherResponse(uint correlationId, ResponseCode responseCode, ulo public ulong Sequence => sequence; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out QueryPublisherResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs b/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs index 0142774a..68318779 100644 --- a/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs +++ b/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs @@ -2,7 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; using System.Buffers; namespace RabbitMQ.Stream.Client @@ -23,13 +22,15 @@ public SaslAuthenticateRequest(uint correlationId, string mechanism, byte[] data public int SizeNeeded => 4 + 4 + WireFormatting.StringSize(mechanism) + 4 + data.Length; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteString(span.Slice(offset), mechanism); offset += WireFormatting.WriteBytes(span.Slice(offset), new ReadOnlySequence(data)); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs b/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs index 20fdec25..8c4a9b4f 100644 --- a/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs +++ b/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs @@ -46,7 +46,7 @@ internal static int Read(ReadOnlySequence frame, out SaslAuthenticateRespo return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs b/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs index f7c29c08..e4c9aa1c 100644 --- a/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs +++ b/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -18,11 +18,13 @@ public SaslHandshakeRequest(uint correlationId) public int SizeNeeded => 8; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs b/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs index ae7bc39a..6d2e441b 100644 --- a/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs +++ b/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs @@ -47,7 +47,8 @@ internal static int Read(ReadOnlySequence frame, out SaslHandshakeResponse command = new SaslHandshakeResponse(correlation, mechs); return offset; } - public int Write(Span span) + + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/StoreOffsetRequest.cs b/RabbitMQ.Stream.Client/StoreOffsetRequest.cs index 92b9e7ad..7170e59a 100644 --- a/RabbitMQ.Stream.Client/StoreOffsetRequest.cs +++ b/RabbitMQ.Stream.Client/StoreOffsetRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -23,14 +23,16 @@ internal StoreOffsetRequest(string stream, string reference, ulong offsetValue) public int SizeNeeded => 2 + 2 + WireFormatting.StringSize(reference) + WireFormatting.StringSize(stream) + 8; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); offset += WireFormatting.WriteString(span.Slice(offset), reference); offset += WireFormatting.WriteString(span.Slice(offset), stream); offset += WireFormatting.WriteUInt64(span.Slice(offset), offsetValue); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/StreamStatsRequest.cs b/RabbitMQ.Stream.Client/StreamStatsRequest.cs index 5bf09021..a8a0c046 100644 --- a/RabbitMQ.Stream.Client/StreamStatsRequest.cs +++ b/RabbitMQ.Stream.Client/StreamStatsRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -20,12 +20,14 @@ public StreamStatsRequest(uint correlationId, string stream) public int SizeNeeded => 8 + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteString(span.Slice(offset), stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/StreamStatsResponse.cs b/RabbitMQ.Stream.Client/StreamStatsResponse.cs index ce01c3c3..499fec94 100644 --- a/RabbitMQ.Stream.Client/StreamStatsResponse.cs +++ b/RabbitMQ.Stream.Client/StreamStatsResponse.cs @@ -28,7 +28,7 @@ public StreamStatsResponse(uint correlationId, ResponseCode responseCode, IDicti public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/SubEntryPublish.cs b/RabbitMQ.Stream.Client/SubEntryPublish.cs index 2f82e8a9..9887cba2 100644 --- a/RabbitMQ.Stream.Client/SubEntryPublish.cs +++ b/RabbitMQ.Stream.Client/SubEntryPublish.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -21,8 +21,9 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), Version); offset += WireFormatting.WriteByte(span.Slice(offset), publisherId); @@ -48,6 +49,7 @@ public int Write(Span span) (uint)compressionCodec.CompressedSize); offset += compressionCodec.Write(span.Slice(offset)); + writer.Advance(offset); return offset; } diff --git a/RabbitMQ.Stream.Client/Subscribe.cs b/RabbitMQ.Stream.Client/Subscribe.cs index 544ffe1c..1f60389a 100644 --- a/RabbitMQ.Stream.Client/Subscribe.cs +++ b/RabbitMQ.Stream.Client/Subscribe.cs @@ -122,7 +122,7 @@ private SubscribeResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } @@ -179,8 +179,9 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); @@ -198,6 +199,7 @@ public int Write(Span span) } } + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/Tune.cs b/RabbitMQ.Stream.Client/Tune.cs index 107791b4..9b56523e 100644 --- a/RabbitMQ.Stream.Client/Tune.cs +++ b/RabbitMQ.Stream.Client/Tune.cs @@ -2,7 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; using System.Buffers; namespace RabbitMQ.Stream.Client @@ -24,13 +23,15 @@ public TuneResponse(uint frameMax, uint heartbeat) public uint Heartbeat => heartbeat; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), frameMax); offset += WireFormatting.WriteUInt32(span.Slice(offset), heartbeat); + writer.Advance(offset); return offset; } diff --git a/RabbitMQ.Stream.Client/TuneRequest.cs b/RabbitMQ.Stream.Client/TuneRequest.cs index cbe63252..21e2ea4f 100644 --- a/RabbitMQ.Stream.Client/TuneRequest.cs +++ b/RabbitMQ.Stream.Client/TuneRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2023 VMware, Inc. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -23,12 +23,14 @@ public TuneRequest(uint frameMax, uint heartbeat) public uint Heartbeat => heartbeat; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), frameMax); offset += WireFormatting.WriteUInt32(span.Slice(offset), heartbeat); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/Unubscribe.cs b/RabbitMQ.Stream.Client/Unubscribe.cs index 5cd61ef4..f20bcef4 100644 --- a/RabbitMQ.Stream.Client/Unubscribe.cs +++ b/RabbitMQ.Stream.Client/Unubscribe.cs @@ -25,7 +25,7 @@ private UnsubscribeResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } @@ -55,12 +55,14 @@ public UnsubscribeRequest(uint correlationId, byte subscriptionId) public int SizeNeeded => 9; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); offset += WireFormatting.WriteByte(span.Slice(offset), subscriptionId); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/WireFormatting.cs b/RabbitMQ.Stream.Client/WireFormatting.cs index 69624881..08092043 100644 --- a/RabbitMQ.Stream.Client/WireFormatting.cs +++ b/RabbitMQ.Stream.Client/WireFormatting.cs @@ -54,12 +54,15 @@ internal static int WriteInt64(Span span, long value) BinaryPrimitives.WriteInt64BigEndian(span, value); return 8; } + + public const int SizeofUInt32 = 4; [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static int WriteUInt32(Span span, uint value) { BinaryPrimitives.WriteUInt32BigEndian(span, value); - return 4; + return SizeofUInt32; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static int Write(Span span, ReadOnlySequence msg) {