Skip to content

Commit

Permalink
Write to the writing Pipeline directly
Browse files Browse the repository at this point in the history
See:
https://blog.marcgravell.com/2018/07/pipe-dreams-part-3.html#writes-and-wrongs

Fixup API validation

Add assertion

Fix a missing Advance/1 call

Add FUTURE
  • Loading branch information
lukebakken committed Mar 3, 2023
1 parent 9652280 commit 1606f7b
Show file tree
Hide file tree
Showing 45 changed files with 223 additions and 106 deletions.
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public OutgoingMsg(byte publisherId, ulong publishingId, Message data)
public Message Data => data;
public int SizeNeeded => 0;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/CloseRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client
{
Expand All @@ -20,13 +20,15 @@ public CloseRequest(uint correlationId, string reason)

public int SizeNeeded => 10 + WireFormatting.StringSize(reason);

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteUInt16(span.Slice(offset), 1); //ok code
offset += WireFormatting.WriteString(span.Slice(offset), reason);
writer.Advance(offset);
return offset;
}
}
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/CloseResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ public CloseResponse(uint correlationId, ResponseCode responseCode)

public ResponseCode ResponseCode => responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out CloseResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
Expand Down
88 changes: 74 additions & 14 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,34 +83,94 @@ public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>
return new Connection(socket, commandCallback, closedCallBack, sslOption);
}

public async ValueTask<bool> Write<T>(T command) where T : struct, ICommand
public ValueTask<bool> Write<T>(T command) where T : struct, ICommand
{
await WriteCommand(command).ConfigureAwait(false);
// we return true to indicate that the command was written
// In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220
// we made all WriteCommand async so await is enough to indicate that the command was written
// We decided to keep the return value to avoid a breaking change
return true;
if (!_writeLock.Wait(0))
{
// https://blog.marcgravell.com/2018/07/pipe-dreams-part-3.html
var writeSlowPath = WriteCommandAsyncSlowPath(command);
writeSlowPath.ConfigureAwait(false);
return writeSlowPath;
}
else
{
var release = true;
try
{
var payloadSize = WriteCommandPayloadSize(command);
var written = command.Write(writer);
Debug.Assert(payloadSize == written);
var flush = writer.FlushAsync();
flush.ConfigureAwait(false);
if (flush.IsCompletedSuccessfully)
{
// we return true to indicate that the command was written
// In this PR https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/220
// we made all WriteCommand async so await is enough to indicate that the command was written
// We decided to keep the return value to avoid a breaking change
return ValueTask.FromResult(true);
}
else
{
release = false;
return AwaitFlushAndRelease(flush);
}
}
finally
{
if (release)
{
_writeLock.Release();
}
}
}
}

private async Task WriteCommand<T>(T command) where T : struct, ICommand
private async ValueTask<bool> WriteCommandAsyncSlowPath<T>(T command) where T : struct, ICommand
{
// Only one thread should be able to write to the output pipeline at a time.
await _writeLock.WaitAsync().ConfigureAwait(false);
try
{
var size = command.SizeNeeded;
var mem = new byte[4 + size]; // + 4 to write the size
WireFormatting.WriteUInt32(mem, (uint)size);
var written = command.Write(mem.AsSpan()[4..]);
await writer.WriteAsync(new ReadOnlyMemory<byte>(mem)).ConfigureAwait(false);
Debug.Assert(size == written);
var payloadSize = WriteCommandPayloadSize(command);
var written = command.Write(writer);
Debug.Assert(payloadSize == written);
await writer.FlushAsync().ConfigureAwait(false);
}
finally
{
_writeLock.Release();
}

return true;
}

private async ValueTask<bool> AwaitFlushAndRelease(ValueTask<FlushResult> task)
{
try
{
await task.ConfigureAwait(false);
}
finally
{
_writeLock.Release();
}

return true;
}

private int WriteCommandPayloadSize<T>(T command) where T : struct, ICommand
{
/*
* TODO FUTURE
* This code could be moved into a common base class for all outgoing
* commands
*/
var payloadSize = command.SizeNeeded;
var mem = writer.GetSpan(WireFormatting.SizeofUInt32);
var written = WireFormatting.WriteUInt32(mem, (uint)payloadSize);
writer.Advance(written);
return payloadSize;
}

