Skip to content

Commit

Permalink
Cancel ScriptExecution without calling CancelScript when the StartScr…
Browse files Browse the repository at this point in the history
…ipt request has never been transferred to Tentacle (#756)

* Enhancing how we check if we have cancelled while starting a script
* Additional tests to cover cancellation logic

---------

Co-authored-by: Stephen Burman <[email protected]>
  • Loading branch information
nathanwoctopusdeploy and sburmanoctopus authored Jan 8, 2024
1 parent 456d1c0 commit ca792e8
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 10 deletions.
16 changes: 16 additions & 0 deletions source/Octopus.Tentacle.Client/ExceptionExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using Halibut;
using Halibut.Exceptions;
using Halibut.Transport;

namespace Octopus.Tentacle.Client
{
public static class ExceptionExtensionMethods
{
public static bool IsConnectionException(this Exception exception)
{
return exception is ConnectingRequestCancelledException
|| exception is HalibutClientException {ConnectionState: ConnectionState.Connecting};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ async Task<ScriptStatusResponseV2> StartScriptAction(CancellationToken ct)
void OnErrorAction(Exception ex)
{
// If we can guarantee that the call to StartScript has not connected to the Service then we can decrement the count
if (ex is ConnectingRequestCancelledException ||
ex is HalibutClientException { ConnectionState: ConnectionState.Connecting })
if (ex.IsConnectionException())
{
--startScriptCallsConnectedCount;
}
Expand All @@ -109,7 +108,7 @@ void OnErrorAction(Exception ex)

// We determine if the call was connecting when cancelled, then assume it's transferring if it is not connecting.
// This is the safest option as it will default to the CancelScript CompleteScript path if we are unsure
var startScriptCallIsConnecting = ex is ConnectingRequestCancelledException;
var startScriptCallIsConnecting = ex.IsConnectionException();

if (!startScriptCallIsConnecting || startScriptCallIsBeingRetried)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ async Task<ScriptStatusResponseV3Alpha> StartScriptAction(CancellationToken ct)
void OnErrorAction(Exception ex)
{
// If we can guarantee that the call to StartScript has not connected to the Service then we can decrement the count
if (ex is ConnectingRequestCancelledException ||
ex is HalibutClientException { ConnectionState: ConnectionState.Connecting })
if (ex.IsConnectionException())
{
--startScriptCallsConnectedCount;
}
Expand All @@ -97,7 +96,7 @@ void OnErrorAction(Exception ex)

// We determine if the call was connecting when cancelled, then assume it's transferring if it is not connecting.
// This is the safest option as it will default to the CancelScript CompleteScript path if we are unsure
var startScriptCallIsConnecting = ex is ConnectingRequestCancelledException;
var startScriptCallIsConnecting = ex.IsConnectionException();

if (!startScriptCallIsConnecting || startScriptCallIsBeingRetried)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</Otherwise>
</Choose>
<ItemGroup>
<PackageReference Include="Halibut" Version="7.0.459" />
<PackageReference Include="Halibut" Version="7.0.465" />
<PackageReference Include="Octopus.Diagnostics" Version="2.1.0" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using Halibut;
using Halibut.Diagnostics;
using NUnit.Framework;
using Octopus.Tentacle.Client.Scripts;
using Octopus.Tentacle.CommonTestUtils.Builders;
Expand All @@ -14,6 +15,7 @@
using Octopus.Tentacle.Contracts.ScriptServiceV3Alpha;
using Octopus.Tentacle.Tests.Integration.Support;
using Octopus.Tentacle.Tests.Integration.Support.ExtensionMethods;
using Octopus.Tentacle.Tests.Integration.Support.PendingRequestQueueFactories;
using Octopus.Tentacle.Tests.Integration.Util;
using Octopus.Tentacle.Tests.Integration.Util.Builders;
using Octopus.Tentacle.Tests.Integration.Util.Builders.Decorators;
Expand Down Expand Up @@ -124,6 +126,10 @@ public async Task DuringGetCapabilities_ScriptExecutionCanBeCancelled(TentacleCo
scriptMethodUsages.ForCompleteScriptAsync().Started.Should().Be(0, "Should not have tried to call CompleteScript");
}

/// <summary>
/// This test, and probably others in this test class do not correctly test the Connecting Scenario. The port forwarder is used to kill new and existing connections but this can
/// result in Halibut thinking a Request is transferring, where we want it to be connecting. These tests need to be rewritten to ensure they test the correct scenario.
/// </summary>
[Test]
[TentacleConfigurations(additionalParameterTypes: new object[] { typeof(RpcCall), typeof(RpcCallStage) })]
public async Task DuringStartScript_ScriptExecutionCanBeCancelled(TentacleConfigurationTestCase tentacleConfigurationTestCase, RpcCall rpcCall, RpcCallStage rpcCallStage)
Expand Down Expand Up @@ -254,6 +260,138 @@ public async Task DuringStartScript_ScriptExecutionCanBeCancelled(TentacleConfig
}
}

[Test]
[TentacleConfigurations(testListening: false)]
public async Task DuringStartScript_ForPollingTentacle_ThatIsRetryingTheRpc_AndConnecting_ScriptExecutionCanBeCancelled(TentacleConfigurationTestCase tentacleConfigurationTestCase)
{
var halibutTimeoutsAndLimits = HalibutTimeoutsAndLimits.RecommendedValues();
halibutTimeoutsAndLimits.PollingQueueWaitTimeout = TimeSpan.FromSeconds(4);
halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(3);

var started = false;
var cancelExecutionCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);

await using var clientAndTentacle = await tentacleConfigurationTestCase.CreateBuilder()
.WithRetryDuration(TimeSpan.FromHours(1))
.WithTentacleServiceDecorator(new TentacleServiceDecoratorBuilder().RecordMethodUsages(tentacleConfigurationTestCase, out var scriptServiceRecordedUsages).Build())
.WithPendingRequestQueueFactory(new CancelWhenRequestQueuedPendingRequestQueueFactory(
cancelExecutionCancellationTokenSource,
halibutTimeoutsAndLimits,
// Cancel the execution when the StartScript RPC call is being retries
shouldCancel: async () =>
{
await Task.CompletedTask;

if (started && scriptServiceRecordedUsages.ForStartScriptAsync().Started >= 2)
{
Logger.Information("Cancelling Execute Script");
return true;
}

return false;
}))
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits)
.Build(CancellationToken);

// Arrange
Logger.Information("Execute a script so that GetCapabilities will be cached");
await clientAndTentacle.TentacleClient.ExecuteScript(
new LatestStartScriptCommandBuilder().WithScriptBody(b => b.Print("The script")).Build(),
cancelExecutionCancellationTokenSource.Token);

Logger.Information("Stop Tentacle so no more requests are picked up");
await clientAndTentacle.RunningTentacle.Stop(CancellationToken);

var delay = clientAndTentacle.Server.ServerHalibutRuntime.TimeoutsAndLimits.PollingQueueWaitTimeout + TimeSpan.FromSeconds(2);
Logger.Information($"Waiting for {delay} for any active PendingRequestQueues for the Tentacle to drop the latest poll connection");
await Task.Delay(delay, CancellationToken);

scriptServiceRecordedUsages.Reset();
started = true;

// ACT
var startScriptCommand = new LatestStartScriptCommandBuilder()
.WithScriptBody(b => b.Print("The script").Sleep(TimeSpan.FromHours(1)))
.Build();

Logger.Information("Start Executing the Script");
var executeScriptTask = clientAndTentacle.TentacleClient.ExecuteScript(startScriptCommand, cancelExecutionCancellationTokenSource.Token);
var expectedException = new ExceptionContractAssertionBuilder(FailureScenario.ScriptExecutionCancelled, tentacleConfigurationTestCase.TentacleType, clientAndTentacle).Build();
await AssertionExtensions.Should(async () => await executeScriptTask).ThrowExceptionContractAsync(expectedException);

// Assert
scriptServiceRecordedUsages.ForStartScriptAsync().Completed.Should().BeGreaterOrEqualTo(2);
scriptServiceRecordedUsages.ForGetStatusAsync().Started.Should().Be(0);
scriptServiceRecordedUsages.ForCancelScriptAsync().Started.Should().BeGreaterOrEqualTo(0, "Script Execution does not need to be cancelled on Tentacle as it has not started");
scriptServiceRecordedUsages.ForCompleteScriptAsync().Started.Should().Be(0);
}

[Test]
[TentacleConfigurations(testPolling: false)]
public async Task DuringStartScript_ForListeningTentacle_ThatIsRetryingTheRpc_AndConnecting_ScriptExecutionCanBeCancelled(TentacleConfigurationTestCase tentacleConfigurationTestCase)
{
var halibutTimeoutsAndLimits = HalibutTimeoutsAndLimits.RecommendedValues();
halibutTimeoutsAndLimits.RetryCountLimit = 1;
halibutTimeoutsAndLimits.ConnectionErrorRetryTimeout = TimeSpan.FromSeconds(4);
halibutTimeoutsAndLimits.RetryListeningSleepInterval = TimeSpan.Zero;
halibutTimeoutsAndLimits.TcpClientConnectTimeout = TimeSpan.FromSeconds(4);
halibutTimeoutsAndLimits.TcpClientPooledConnectionTimeout = TimeSpan.FromSeconds(5);

var cancelExecutionCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);

await using var clientAndTentacle = await tentacleConfigurationTestCase.CreateBuilder()
.WithRetryDuration(TimeSpan.FromHours(1))
.WithTentacleServiceDecorator(new TentacleServiceDecoratorBuilder().RecordMethodUsages(tentacleConfigurationTestCase, out var scriptServiceRecordedUsages).Build())
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits)
.Build(CancellationToken);

// Arrange
Logger.Information("Execute a script so that GetCapabilities will be cached");
await clientAndTentacle.TentacleClient.ExecuteScript(
new LatestStartScriptCommandBuilder().WithScriptBody(b => b.Print("The script")).Build(),
cancelExecutionCancellationTokenSource.Token);

Logger.Information("Stop Tentacle so no more requests are picked up");
await clientAndTentacle.RunningTentacle.Stop(CancellationToken);

var delay = clientAndTentacle.Server.ServerHalibutRuntime.TimeoutsAndLimits.SafeTcpClientPooledConnectionTimeout + TimeSpan.FromSeconds(2);
Logger.Information($"Waiting for {delay} for any active Pooled Connections for the Tentacle to expire");
await Task.Delay(delay, CancellationToken);

scriptServiceRecordedUsages.Reset();

// ACT
var startScriptCommand = new LatestStartScriptCommandBuilder()
.WithScriptBody(b => b.Print("The script").Sleep(TimeSpan.FromHours(1)))
.Build();

Logger.Information("Start Executing the Script");
var executeScriptTask = clientAndTentacle.TentacleClient.ExecuteScript(startScriptCommand, cancelExecutionCancellationTokenSource.Token);
var cancellationTask = Task.Run(async () =>
{
while (true)
{
if (scriptServiceRecordedUsages.ForStartScriptAsync().Started >= 2)
{
cancelExecutionCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2));
return;
}

await Task.Delay(TimeSpan.FromSeconds(0.5), CancellationToken);
}
}, CancellationToken);

