From 6463887a31c37ceb32c45e6737d91e91c7e55994 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Tue, 7 Mar 2023 20:09:55 -0800 Subject: [PATCH] Bring semaphore synchronization updates in retry delegating handler to LTS (#3148) * fix(iot-dev): Fix bug where client's retry policy applied n^2 times rather than n times (#2209) * Fix synchronization of semaphore in retry delegating handler (#3135) --------- Co-authored-by: timtay-microsoft --- build.ps1 | 2 +- e2e/test/iothub/FileUploadE2ETests.cs | 1 + .../iothub/service/RegistryManagerE2ETests.cs | 1 + .../src/Transport/DefaultDelegatingHandler.cs | 5 +- .../src/Transport/RetryDelegatingHandler.cs | 553 +++++++++++++----- .../tests/RetryDelegatingHandlerTests.cs | 358 +++++++----- iothub/service/src/IoTHubExceptionResult.cs | 3 + vsts/vsts.yaml | 9 +- 8 files changed, 616 insertions(+), 316 deletions(-) diff --git a/build.ps1 b/build.ps1 index b8c450b0c3..eb9ae45ef0 100644 --- a/build.ps1 +++ b/build.ps1 @@ -151,7 +151,7 @@ Function BuildPackage($path, $message) SignDotNetBinary $filesToSign } - & dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --output $localPackages + & dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --property:PackageOutputPath=$localPackages if ($LASTEXITCODE -ne 0) { diff --git a/e2e/test/iothub/FileUploadE2ETests.cs b/e2e/test/iothub/FileUploadE2ETests.cs index 968ebb2ca4..4bf182f4bb 100644 --- a/e2e/test/iothub/FileUploadE2ETests.cs +++ b/e2e/test/iothub/FileUploadE2ETests.cs @@ -97,6 +97,7 @@ public async Task FileUpload_SmallFile_Http_GranularSteps_x509() [LoggedTestMethod] [TestCategory("LongRunning")] + [TestCategory("Proxy")] public async Task FileUpload_SmallFile_Http_GranularSteps_Proxy() { string filename = await GetTestFileNameAsync(FileSizeSmall).ConfigureAwait(false); diff --git a/e2e/test/iothub/service/RegistryManagerE2ETests.cs b/e2e/test/iothub/service/RegistryManagerE2ETests.cs index 4af2101231..11aff2deaf 100644 --- a/e2e/test/iothub/service/RegistryManagerE2ETests.cs +++ b/e2e/test/iothub/service/RegistryManagerE2ETests.cs @@ -117,6 +117,7 @@ public async Task RegistryManager_AddDeviceWithTwinWithDeviceCapabilities() } [LoggedTestMethod] + [TestCategory("Proxy")] public async Task RegistryManager_AddDeviceWithProxy() { string deviceId = _devicePrefix + Guid.NewGuid(); diff --git a/iothub/device/src/Transport/DefaultDelegatingHandler.cs b/iothub/device/src/Transport/DefaultDelegatingHandler.cs index 6db75b1e18..2182c11733 100644 --- a/iothub/device/src/Transport/DefaultDelegatingHandler.cs +++ b/iothub/device/src/Transport/DefaultDelegatingHandler.cs @@ -11,8 +11,9 @@ namespace Microsoft.Azure.Devices.Client.Transport { internal abstract class DefaultDelegatingHandler : IDelegatingHandler { - private volatile IDelegatingHandler _innerHandler; + protected const string ClientDisposedMessage = "The client has been disposed and is no longer usable."; protected volatile bool _disposed; + private volatile IDelegatingHandler _innerHandler; protected DefaultDelegatingHandler(IPipelineContext context, IDelegatingHandler innerHandler) { @@ -208,7 +209,7 @@ protected void ThrowIfDisposed() { if (_disposed) { - throw new ObjectDisposedException("IoT Client"); + throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } } diff --git a/iothub/device/src/Transport/RetryDelegatingHandler.cs b/iothub/device/src/Transport/RetryDelegatingHandler.cs index a4c62e3343..62eefaabc3 100644 --- a/iothub/device/src/Transport/RetryDelegatingHandler.cs +++ b/iothub/device/src/Transport/RetryDelegatingHandler.cs @@ -20,16 +20,34 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler private RetryPolicy _internalRetryPolicy; - private SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1); +#pragma warning disable CA2213 + // The semaphores are getting disposed in the Dispose() block below but it seems like + // Microsoft.CodeAnalysis.FxCopAnalyzers isn't able to analyze and interpret our code successfully. + // We've moved to Microsoft.CodeAnalysis.NetAnalyzers in main and it understands the below disposal block successfully. + private readonly SemaphoreSlim _clientOpenSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _cloudToDeviceMessageSubscriptionSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _cloudToDeviceEventSubscriptionSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _directMethodSubscriptionSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _twinEventsSubscriptionSemaphore = new SemaphoreSlim(1, 1); +#pragma warning restore CA2213 + private bool _openCalled; - private bool _opened; private bool _methodsEnabled; private bool _twinEnabled; private bool _eventsEnabled; private bool _deviceReceiveMessageEnabled; + private bool _isDisposing; + private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0. private Task _transportClosedTask; + +#pragma warning disable CA2213 + // The cancellation token sources are getting canceled and disposed in the Dispose() block below but it seems like + // Microsoft.CodeAnalysis.FxCopAnalyzers isn't able to analyze and interpret our code successfully. + // We've moved to Microsoft.CodeAnalysis.NetAnalyzers in main and it understands the below disposal block successfully. private readonly CancellationTokenSource _handleDisconnectCts = new CancellationTokenSource(); + private readonly CancellationTokenSource _cancelPendingOperationsCts = new CancellationTokenSource(); +#pragma warning restore CA2213 private readonly ConnectionStatusChangesHandler _onConnectionStatusChanged; @@ -73,20 +91,22 @@ public override async Task SendEventAsync(Message message, CancellationToken can { Logging.Enter(this, message, cancellationToken, nameof(SendEventAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); if (message.IsBodyCalled) { message.ResetBody(); } - await base.SendEventAsync(message, cancellationToken).ConfigureAwait(false); + await base.SendEventAsync(message, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -101,11 +121,13 @@ public override async Task SendEventAsync(IEnumerable messages, Cancell { Logging.Enter(this, messages, cancellationToken, nameof(SendEventAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); foreach (Message m in messages) { @@ -115,9 +137,9 @@ await _internalRetryPolicy } } - await base.SendEventAsync(messages, cancellationToken).ConfigureAwait(false); + await base.SendEventAsync(messages, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -132,14 +154,16 @@ public override async Task SendMethodResponseAsync(MethodResponseInternal method { Logging.Enter(this, method, cancellationToken, nameof(SendMethodResponseAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await base.SendMethodResponseAsync(method, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.SendMethodResponseAsync(method, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -154,14 +178,16 @@ public override async Task ReceiveAsync(CancellationToken cancellationT { Logging.Enter(this, cancellationToken, nameof(ReceiveAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + return await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - return await base.ReceiveAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + return await base.ReceiveAsync(operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -177,14 +203,16 @@ public override async Task ReceiveAsync(TimeoutHelper timeoutHelper) Logging.Enter(this, timeoutHelper, nameof(ReceiveAsync)); using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime()); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token); + return await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(timeoutHelper).ConfigureAwait(false); + await EnsureOpenedAsync(false, timeoutHelper).ConfigureAwait(false); return await base.ReceiveAsync(timeoutHelper).ConfigureAwait(false); }, - cts.Token) + operationCts.Token) .ConfigureAwait(false); } finally @@ -199,28 +227,38 @@ public override async Task EnableReceiveMessageAsync(CancellationToken cancellat { Logging.Enter(this, cancellationToken, nameof(EnableReceiveMessageAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - // Ensure that the connection has been opened, before enabling the callback for receiving messages. - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); - // Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // Wait to acquire the _cloudToDeviceSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. + await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { // The telemetry downlink needs to be enabled only for the first time that the callback is set. Debug.Assert(!_deviceReceiveMessageEnabled); - await base.EnableReceiveMessageAsync(cancellationToken).ConfigureAwait(false); + await base.EnableReceiveMessageAsync(operationCts.Token).ConfigureAwait(false); _deviceReceiveMessageEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceMessageSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -237,28 +275,38 @@ public override async Task EnsurePendingMessagesAreDeliveredAsync(CancellationTo { if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(EnsurePendingMessagesAreDeliveredAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - // Ensure that the connection has been opened before returning pending messages to the callback. - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); - // Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. + await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { // Ensure that a callback for receiving messages has been previously set. Debug.Assert(_deviceReceiveMessageEnabled); - await base.EnsurePendingMessagesAreDeliveredAsync(cancellationToken).ConfigureAwait(false); + await base.EnsurePendingMessagesAreDeliveredAsync(operationCts.Token).ConfigureAwait(false); } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceMessageSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -273,28 +321,38 @@ public override async Task DisableReceiveMessageAsync(CancellationToken cancella { Logging.Enter(this, cancellationToken, nameof(DisableReceiveMessageAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - // Ensure that the connection has been opened, before disabling the callback for receiving messages. - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); - // Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. + await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { // Ensure that a callback for receiving messages has been previously set. Debug.Assert(_deviceReceiveMessageEnabled); - await base.DisableReceiveMessageAsync(cancellationToken).ConfigureAwait(false); + await base.DisableReceiveMessageAsync(operationCts.Token).ConfigureAwait(false); _deviceReceiveMessageEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceMessageSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -309,25 +367,35 @@ public override async Task EnableMethodsAsync(CancellationToken cancellationToke { Logging.Enter(this, cancellationToken, nameof(EnableMethodsAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _directMethodSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(!_methodsEnabled); - await base.EnableMethodsAsync(cancellationToken).ConfigureAwait(false); + await base.EnableMethodsAsync(operationCts.Token).ConfigureAwait(false); _methodsEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _directMethodSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing direct method subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -342,24 +410,35 @@ public override async Task DisableMethodsAsync(CancellationToken cancellationTok { Logging.Enter(this, cancellationToken, nameof(DisableMethodsAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _directMethodSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(_methodsEnabled); - await base.DisableMethodsAsync(cancellationToken).ConfigureAwait(false); + await base.DisableMethodsAsync(operationCts.Token).ConfigureAwait(false); _methodsEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _directMethodSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing direct method subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -372,26 +451,38 @@ public override async Task EnableEventReceiveAsync(CancellationToken cancellatio { try { - Logging.Enter(this, cancellationToken, nameof(EnableEventReceiveAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, cancellationToken, nameof(EnableEventReceiveAsync)); + + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _cloudToDeviceEventSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { - await base.EnableEventReceiveAsync(cancellationToken).ConfigureAwait(false); Debug.Assert(!_eventsEnabled); + await base.EnableEventReceiveAsync(operationCts.Token).ConfigureAwait(false); _eventsEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceEventSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -404,26 +495,38 @@ public override async Task DisableEventReceiveAsync(CancellationToken cancellati { try { - Logging.Enter(this, cancellationToken, nameof(DisableEventReceiveAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, cancellationToken, nameof(DisableEventReceiveAsync)); + + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _cloudToDeviceEventSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(_eventsEnabled); - await base.DisableEventReceiveAsync(cancellationToken).ConfigureAwait(false); + await base.DisableEventReceiveAsync(operationCts.Token).ConfigureAwait(false); _eventsEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceEventSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -438,24 +541,35 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo { Logging.Enter(this, cancellationToken, nameof(EnableTwinPatchAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _twinEventsSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(!_twinEnabled); - await base.EnableTwinPatchAsync(cancellationToken).ConfigureAwait(false); + await base.EnableTwinPatchAsync(operationCts.Token).ConfigureAwait(false); _twinEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _twinEventsSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -470,24 +584,35 @@ public override async Task DisableTwinPatchAsync(CancellationToken cancellationT { Logging.Enter(this, cancellationToken, nameof(DisableTwinPatchAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _twinEventsSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(_twinEnabled); - await base.DisableTwinPatchAsync(cancellationToken).ConfigureAwait(false); + await base.DisableTwinPatchAsync(operationCts.Token).ConfigureAwait(false); _twinEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _twinEventsSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -502,14 +627,16 @@ public override async Task SendTwinGetAsync(CancellationToken cancellation { Logging.Enter(this, cancellationToken, nameof(SendTwinGetAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + return await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - return await base.SendTwinGetAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + return await base.SendTwinGetAsync(operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -524,14 +651,16 @@ public override async Task SendTwinPatchAsync(TwinCollection reportedProperties, { Logging.Enter(this, reportedProperties, cancellationToken, nameof(SendTwinPatchAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await base.SendTwinPatchAsync(reportedProperties, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.SendTwinPatchAsync(reportedProperties, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -546,14 +675,16 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can { Logging.Enter(this, lockToken, cancellationToken, nameof(CompleteAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await base.CompleteAsync(lockToken, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.CompleteAsync(lockToken, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -568,14 +699,16 @@ public override async Task AbandonAsync(string lockToken, CancellationToken canc { Logging.Enter(this, lockToken, cancellationToken, nameof(AbandonAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await base.AbandonAsync(lockToken, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.AbandonAsync(lockToken, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -590,14 +723,16 @@ public override async Task RejectAsync(string lockToken, CancellationToken cance { Logging.Enter(this, lockToken, cancellationToken, nameof(RejectAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .ExecuteAsync( async () => { - await EnsureOpenedAsync(cancellationToken).ConfigureAwait(false); - await base.RejectAsync(lockToken, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.RejectAsync(lockToken, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -608,12 +743,11 @@ await _internalRetryPolicy public override Task OpenAsync(CancellationToken cancellationToken) { - return EnsureOpenedAsync(cancellationToken); + return EnsureOpenedAsync(true, cancellationToken); } public override async Task CloseAsync(CancellationToken cancellationToken) { - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (!_openCalled) @@ -624,39 +758,42 @@ public override async Task CloseAsync(CancellationToken cancellationToken) Logging.Enter(this, cancellationToken, nameof(CloseAsync)); _handleDisconnectCts.Cancel(); + _cancelPendingOperationsCts.Cancel(); await base.CloseAsync(cancellationToken).ConfigureAwait(false); } finally { - Logging.Exit(this, cancellationToken, nameof(CloseAsync)); - - _handlerSemaphore?.Release(); Dispose(true); + + if (Logging.IsEnabled) + Logging.Exit(this, cancellationToken, nameof(CloseAsync)); } } /// /// Implicit open handler. /// - private async Task EnsureOpenedAsync(CancellationToken cancellationToken) + private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellationToken) { + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + // If this object has already been disposed, we will throw an exception indicating that. // This is the entry point for interacting with the client and this safety check should be done here. // The current behavior does not support open->close->open if (_disposed) { - throw new ObjectDisposedException(nameof(RetryDelegatingHandler)); + throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } - if (Volatile.Read(ref _opened)) + if (Interlocked.Read(ref _isOpened) == 1) { return; } - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { - if (!_opened) + if (Interlocked.Read(ref _isOpened) == 0) { Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync)); @@ -664,7 +801,7 @@ private async Task EnsureOpenedAsync(CancellationToken cancellationToken) // we are returning the corresponding connection status change event => disconnected: retry_expired. try { - await OpenInternalAsync(cancellationToken).ConfigureAwait(false); + await OpenInternalAsync(withRetry, operationCts.Token).ConfigureAwait(false); } catch (Exception ex) when (!ex.IsFatal()) { @@ -674,7 +811,7 @@ private async Task EnsureOpenedAsync(CancellationToken cancellationToken) if (!_disposed) { - _opened = true; + _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" _openCalled = true; // Send the request for transport close notification. @@ -690,26 +827,42 @@ private async Task EnsureOpenedAsync(CancellationToken cancellationToken) } finally { - _handlerSemaphore?.Release(); + try + { + _clientOpenSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } } - private async Task EnsureOpenedAsync(TimeoutHelper timeoutHelper) + private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper) { - if (Volatile.Read(ref _opened)) + using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime()); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token); + + // If this object has already been disposed, we will throw an exception indicating that. + // This is the entry point for interacting with the client and this safety check should be done here. + // The current behavior does not support open->close->open + if (_disposed) { - return; + throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } - bool gain = await _handlerSemaphore.WaitAsync(timeoutHelper.GetRemainingTime()).ConfigureAwait(false); - if (!gain) + if (Interlocked.Read(ref _isOpened) == 1) { - throw new TimeoutException("Timed out to acquire handler lock."); + return; } + await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); + try { - if (!_opened) + if (Interlocked.Read(ref _isOpened) == 0) { Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync)); @@ -717,7 +870,7 @@ private async Task EnsureOpenedAsync(TimeoutHelper timeoutHelper) // we are returning the corresponding connection status change event => disconnected: retry_expired. try { - await OpenInternalAsync(timeoutHelper).ConfigureAwait(false); + await OpenInternalAsync(withRetry, timeoutHelper).ConfigureAwait(false); } catch (Exception ex) when (!ex.IsFatal()) { @@ -727,7 +880,7 @@ private async Task EnsureOpenedAsync(TimeoutHelper timeoutHelper) if (!_disposed) { - _opened = true; + _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" _openCalled = true; // Send the request for transport close notification. @@ -743,41 +896,79 @@ private async Task EnsureOpenedAsync(TimeoutHelper timeoutHelper) } finally { - _handlerSemaphore?.Release(); + try + { + _clientOpenSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } } - private Task OpenInternalAsync(CancellationToken cancellationToken) + private async Task OpenInternalAsync(bool withRetry, CancellationToken cancellationToken) { - return _internalRetryPolicy - .ExecuteAsync( - async () => - { - try - { - Logging.Enter(this, cancellationToken, nameof(OpenAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); - // Will throw on error. - await base.OpenAsync(cancellationToken).ConfigureAwait(false); - _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); - } - catch (Exception ex) when (!ex.IsFatal()) - { - HandleConnectionStatusExceptions(ex); - throw; - } - finally + if (withRetry) + { + await _internalRetryPolicy + .ExecuteAsync( + async () => { - Logging.Exit(this, cancellationToken, nameof(OpenAsync)); - } - }, - cancellationToken); + try + { + Logging.Enter(this, cancellationToken, nameof(OpenAsync)); + + // Will throw on error. + await base.OpenAsync(operationCts.Token).ConfigureAwait(false); + _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); + } + catch (Exception ex) when (!ex.IsFatal()) + { + HandleConnectionStatusExceptions(ex); + throw; + } + finally + { + Logging.Exit(this, cancellationToken, nameof(OpenAsync)); + } + }, + operationCts.Token).ConfigureAwait(false); + } + else + { + try + { + Logging.Enter(this, cancellationToken, nameof(OpenAsync)); + + // Will throw on error. + await base.OpenAsync(operationCts.Token).ConfigureAwait(false); + _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); + } + catch (Exception ex) when (!ex.IsFatal()) + { + HandleConnectionStatusExceptions(ex); + throw; + } + finally + { + Logging.Exit(this, cancellationToken, nameof(OpenAsync)); + } + } } - private async Task OpenInternalAsync(TimeoutHelper timeoutHelper) + private async Task OpenInternalAsync(bool withRetry, TimeoutHelper timeoutHelper) { using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime()); - await _internalRetryPolicy + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token); + + if (withRetry) + { + await _internalRetryPolicy .ExecuteAsync( async () => { @@ -799,8 +990,29 @@ await _internalRetryPolicy Logging.Exit(this, timeoutHelper, nameof(OpenAsync)); } }, - cts.Token) + operationCts.Token) .ConfigureAwait(false); + } + else + { + try + { + Logging.Enter(this, timeoutHelper, nameof(OpenAsync)); + + // Will throw on error. + await base.OpenAsync(timeoutHelper).ConfigureAwait(false); + _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); + } + catch (Exception ex) when (!ex.IsFatal()) + { + HandleConnectionStatusExceptions(ex); + throw; + } + finally + { + Logging.Exit(this, timeoutHelper, nameof(OpenAsync)); + } + } } // Triggered from connection loss event @@ -829,8 +1041,8 @@ private async Task HandleDisconnectAsync() Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync)); - await _handlerSemaphore.WaitAsync().ConfigureAwait(false); - _opened = false; + await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false); + _ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed" try { @@ -899,7 +1111,7 @@ await _internalRetryPolicy.ExecuteAsync(async () => // Send the request for transport close notification. _transportClosedTask = HandleDisconnectAsync(); - _opened = true; + _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); Logging.Info(this, "Subscriptions recovered.", nameof(HandleDisconnectAsync)); @@ -914,7 +1126,16 @@ await _internalRetryPolicy.ExecuteAsync(async () => } finally { - _handlerSemaphore?.Release(); + try + { + _clientOpenSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } } @@ -956,22 +1177,58 @@ private void HandleConnectionStatusExceptions(Exception exception, bool connectF protected override void Dispose(bool disposing) { - if (_disposed) + try { - return; - } + if (Logging.IsEnabled) + { + Logging.Enter(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_disposed}; disposing={disposing}", $"{nameof(RetryDelegatingHandler)}.{nameof(Dispose)}"); + } + + if (!_disposed) + { + _isDisposing = true; - base.Dispose(disposing); - if (disposing) + base.Dispose(disposing); + if (disposing) + { + var disposables = new List + { + _handleDisconnectCts, + _cancelPendingOperationsCts, + _clientOpenSemaphore, + _cloudToDeviceMessageSubscriptionSemaphore, + _cloudToDeviceEventSubscriptionSemaphore, + _directMethodSubscriptionSemaphore, + _twinEventsSubscriptionSemaphore, + }; + + _handleDisconnectCts?.Cancel(); + _cancelPendingOperationsCts?.Cancel(); + + foreach (IDisposable disposable in disposables) + { + try + { + disposable?.Dispose(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Tried disposing the IDisposable {disposable} but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } + } + } + + // the _disposed flag is inherited from the base class DefaultDelegatingHandler and is finally set to true there. + } + } + finally { - _handleDisconnectCts?.Cancel(); - _handleDisconnectCts?.Dispose(); - if (_handlerSemaphore != null && _handlerSemaphore.CurrentCount == 0) + if (Logging.IsEnabled) { - _handlerSemaphore.Release(); + Logging.Exit(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_disposed}; disposing={disposing}", $"{nameof(RetryDelegatingHandler)}.{nameof(Dispose)}"); } - _handlerSemaphore?.Dispose(); - _handlerSemaphore = null; } } } diff --git a/iothub/device/tests/RetryDelegatingHandlerTests.cs b/iothub/device/tests/RetryDelegatingHandlerTests.cs index 89621b6bb9..b885f36a61 100644 --- a/iothub/device/tests/RetryDelegatingHandlerTests.cs +++ b/iothub/device/tests/RetryDelegatingHandlerTests.cs @@ -11,6 +11,7 @@ using Microsoft.Azure.Devices.Client.Transport; using Microsoft.VisualStudio.TestTools.UnitTesting; using NSubstitute; +using FluentAssertions; namespace Microsoft.Azure.Devices.Client.Test { @@ -29,21 +30,23 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries() IPipelineContext contextMock = Substitute.For(); IDelegatingHandler innerHandlerMock = Substitute.For(); - innerHandlerMock.OpenAsync(Arg.Any()).Returns(t => - { - return ++callCounter == 1 - ? throw new IotHubException("Test transient exception", isTransient: true) - : TaskHelpers.CompletedTask; - }); + innerHandlerMock + .OpenAsync(Arg.Any()) + .Returns(t => + { + return ++callCounter == 1 + ? throw new IotHubException("Test transient exception", isTransient: true) + : TaskHelpers.CompletedTask; + }); innerHandlerMock.WaitForTransportClosedAsync().Returns(Task.Delay(TimeSpan.FromSeconds(10))); var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock); // act - await retryDelegatingHandler.OpenAsync(new CancellationToken()).ConfigureAwait(false); + await retryDelegatingHandler.OpenAsync(CancellationToken.None).ConfigureAwait(false); // assert - Assert.AreEqual(2, callCounter); + callCounter.Should().Be(2); } [TestMethod] @@ -54,29 +57,31 @@ public async Task RetryDelegatingHandler_SendEventAsyncRetries() IPipelineContext contextMock = Substitute.For(); IDelegatingHandler innerHandlerMock = Substitute.For(); - var message = new Message(new MemoryStream(new byte[] { 1, 2, 3 })); - innerHandlerMock.SendEventAsync(Arg.Is(message), Arg.Any()).Returns(t => - { - callCounter++; - - Message m = t.Arg(); - Stream stream = m.GetBodyStream(); - if (callCounter == 1) - { - throw new IotHubException(TestExceptionMessage, isTransient: true); - } - byte[] buffer = new byte[3]; - stream.Read(buffer, 0, 3); - return TaskHelpers.CompletedTask; ; - }); + using var message = new Message(new MemoryStream(new byte[] { 1, 2, 3 })); + innerHandlerMock + .SendEventAsync(Arg.Is(message), Arg.Any()) + .Returns(t => + { + callCounter++; + + Message m = t.Arg(); + Stream stream = m.GetBodyStream(); + if (callCounter == 1) + { + throw new IotHubException(TestExceptionMessage, isTransient: true); + } + byte[] buffer = new byte[3]; + stream.Read(buffer, 0, 3); + return TaskHelpers.CompletedTask; + }); var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock); // act - await retryDelegatingHandler.SendEventAsync(message, new CancellationToken()).ConfigureAwait(false); + await retryDelegatingHandler.SendEventAsync(message, CancellationToken.None).ConfigureAwait(false); // assert - Assert.AreEqual(2, callCounter); + callCounter.Should().Be(2); } [TestMethod] @@ -88,114 +93,130 @@ public async Task RetryDelegatingHandler_DoesNotRetryOnNotSupportedException() IPipelineContext contextMock = Substitute.For(); IDelegatingHandler innerHandlerMock = Substitute.For(); var memoryStream = new NotSeekableStream(new byte[] { 1, 2, 3 }); - var message = new Message(memoryStream); - innerHandlerMock.SendEventAsync(Arg.Is(message), Arg.Any()).Returns(t => - { - callCounter++; - Message m = t.Arg(); - Stream stream = m.GetBodyStream(); - byte[] buffer = new byte[3]; - stream.Read(buffer, 0, 3); - throw new IotHubException(TestExceptionMessage, isTransient: true); - }); + using var message = new Message(memoryStream); + innerHandlerMock + .SendEventAsync(Arg.Is(message), Arg.Any()) + .Returns(t => + { + callCounter++; + Message m = t.Arg(); + Stream stream = m.GetBodyStream(); + byte[] buffer = new byte[3]; + stream.Read(buffer, 0, 3); + throw new IotHubException(TestExceptionMessage, isTransient: true); + }); var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock); // act - NotSupportedException exception = await retryDelegatingHandler.SendEventAsync(message, new CancellationToken()).ExpectedAsync().ConfigureAwait(false); + NotSupportedException exception = await retryDelegatingHandler + .SendEventAsync(message, CancellationToken.None) + .ExpectedAsync() + .ConfigureAwait(false); // assert - Assert.AreEqual(callCounter, 1); + callCounter.Should().Be(1); } [TestMethod] public async Task RetryOneMessageHasBeenTouchedTransientExceptionOccuredSuccess() { + // arrange int callCounter = 0; var contextMock = Substitute.For(); var innerHandlerMock = Substitute.For(); - var message = new Message(new MemoryStream(new byte[] { 1, 2, 3 })); + using var message = new Message(new MemoryStream(new byte[] { 1, 2, 3 })); IEnumerable messages = new[] { message }; - innerHandlerMock.SendEventAsync(Arg.Is(messages), Arg.Any()).Returns(t => - { - callCounter++; - - Message m = t.Arg>().First(); - Stream stream = m.GetBodyStream(); - if (callCounter == 1) - { - throw new IotHubException(TestExceptionMessage, isTransient: true); - } - var buffer = new byte[3]; - stream.Read(buffer, 0, 3); - return TaskHelpers.CompletedTask; ; - }); + innerHandlerMock + .SendEventAsync(Arg.Is(messages), Arg.Any()) + .Returns(t => + { + Message m = t.Arg>().First(); + Stream stream = m.GetBodyStream(); + if (++callCounter == 1) + { + throw new IotHubException(TestExceptionMessage, isTransient: true); + } + var buffer = new byte[3]; + stream.Read(buffer, 0, 3); + return TaskHelpers.CompletedTask; + }); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - var cancellationToken = new CancellationToken(); - await sut.SendEventAsync(messages, cancellationToken).ConfigureAwait(false); - Assert.AreEqual(2, callCounter); + // act + await sut.SendEventAsync(messages, CancellationToken.None).ConfigureAwait(false); + + // assert + callCounter.Should().Be(2); } [TestMethod] public async Task RetryMessageWithSeekableStreamHasBeenReadTransientExceptionOccuredThrows() { + // arrange int callCounter = 0; var contextMock = Substitute.For(); var innerHandlerMock = Substitute.For(); - var message = new Message(new MemoryStream(new byte[] { 1, 2, 3 })); - innerHandlerMock.SendEventAsync(Arg.Is(message), Arg.Any()).Returns(t => - { - callCounter++; - var m = t.Arg(); - Stream stream = m.GetBodyStream(); - var buffer = new byte[3]; - stream.Read(buffer, 0, 3); - if (callCounter == 1) - { - throw new IotHubException(TestExceptionMessage, isTransient: true); - } - return TaskHelpers.CompletedTask; ; - }); + using var message = new Message(new MemoryStream(new byte[] { 1, 2, 3 })); + innerHandlerMock + .SendEventAsync(Arg.Is(message), Arg.Any()) + .Returns(t => + { + var m = t.Arg(); + Stream stream = m.GetBodyStream(); + var buffer = new byte[3]; + stream.Read(buffer, 0, 3); + if (++callCounter == 1) + { + throw new IotHubException(TestExceptionMessage, isTransient: true); + } + return TaskHelpers.CompletedTask; + }); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - var cancellationToken = new CancellationToken(); - await sut.SendEventAsync(message, cancellationToken).ConfigureAwait(false); - Assert.AreEqual(callCounter, 2); + // act + await sut.SendEventAsync(message, CancellationToken.None).ConfigureAwait(false); + + // assert + callCounter.Should().Be(2); } [TestMethod] public async Task RetryNonTransientErrorThrownThrows() { + // arrange int callCounter = 0; var contextMock = Substitute.For(); var innerHandlerMock = Substitute.For(); - innerHandlerMock.OpenAsync(Arg.Any()).Returns(t => - { - callCounter++; - - if (callCounter == 1) + innerHandlerMock + .OpenAsync(Arg.Any()) + .Returns(t => { - throw new InvalidOperationException(""); - } - return TaskHelpers.CompletedTask; ; - }); + if (++callCounter == 1) + { + throw new InvalidOperationException(""); + } + return TaskHelpers.CompletedTask; + }); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - var cancellationToken = new CancellationToken(); - await sut.OpenAsync(cancellationToken).ExpectedAsync().ConfigureAwait(false); - Assert.AreEqual(callCounter, 1); + // arrange + await sut.OpenAsync(CancellationToken.None).ExpectedAsync().ConfigureAwait(false); + + // act + callCounter.Should().Be(1); } [TestMethod] public async Task DeviceNotFoundExceptionReturnsDeviceDisabledStatus() { + // arrange var contextMock = Substitute.For(); var innerHandlerMock = Substitute.For(); innerHandlerMock.OpenAsync(Arg.Any()).Returns(t => throw new DeviceNotFoundException()); @@ -210,116 +231,118 @@ public async Task DeviceNotFoundExceptionReturnsDeviceDisabledStatus() contextMock.Get().Returns(statusChangeHandler); - var cancellationToken = new CancellationToken(); - var testee = new RetryDelegatingHandler(contextMock, innerHandlerMock); - await ((Func)(() => testee.OpenAsync(cancellationToken))).ExpectedAsync().ConfigureAwait(false); + var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); + + // act + await ((Func)(() => sut + .OpenAsync(CancellationToken.None))) + .ExpectedAsync() + .ConfigureAwait(false); - Assert.AreEqual(ConnectionStatus.Disconnected, status); - Assert.AreEqual(ConnectionStatusChangeReason.Device_Disabled, statusChangeReason); + // assert + status.Should().Be(ConnectionStatus.Disconnected); + statusChangeReason.Should().Be(ConnectionStatusChangeReason.Device_Disabled); } [TestMethod] public async Task RetryTransientErrorThrownAfterNumberOfRetriesThrows() { + // arrange + using var cts = new CancellationTokenSource(1000); var contextMock = Substitute.For(); var innerHandlerMock = Substitute.For(); - innerHandlerMock.OpenAsync(Arg.Any()).Returns(t => - { - throw new IotHubException(TestExceptionMessage, isTransient: true); - }); + innerHandlerMock + .OpenAsync(Arg.Any()) + .Returns(t => throw new IotHubException(TestExceptionMessage, isTransient: true)); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - using (var cts = new CancellationTokenSource(100)) - { - IotHubException exception = await sut.OpenAsync(cts.Token).ExpectedAsync().ConfigureAwait(false); - Assert.AreEqual(TestExceptionMessage, exception.Message); - } + IotHubException exception = await sut + .OpenAsync(cts.Token) + .ExpectedAsync() + .ConfigureAwait(false); + + // act + + // assert + exception.Message.Should().Be(TestExceptionMessage); } [TestMethod] public async Task RetryCancellationTokenCanceledOpen() { + // arrange var innerHandlerMock = Substitute.For(); - using var cancellationTokenSource = new CancellationTokenSource(); - - cancellationTokenSource.Cancel(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); innerHandlerMock.OpenAsync(Arg.Any()).Returns(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - await sut.OpenAsync(cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); + // act and assert + await sut.OpenAsync(cts.Token).ExpectedAsync().ConfigureAwait(false); } [TestMethod] public async Task RetryCancellationTokenCanceledSendEvent() { + // arrange var innerHandlerMock = Substitute.For(); innerHandlerMock.SendEventAsync((Message)null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - using var cancellationTokenSource = new CancellationTokenSource(); - cancellationTokenSource.Cancel(); - await sut.SendEventAsync(Arg.Any(), cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // act and assert + await sut.SendEventAsync(Arg.Any(), cts.Token).ExpectedAsync().ConfigureAwait(false); } [TestMethod] public async Task RetryCancellationTokenCanceledSendEventWithIEnumMessage() { + // arrange var innerHandlerMock = Substitute.For(); innerHandlerMock.SendEventAsync((IEnumerable)null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - using var cancellationTokenSource = new CancellationTokenSource(); - cancellationTokenSource.Cancel(); - await sut.SendEventAsync(new List(), cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + // act + await sut.SendEventAsync(new List(), cts.Token).ExpectedAsync().ConfigureAwait(false); + + // assert await innerHandlerMock.Received(0).SendEventAsync(new List(), Arg.Any()).ConfigureAwait(false); } [TestMethod] public async Task RetryCancellationTokenCanceledReceive() { + // arrange var innerHandlerMock = Substitute.For(); - using var cancellationTokenSource = new CancellationTokenSource(); + using var cts = new CancellationTokenSource(); - cancellationTokenSource.Cancel(); - innerHandlerMock.ReceiveAsync(cancellationTokenSource.Token).Returns(new Task(() => new Message(new byte[0]))); + cts.Cancel(); + innerHandlerMock.ReceiveAsync(cts.Token).Returns(new Task(() => new Message(new byte[0]))); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - await sut.ReceiveAsync(cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); - } - - private class TestRetryPolicy : IRetryPolicy - { - public int Counter - { - get; - private set; - } - - public bool ShouldRetry(int currentRetryCount, Exception lastException, out TimeSpan retryInterval) - { - Counter++; - Assert.IsInstanceOfType(lastException, typeof(IotHubCommunicationException)); - - retryInterval = TimeSpan.MinValue; - if (Counter < 2) return true; - return false; - } + // act and assert + await sut.ReceiveAsync(cts.Token).ExpectedAsync().ConfigureAwait(false); } [TestMethod] public async Task RetrySetRetryPolicyVerifyInternalsSuccess() { + // arrange var innerHandlerMock = Substitute.For(); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - var retryPolicy = new TestRetryPolicy(); + var retryPolicy = new TestRetryPolicyRetryTwice(); sut.SetRetryPolicy(retryPolicy); int innerHandlerCallCounter = 0; @@ -330,57 +353,82 @@ public async Task RetrySetRetryPolicyVerifyInternalsSuccess() throw new IotHubCommunicationException(); }); + // act and assert await sut.OpenAsync(CancellationToken.None).ExpectedAsync().ConfigureAwait(false); - Assert.AreEqual(2, innerHandlerCallCounter); - Assert.AreEqual(2, retryPolicy.Counter); + innerHandlerCallCounter.Should().Be(2); + retryPolicy.Counter.Should().Be(2); var noretry = new NoRetry(); sut.SetRetryPolicy(noretry); await sut.OpenAsync(CancellationToken.None).ExpectedAsync().ConfigureAwait(false); - Assert.AreEqual(3, innerHandlerCallCounter); - Assert.AreEqual(2, retryPolicy.Counter); + innerHandlerCallCounter.Should().Be(3); + retryPolicy.Counter.Should().Be(2); } [TestMethod] public async Task RetryCancellationTokenCanceledComplete() { + // arrange var innerHandlerMock = Substitute.For(); - using var cancellationTokenSource = new CancellationTokenSource(); - - cancellationTokenSource.Cancel(); - innerHandlerMock.CompleteAsync(Arg.Any(), cancellationTokenSource.Token).Returns(TaskHelpers.CompletedTask); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + innerHandlerMock.CompleteAsync(Arg.Any(), cts.Token).Returns(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - await sut.CompleteAsync("", cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); + // act and assert + await sut.CompleteAsync("", cts.Token).ExpectedAsync().ConfigureAwait(false); } [TestMethod] public async Task RetryCancellationTokenCanceledAbandon() { + // arrange var innerHandlerMock = Substitute.For(); - innerHandlerMock.AbandonAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask); + innerHandlerMock.AbandonAsync(null, Arg.Any()).ReturnsForAnyArgs(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - using var cancellationTokenSource = new CancellationTokenSource(); - cancellationTokenSource.Cancel(); - await sut.AbandonAsync(Arg.Any(), cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // act and assert + await sut + .AbandonAsync(Arg.Any(), cts.Token) + .ExpectedAsync() + .ConfigureAwait(false); } [TestMethod] public async Task RetryCancellationTokenCanceledReject() { + // arrange var innerHandlerMock = Substitute.For(); - innerHandlerMock.RejectAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask); + innerHandlerMock.RejectAsync(null, Arg.Any()).ReturnsForAnyArgs(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - using var cancellationTokenSource = new CancellationTokenSource(); - cancellationTokenSource.Cancel(); - await sut.RejectAsync(Arg.Any(), cancellationTokenSource.Token).ExpectedAsync().ConfigureAwait(false); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // act and assert + await sut.RejectAsync(Arg.Any(), cts.Token).ExpectedAsync().ConfigureAwait(false); + } + + private class TestRetryPolicyRetryTwice : IRetryPolicy + { + public int Counter { get; private set; } + + public bool ShouldRetry(int currentRetryCount, Exception lastException, out TimeSpan retryInterval) + { + Counter++; + lastException.Should().BeOfType(typeof(IotHubCommunicationException)); + + retryInterval = TimeSpan.MinValue; + return Counter < 2; + } } private class NotSeekableStream : MemoryStream @@ -393,7 +441,7 @@ public NotSeekableStream(byte[] buffer) : base(buffer) public override long Length { - get { throw new NotSupportedException(); } + get => throw new NotSupportedException(); } public override void SetLength(long value) @@ -403,14 +451,8 @@ public override void SetLength(long value) public override long Position { - get - { - throw new NotSupportedException(); - } - set - { - throw new NotSupportedException(); - } + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin loc) diff --git a/iothub/service/src/IoTHubExceptionResult.cs b/iothub/service/src/IoTHubExceptionResult.cs index 5ccfe48c29..45e8404527 100644 --- a/iothub/service/src/IoTHubExceptionResult.cs +++ b/iothub/service/src/IoTHubExceptionResult.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics.CodeAnalysis; using Newtonsoft.Json; namespace Microsoft.Azure.Devices @@ -10,6 +11,8 @@ namespace Microsoft.Azure.Devices /// internal class IoTHubExceptionResult { + [SuppressMessage("Usage", "CA1507: Use nameof in place of string literal 'Message'", + Justification = "This JsonProperty annotation depends on service-defined contract (name) and is independent of the property name selected by the SDK.")] [JsonProperty("Message")] internal string Message { get; set; } } diff --git a/vsts/vsts.yaml b/vsts/vsts.yaml index 445d9caa4a..61927cfa62 100644 --- a/vsts/vsts.yaml +++ b/vsts/vsts.yaml @@ -532,13 +532,8 @@ jobs: pool: vmImage: windows-2022 steps: - - script: | - rem Run dotnet first experience. - dotnet new - rem Start build - build.cmd -clean -build -configuration Debug -package - - displayName: build + - powershell: .\build.ps1 -clean -build -configutaion Debug -package + displayName: Build Package - task: ComponentGovernanceComponentDetection@0 displayName: "Component Detection"