private async Task ProcessIncomingFrames()
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private ConsumerUpdateQueryResponse(uint correlationId, byte subscriptionId, byt
public bool IsActive => active == 1;
public int SizeNeeded => throw new NotImplementedException();

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/ConsumerUpdateRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client;

Expand All @@ -27,13 +27,15 @@ public int SizeNeeded
}
}

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), _correlationId);
offset += WireFormatting.WriteUInt16(span.Slice(offset), (ushort)ResponseCode.Ok);
offset += OffsetSpecification.Write(span.Slice(offset));
writer.Advance(offset);
return offset;
}

Expand Down
8 changes: 6 additions & 2 deletions RabbitMQ.Stream.Client/Create.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public int SizeNeeded
}
}

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);

var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
Expand All @@ -49,6 +51,7 @@ public int Write(Span<byte> span)
offset += WireFormatting.WriteString(span.Slice(offset), value);
}

writer.Advance(offset);
return offset;
}
}
Expand All @@ -71,10 +74,11 @@ public CreateResponse(uint correlationId, ushort responseCode)

public ResponseCode ResponseCode => (ResponseCode)responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out CreateResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/Credit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client
{
Expand All @@ -19,13 +19,15 @@ public CreditRequest(byte subscriptionId, ushort credit)
}
public int SizeNeeded => 7;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var command = (ICommand)this;
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), command.Version);
offset += WireFormatting.WriteByte(span.Slice(offset), subscriptionId);
offset += WireFormatting.WriteUInt16(span.Slice(offset), credit);
writer.Advance(offset);
return offset;
}
}
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/CreditResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private CreditResponse(ResponseCode responseCode, byte subscriptionId)

private ResponseCode ResponseCode { get; }

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/DeclarePublisherRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client
{
Expand All @@ -24,14 +24,16 @@ public DeclarePublisherRequest(uint correlationId, byte publisherId, string publ

public int SizeNeeded => 8 + 1 + WireFormatting.StringSize(publisherRef) + WireFormatting.StringSize(stream);

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteByte(span.Slice(offset), publisherId);
offset += WireFormatting.WriteString(span.Slice(offset), publisherRef);
offset += WireFormatting.WriteString(span.Slice(offset), stream);
writer.Advance(offset);
return offset;
}
}
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/DeclarePublisherResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public DeclarePublisherResponse(uint correlationId, ResponseCode responseCode)

public ResponseCode ResponseCode => responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
7 changes: 5 additions & 2 deletions RabbitMQ.Stream.Client/Delete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ public DeleteRequest(uint correlationId, string stream)

public int SizeNeeded => 8 + WireFormatting.StringSize(stream);

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteString(span.Slice(offset), stream);
writer.Advance(offset);
return offset;
}
}
Expand All @@ -49,10 +51,11 @@ public DeleteResponse(uint correlationId, ushort responseCode)

public ResponseCode ResponseCode => (ResponseCode)responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out DeleteResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/DeletePublisherRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client
{
Expand All @@ -20,12 +20,14 @@ public DeletePublisherRequest(uint correlationId, byte publisherId)

public int SizeNeeded => 8 + 1;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
offset += WireFormatting.WriteUInt32(span.Slice(offset), correlationId);
offset += WireFormatting.WriteByte(span.Slice(offset), publisherId);
writer.Advance(offset);
return offset;
}
}
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/DeletePublisherResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ public DeletePublisherResponse(uint correlationId, ResponseCode responseCode)

public ResponseCode ResponseCode => responseCode;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out DeletePublisherResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Deliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private Deliver(byte subscriptionId, Chunk chunk)

public byte SubscriptionId => subscriptionId;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
throw new NotImplementedException();
}
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/HeartBeatRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client;

Expand All @@ -12,10 +12,12 @@ namespace RabbitMQ.Stream.Client;

public int SizeNeeded => 4;

public int Write(Span<byte> span)
public int Write(IBufferWriter<byte> writer)
{
var span = writer.GetSpan(SizeNeeded);
var offset = WireFormatting.WriteUInt16(span, Key);
offset += WireFormatting.WriteUInt16(span.Slice(offset), ((ICommand)this).Version);
writer.Advance(offset);
return offset;
}
}
Loading

0 comments on commit 1606f7b

Please sign in to comment.