Skip to content

Commit

Permalink
Set state to 'closed' when CloseAsync() is called (#3149) (#3160)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhipsaMisra authored Mar 14, 2023
1 parent 6463887 commit 008b3fd
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions iothub/device/src/Transport/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

namespace Microsoft.Azure.Devices.Client.Transport
{
internal enum ClientTransportStatus
{
Closed = 0,
Open = 1,
}

internal class RetryDelegatingHandler : DefaultDelegatingHandler
{
// RetryCount is used for testing purpose and is equal to MaxValue in prod.
Expand All @@ -37,7 +43,7 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler
private bool _eventsEnabled;
private bool _deviceReceiveMessageEnabled;
private bool _isDisposing;
private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0.
private long _clientTransportState; // references the current client transport status as the int value of ClientTransportStatus

private Task _transportClosedTask;

Expand Down Expand Up @@ -763,6 +769,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
}
finally
{
SetClientTransportStatus(ClientTransportStatus.Closed);
Dispose(true);

if (Logging.IsEnabled)
Expand All @@ -785,15 +792,15 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Interlocked.Read(ref _isOpened) == 1)
if (GetClientTransportStatus() == ClientTransportStatus.Open)
{
return;
}

await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (Interlocked.Read(ref _isOpened) == 0)
if (GetClientTransportStatus() == ClientTransportStatus.Closed)
{
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));

Expand All @@ -811,7 +818,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat

if (!_disposed)
{
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_openCalled = true;

// Send the request for transport close notification.
Expand Down Expand Up @@ -853,7 +860,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Interlocked.Read(ref _isOpened) == 1)
if (GetClientTransportStatus() == ClientTransportStatus.Open)
{
return;
}
Expand All @@ -862,7 +869,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

try
{
if (Interlocked.Read(ref _isOpened) == 0)
if (GetClientTransportStatus() == ClientTransportStatus.Closed)
{
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));

Expand All @@ -880,7 +887,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

if (!_disposed)
{
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_openCalled = true;

// Send the request for transport close notification.
Expand Down Expand Up @@ -1042,7 +1049,7 @@ private async Task HandleDisconnectAsync()
Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync));

await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false);
_ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed"
SetClientTransportStatus(ClientTransportStatus.Closed);

try
{
Expand Down Expand Up @@ -1111,7 +1118,7 @@ await _internalRetryPolicy.ExecuteAsync(async () =>
// Send the request for transport close notification.
_transportClosedTask = HandleDisconnectAsync();

_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

Logging.Info(this, "Subscriptions recovered.", nameof(HandleDisconnectAsync));
Expand Down Expand Up @@ -1175,6 +1182,16 @@ private void HandleConnectionStatusExceptions(Exception exception, bool connectF
Logging.Info(this, $"Connection status change: status={status}, reason={reason}", nameof(HandleConnectionStatusExceptions));
}

private ClientTransportStatus GetClientTransportStatus()
{
return (ClientTransportStatus)Interlocked.Read(ref _clientTransportState);
}

private void SetClientTransportStatus(ClientTransportStatus clientTransportStatus)
{
_ = Interlocked.Exchange(ref _clientTransportState, (int)clientTransportStatus);
}

protected override void Dispose(bool disposing)
{
try
Expand All @@ -1187,8 +1204,10 @@ protected override void Dispose(bool disposing)
if (!_disposed)
{
_isDisposing = true;
SetClientTransportStatus(ClientTransportStatus.Closed);

base.Dispose(disposing);

if (disposing)
{
var disposables = new List<IDisposable>
Expand Down

0 comments on commit 008b3fd

Please sign in to comment.