diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 019bc979..5aeba374 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -105,7 +105,7 @@ public OutgoingMsg(byte publisherId, ulong publishingId, Message data) public Message Data => data; public int SizeNeeded => 0; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/CloseRequest.cs b/RabbitMQ.Stream.Client/CloseRequest.cs index 9b7b6073..ae8a9baf 100644 --- a/RabbitMQ.Stream.Client/CloseRequest.cs +++ b/RabbitMQ.Stream.Client/CloseRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -20,13 +20,15 @@ public CloseRequest(uint correlationId, string reason) public int SizeNeeded => 10 + WireFormatting.StringSize(reason); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteUInt16(span.Slice(offset), 1); //ok code - offset += WireFormatting.WriteString(span.Slice(offset), reason); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteUInt16(span[offset..], 1); //ok code + offset += WireFormatting.WriteString(span[offset..], reason); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/CloseResponse.cs b/RabbitMQ.Stream.Client/CloseResponse.cs index 11fd2797..28f3933c 100644 --- a/RabbitMQ.Stream.Client/CloseResponse.cs +++ b/RabbitMQ.Stream.Client/CloseResponse.cs @@ -25,10 +25,11 @@ public CloseResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out CloseResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/CommandVersionsRequest.cs b/RabbitMQ.Stream.Client/CommandVersionsRequest.cs index 9ced044a..240e0b10 100644 --- a/RabbitMQ.Stream.Client/CommandVersionsRequest.cs +++ b/RabbitMQ.Stream.Client/CommandVersionsRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -29,8 +29,9 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span[offset..], _correlationId); diff --git a/RabbitMQ.Stream.Client/CommandVersionsResponse.cs b/RabbitMQ.Stream.Client/CommandVersionsResponse.cs index 054bd947..cedf6d36 100644 --- a/RabbitMQ.Stream.Client/CommandVersionsResponse.cs +++ b/RabbitMQ.Stream.Client/CommandVersionsResponse.cs @@ -20,7 +20,6 @@ private CommandVersionsResponse(uint correlationId, ResponseCode responseCode, L } public int SizeNeeded { get => throw new NotImplementedException(); } - public int Write(Span span) => throw new NotImplementedException(); public uint CorrelationId { get; } public ResponseCode ResponseCode { get; } @@ -47,4 +46,9 @@ internal static int Read(ReadOnlySequence frame, out CommandVersionsRespon command = new CommandVersionsResponse(correlation, (ResponseCode)responseCode, commands); return offset; } + + public int Write(IBufferWriter writer) + { + throw new NotImplementedException(); + } } diff --git a/RabbitMQ.Stream.Client/Compression.cs b/RabbitMQ.Stream.Client/Compression.cs index 99f45cd8..e4387289 100644 --- a/RabbitMQ.Stream.Client/Compression.cs +++ b/RabbitMQ.Stream.Client/Compression.cs @@ -58,8 +58,8 @@ public int Write(Span span) var offset = 0; foreach (var msg in messages) { - offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)msg.Size); - offset += msg.Write(span.Slice(offset)); + offset += WireFormatting.WriteUInt32(span[offset..], (uint)msg.Size); + offset += msg.Write(span[offset..]); } return offset; @@ -89,8 +89,8 @@ public void Compress(List messages) var offset = 0; foreach (var msg in messages) { - offset += WireFormatting.WriteUInt32(span.Slice(offset), (uint)msg.Size); - offset += msg.Write(span.Slice(offset)); + offset += WireFormatting.WriteUInt32(span[offset..], (uint)msg.Size); + offset += msg.Write(span[offset..]); } using var compressedMemory = new MemoryStream(); diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index d31de0ac..5d08c82a 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -22,12 +22,12 @@ internal static class ConnectionClosedReason public class Connection : IDisposable { - private readonly Socket socket; - private readonly PipeWriter writer; - private readonly PipeReader reader; + private readonly Socket _socket; + private readonly PipeWriter _writer; + private readonly PipeReader _reader; private readonly Task _incomingFramesTask; - private readonly Func, Task> commandCallback; - private readonly Func closedCallback; + private readonly Func, Task> _commandCallback; + private readonly Func _closedCallback; private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1); private int numFrames; private bool isClosed = false; @@ -56,13 +56,13 @@ private Connection(Socket socket, Func, Task> callback, Func closedCallBack, SslOption sslOption, ILogger logger) { _logger = logger; - this.socket = socket; - commandCallback = callback; - closedCallback = closedCallBack; + _socket = socket; + _commandCallback = callback; + _closedCallback = closedCallBack; var networkStream = new NetworkStream(socket); var stream = MaybeTcpUpgrade(networkStream, sslOption); - writer = PipeWriter.Create(stream); - reader = PipeReader.Create(stream); + _writer = PipeWriter.Create(stream); + _reader = PipeReader.Create(stream); // ProcessIncomingFrames is dropped as soon as the connection is closed // no need to stop it manually when the connection is closed _incomingFramesTask = Task.Run(ProcessIncomingFrames); @@ -101,17 +101,50 @@ public static async Task Create(EndPoint endpoint, Func return new Connection(socket, commandCallback, closedCallBack, sslOption, logger); } - public async ValueTask Write(T command) where T : struct, ICommand + public ValueTask Write(T command) where T : struct, ICommand { - await WriteCommand(command).ConfigureAwait(false); - // we return true to indicate that the command was written - // In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220 - // we made all WriteCommand async so await is enough to indicate that the command was written - // We decided to keep the return value to avoid a breaking change - return true; + if (!_writeLock.Wait(0)) + { + // https://blog.marcgravell.com/2018/07/pipe-dreams-part-3.html + var writeSlowPath = WriteCommandAsyncSlowPath(command); + writeSlowPath.ConfigureAwait(false); + return writeSlowPath; + } + else + { + var release = true; + try + { + var payloadSize = WriteCommandPayloadSize(command); + var written = command.Write(_writer); + Debug.Assert(payloadSize == written); + var flush = _writer.FlushAsync(); + flush.ConfigureAwait(false); + if (flush.IsCompletedSuccessfully) + { + // we return true to indicate that the command was written + // In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220 + // we made all WriteCommand async so await is enough to indicate that the command was written + // We decided to keep the return value to avoid a breaking change + return ValueTask.FromResult(true); + } + else + { + release = false; + return AwaitFlushThenRelease(flush); + } + } + finally + { + if (release) + { + _writeLock.Release(); + } + } + } } - private async Task WriteCommand(T command) where T : struct, ICommand + private async ValueTask WriteCommandAsyncSlowPath(T command) where T : struct, ICommand { if (Token.IsCancellationRequested) { @@ -127,18 +160,45 @@ private async Task WriteCommand(T command) where T : struct, ICommand await _writeLock.WaitAsync(Token).ConfigureAwait(false); try { - var size = command.SizeNeeded; - var mem = new byte[4 + size]; // + 4 to write the size - WireFormatting.WriteUInt32(mem, (uint)size); - var written = command.Write(mem.AsSpan()[4..]); - await writer.WriteAsync(new ReadOnlyMemory(mem), Token).ConfigureAwait(false); - Debug.Assert(size == written); - await writer.FlushAsync(Token).ConfigureAwait(false); + var payloadSize = WriteCommandPayloadSize(command); + var written = command.Write(_writer); + Debug.Assert(payloadSize == written); + await _writer.FlushAsync().ConfigureAwait(false); } finally { _writeLock.Release(); } + + return true; + } + + private async ValueTask AwaitFlushThenRelease(ValueTask task) + { + try + { + await task.ConfigureAwait(false); + } + finally + { + _writeLock.Release(); + } + + return true; + } + + private int WriteCommandPayloadSize(T command) where T : struct, ICommand + { + /* + * TODO FUTURE + * This code could be moved into a common base class for all outgoing + * commands + */ + var payloadSize = command.SizeNeeded; + var mem = new byte[4 + payloadSize]; // + 4 to write the size + var written = WireFormatting.WriteUInt32(mem, (uint)payloadSize); + _writer.Advance(written); + return payloadSize; } private async Task ProcessIncomingFrames() @@ -148,9 +208,9 @@ private async Task ProcessIncomingFrames() { while (!isClosed) { - if (!reader.TryRead(out var result)) + if (!_reader.TryRead(out var result)) { - result = await reader.ReadAsync(Token).ConfigureAwait(false); + result = await _reader.ReadAsync(Token).ConfigureAwait(false); } var buffer = result.Buffer; @@ -166,16 +226,15 @@ private async Task ProcessIncomingFrames() while (TryReadFrame(ref buffer, out var frame) && !isClosed) { // Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled. - var memory = ArrayPool.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length); frame.CopyTo(memory.Span); - await commandCallback(memory).ConfigureAwait(false); + await _commandCallback(memory).ConfigureAwait(false); numFrames += 1; } - reader.AdvanceTo(buffer.Start, buffer.End); + _reader.AdvanceTo(buffer.Start, buffer.End); } } catch (OperationCanceledException e) @@ -206,8 +265,8 @@ private async Task ProcessIncomingFrames() "TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ", ClientId, _closedReason, Token.IsCancellationRequested); // Mark the PipeReader as complete - await reader.CompleteAsync(caught).ConfigureAwait(false); - closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false); + await _reader.CompleteAsync(caught).ConfigureAwait(false); + _closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false); } } @@ -247,9 +306,9 @@ public void Dispose() } isClosed = true; - writer.Complete(); - reader.Complete(); - socket.Close(); + _writer.Complete(); + _reader.Complete(); + _socket.Close(); if (!_incomingFramesTask.Wait(Consts.MidWait)) { _logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}", diff --git a/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs b/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs index d02d5e0a..e78a35bc 100644 --- a/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs +++ b/RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs @@ -28,7 +28,7 @@ private ConsumerUpdateQueryResponse(uint correlationId, byte subscriptionId, byt public bool IsActive => active == 1; public int SizeNeeded => throw new NotImplementedException(); - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs b/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs index 7fa98786..125eb457 100644 --- a/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs +++ b/RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -27,13 +27,15 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), _correlationId); - offset += WireFormatting.WriteUInt16(span.Slice(offset), (ushort)ResponseCode.Ok); - offset += OffsetSpecification.Write(span.Slice(offset)); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], _correlationId); + offset += WireFormatting.WriteUInt16(span[offset..], (ushort)ResponseCode.Ok); + offset += OffsetSpecification.Write(span[offset..]); + writer.Advance(offset); return offset; } diff --git a/RabbitMQ.Stream.Client/Create.cs b/RabbitMQ.Stream.Client/Create.cs index e88f0ded..edf88330 100644 --- a/RabbitMQ.Stream.Client/Create.cs +++ b/RabbitMQ.Stream.Client/Create.cs @@ -35,20 +35,23 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); + var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), stream); - offset += WireFormatting.WriteInt32(span.Slice(offset), arguments.Count); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteString(span[offset..], stream); + offset += WireFormatting.WriteInt32(span[offset..], arguments.Count); foreach (var (key, value) in arguments) { - offset += WireFormatting.WriteString(span.Slice(offset), key); - offset += WireFormatting.WriteString(span.Slice(offset), value); + offset += WireFormatting.WriteString(span[offset..], key); + offset += WireFormatting.WriteString(span[offset..], value); } + writer.Advance(offset); return offset; } } @@ -71,10 +74,11 @@ public CreateResponse(uint correlationId, ushort responseCode) public ResponseCode ResponseCode => (ResponseCode)responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out CreateResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/CreateSuperStream.cs b/RabbitMQ.Stream.Client/CreateSuperStream.cs index 83ffac55..6efc485a 100644 --- a/RabbitMQ.Stream.Client/CreateSuperStream.cs +++ b/RabbitMQ.Stream.Client/CreateSuperStream.cs @@ -46,8 +46,9 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], command.Version); @@ -94,11 +95,6 @@ private CreateSuperStreamResponse(uint correlationId, ushort responseCode) public ResponseCode ResponseCode => (ResponseCode)_responseCode; - public int Write(Span span) - { - throw new NotImplementedException(); - } - internal static int Read(ReadOnlySequence frame, out CreateSuperStreamResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); @@ -108,4 +104,9 @@ internal static int Read(ReadOnlySequence frame, out CreateSuperStreamResp command = new CreateSuperStreamResponse(correlation, responseCode); return offset; } + + public int Write(IBufferWriter writer) + { + throw new NotImplementedException(); + } } diff --git a/RabbitMQ.Stream.Client/Credit.cs b/RabbitMQ.Stream.Client/Credit.cs index 455e21a4..1d0ba4b8 100644 --- a/RabbitMQ.Stream.Client/Credit.cs +++ b/RabbitMQ.Stream.Client/Credit.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -19,13 +19,15 @@ public CreditRequest(byte subscriptionId, ushort credit) } public int SizeNeeded => 7; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); - offset += WireFormatting.WriteByte(span.Slice(offset), subscriptionId); - offset += WireFormatting.WriteUInt16(span.Slice(offset), credit); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteByte(span[offset..], subscriptionId); + offset += WireFormatting.WriteUInt16(span[offset..], credit); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/CreditResponse.cs b/RabbitMQ.Stream.Client/CreditResponse.cs index 7467ddab..192355d7 100644 --- a/RabbitMQ.Stream.Client/CreditResponse.cs +++ b/RabbitMQ.Stream.Client/CreditResponse.cs @@ -24,7 +24,7 @@ private CreditResponse(ResponseCode responseCode, byte subscriptionId) private ResponseCode ResponseCode { get; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs b/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs index 4f1cec1f..384aeb8d 100644 --- a/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs +++ b/RabbitMQ.Stream.Client/DeclarePublisherRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -24,14 +24,16 @@ public DeclarePublisherRequest(uint correlationId, byte publisherId, string publ public int SizeNeeded => 8 + 1 + WireFormatting.StringSize(publisherRef) + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteByte(span.Slice(offset), publisherId); - offset += WireFormatting.WriteString(span.Slice(offset), publisherRef); - offset += WireFormatting.WriteString(span.Slice(offset), stream); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteByte(span[offset..], publisherId); + offset += WireFormatting.WriteString(span[offset..], publisherRef); + offset += WireFormatting.WriteString(span[offset..], stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs b/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs index dd758905..d587eed3 100644 --- a/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs +++ b/RabbitMQ.Stream.Client/DeclarePublisherResponse.cs @@ -25,7 +25,7 @@ public DeclarePublisherResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/Delete.cs b/RabbitMQ.Stream.Client/Delete.cs index 8ee71624..e04468b8 100644 --- a/RabbitMQ.Stream.Client/Delete.cs +++ b/RabbitMQ.Stream.Client/Delete.cs @@ -21,12 +21,14 @@ public DeleteRequest(uint correlationId, string stream) public int SizeNeeded => 8 + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), stream); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteString(span[offset..], stream); + writer.Advance(offset); return offset; } } @@ -49,10 +51,11 @@ public DeleteResponse(uint correlationId, ushort responseCode) public ResponseCode ResponseCode => (ResponseCode)responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out DeleteResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/DeletePublisherRequest.cs b/RabbitMQ.Stream.Client/DeletePublisherRequest.cs index 38dd0b22..4f31000f 100644 --- a/RabbitMQ.Stream.Client/DeletePublisherRequest.cs +++ b/RabbitMQ.Stream.Client/DeletePublisherRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -20,12 +20,14 @@ public DeletePublisherRequest(uint correlationId, byte publisherId) public int SizeNeeded => 8 + 1; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteByte(span.Slice(offset), publisherId); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteByte(span[offset..], publisherId); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/DeletePublisherResponse.cs b/RabbitMQ.Stream.Client/DeletePublisherResponse.cs index 94c6f330..3d20360e 100644 --- a/RabbitMQ.Stream.Client/DeletePublisherResponse.cs +++ b/RabbitMQ.Stream.Client/DeletePublisherResponse.cs @@ -24,10 +24,11 @@ public DeletePublisherResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out DeletePublisherResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/DeleteSuperStream.cs b/RabbitMQ.Stream.Client/DeleteSuperStream.cs index 18c1778b..5bd984d1 100644 --- a/RabbitMQ.Stream.Client/DeleteSuperStream.cs +++ b/RabbitMQ.Stream.Client/DeleteSuperStream.cs @@ -21,8 +21,9 @@ public DeleteSuperStreamRequest(uint correlationId, string superStream) public int SizeNeeded => 8 + WireFormatting.StringSize(_superStream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); offset += WireFormatting.WriteUInt32(span[offset..], _correlationId); @@ -49,7 +50,7 @@ public DeleteSuperStreamResponse(uint correlationId, ushort responseCode) public ResponseCode ResponseCode => (ResponseCode)_responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/Deliver.cs b/RabbitMQ.Stream.Client/Deliver.cs index 88322aef..4c318a16 100644 --- a/RabbitMQ.Stream.Client/Deliver.cs +++ b/RabbitMQ.Stream.Client/Deliver.cs @@ -27,7 +27,7 @@ private Deliver(byte subscriptionId, Chunk chunk) public byte SubscriptionId => subscriptionId; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/HeartBeatRequest.cs b/RabbitMQ.Stream.Client/HeartBeatRequest.cs index 572c4083..1b4232b6 100644 --- a/RabbitMQ.Stream.Client/HeartBeatRequest.cs +++ b/RabbitMQ.Stream.Client/HeartBeatRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -12,10 +12,12 @@ namespace RabbitMQ.Stream.Client; public int SizeNeeded => 4; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/ICommand.cs b/RabbitMQ.Stream.Client/ICommand.cs index 0c4ea0b8..dbbe2ae6 100644 --- a/RabbitMQ.Stream.Client/ICommand.cs +++ b/RabbitMQ.Stream.Client/ICommand.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -10,7 +10,7 @@ public interface ICommand { ushort Version => 1; uint CorrelationId => uint.MaxValue; - public int SizeNeeded { get; } - int Write(Span span); + int SizeNeeded { get; } + int Write(IBufferWriter writer); } } diff --git a/RabbitMQ.Stream.Client/MetaData.cs b/RabbitMQ.Stream.Client/MetaData.cs index a5a70778..147bdb32 100644 --- a/RabbitMQ.Stream.Client/MetaData.cs +++ b/RabbitMQ.Stream.Client/MetaData.cs @@ -23,19 +23,22 @@ public MetaDataQuery(uint correlationId, IList streams) public int SizeNeeded => 12 + streams.Sum(WireFormatting.StringSize); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], command.Version); offset += WireFormatting.WriteUInt32(span[offset..], correlationId); // map offset += WireFormatting.WriteInt32(span[offset..], streams.Count()); + foreach (var s in streams) { offset += WireFormatting.WriteString(span[offset..], s); } + writer.Advance(offset); return offset; } } @@ -151,7 +154,7 @@ internal static int Read(ReadOnlySequence frame, out MetaDataResponse comm return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } @@ -186,7 +189,7 @@ internal static int Read(ReadOnlySequence frame, out MetaDataUpdate comman return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/OpenRequest.cs b/RabbitMQ.Stream.Client/OpenRequest.cs index 8826f9e1..3a8b45e5 100644 --- a/RabbitMQ.Stream.Client/OpenRequest.cs +++ b/RabbitMQ.Stream.Client/OpenRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -19,12 +19,14 @@ public OpenRequest(uint correlationId, string vhost) } public int SizeNeeded => 8 + WireFormatting.StringSize(vhost); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), vhost); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteString(span[offset..], vhost); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/OpenResponse.cs b/RabbitMQ.Stream.Client/OpenResponse.cs index 0901aeae..a094f12f 100644 --- a/RabbitMQ.Stream.Client/OpenResponse.cs +++ b/RabbitMQ.Stream.Client/OpenResponse.cs @@ -31,7 +31,7 @@ private OpenResponse(uint correlationId, ResponseCode responseCode, public IDictionary ConnectionProperties => connectionProperties; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs b/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs index a043491a..50a64a7a 100644 --- a/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs +++ b/RabbitMQ.Stream.Client/PartitionsQueryRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -22,13 +22,15 @@ public PartitionsQueryRequest(uint correlationId, string superStream) 4 + WireFormatting.StringSize(_superStream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), _correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), _superStream); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteUInt32(span[offset..], _correlationId); + offset += WireFormatting.WriteString(span[offset..], _superStream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs b/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs index 4a9abf56..683ee330 100644 --- a/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs +++ b/RabbitMQ.Stream.Client/PartitionsQueryResponse.cs @@ -26,7 +26,7 @@ public PartitionsQueryResponse(uint correlationId, ResponseCode responseCode, st public string[] Streams { get; } public int SizeNeeded { get; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PeerProperties.cs b/RabbitMQ.Stream.Client/PeerProperties.cs index c97e5d89..8a72d063 100644 --- a/RabbitMQ.Stream.Client/PeerProperties.cs +++ b/RabbitMQ.Stream.Client/PeerProperties.cs @@ -34,20 +34,22 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); // map - offset += WireFormatting.WriteInt32(span.Slice(offset), properties.Count); + offset += WireFormatting.WriteInt32(span[offset..], properties.Count); foreach (var (k, v) in properties) { - offset += WireFormatting.WriteString(span.Slice(offset), k); - offset += WireFormatting.WriteString(span.Slice(offset), v); + offset += WireFormatting.WriteString(span[offset..], k); + offset += WireFormatting.WriteString(span[offset..], v); } + writer.Advance(offset); return offset; } } @@ -94,7 +96,7 @@ internal static int Read(ReadOnlySequence frame, out PeerPropertiesRespons return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 69e44e3d..2cb0b747 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -212,7 +212,6 @@ RabbitMQ.Stream.Client.CloseResponse.CloseResponse(uint correlationId, RabbitMQ. RabbitMQ.Stream.Client.CloseResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.CloseResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.CloseResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CloseResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.CodecAlreadyExistException RabbitMQ.Stream.Client.CodecAlreadyExistException.CodecAlreadyExistException(string s) -> void RabbitMQ.Stream.Client.CodecNotFoundException @@ -242,7 +241,6 @@ RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.IsActive.get -> bool RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.SubscriptionId.get -> byte -RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.CreateConsumerException RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s) -> void RabbitMQ.Stream.Client.CreateProducerException @@ -253,34 +251,29 @@ RabbitMQ.Stream.Client.CreateResponse.CreateResponse() -> void RabbitMQ.Stream.Client.CreateResponse.CreateResponse(uint correlationId, ushort responseCode) -> void RabbitMQ.Stream.Client.CreateResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.CreateResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CreateResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.CreateStreamException RabbitMQ.Stream.Client.CreateStreamException.CreateStreamException(string s) -> void RabbitMQ.Stream.Client.CreditResponse RabbitMQ.Stream.Client.CreditResponse.CreditResponse() -> void RabbitMQ.Stream.Client.CreditResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CreditResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeclarePublisherResponse RabbitMQ.Stream.Client.DeclarePublisherResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeclarePublisherResponse.DeclarePublisherResponse() -> void RabbitMQ.Stream.Client.DeclarePublisherResponse.DeclarePublisherResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.DeclarePublisherResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeclarePublisherResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeclarePublisherResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeletePublisherResponse RabbitMQ.Stream.Client.DeletePublisherResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeletePublisherResponse.DeletePublisherResponse() -> void RabbitMQ.Stream.Client.DeletePublisherResponse.DeletePublisherResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void RabbitMQ.Stream.Client.DeletePublisherResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeletePublisherResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeletePublisherResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeleteResponse RabbitMQ.Stream.Client.DeleteResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeleteResponse.DeleteResponse() -> void RabbitMQ.Stream.Client.DeleteResponse.DeleteResponse(uint correlationId, ushort responseCode) -> void RabbitMQ.Stream.Client.DeleteResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeleteResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeleteResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeleteStreamException RabbitMQ.Stream.Client.DeleteStreamException.DeleteStreamException(string s) -> void RabbitMQ.Stream.Client.Deliver @@ -288,7 +281,6 @@ RabbitMQ.Stream.Client.Deliver.Chunk.get -> RabbitMQ.Stream.Client.Chunk RabbitMQ.Stream.Client.Deliver.Deliver() -> void RabbitMQ.Stream.Client.Deliver.SizeNeeded.get -> int RabbitMQ.Stream.Client.Deliver.SubscriptionId.get -> byte -RabbitMQ.Stream.Client.Deliver.Write(System.Span span) -> int RabbitMQ.Stream.Client.GenericProtocolException RabbitMQ.Stream.Client.GenericProtocolException.GenericProtocolException(RabbitMQ.Stream.Client.ResponseCode responseCode, string s) -> void RabbitMQ.Stream.Client.GzipCompressionCodec @@ -320,7 +312,6 @@ RabbitMQ.Stream.Client.ICommand RabbitMQ.Stream.Client.ICommand.CorrelationId.get -> uint RabbitMQ.Stream.Client.ICommand.SizeNeeded.get -> int RabbitMQ.Stream.Client.ICommand.Version.get -> ushort -RabbitMQ.Stream.Client.ICommand.Write(System.Span span) -> int RabbitMQ.Stream.Client.ICompressionCodec RabbitMQ.Stream.Client.ICompressionCodec.Compress(System.Collections.Generic.List messages) -> void RabbitMQ.Stream.Client.ICompressionCodec.CompressedSize.get -> int @@ -392,7 +383,6 @@ RabbitMQ.Stream.Client.Message.Properties.get -> RabbitMQ.Stream.Client.AMQP.Pro RabbitMQ.Stream.Client.Message.Properties.set -> void RabbitMQ.Stream.Client.Message.Serialize() -> System.Buffers.ReadOnlySequence RabbitMQ.Stream.Client.Message.Size.get -> int -RabbitMQ.Stream.Client.Message.Write(System.Span span) -> int RabbitMQ.Stream.Client.MessageContext RabbitMQ.Stream.Client.MessageContext.MessageContext() -> void RabbitMQ.Stream.Client.MessageContext.Offset.get -> ulong @@ -401,21 +391,18 @@ RabbitMQ.Stream.Client.MetaDataQuery RabbitMQ.Stream.Client.MetaDataQuery.MetaDataQuery() -> void RabbitMQ.Stream.Client.MetaDataQuery.MetaDataQuery(uint correlationId, System.Collections.Generic.IList streams) -> void RabbitMQ.Stream.Client.MetaDataQuery.SizeNeeded.get -> int -RabbitMQ.Stream.Client.MetaDataQuery.Write(System.Span span) -> int RabbitMQ.Stream.Client.MetaDataResponse RabbitMQ.Stream.Client.MetaDataResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.MetaDataResponse.MetaDataResponse() -> void RabbitMQ.Stream.Client.MetaDataResponse.MetaDataResponse(uint correlationId, System.Collections.Generic.IDictionary streamInfos) -> void RabbitMQ.Stream.Client.MetaDataResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.MetaDataResponse.StreamInfos.get -> System.Collections.Generic.IDictionary -RabbitMQ.Stream.Client.MetaDataResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.MetaDataUpdate RabbitMQ.Stream.Client.MetaDataUpdate.Code.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.MetaDataUpdate.MetaDataUpdate() -> void RabbitMQ.Stream.Client.MetaDataUpdate.MetaDataUpdate(string stream, RabbitMQ.Stream.Client.ResponseCode code) -> void RabbitMQ.Stream.Client.MetaDataUpdate.SizeNeeded.get -> int RabbitMQ.Stream.Client.MetaDataUpdate.Stream.get -> string -RabbitMQ.Stream.Client.MetaDataUpdate.Write(System.Span span) -> int RabbitMQ.Stream.Client.NoneCompressionCodec RabbitMQ.Stream.Client.NoneCompressionCodec.Compress(System.Collections.Generic.List messages) -> void RabbitMQ.Stream.Client.NoneCompressionCodec.CompressedSize.get -> int @@ -469,7 +456,6 @@ RabbitMQ.Stream.Client.PartitionsQueryResponse.PartitionsQueryResponse(uint corr RabbitMQ.Stream.Client.PartitionsQueryResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.PartitionsQueryResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.PartitionsQueryResponse.Streams.get -> string[] -RabbitMQ.Stream.Client.PartitionsQueryResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.PeerPropertiesResponse RabbitMQ.Stream.Client.PeerPropertiesResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.PeerPropertiesResponse.PeerPropertiesResponse() -> void @@ -477,7 +463,6 @@ RabbitMQ.Stream.Client.PeerPropertiesResponse.PeerPropertiesResponse(uint correl RabbitMQ.Stream.Client.PeerPropertiesResponse.Properties.get -> System.Collections.Generic.IDictionary RabbitMQ.Stream.Client.PeerPropertiesResponse.ResponseCode.get -> ushort RabbitMQ.Stream.Client.PeerPropertiesResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PeerPropertiesResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.ProtocolException RabbitMQ.Stream.Client.ProtocolException.ProtocolException(string s) -> void RabbitMQ.Stream.Client.Publish @@ -485,19 +470,16 @@ RabbitMQ.Stream.Client.Publish.MessageCount.get -> int RabbitMQ.Stream.Client.Publish.Publish() -> void RabbitMQ.Stream.Client.Publish.Publish(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages) -> void RabbitMQ.Stream.Client.Publish.SizeNeeded.get -> int -RabbitMQ.Stream.Client.Publish.Write(System.Span span) -> int RabbitMQ.Stream.Client.PublishConfirm RabbitMQ.Stream.Client.PublishConfirm.PublishConfirm() -> void RabbitMQ.Stream.Client.PublishConfirm.PublisherId.get -> byte RabbitMQ.Stream.Client.PublishConfirm.PublishingIds.get -> System.ReadOnlyMemory RabbitMQ.Stream.Client.PublishConfirm.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PublishConfirm.Write(System.Span span) -> int RabbitMQ.Stream.Client.PublishError RabbitMQ.Stream.Client.PublishError.PublisherId.get -> byte RabbitMQ.Stream.Client.PublishError.PublishError() -> void RabbitMQ.Stream.Client.PublishError.PublishingErrors.get -> (ulong, RabbitMQ.Stream.Client.ResponseCode)[] RabbitMQ.Stream.Client.PublishError.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PublishError.Write(System.Span span) -> int RabbitMQ.Stream.Client.QueryException RabbitMQ.Stream.Client.QueryException.QueryException(string s) -> void RabbitMQ.Stream.Client.QueryOffsetResponse @@ -507,7 +489,6 @@ RabbitMQ.Stream.Client.QueryOffsetResponse.QueryOffsetResponse() -> void RabbitMQ.Stream.Client.QueryOffsetResponse.QueryOffsetResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, ulong offsetValue) -> void RabbitMQ.Stream.Client.QueryOffsetResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.QueryOffsetResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.QueryOffsetResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.QueryPublisherResponse RabbitMQ.Stream.Client.QueryPublisherResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.QueryPublisherResponse.QueryPublisherResponse() -> void @@ -515,7 +496,6 @@ RabbitMQ.Stream.Client.QueryPublisherResponse.QueryPublisherResponse(uint correl RabbitMQ.Stream.Client.QueryPublisherResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.QueryPublisherResponse.Sequence.get -> ulong RabbitMQ.Stream.Client.QueryPublisherResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.RawConsumer RabbitMQ.Stream.Client.RawConsumer.Dispose() -> void RabbitMQ.Stream.Client.RawConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task @@ -750,13 +730,11 @@ RabbitMQ.Stream.Client.SubscribeResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.SubscribeResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.SubscribeResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.SubscribeResponse.SubscribeResponse() -> void -RabbitMQ.Stream.Client.SubscribeResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.UnsubscribeResponse RabbitMQ.Stream.Client.UnsubscribeResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.UnsubscribeResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.UnsubscribeResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.UnsubscribeResponse.UnsubscribeResponse() -> void -RabbitMQ.Stream.Client.UnsubscribeResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.Version RabbitMQ.Stream.Client.VirtualHostAccessFailureException RabbitMQ.Stream.Client.VirtualHostAccessFailureException.VirtualHostAccessFailureException(string s) -> void diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index aeace579..8f3dfc8d 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -56,6 +56,8 @@ RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void +RabbitMQ.Stream.Client.CommandVersionsRequest.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.CommandVersionsResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void RabbitMQ.Stream.Client.ConnectionItem RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool @@ -110,14 +112,14 @@ RabbitMQ.Stream.Client.CreateSuperStreamResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.CreateSuperStreamResponse.CreateSuperStreamResponse() -> void RabbitMQ.Stream.Client.CreateSuperStreamResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.CreateSuperStreamResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CreateSuperStreamResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.CreateSuperStreamResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.DeleteSuperStreamResponse RabbitMQ.Stream.Client.DeleteSuperStreamResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.DeleteSuperStreamResponse.DeleteSuperStreamResponse() -> void RabbitMQ.Stream.Client.DeleteSuperStreamResponse.DeleteSuperStreamResponse(uint correlationId, ushort responseCode) -> void RabbitMQ.Stream.Client.DeleteSuperStreamResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.DeleteSuperStreamResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.DeleteSuperStreamResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.DeleteSuperStreamResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.EntityCommonConfig RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.get -> string RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void @@ -145,14 +147,12 @@ RabbitMQ.Stream.Client.CommandVersionsRequest RabbitMQ.Stream.Client.CommandVersionsRequest.CommandVersionsRequest() -> void RabbitMQ.Stream.Client.CommandVersionsRequest.CommandVersionsRequest(uint correlationId) -> void RabbitMQ.Stream.Client.CommandVersionsRequest.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CommandVersionsRequest.Write(System.Span span) -> int RabbitMQ.Stream.Client.CommandVersionsResponse RabbitMQ.Stream.Client.CommandVersionsResponse.Commands.get -> System.Collections.Generic.List RabbitMQ.Stream.Client.CommandVersionsResponse.CommandVersionsResponse() -> void RabbitMQ.Stream.Client.CommandVersionsResponse.CorrelationId.get -> uint RabbitMQ.Stream.Client.CommandVersionsResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode RabbitMQ.Stream.Client.CommandVersionsResponse.SizeNeeded.get -> int -RabbitMQ.Stream.Client.CommandVersionsResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.ICommandVersions RabbitMQ.Stream.Client.ICommandVersions.Command.get -> ushort RabbitMQ.Stream.Client.ICommandVersions.MaxVersion.get -> ushort @@ -207,7 +207,7 @@ RabbitMQ.Stream.Client.PublishFilter.MinVersion.get -> ushort RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int -RabbitMQ.Stream.Client.PublishFilter.Write(System.Span span) -> int +RabbitMQ.Stream.Client.PublishFilter.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void @@ -239,6 +239,26 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void RabbitMQ.Stream.Client.Reliable.ConsumerFactory._consumer -> RabbitMQ.Stream.Client.IConsumer +RabbitMQ.Stream.Client.CloseResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.CreateResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.CreditResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.DeclarePublisherResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.DeletePublisherResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.DeleteResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.Deliver.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.ICommand.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.Message.Write(System.Span span) -> int +RabbitMQ.Stream.Client.MetaDataQuery.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.MetaDataResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.MetaDataUpdate.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PartitionsQueryResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PeerPropertiesResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.Publish.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PublishConfirm.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.PublishError.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.QueryOffsetResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task @@ -298,7 +318,7 @@ RabbitMQ.Stream.Client.RouteQueryResponse.RouteQueryResponse() -> void RabbitMQ.Stream.Client.RouteQueryResponse.RouteQueryResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.List streams) -> void RabbitMQ.Stream.Client.RouteQueryResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.RouteQueryResponse.Streams.get -> System.Collections.Generic.List -RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType @@ -313,12 +333,12 @@ RabbitMQ.Stream.Client.StreamStatsResponse.SizeNeeded.get -> int RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.Generic.IDictionary RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary statistic) -> void -RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateSuperStream(RabbitMQ.Stream.Client.SuperStreamSpec spec) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.DeleteSuperStream(string superStream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Buffers.IBufferWriter writer) -> int RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.SuperStreamExists(string superStream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task @@ -355,3 +375,5 @@ static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Str static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.SubscribeResponse.Write(System.Buffers.IBufferWriter writer) -> int +RabbitMQ.Stream.Client.UnsubscribeResponse.Write(System.Buffers.IBufferWriter writer) -> int diff --git a/RabbitMQ.Stream.Client/Publish.cs b/RabbitMQ.Stream.Client/Publish.cs index 1a229dff..8f66badd 100644 --- a/RabbitMQ.Stream.Client/Publish.cs +++ b/RabbitMQ.Stream.Client/Publish.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; using System.Collections.Generic; namespace RabbitMQ.Stream.Client @@ -37,8 +37,9 @@ public Publish(byte publisherId, List<(ulong, Message)> messages) MessageCount = messages.Count; } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], Version); offset += WireFormatting.WriteByte(span[offset..], publisherId); @@ -53,6 +54,7 @@ public int Write(Span span) offset += msg.Write(span[offset..]); } + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/PublishConfirm.cs b/RabbitMQ.Stream.Client/PublishConfirm.cs index 901c9659..787113ef 100644 --- a/RabbitMQ.Stream.Client/PublishConfirm.cs +++ b/RabbitMQ.Stream.Client/PublishConfirm.cs @@ -43,7 +43,7 @@ internal static int Read(ReadOnlySequence frame, out PublishConfirm comman return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PublishError.cs b/RabbitMQ.Stream.Client/PublishError.cs index 591522de..e78da8c5 100644 --- a/RabbitMQ.Stream.Client/PublishError.cs +++ b/RabbitMQ.Stream.Client/PublishError.cs @@ -43,7 +43,7 @@ internal static int Read(ReadOnlySequence frame, out PublishError command) return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/PublishFilter.cs b/RabbitMQ.Stream.Client/PublishFilter.cs index 3b0291d9..703efb19 100644 --- a/RabbitMQ.Stream.Client/PublishFilter.cs +++ b/RabbitMQ.Stream.Client/PublishFilter.cs @@ -3,6 +3,7 @@ // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Buffers; using System.Collections.Generic; using Microsoft.Extensions.Logging; @@ -70,8 +71,9 @@ public PublishFilter(byte publisherId, List<(ulong, Message)> messages, MessageCount = messages.Count; } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], Version); offset += WireFormatting.WriteByte(span[offset..], publisherId); diff --git a/RabbitMQ.Stream.Client/QueryOffsetRequest.cs b/RabbitMQ.Stream.Client/QueryOffsetRequest.cs index f129c76b..11c56060 100644 --- a/RabbitMQ.Stream.Client/QueryOffsetRequest.cs +++ b/RabbitMQ.Stream.Client/QueryOffsetRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -23,14 +23,16 @@ public QueryOffsetRequest(string stream, uint corrId, string reference) public int SizeNeeded => 2 + 2 + 4 + WireFormatting.StringSize(reference) + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), corrId); - offset += WireFormatting.WriteString(span.Slice(offset), reference); - offset += WireFormatting.WriteString(span.Slice(offset), stream); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteUInt32(span[offset..], corrId); + offset += WireFormatting.WriteString(span[offset..], reference); + offset += WireFormatting.WriteString(span[offset..], stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/QueryOffsetResponse.cs b/RabbitMQ.Stream.Client/QueryOffsetResponse.cs index cafcd703..d29c78dc 100644 --- a/RabbitMQ.Stream.Client/QueryOffsetResponse.cs +++ b/RabbitMQ.Stream.Client/QueryOffsetResponse.cs @@ -28,7 +28,7 @@ public QueryOffsetResponse(uint correlationId, ResponseCode responseCode, ulong public ResponseCode ResponseCode => responseCode; public ulong Offset => offsetValue; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/QueryPublisherRequest.cs b/RabbitMQ.Stream.Client/QueryPublisherRequest.cs index ad0cf8a3..34321def 100644 --- a/RabbitMQ.Stream.Client/QueryPublisherRequest.cs +++ b/RabbitMQ.Stream.Client/QueryPublisherRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -21,13 +21,15 @@ public QueryPublisherRequest(uint correlationId, string publisherRef, string str } public int SizeNeeded => 8 + WireFormatting.StringSize(publisherRef) + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), publisherRef); - offset += WireFormatting.WriteString(span.Slice(offset), stream); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteString(span[offset..], publisherRef); + offset += WireFormatting.WriteString(span[offset..], stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/QueryPublisherResponse.cs b/RabbitMQ.Stream.Client/QueryPublisherResponse.cs index bfa206f9..9f01a0a0 100644 --- a/RabbitMQ.Stream.Client/QueryPublisherResponse.cs +++ b/RabbitMQ.Stream.Client/QueryPublisherResponse.cs @@ -27,10 +27,11 @@ public QueryPublisherResponse(uint correlationId, ResponseCode responseCode, ulo public ulong Sequence => sequence; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } + internal static int Read(ReadOnlySequence frame, out QueryPublisherResponse command) { var offset = WireFormatting.ReadUInt16(frame, out _); diff --git a/RabbitMQ.Stream.Client/RouteQueryRequest.cs b/RabbitMQ.Stream.Client/RouteQueryRequest.cs index f3f55841..773b3900 100644 --- a/RabbitMQ.Stream.Client/RouteQueryRequest.cs +++ b/RabbitMQ.Stream.Client/RouteQueryRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client; @@ -24,8 +24,9 @@ public RouteQueryRequest(uint corrId, string superStream, string routingKey) WireFormatting.StringSize(_superStream) + WireFormatting.StringSize(_routingKey); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); offset += WireFormatting.WriteUInt16(span[offset..], command.Version); diff --git a/RabbitMQ.Stream.Client/RouteQueryResponse.cs b/RabbitMQ.Stream.Client/RouteQueryResponse.cs index 9d5de300..740a38a8 100644 --- a/RabbitMQ.Stream.Client/RouteQueryResponse.cs +++ b/RabbitMQ.Stream.Client/RouteQueryResponse.cs @@ -21,7 +21,7 @@ public RouteQueryResponse(uint correlationId, ResponseCode responseCode, List Streams { get; } public int SizeNeeded => throw new NotImplementedException(); - public int Write(Span span) => throw new NotImplementedException(); + public int Write(IBufferWriter writer) => throw new NotImplementedException(); public uint CorrelationId { get; } public ResponseCode ResponseCode { get; } diff --git a/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs b/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs index a7cb0f3f..3421636c 100644 --- a/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs +++ b/RabbitMQ.Stream.Client/SaslAuthenticateRequest.cs @@ -2,7 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; using System.Buffers; namespace RabbitMQ.Stream.Client @@ -23,13 +22,15 @@ public SaslAuthenticateRequest(uint correlationId, string mechanism, byte[] data public int SizeNeeded => 4 + 4 + WireFormatting.StringSize(mechanism) + 4 + data.Length; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), mechanism); - offset += WireFormatting.WriteBytes(span.Slice(offset), new ReadOnlySequence(data)); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteString(span[offset..], mechanism); + offset += WireFormatting.WriteBytes(span[offset..], new ReadOnlySequence(data)); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs b/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs index 78c84ddf..36da74ee 100644 --- a/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs +++ b/RabbitMQ.Stream.Client/SaslAuthenticateResponse.cs @@ -46,7 +46,7 @@ internal static int Read(ReadOnlySequence frame, out SaslAuthenticateRespo return offset; } - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs b/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs index ecb04bcd..bc92e358 100644 --- a/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs +++ b/RabbitMQ.Stream.Client/SaslHandshakeRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -18,11 +18,13 @@ public SaslHandshakeRequest(uint correlationId) public int SizeNeeded => 8; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs b/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs index 32f69836..7b1313af 100644 --- a/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs +++ b/RabbitMQ.Stream.Client/SaslHandshakeResponse.cs @@ -47,7 +47,8 @@ internal static int Read(ReadOnlySequence frame, out SaslHandshakeResponse command = new SaslHandshakeResponse(correlation, mechs); return offset; } - public int Write(Span span) + + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/StoreOffsetRequest.cs b/RabbitMQ.Stream.Client/StoreOffsetRequest.cs index 2ff4f41c..43ddb5c0 100644 --- a/RabbitMQ.Stream.Client/StoreOffsetRequest.cs +++ b/RabbitMQ.Stream.Client/StoreOffsetRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -23,14 +23,16 @@ internal StoreOffsetRequest(string stream, string reference, ulong offsetValue) public int SizeNeeded => 2 + 2 + WireFormatting.StringSize(reference) + WireFormatting.StringSize(stream) + 8; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); - offset += WireFormatting.WriteString(span.Slice(offset), reference); - offset += WireFormatting.WriteString(span.Slice(offset), stream); - offset += WireFormatting.WriteUInt64(span.Slice(offset), offsetValue); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteString(span[offset..], reference); + offset += WireFormatting.WriteString(span[offset..], stream); + offset += WireFormatting.WriteUInt64(span[offset..], offsetValue); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/StreamStatsRequest.cs b/RabbitMQ.Stream.Client/StreamStatsRequest.cs index 5c87b09d..53905763 100644 --- a/RabbitMQ.Stream.Client/StreamStatsRequest.cs +++ b/RabbitMQ.Stream.Client/StreamStatsRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -20,12 +20,14 @@ public StreamStatsRequest(uint correlationId, string stream) public int SizeNeeded => 8 + WireFormatting.StringSize(stream); - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteString(span.Slice(offset), stream); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteString(span[offset..], stream); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/StreamStatsResponse.cs b/RabbitMQ.Stream.Client/StreamStatsResponse.cs index f88c7225..dad62279 100644 --- a/RabbitMQ.Stream.Client/StreamStatsResponse.cs +++ b/RabbitMQ.Stream.Client/StreamStatsResponse.cs @@ -28,7 +28,7 @@ public StreamStatsResponse(uint correlationId, ResponseCode responseCode, IDicti public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } diff --git a/RabbitMQ.Stream.Client/SubEntryPublish.cs b/RabbitMQ.Stream.Client/SubEntryPublish.cs index f03a4158..3a6bb1f3 100644 --- a/RabbitMQ.Stream.Client/SubEntryPublish.cs +++ b/RabbitMQ.Stream.Client/SubEntryPublish.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -21,33 +21,35 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), Version); - offset += WireFormatting.WriteByte(span.Slice(offset), publisherId); + offset += WireFormatting.WriteUInt16(span[offset..], Version); + offset += WireFormatting.WriteByte(span[offset..], publisherId); // number of root messages. In this case will be always 1. - offset += WireFormatting.WriteInt32(span.Slice(offset), 1); + offset += WireFormatting.WriteInt32(span[offset..], 1); // publishingId for all the messages // so there is publishingId --> []messages - offset += WireFormatting.WriteUInt64(span.Slice(offset), publishingId); + offset += WireFormatting.WriteUInt64(span[offset..], publishingId); // compress mode see CompressMode var agg = (byte)compressionCodec.CompressionType << 4; offset += WireFormatting.WriteByte( - span.Slice(offset), (byte)(0x80 | agg)); + span[offset..], (byte)(0x80 | agg)); // sub Messages number - offset += WireFormatting.WriteUInt16(span.Slice(offset), (ushort)compressionCodec.MessagesCount); + offset += WireFormatting.WriteUInt16(span[offset..], (ushort)compressionCodec.MessagesCount); // uncompressed byte size value - offset += WireFormatting.WriteUInt32(span.Slice(offset), + offset += WireFormatting.WriteUInt32(span[offset..], (uint)compressionCodec.UnCompressedSize); // compressed byte size value - offset += WireFormatting.WriteUInt32(span.Slice(offset), + offset += WireFormatting.WriteUInt32(span[offset..], (uint)compressionCodec.CompressedSize); - offset += compressionCodec.Write(span.Slice(offset)); + offset += compressionCodec.Write(span[offset..]); + writer.Advance(offset); return offset; } diff --git a/RabbitMQ.Stream.Client/Subscribe.cs b/RabbitMQ.Stream.Client/Subscribe.cs index 8a4f7d6e..0f8e61bf 100644 --- a/RabbitMQ.Stream.Client/Subscribe.cs +++ b/RabbitMQ.Stream.Client/Subscribe.cs @@ -146,7 +146,7 @@ internal SubscribeResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } @@ -203,25 +203,27 @@ public int SizeNeeded } } - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteByte(span.Slice(offset), subscriptionId); - offset += WireFormatting.WriteString(span.Slice(offset), stream); - offset += offsetType.Write(span.Slice(offset)); - offset += WireFormatting.WriteUInt16(span.Slice(offset), credit); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteByte(span[offset..], subscriptionId); + offset += WireFormatting.WriteString(span[offset..], stream); + offset += offsetType.Write(span[offset..]); + offset += WireFormatting.WriteUInt16(span[offset..], credit); if (properties.Count > 0) { - offset += WireFormatting.WriteInt32(span.Slice(offset), properties.Count); + offset += WireFormatting.WriteInt32(span[offset..], properties.Count); foreach (var (k, v) in properties) { - offset += WireFormatting.WriteString(span.Slice(offset), k); - offset += WireFormatting.WriteString(span.Slice(offset), v); + offset += WireFormatting.WriteString(span[offset..], k); + offset += WireFormatting.WriteString(span[offset..], v); } } + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/Tune.cs b/RabbitMQ.Stream.Client/Tune.cs index 8e963406..523c1572 100644 --- a/RabbitMQ.Stream.Client/Tune.cs +++ b/RabbitMQ.Stream.Client/Tune.cs @@ -2,7 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; using System.Buffers; namespace RabbitMQ.Stream.Client @@ -24,13 +23,15 @@ public TuneResponse(uint frameMax, uint heartbeat) public uint Heartbeat => heartbeat; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var command = (ICommand)this; var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), frameMax); - offset += WireFormatting.WriteUInt32(span.Slice(offset), heartbeat); + offset += WireFormatting.WriteUInt16(span[offset..], command.Version); + offset += WireFormatting.WriteUInt32(span[offset..], frameMax); + offset += WireFormatting.WriteUInt32(span[offset..], heartbeat); + writer.Advance(offset); return offset; } diff --git a/RabbitMQ.Stream.Client/TuneRequest.cs b/RabbitMQ.Stream.Client/TuneRequest.cs index 4eee4614..6e674898 100644 --- a/RabbitMQ.Stream.Client/TuneRequest.cs +++ b/RabbitMQ.Stream.Client/TuneRequest.cs @@ -2,7 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; +using System.Buffers; namespace RabbitMQ.Stream.Client { @@ -23,12 +23,14 @@ public TuneRequest(uint frameMax, uint heartbeat) public uint Heartbeat => heartbeat; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), frameMax); - offset += WireFormatting.WriteUInt32(span.Slice(offset), heartbeat); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], frameMax); + offset += WireFormatting.WriteUInt32(span[offset..], heartbeat); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/Unubscribe.cs b/RabbitMQ.Stream.Client/Unubscribe.cs index 5f0f7ecc..078d8081 100644 --- a/RabbitMQ.Stream.Client/Unubscribe.cs +++ b/RabbitMQ.Stream.Client/Unubscribe.cs @@ -25,7 +25,7 @@ private UnsubscribeResponse(uint correlationId, ResponseCode responseCode) public ResponseCode ResponseCode => responseCode; - public int Write(Span span) + public int Write(IBufferWriter writer) { throw new NotImplementedException(); } @@ -55,12 +55,14 @@ public UnsubscribeRequest(uint correlationId, byte subscriptionId) public int SizeNeeded => 9; - public int Write(Span span) + public int Write(IBufferWriter writer) { + var span = writer.GetSpan(SizeNeeded); var offset = WireFormatting.WriteUInt16(span, Key); - offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version); - offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId); - offset += WireFormatting.WriteByte(span.Slice(offset), subscriptionId); + offset += WireFormatting.WriteUInt16(span[offset..], ((ICommand)this).Version); + offset += WireFormatting.WriteUInt32(span[offset..], correlationId); + offset += WireFormatting.WriteByte(span[offset..], subscriptionId); + writer.Advance(offset); return offset; } } diff --git a/RabbitMQ.Stream.Client/WireFormatting.cs b/RabbitMQ.Stream.Client/WireFormatting.cs index e9ed8a0b..63495fc4 100644 --- a/RabbitMQ.Stream.Client/WireFormatting.cs +++ b/RabbitMQ.Stream.Client/WireFormatting.cs @@ -54,12 +54,15 @@ internal static int WriteInt64(Span span, long value) BinaryPrimitives.WriteInt64BigEndian(span, value); return 8; } + + public const int SizeofUInt32 = 4; [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static int WriteUInt32(Span span, uint value) { BinaryPrimitives.WriteUInt32BigEndian(span, value); - return 4; + return SizeofUInt32; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static int Write(Span span, ReadOnlySequence msg) {