From 8aedc6680967e8177f77f988416213e8648e4ad8 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Tue, 13 Jun 2023 17:27:42 +0200 Subject: [PATCH] Support for new Slic ping/pong frame protocol (#3348) --- .../IdleTimeoutDuplexConnectionDecorator.cs | 2 +- .../OpaqueDataSliceDecoderExtensions.cs | 15 ++++ .../OpaqueDataSliceEncoderExtensions.cs | 15 ++++ .../Slic/Internal/SlicConnection.cs | 76 +++++++++++++++---- 4 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceDecoderExtensions.cs create mode 100644 src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceEncoderExtensions.cs diff --git a/src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs b/src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs index 17b50f3f4e..031c1bf95c 100644 --- a/src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs +++ b/src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs @@ -73,7 +73,7 @@ async ValueTask PerformWriteAsync() await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); // After each successful write, we schedule one ping (keep alive or heartbeat) at _writeIdleTimeout / 2 in - // the future. Since each ping is itself a write, if there is no application activity at all, we'll send + // the future. Since each ping is itself a write, if there is no application activity at all, we'll send // successive pings at _writeIdleTimeout / 2 intervals. ScheduleKeepAlive(); } diff --git a/src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceDecoderExtensions.cs b/src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceDecoderExtensions.cs new file mode 100644 index 0000000000..d12fdd7d45 --- /dev/null +++ b/src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceDecoderExtensions.cs @@ -0,0 +1,15 @@ +// Copyright (c) ZeroC, Inc. + +using IceRpc.Slice; + +namespace IceRpc.Transports.Slic.Internal; + +/// Provides an extension method for decoding a 64-bit opaque data value into a . +public static class OpaqueDataSliceDecoderExtensions +{ + /// Decodes a 64-bit opaque data value. + /// The Slice decoder. + /// The opaque data value decoded as a . + public static long DecodeOpaqueData(this ref SliceDecoder decoder) => decoder.DecodeInt64(); +} diff --git a/src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceEncoderExtensions.cs b/src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceEncoderExtensions.cs new file mode 100644 index 0000000000..201edb8e3e --- /dev/null +++ b/src/IceRpc/Transports/Slic/Internal/OpaqueDataSliceEncoderExtensions.cs @@ -0,0 +1,15 @@ +// Copyright (c) ZeroC, Inc. + +using IceRpc.Slice; + +namespace IceRpc.Transports.Slic.Internal; + +/// Provides an extension method for encoding a into a 64-bit opaque data +/// value. +public static class OpaqueDataSliceEncoderExtensions +{ + /// Encodes a as a 64-bit opaque data value. + /// The Slice encoder. + /// The value to encode. + public static void EncodeOpaqueData(this ref SliceEncoder encoder, long value) => encoder.EncodeInt64(value); +} diff --git a/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs b/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs index df8d3e4318..fa4a929071 100644 --- a/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs +++ b/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs @@ -64,6 +64,7 @@ internal class SlicConnection : IMultiplexedConnection private readonly int _packetMaxSize; private IceRpcError? _peerCloseError; private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan; + private int _pongIsPending; private Task? _readFramesTask; private readonly ConcurrentDictionary _streams = new(); @@ -79,7 +80,6 @@ internal class SlicConnection : IMultiplexedConnection // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server // connection. private bool _writerIsShutdown; - private readonly SemaphoreSlim _writeSemaphore = new(1, 1); public async ValueTask AcceptStreamAsync(CancellationToken cancellationToken) @@ -297,18 +297,28 @@ static InitializeAckBody DecodeInitializeAckOrVersion(FrameType frameType, ReadO void KeepAlive() { - try - { - WriteConnectionFrame(FrameType.Ping, encode: null); - } - catch (IceRpcException) - { - // Expected if the connection failed. - } - catch (Exception exception) + // TODO: KeepAlive is called every IdleTimeout / 2 if it's not deferred by a write. Instead, we could + // consider disabling the KeepAlive timer until the pong frame is received. + + // If not waiting for a pong frame (_pongIsPending=0), send a new ping frame. + if (Interlocked.CompareExchange(ref _pongIsPending, 1, 0) == 0) { - Debug.Fail($"ping failed with an unexpected exception: {exception}"); - throw; + try + { + // For now, the Ping frame payload is just a long which is always set to 0. In the future, it could + // be a ping frame type value if the ping frame is used for different purpose (e.g: a KeepAlive or + // RTT ping frame type). + var pingBody = new PingBody(0L); + WriteConnectionFrame(FrameType.Ping, pingBody.Encode); + } + catch (IceRpcException) + { + // Expected if the connection is closed. + } + catch (Exception exception) + { + Debug.Fail($"The Slic keep alive timer failed with an unexpected exception: {exception}"); + } } } @@ -1015,13 +1025,11 @@ private Task ReadFrameAsync(FrameType type, int size, ulong? streamId, Cancellat } case FrameType.Ping: { - WriteConnectionFrame(FrameType.Pong, encode: null); - return Task.CompletedTask; + return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken); } case FrameType.Pong: { - // Nothing to do, the duplex connection reader keeps track of the last activity time. - return Task.CompletedTask; + return ReadPongFrameAsync(size, cancellationToken); } case FrameType.Stream: case FrameType.StreamLast: @@ -1100,6 +1108,42 @@ async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken) } } + async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken) + { + // Read the ping frame. + PingBody pingBody = await ReadFrameBodyAsync( + size, + (ref SliceDecoder decoder) => new PingBody(ref decoder), + cancellationToken).ConfigureAwait(false); + + // Return a pong frame with the ping payload. + WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode); + } + + async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken) + { + if (Interlocked.CompareExchange(ref _pongIsPending, 0, 1) == 1) + { + // If waiting for a pong frame (_pongIsPending=1), ensure the pong frame payload value is expected. + + PongBody pongBody = await ReadFrameBodyAsync( + size, + (ref SliceDecoder decoder) => new PongBody(ref decoder), + cancellationToken).ConfigureAwait(false); + + // For now, we only send a 0 payload value. + if (pongBody.Payload != 0L) + { + throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload."); + } + } + else + { + // If not waiting for a pong frame (_pongIsPending=0), this pong frame is unexpected. + throw new InvalidDataException($"Received an unexpected {nameof(FrameType.Pong)} frame."); + } + } + async Task ReadStreamConsumedFrameAsync(int size, ulong streamId, CancellationToken cancellationToken) { StreamConsumedBody frame = await ReadFrameBodyAsync(