Skip to content

Commit

Permalink
Safe message handler sample (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Aug 12, 2024
1 parent f281d33 commit 175feb5
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 16 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ dotnet_diagnostic.SA1515.severity = none

# Do not require XML doc in samples
dotnet_diagnostic.SA1600.severity = none
dotnet_diagnostic.SA1602.severity = none

# Do not require file header
dotnet_diagnostic.SA1633.severity = none
Expand Down
8 changes: 4 additions & 4 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Temporalio" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.1.2" />
<PackageReference Include="Temporalio" Version="1.2.0" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.2.0" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.2.0" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.2.0" />
<!--
Can also reference the SDK downloaded to a local directory:
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Prerequisites:
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [SafeMessageHandlers](src/SafeMessageHandlers) - Use `Semaphore` to ensure operations are atomically processed in a workflow.
* [Saga](src/Saga) - Demonstrates how to implement a saga pattern.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [SignalsQueries](src/SignalsQueries) - A loyalty program using Signals and Queries.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -149,6 +151,10 @@ Global
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -178,5 +184,6 @@ Global
{B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
44 changes: 44 additions & 0 deletions src/SafeMessageHandlers/ClusterManagerActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace TemporalioSamples.SafeMessageHandlers;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public class ClusterManagerActivities
{
public record AllocateNodesToJobInput(
IList<string> Nodes,
string JobName);

[Activity]
public async Task AllocateNodesToJobAsync(AllocateNodesToJobInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Assigning nodes {Nodes} to job {TaskName}", input.Nodes, input.JobName);
await Task.Delay(100);
}

public record DeallocateNodesFromJobInput(
IList<string> Nodes,
string JobName);

[Activity]
public async Task DeallocateNodesFromJobAsync(DeallocateNodesFromJobInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Deallocating nodes {Nodes} from job {TaskName}", input.Nodes, input.JobName);
await Task.Delay(100);
}

public record FindBadNodesInput(
IList<string> Nodes);

[Activity]
public async Task<List<string>> FindBadNodesAsync(FindBadNodesInput input)
{
await Task.Delay(100);
return input.Nodes.
Select((node, index) => index % 5 == 0 ? null : node).
OfType<string>().
ToList();
}
}
209 changes: 209 additions & 0 deletions src/SafeMessageHandlers/ClusterManagerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
namespace TemporalioSamples.SafeMessageHandlers;

using Microsoft.Extensions.Logging;
using Temporalio.Exceptions;
using Temporalio.Workflows;

[Workflow]
public class ClusterManagerWorkflow
{
public record State
{
public bool ClusterStarted { get; set; }

public bool ClusterShutdown { get; set; }

public IDictionary<string, string?> Nodes { get; init; } = new Dictionary<string, string?>();

public int MaxAssignedNodes { get; set; }
}

public record Input
{
public State State { get; init; } = new();

public bool TestContinueAsNew { get; init; }
}

public record Result(
int MaxAssignedNodes,
int NumAssignedNodes);

private readonly Semaphore nodesLock = new(1);
private readonly int maxHistoryLength;
private readonly TimeSpan sleepInterval;

[WorkflowInit]
public ClusterManagerWorkflow(Input input)
{
CurrentState = input.State;
maxHistoryLength = input.TestContinueAsNew ? 120 : int.MaxValue;
sleepInterval = TimeSpan.FromSeconds(input.TestContinueAsNew ? 1 : 600);
}

[WorkflowQuery]
public State CurrentState { get; init; }

[WorkflowRun]
public async Task<Result> RunAsync(Input input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);

// Perform health checks at intervals
do
{
await PerformHealthChecksAsync();
await Workflow.WaitConditionAsync(
() => CurrentState.ClusterShutdown || ShouldContinueAsNew,
sleepInterval);

// Continue as new if needed
if (ShouldContinueAsNew)
{
Workflow.Logger.LogInformation("Continuing as new");
throw Workflow.CreateContinueAsNewException((ClusterManagerWorkflow wf) => wf.RunAsync(new()
{
State = CurrentState,
TestContinueAsNew = input.TestContinueAsNew,
}));
}
}
while (!CurrentState.ClusterShutdown);
return new(CurrentState.MaxAssignedNodes, NumAssignedNodes);
}

[WorkflowSignal]
public async Task StartClusterAsync()
{
CurrentState.ClusterStarted = true;
foreach (var node in Enumerable.Range(0, 25))
{
CurrentState.Nodes[$"{node}"] = null;
}
Workflow.Logger.LogInformation("Cluster started");
}

[WorkflowSignal]
public async Task ShutdownClusterAsync()
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
CurrentState.ClusterShutdown = true;
Workflow.Logger.LogInformation("Cluster shut down");
}

public record AllocateNodesToJobInput(int NumNodes, string JobName);

[WorkflowUpdate]
public async Task<List<string>> AllocateNodesToJobAsync(AllocateNodesToJobInput input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
if (CurrentState.ClusterShutdown)
{
throw new ApplicationFailureException(
"Cannot allocate nodes to a job, cluster is already shut down");
}
await nodesLock.WaitAsync();
try
{
var unassignedNodes = CurrentState.Nodes.
Where(kvp => kvp.Value == null).
Select(kvp => kvp.Key).
ToList();
if (unassignedNodes.Count < input.NumNodes)
{
throw new ApplicationFailureException(
$"Cannot allocate {input.NumNodes} nodes, have only {unassignedNodes.Count} available");
}
var assignedNodes = unassignedNodes[..input.NumNodes];
// This await would be dangerous without nodesLock because it yields control and allows
// interleaving
await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.AllocateNodesToJobAsync(new(assignedNodes, input.JobName)),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
foreach (var node in assignedNodes)
{
CurrentState.Nodes[node] = input.JobName;
}
CurrentState.MaxAssignedNodes = int.Max(CurrentState.MaxAssignedNodes, NumAssignedNodes);
return assignedNodes;
}
finally
{
nodesLock.Release();
}
}

public record DeleteJobInput(string JobName);

[WorkflowUpdate]
public async Task DeleteJobAsync(DeleteJobInput input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
if (CurrentState.ClusterShutdown)
{
throw new ApplicationFailureException(
"Cannot delete job, cluster is already shut down");
}
await nodesLock.WaitAsync();
try
{
var toUnassign = CurrentState.Nodes.
Where(kvp => kvp.Value == input.JobName).
Select(kvp => kvp.Key).
ToList();
// This await would be dangerous without nodesLock because it yields control and allows
// interleaving
await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.DeallocateNodesFromJobAsync(new(toUnassign, input.JobName)),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
foreach (var node in toUnassign)
{
CurrentState.Nodes[node] = null;
}
}
finally
{
nodesLock.Release();
}
}

private int NumAssignedNodes =>
CurrentState.Nodes.Count(kvp => kvp.Value is { } val && val != "BAD!");

private bool ShouldContinueAsNew =>
// Don't continue as new while update running
nodesLock.CurrentCount > 0 &&
// Continue if suggested or, for ease of testing, max history reached
(Workflow.ContinueAsNewSuggested || Workflow.CurrentHistoryLength > maxHistoryLength);

private async Task PerformHealthChecksAsync()
{
await nodesLock.WaitAsync();
try
{
// Find bad nodes from the set of non-bad ones. This await would be dangerous without
// nodesLock because it yields control and allows interleaving.
var assignedNodes = CurrentState.Nodes.
Where(kvp => kvp.Value is { } val && val != "BAD!").
Select(kvp => kvp.Value!).
ToList();
var badNodes = await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.FindBadNodesAsync(new(assignedNodes)),
new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(10),
// This health check is optional, and our lock would block the whole workflow if
// we let it retry forever
RetryPolicy = new() { MaximumAttempts = 1 },
});
foreach (var node in badNodes)
{
CurrentState.Nodes[node] = "BAD!";
}
}
finally
{
nodesLock.Release();
}
}
}
Loading

0 comments on commit 175feb5

Please sign in to comment.