From be233c97302cb101c17382119bd924e13d76ca47 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 24 Jun 2024 14:16:55 -0500 Subject: [PATCH] Atomic message handler sample --- .editorconfig | 1 + README.md | 1 + TemporalioSamples.sln | 7 + .../ClusterManagerActivities.cs | 44 ++++ .../ClusterManagerWorkflow.workflow.cs | 211 ++++++++++++++++++ src/AtomicMessageHandlers/Program.cs | 101 +++++++++ src/AtomicMessageHandlers/README.md | 17 ++ ...oralioSamples.AtomicMessageHandlers.csproj | 7 + .../ClusterManagerWorkflowTests.cs | 65 ++++++ .../SayHelloWorkflowTests.cs | 27 ++- tests/TemporalioSamples.Tests.csproj | 1 + tests/WorkflowEnvironment.cs | 36 +++ tests/WorkflowEnvironmentCollection.cs | 10 + tests/WorkflowEnvironmentTestBase.cs | 23 ++ 14 files changed, 539 insertions(+), 12 deletions(-) create mode 100644 src/AtomicMessageHandlers/ClusterManagerActivities.cs create mode 100644 src/AtomicMessageHandlers/ClusterManagerWorkflow.workflow.cs create mode 100644 src/AtomicMessageHandlers/Program.cs create mode 100644 src/AtomicMessageHandlers/README.md create mode 100644 src/AtomicMessageHandlers/TemporalioSamples.AtomicMessageHandlers.csproj create mode 100644 tests/AtomicMessageHandlers/ClusterManagerWorkflowTests.cs create mode 100644 tests/WorkflowEnvironment.cs create mode 100644 tests/WorkflowEnvironmentCollection.cs create mode 100644 tests/WorkflowEnvironmentTestBase.cs diff --git a/.editorconfig b/.editorconfig index 3182e29..d80c50e 100644 --- a/.editorconfig +++ b/.editorconfig @@ -96,6 +96,7 @@ dotnet_diagnostic.SA1515.severity = none # Do not require XML doc in samples dotnet_diagnostic.SA1600.severity = none +dotnet_diagnostic.SA1602.severity = none # Do not require file header dotnet_diagnostic.SA1633.severity = none diff --git a/README.md b/README.md index cd6234d..f233dfa 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Prerequisites: * [ActivityHeartbeatingCancellation](src/ActivityHeartbeatingCancellation) - How to use heartbeating and cancellation handling in an activity. * [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities. * [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language. +* [AtomicMessageHandlers](src/AtomicMessageHandlers) - Use `SemaphoreSlim` to ensure operations are atomically processed in a workflow. * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. * [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors. diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index b45058f..c59810c 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.AtomicMessageHandlers", "src\AtomicMessageHandlers\TemporalioSamples.AtomicMessageHandlers.csproj", "{5E497499-F87C-4DC6-B5DC-F508F31EA172}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -149,6 +151,10 @@ Global {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU + {5E497499-F87C-4DC6-B5DC-F508F31EA172}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5E497499-F87C-4DC6-B5DC-F508F31EA172}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5E497499-F87C-4DC6-B5DC-F508F31EA172}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5E497499-F87C-4DC6-B5DC-F508F31EA172}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -178,5 +184,6 @@ Global {B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {5E497499-F87C-4DC6-B5DC-F508F31EA172} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/AtomicMessageHandlers/ClusterManagerActivities.cs b/src/AtomicMessageHandlers/ClusterManagerActivities.cs new file mode 100644 index 0000000..2de96f3 --- /dev/null +++ b/src/AtomicMessageHandlers/ClusterManagerActivities.cs @@ -0,0 +1,44 @@ +namespace TemporalioSamples.AtomicMessageHandlers; + +using Microsoft.Extensions.Logging; +using Temporalio.Activities; + +public class ClusterManagerActivities +{ + public record AllocateNodesToJobInput( + IList Nodes, + string JobName); + + [Activity] + public async Task AllocateNodesToJobAsync(AllocateNodesToJobInput input) + { + ActivityExecutionContext.Current.Logger.LogInformation( + "Assigning nodes {Nodes} to job {TaskName}", input.Nodes, input.JobName); + await Task.Delay(100); + } + + public record DeallocateNodesFromJobInput( + IList Nodes, + string JobName); + + [Activity] + public async Task DeallocateNodesFromJobAsync(DeallocateNodesFromJobInput input) + { + ActivityExecutionContext.Current.Logger.LogInformation( + "Deallocating nodes {Nodes} from job {TaskName}", input.Nodes, input.JobName); + await Task.Delay(100); + } + + public record FindBadNodesInput( + IList Nodes); + + [Activity] + public async Task> FindBadNodesAsync(FindBadNodesInput input) + { + await Task.Delay(100); + return input.Nodes. + Select((node, index) => index % 5 == 0 ? null : node). + OfType(). + ToList(); + } +} \ No newline at end of file diff --git a/src/AtomicMessageHandlers/ClusterManagerWorkflow.workflow.cs b/src/AtomicMessageHandlers/ClusterManagerWorkflow.workflow.cs new file mode 100644 index 0000000..1f1fc28 --- /dev/null +++ b/src/AtomicMessageHandlers/ClusterManagerWorkflow.workflow.cs @@ -0,0 +1,211 @@ +namespace TemporalioSamples.AtomicMessageHandlers; + +using Microsoft.Extensions.Logging; +using Temporalio.Exceptions; +using Temporalio.Workflows; + +[Workflow] +public sealed class ClusterManagerWorkflow : IDisposable +{ + public record State + { + public bool ClusterStarted { get; set; } + + public bool ClusterShutdown { get; set; } + + public IDictionary Nodes { get; init; } = new Dictionary(); + + public int MaxAssignedNodes { get; set; } + } + + public record Input + { + public State State { get; init; } = new(); + + public bool TestContinueAsNew { get; init; } + } + + public record Result( + int MaxAssignedNodes, + int NumAssignedNodes); + + private readonly SemaphoreSlim nodesLock = new(1, 1); + private readonly int maxHistoryLength; + private readonly TimeSpan sleepInterval; + + [WorkflowInit] + public ClusterManagerWorkflow(Input input) + { + CurrentState = input.State; + maxHistoryLength = input.TestContinueAsNew ? 120 : int.MaxValue; + sleepInterval = TimeSpan.FromSeconds(input.TestContinueAsNew ? 1 : 600); + } + + [WorkflowQuery] + public State CurrentState { get; init; } + + [WorkflowRun] + public async Task RunAsync(Input input) + { + await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted); + + // Perform health checks at intervals + do + { + await PerformHealthChecksAsync(); + await Workflow.WaitConditionAsync( + () => CurrentState.ClusterShutdown || ShouldContinueAsNew, + sleepInterval); + + // Continue as new if needed + if (ShouldContinueAsNew) + { + Workflow.Logger.LogInformation("Continuing as new"); + throw Workflow.CreateContinueAsNewException((ClusterManagerWorkflow wf) => wf.RunAsync(new() + { + State = CurrentState, + TestContinueAsNew = input.TestContinueAsNew, + })); + } + } + while (!CurrentState.ClusterShutdown); + return new(CurrentState.MaxAssignedNodes, NumAssignedNodes); + } + + [WorkflowSignal] + public async Task StartClusterAsync() + { + CurrentState.ClusterStarted = true; + foreach (var node in Enumerable.Range(0, 25)) + { + CurrentState.Nodes[$"{node}"] = null; + } + Workflow.Logger.LogInformation("Cluster started"); + } + + [WorkflowSignal] + public async Task ShutdownClusterAsync() + { + await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted); + CurrentState.ClusterShutdown = true; + Workflow.Logger.LogInformation("Cluster shut down"); + } + + public record AllocateNodesToJobInput(int NumNodes, string JobName); + + [WorkflowUpdate] + public async Task> AllocateNodesToJobAsync(AllocateNodesToJobInput input) + { + await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted); + if (CurrentState.ClusterShutdown) + { + throw new ApplicationFailureException( + "Cannot allocate nodes to a job, cluster is already shut down"); + } + await nodesLock.WaitAsync(); + try + { + var unassignedNodes = CurrentState.Nodes. + Where(kvp => kvp.Value == null). + Select(kvp => kvp.Key). + ToList(); + if (unassignedNodes.Count < input.NumNodes) + { + throw new ApplicationFailureException( + $"Cannot allocate {input.NumNodes} nodes, have only {unassignedNodes.Count} available"); + } + var assignedNodes = unassignedNodes[..input.NumNodes]; + // This await would be dangerous without nodesLock because it yields control and allows + // interleaving + await Workflow.ExecuteActivityAsync( + (ClusterManagerActivities acts) => acts.AllocateNodesToJobAsync(new(assignedNodes, input.JobName)), + new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) }); + foreach (var node in assignedNodes) + { + CurrentState.Nodes[node] = input.JobName; + } + CurrentState.MaxAssignedNodes = int.Max(CurrentState.MaxAssignedNodes, NumAssignedNodes); + return assignedNodes; + } + finally + { + nodesLock.Release(); + } + } + + public record DeleteJobInput(string JobName); + + [WorkflowUpdate] + public async Task DeleteJobAsync(DeleteJobInput input) + { + await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted); + if (CurrentState.ClusterShutdown) + { + throw new ApplicationFailureException( + "Cannot delete job, cluster is already shut down"); + } + await nodesLock.WaitAsync(); + try + { + var toUnassign = CurrentState.Nodes. + Where(kvp => kvp.Value == input.JobName). + Select(kvp => kvp.Key). + ToList(); + // This await would be dangerous without nodesLock because it yields control and allows + // interleaving + await Workflow.ExecuteActivityAsync( + (ClusterManagerActivities acts) => acts.DeallocateNodesFromJobAsync(new(toUnassign, input.JobName)), + new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) }); + foreach (var node in toUnassign) + { + CurrentState.Nodes[node] = null; + } + } + finally + { + nodesLock.Release(); + } + } + + public void Dispose() => nodesLock.Dispose(); + + private int NumAssignedNodes => + CurrentState.Nodes.Count(kvp => kvp.Value is { } val && val != "BAD!"); + + private bool ShouldContinueAsNew => + // Don't continue as new while update running + nodesLock.CurrentCount > 0 && + // Continue if suggested or, for ease of testing, max history reached + (Workflow.ContinueAsNewSuggested || Workflow.CurrentHistoryLength > maxHistoryLength); + + private async Task PerformHealthChecksAsync() + { + await nodesLock.WaitAsync(); + try + { + // Find bad nodes from the set of non-bad ones. This await would be dangerous without + // nodesLock because it yields control and allows interleaving. + var assignedNodes = CurrentState.Nodes. + Where(kvp => kvp.Value is { } val && val != "BAD!"). + Select(kvp => kvp.Value!). + ToList(); + var badNodes = await Workflow.ExecuteActivityAsync( + (ClusterManagerActivities acts) => acts.FindBadNodesAsync(new(assignedNodes)), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(10), + // This health check is optional, and our lock would block the whole workflow if + // we let it retry forever + RetryPolicy = new() { MaximumAttempts = 1 }, + }); + foreach (var node in badNodes) + { + CurrentState.Nodes[node] = "BAD!"; + } + } + finally + { + nodesLock.Release(); + } + } +} \ No newline at end of file diff --git a/src/AtomicMessageHandlers/Program.cs b/src/AtomicMessageHandlers/Program.cs new file mode 100644 index 0000000..4fa488f --- /dev/null +++ b/src/AtomicMessageHandlers/Program.cs @@ -0,0 +1,101 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Api.Enums.V1; +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.AtomicMessageHandlers; + +// Create a client to localhost on default namespace +using var loggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)); +var client = await TemporalClient.ConnectAsync(new("localhost:7233") +{ + LoggerFactory = loggerFactory, +}); +var logger = loggerFactory.CreateLogger(); + +async Task RunWorkerAsync() +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Run worker until cancelled + logger.LogInformation("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "atomic-message-handlers-sample"). + AddAllActivities(new ClusterManagerActivities()). + AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + logger.LogInformation("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync(bool testContinueAsNew) +{ + // Start workflow + var workflowOptions = new WorkflowOptions( + id: "atomic-message-handlers-workflow-id", + taskQueue: "atomic-message-handlers-sample") + { + IdReusePolicy = WorkflowIdReusePolicy.TerminateIfRunning, + }; + workflowOptions.SignalWithStart((ClusterManagerWorkflow wf) => wf.StartClusterAsync()); + var handle = await client.StartWorkflowAsync( + (ClusterManagerWorkflow wf) => wf.RunAsync(new() { TestContinueAsNew = testContinueAsNew }), + workflowOptions); + + // Allocate 2 nodes each to 6 jobs + await Task.WhenAll(Enumerable.Range(0, 6).Select(i => + handle.ExecuteUpdateAsync(wf => wf.AllocateNodesToJobAsync( + new(2, $"job-{i}"))))); + + // Wait a bit + await Task.Delay(testContinueAsNew ? 10000 : 1000); + + // Delete the jobs + await Task.WhenAll(Enumerable.Range(0, 6).Select(i => + handle.ExecuteUpdateAsync(wf => wf.DeleteJobAsync(new($"job-{i}"))))); + + // Shutdown cluster + await handle.SignalAsync(wf => wf.ShutdownClusterAsync()); + var result = await handle.GetResultAsync(); + + logger.LogInformation( + "Cluster shut down successfully. " + + "It peaked at {MaxAssignedNodes} assigned nodes. " + + "It had {NumAssignedNodes} nodes assigned at the end.", + result.MaxAssignedNodes, + result.NumAssignedNodes); +} + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + if (args.Length > 1) + { + throw new ArgumentException("No extra options allowed for 'worker'"); + } + await RunWorkerAsync(); + break; + case "workflow": + if (args.Length > 2 || (args.Length == 2 && args[1] != "--test-continue-as-new")) + { + throw new ArgumentException("Only '--test-continue-as-new' option allowed for 'worker'"); + } + await ExecuteWorkflowAsync(args.ElementAtOrDefault(1) == "--test-continue-as-new"); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/AtomicMessageHandlers/README.md b/src/AtomicMessageHandlers/README.md new file mode 100644 index 0000000..3efafe3 --- /dev/null +++ b/src/AtomicMessageHandlers/README.md @@ -0,0 +1,17 @@ +# Atomic Message Handlers + +This sample shows a workflow using `SemaphoreSlim` to atomically process certain blocks of workflow code to prevent data +race issues. The sample code demonstrates assigning cluster nodes to jobs atomically. + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run the workflow from this directory: + + dotnet run workflow + +This will show logs in the worker window of the workflow running and assigning nodes to jobs. To see what this looks +like with a continue-as-new operation to relieve history pressure, pass `--test-continue-as-new` to +`dotnet run workflow`. \ No newline at end of file diff --git a/src/AtomicMessageHandlers/TemporalioSamples.AtomicMessageHandlers.csproj b/src/AtomicMessageHandlers/TemporalioSamples.AtomicMessageHandlers.csproj new file mode 100644 index 0000000..e3b6154 --- /dev/null +++ b/src/AtomicMessageHandlers/TemporalioSamples.AtomicMessageHandlers.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + \ No newline at end of file diff --git a/tests/AtomicMessageHandlers/ClusterManagerWorkflowTests.cs b/tests/AtomicMessageHandlers/ClusterManagerWorkflowTests.cs new file mode 100644 index 0000000..94a2301 --- /dev/null +++ b/tests/AtomicMessageHandlers/ClusterManagerWorkflowTests.cs @@ -0,0 +1,65 @@ +namespace TemporalioSamples.AtomicMessageHandlers; + +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.Tests; +using Xunit; +using Xunit.Abstractions; + +public class ClusterManagerWorkflowTests : WorkflowEnvironmentTestBase +{ + public ClusterManagerWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) + : base(output, env) + { + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task StartWorkflowAsync_SimpleJobSet_Succeeds(bool testContinueAsNew) + { + // Run inside worker + using var worker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddAllActivities(new ClusterManagerActivities()). + AddWorkflow()); + await worker.ExecuteAsync(async () => + { + // Start workflow + var workflowOptions = new WorkflowOptions( + id: $"wf-{Guid.NewGuid()}", + taskQueue: worker.Options.TaskQueue!); + workflowOptions.SignalWithStart((ClusterManagerWorkflow wf) => wf.StartClusterAsync()); + var handle = await Client.StartWorkflowAsync( + (ClusterManagerWorkflow wf) => wf.RunAsync(new() { TestContinueAsNew = testContinueAsNew }), + workflowOptions); + + // Allocate 2 nodes each to 6 jobs + var nodeSets = await Task.WhenAll(Enumerable.Range(0, 6).Select(i => + handle.ExecuteUpdateAsync(wf => wf.AllocateNodesToJobAsync(new(2, $"job-{i}"))))); + Assert.Equal(6, nodeSets.Length); + Assert.All(nodeSets, nodes => Assert.Equal(2, nodes.Count)); + + // Confirm that some jobs are assigned + var state = await handle.QueryAsync(wf => wf.CurrentState); + Assert.True(state.ClusterStarted); + Assert.False(state.ClusterShutdown); + Assert.Equal(2, state.Nodes.Count(kvp => kvp.Value == "job-0")); + + // Delete all the jobs, shutdown the cluster, and confirm result + await Task.WhenAll(Enumerable.Range(0, 6).Select(i => + handle.ExecuteUpdateAsync(wf => wf.DeleteJobAsync(new($"job-{i}"))))); + await handle.SignalAsync(wf => wf.ShutdownClusterAsync()); + var result = await handle.GetResultAsync(); + Assert.Equal(12, result.MaxAssignedNodes); + Assert.Equal(0, result.NumAssignedNodes); + + // Check whether the workflow continued as new + var firstRunHistory = await (handle with { RunId = handle.ResultRunId }).FetchHistoryAsync(); + bool continued = firstRunHistory.Events.Any( + evt => evt.WorkflowExecutionContinuedAsNewEventAttributes != null); + Assert.Equal(testContinueAsNew, continued); + }); + } +} \ No newline at end of file diff --git a/tests/ContextPropagation/SayHelloWorkflowTests.cs b/tests/ContextPropagation/SayHelloWorkflowTests.cs index 0a620af..5d70e3e 100644 --- a/tests/ContextPropagation/SayHelloWorkflowTests.cs +++ b/tests/ContextPropagation/SayHelloWorkflowTests.cs @@ -1,30 +1,33 @@ +namespace TemporalioSamples.Tests.ContextPropagation; + using Temporalio.Activities; using Temporalio.Client; using Temporalio.Converters; using Temporalio.Exceptions; -using Temporalio.Testing; using Temporalio.Worker; using TemporalioSamples.ContextPropagation; using Xunit; +using Xunit.Abstractions; -namespace TemporalioSamples.Tests.ContextPropagation; - -public class SayHelloWorkflowTests +public class SayHelloWorkflowTests : WorkflowEnvironmentTestBase { + public SayHelloWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) + : base(output, env) + { + } + [Fact] public async Task RunAsync_ContextPropagation_ReachesActivity() { - await using var env = await WorkflowEnvironment.StartLocalAsync(); - // Update the client to use the interceptor - var clientOptions = (TemporalClientOptions)env.Client.Options.Clone(); - clientOptions.Interceptors = new[] - { + var clientOptions = (TemporalClientOptions)Client.Options.Clone(); + clientOptions.Interceptors = + [ new ContextPropagationInterceptor( MyContext.UserId, DataConverter.Default.PayloadConverter), - }; - var client = new TemporalClient(env.Client.Connection, clientOptions); + ]; + var client = new TemporalClient(Client.Connection, clientOptions); // Mock out the activity to assert the context value [Activity] @@ -44,7 +47,7 @@ static string SayHello(string name) // Run worker using var worker = new TemporalWorker( client, - new TemporalWorkerOptions("my-task-queue"). + new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). AddActivity(SayHello). AddWorkflow()); await worker.ExecuteAsync(async () => diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index f4a0ed8..e19a86b 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -21,6 +21,7 @@ + diff --git a/tests/WorkflowEnvironment.cs b/tests/WorkflowEnvironment.cs new file mode 100644 index 0000000..2d181b6 --- /dev/null +++ b/tests/WorkflowEnvironment.cs @@ -0,0 +1,36 @@ +namespace TemporalioSamples.Tests; + +using System; +using Temporalio.Client; +using Xunit; + +public class WorkflowEnvironment : IAsyncLifetime +{ + private Temporalio.Testing.WorkflowEnvironment? env; + + public ITemporalClient Client => + env?.Client ?? throw new InvalidOperationException("Environment not created"); + + public async Task InitializeAsync() + { + env = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync(new() + { + DevServerOptions = new() + { + ExtraArgs = + [ + "--dynamic-config-value", + "frontend.enableUpdateWorkflowExecution=true", + ], + }, + }); + } + + public async Task DisposeAsync() + { + if (env != null) + { + await env.ShutdownAsync(); + } + } +} diff --git a/tests/WorkflowEnvironmentCollection.cs b/tests/WorkflowEnvironmentCollection.cs new file mode 100644 index 0000000..76e1bb7 --- /dev/null +++ b/tests/WorkflowEnvironmentCollection.cs @@ -0,0 +1,10 @@ +#pragma warning disable CA1711 // We can suffix "Collection" in xUnit + +namespace TemporalioSamples.Tests; + +using Xunit; + +[CollectionDefinition("Environment")] +public class WorkflowEnvironmentCollection : ICollectionFixture +{ +} diff --git a/tests/WorkflowEnvironmentTestBase.cs b/tests/WorkflowEnvironmentTestBase.cs new file mode 100644 index 0000000..7f53085 --- /dev/null +++ b/tests/WorkflowEnvironmentTestBase.cs @@ -0,0 +1,23 @@ +namespace TemporalioSamples.Tests; + +using Temporalio.Client; +using Xunit; +using Xunit.Abstractions; + +[Collection("Environment")] +public abstract class WorkflowEnvironmentTestBase : TestBase +{ + protected WorkflowEnvironmentTestBase(ITestOutputHelper output, WorkflowEnvironment env) + : base(output) + { + Env = env; + // We need to update the client logger with our factory + var newOptions = (TemporalClientOptions)env.Client.Options.Clone(); + newOptions.LoggerFactory = LoggerFactory; + Client = new TemporalClient(env.Client.Connection, newOptions); + } + + protected WorkflowEnvironment Env { get; private init; } + + protected ITemporalClient Client { get; private init; } +}