var expectedException = new ExceptionContractAssertionBuilder(FailureScenario.ScriptExecutionCancelled, tentacleConfigurationTestCase.TentacleType, clientAndTentacle).Build();
await AssertionExtensions.Should(async () => await executeScriptTask).ThrowExceptionContractAsync(expectedException);
await cancellationTask;

// Assert
scriptServiceRecordedUsages.ForStartScriptAsync().Completed.Should().BeGreaterOrEqualTo(2);
scriptServiceRecordedUsages.ForGetStatusAsync().Started.Should().Be(0);
scriptServiceRecordedUsages.ForCancelScriptAsync().Started.Should().BeGreaterOrEqualTo(0, "Script Execution does not need to be cancelled on Tentacle as it has not started");
scriptServiceRecordedUsages.ForCompleteScriptAsync().Started.Should().Be(0);
}

[Test]
[TentacleConfigurations(additionalParameterTypes: new object[] { typeof(RpcCall), typeof(RpcCallStage) })]
public async Task DuringGetStatus_ScriptExecutionCanBeCancelled(TentacleConfigurationTestCase tentacleConfigurationTestCase, RpcCall rpcCall, RpcCallStage rpcCallStage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class ClientAndTentacleBuilder
TcpConnectionUtilities? tcpConnectionUtilities;
bool installAsAService = false;
bool useDefaultMachineConfigurationHomeDirectory = false;
HalibutTimeoutsAndLimits? halibutTimeoutsAndLimits;

public ClientAndTentacleBuilder(TentacleType tentacleType)
{
Expand Down Expand Up @@ -157,6 +158,12 @@ public ClientAndTentacleBuilder UseDefaultMachineConfigurationHomeDirectory()
return this;
}

public ClientAndTentacleBuilder WithHalibutTimeoutsAndLimits(HalibutTimeoutsAndLimits halibutTimeoutsAndLimits)
{
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
return this;
}

PortForwarder? BuildPortForwarder(int localPort, int? listeningPort)
{
if (portForwarderModifiers.Count == 0) return null;
Expand All @@ -175,7 +182,7 @@ public async Task<ClientAndTentacle> Build(CancellationToken cancellationToken)
// Server
var serverHalibutRuntimeBuilder = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.Server)
.WithHalibutTimeoutsAndLimits(HalibutTimeoutsAndLimits.RecommendedValues())
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits ?? HalibutTimeoutsAndLimits.RecommendedValues())
.WithLegacyContractSupport();

