Skip to content

Commit

Permalink
Support for new Slic ping/pong frame protocol (#3348)
Browse files Browse the repository at this point in the history
  • Loading branch information
bentoi authored Jun 13, 2023
1 parent ca49b5a commit 8aedc66
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async ValueTask PerformWriteAsync()
await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);

// After each successful write, we schedule one ping (keep alive or heartbeat) at _writeIdleTimeout / 2 in
// the future. Since each ping is itself a write, if there is no application activity at all, we'll send
// the future. Since each ping is itself a write, if there is no application activity at all, we'll send
// successive pings at _writeIdleTimeout / 2 intervals.
ScheduleKeepAlive();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) ZeroC, Inc.

using IceRpc.Slice;

namespace IceRpc.Transports.Slic.Internal;

/// <summary>Provides an extension method for decoding a 64-bit opaque data value into a <see langword="long"
/// />.</summary>
public static class OpaqueDataSliceDecoderExtensions
{
/// <summary>Decodes a 64-bit opaque data value.</summary>
/// <param name="decoder">The Slice decoder.</param>
/// <returns>The opaque data value decoded as a <see langword="long"/>.</returns>
public static long DecodeOpaqueData(this ref SliceDecoder decoder) => decoder.DecodeInt64();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) ZeroC, Inc.

using IceRpc.Slice;

namespace IceRpc.Transports.Slic.Internal;

/// <summary>Provides an extension method for encoding a <see langword="long" /> into a 64-bit opaque data
/// value.</summary>
public static class OpaqueDataSliceEncoderExtensions
{
/// <summary>Encodes a <see langword="long" /> as a 64-bit opaque data value.</summary>
/// <param name="encoder">The Slice encoder.</param>
/// <param name="value">The value to encode.</param>
public static void EncodeOpaqueData(this ref SliceEncoder encoder, long value) => encoder.EncodeInt64(value);
}
76 changes: 60 additions & 16 deletions src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal class SlicConnection : IMultiplexedConnection
private readonly int _packetMaxSize;
private IceRpcError? _peerCloseError;
private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
private int _pongIsPending;
private Task? _readFramesTask;

private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
Expand All @@ -79,7 +80,6 @@ internal class SlicConnection : IMultiplexedConnection
// followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
// connection.
private bool _writerIsShutdown;

private readonly SemaphoreSlim _writeSemaphore = new(1, 1);

public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -297,18 +297,28 @@ static InitializeAckBody DecodeInitializeAckOrVersion(FrameType frameType, ReadO

void KeepAlive()
{
try
{
WriteConnectionFrame(FrameType.Ping, encode: null);
}
catch (IceRpcException)
{
// Expected if the connection failed.
}
catch (Exception exception)
// TODO: KeepAlive is called every IdleTimeout / 2 if it's not deferred by a write. Instead, we could
// consider disabling the KeepAlive timer until the pong frame is received.

// If not waiting for a pong frame (_pongIsPending=0), send a new ping frame.
if (Interlocked.CompareExchange(ref _pongIsPending, 1, 0) == 0)
{
Debug.Fail($"ping failed with an unexpected exception: {exception}");
throw;
try
{
// For now, the Ping frame payload is just a long which is always set to 0. In the future, it could
// be a ping frame type value if the ping frame is used for different purpose (e.g: a KeepAlive or
// RTT ping frame type).
var pingBody = new PingBody(0L);
WriteConnectionFrame(FrameType.Ping, pingBody.Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The Slic keep alive timer failed with an unexpected exception: {exception}");
}
}
}

Expand Down Expand Up @@ -1015,13 +1025,11 @@ private Task ReadFrameAsync(FrameType type, int size, ulong? streamId, Cancellat
}
case FrameType.Ping:
{
WriteConnectionFrame(FrameType.Pong, encode: null);
return Task.CompletedTask;
return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
}
case FrameType.Pong:
{
// Nothing to do, the duplex connection reader keeps track of the last activity time.
return Task.CompletedTask;
return ReadPongFrameAsync(size, cancellationToken);
}
case FrameType.Stream:
case FrameType.StreamLast:
Expand Down Expand Up @@ -1100,6 +1108,42 @@ async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
}
}

async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
{
// Read the ping frame.
PingBody pingBody = await ReadFrameBodyAsync(
size,
(ref SliceDecoder decoder) => new PingBody(ref decoder),
cancellationToken).ConfigureAwait(false);

// Return a pong frame with the ping payload.
WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode);
}

async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _pongIsPending, 0, 1) == 1)
{
// If waiting for a pong frame (_pongIsPending=1), ensure the pong frame payload value is expected.

PongBody pongBody = await ReadFrameBodyAsync(
size,
(ref SliceDecoder decoder) => new PongBody(ref decoder),
cancellationToken).ConfigureAwait(false);

// For now, we only send a 0 payload value.
if (pongBody.Payload != 0L)
{
throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
}
}
else
{
// If not waiting for a pong frame (_pongIsPending=0), this pong frame is unexpected.
throw new InvalidDataException($"Received an unexpected {nameof(FrameType.Pong)} frame.");
}
}

async Task ReadStreamConsumedFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
{
StreamConsumedBody frame = await ReadFrameBodyAsync(
Expand Down

0 comments on commit 8aedc66

Please sign in to comment.