if (queueFactory != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Halibut;
using Halibut.Diagnostics;
using Halibut.ServiceModel;
using Halibut.Transport.Protocol;

namespace Octopus.Tentacle.Tests.Integration.Support.PendingRequestQueueFactories
{
/// <summary>
/// CancelWhenRequestQueuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued
/// </summary>
public class CancelWhenRequestQueuedPendingRequestQueueFactory : IPendingRequestQueueFactory
{
readonly CancellationTokenSource cancellationTokenSource;
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
readonly Func<Task<bool>> shouldCancel;

public CancelWhenRequestQueuedPendingRequestQueueFactory(CancellationTokenSource cancellationTokenSource, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, Func<Task<bool>> shouldCancel)
{
this.cancellationTokenSource = cancellationTokenSource;
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
this.shouldCancel = shouldCancel;
}

public IPendingRequestQueue CreateQueue(Uri endpoint)
{
return new Decorator(new PendingRequestQueueAsync(halibutTimeoutsAndLimits, new LogFactory().ForEndpoint(endpoint)), cancellationTokenSource, shouldCancel);
}

class Decorator : IPendingRequestQueue
{
readonly CancellationTokenSource cancellationTokenSource;
readonly Func<Task<bool>> shouldCancel;
readonly IPendingRequestQueue inner;

public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource, Func<Task<bool>>? shouldCancel)
{
this.inner = inner;
this.cancellationTokenSource = cancellationTokenSource;
this.shouldCancel = shouldCancel;
}

public bool IsEmpty => inner.IsEmpty;
public int Count => inner.Count;
public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination);
public async Task<RequestMessageWithCancellationToken?> DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken);

public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationTokens)
{
var queueAndWait = inner.QueueAndWaitAsync(request, cancellationTokens);
var cancel = Task.Run(async () =>
{
if (await shouldCancel())
{
// Allow the PendingRRequest to be queued
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(1));
}
});

await Task.WhenAll(queueAndWait, cancel);

return await queueAndWait;
}
}
}
}
Loading

0 comments on commit ca792e8

Please sign in to comment.