Skip to content

Commit

Permalink
Update SDK and add context propagation sample (#68)
Browse files Browse the repository at this point in the history
Fixes #14
  • Loading branch information
cretz authored Jun 5, 2024
1 parent 54bf6f0 commit 49ced3d
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 5 deletions.
8 changes: 4 additions & 4 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Temporalio" Version="1.0.0" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.0.0" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.0.0" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.0.0" />
<PackageReference Include="Temporalio" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.1.2" />
<!--
Can also reference the SDK downloaded to a local directory:
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Prerequisites:
* [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language.
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
Expand Down
8 changes: 7 additions & 1 deletion TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.Saga", "s
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowUpdate", "src\WorkflowUpdate\TemporalioSamples.WorkflowUpdate.csproj", "{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -143,6 +145,10 @@ Global
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Release|Any CPU.Build.0 = Release|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -171,6 +177,6 @@ Global
{3168FB2D-D821-433A-A761-309E0474DE48} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}

{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
169 changes: 169 additions & 0 deletions src/ContextPropagation/ContextPropagationInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
namespace TemporalioSamples.ContextPropagation;

using System.Threading.Tasks;
using Temporalio.Api.Common.V1;
using Temporalio.Client;
using Temporalio.Client.Interceptors;
using Temporalio.Converters;
using Temporalio.Worker.Interceptors;
using Temporalio.Workflows;

/// <summary>
/// General purpose interceptor that can be used to propagate async-local context through workflows
/// and activities. This must be set on the client used for interacting with workflows and used for
/// the worker.
/// </summary>
/// <typeparam name="T">Context data type.</typeparam>
public class ContextPropagationInterceptor<T> : IClientInterceptor, IWorkerInterceptor
{
private readonly AsyncLocal<T> context;
private readonly IPayloadConverter payloadConverter;
private readonly string headerKey;

public ContextPropagationInterceptor(
AsyncLocal<T> context,
IPayloadConverter payloadConverter,
string headerKey = "__my_context_key")
{
this.context = context;
this.payloadConverter = payloadConverter;
this.headerKey = headerKey;
}

public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) =>
new ContextPropagationClientOutboundInterceptor(this, nextInterceptor);

public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) =>
new ContextPropagationWorkflowInboundInterceptor(this, nextInterceptor);

public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) =>
new ContextPropagationActivityInboundInterceptor(this, nextInterceptor);

private Dictionary<string, Payload> HeaderFromContext(IDictionary<string, Payload>? existing)
{
var ret = existing != null ?
new Dictionary<string, Payload>(existing) : new Dictionary<string, Payload>(1);
ret[headerKey] = payloadConverter.ToPayload(context.Value);
return ret;
}

private void WithHeadersApplied(IReadOnlyDictionary<string, Payload>? headers, Action func) =>
WithHeadersApplied(
headers,
() =>
{
func();
return (object?)null;
});

private TResult WithHeadersApplied<TResult>(
IReadOnlyDictionary<string, Payload>? headers, Func<TResult> func)
{
if (headers?.TryGetValue(headerKey, out var payload) == true && payload != null)
{
context.Value = payloadConverter.ToValue<T>(payload);
}
// These are async local, no need to unapply afterwards
return func();
}

private class ContextPropagationClientOutboundInterceptor : ClientOutboundInterceptor
{
private readonly ContextPropagationInterceptor<T> root;

public ContextPropagationClientOutboundInterceptor(
ContextPropagationInterceptor<T> root, ClientOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsync<TWorkflow, TResult>(
StartWorkflowInput input) =>
base.StartWorkflowAsync<TWorkflow, TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task SignalWorkflowAsync(SignalWorkflowInput input) =>
base.SignalWorkflowAsync(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInput input) =>
base.QueryWorkflowAsync<TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsync<TResult>(
StartWorkflowUpdateInput input) =>
base.StartWorkflowUpdateAsync<TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });
}

private class ContextPropagationWorkflowInboundInterceptor : WorkflowInboundInterceptor
{
private readonly ContextPropagationInterceptor<T> root;

public ContextPropagationWorkflowInboundInterceptor(
ContextPropagationInterceptor<T> root, WorkflowInboundInterceptor next)
: base(next) => this.root = root;

public override void Init(WorkflowOutboundInterceptor outbound) =>
base.Init(new ContextPropagationWorkflowOutboundInterceptor(root, outbound));

public override Task<object?> ExecuteWorkflowAsync(ExecuteWorkflowInput input) =>
root.WithHeadersApplied(Workflow.Info.Headers, () => Next.ExecuteWorkflowAsync(input));

public override Task HandleSignalAsync(HandleSignalInput input) =>
root.WithHeadersApplied(input.Headers, () => Next.HandleSignalAsync(input));

public override object? HandleQuery(HandleQueryInput input) =>
root.WithHeadersApplied(input.Headers, () => Next.HandleQuery(input));

public override void ValidateUpdate(HandleUpdateInput input) =>
root.WithHeadersApplied(input.Headers, () => Next.ValidateUpdate(input));

public override Task<object?> HandleUpdateAsync(HandleUpdateInput input) =>
root.WithHeadersApplied(input.Headers, () => Next.HandleUpdateAsync(input));
}

private class ContextPropagationWorkflowOutboundInterceptor : WorkflowOutboundInterceptor
{
private readonly ContextPropagationInterceptor<T> root;

public ContextPropagationWorkflowOutboundInterceptor(
ContextPropagationInterceptor<T> root, WorkflowOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<TResult> ScheduleActivityAsync<TResult>(
ScheduleActivityInput input) =>
Next.ScheduleActivityAsync<TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task<TResult> ScheduleLocalActivityAsync<TResult>(
ScheduleLocalActivityInput input) =>
Next.ScheduleLocalActivityAsync<TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task SignalChildWorkflowAsync(
SignalChildWorkflowInput input) =>
Next.SignalChildWorkflowAsync(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task SignalExternalWorkflowAsync(
SignalExternalWorkflowInput input) =>
Next.SignalExternalWorkflowAsync(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflowAsync<TWorkflow, TResult>(
StartChildWorkflowInput input) =>
Next.StartChildWorkflowAsync<TWorkflow, TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });
}

private class ContextPropagationActivityInboundInterceptor : ActivityInboundInterceptor
{
private readonly ContextPropagationInterceptor<T> root;

public ContextPropagationActivityInboundInterceptor(
ContextPropagationInterceptor<T> root, ActivityInboundInterceptor next)
: base(next) => this.root = root;

public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input) =>
root.WithHeadersApplied(input.Headers, () => Next.ExecuteActivityAsync(input));
}
}
6 changes: 6 additions & 0 deletions src/ContextPropagation/MyContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace TemporalioSamples.ContextPropagation;

public static class MyContext
{
public static readonly AsyncLocal<string> UserId = new();
}
79 changes: 79 additions & 0 deletions src/ContextPropagation/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Converters;
using Temporalio.Worker;
using TemporalioSamples.ContextPropagation;

using var loggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information));
var logger = loggerFactory.CreateLogger<Program>();

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = loggerFactory,
// This is where we set the interceptor to propagate context
Interceptors = new[]
{
new ContextPropagationInterceptor<string>(
MyContext.UserId,
DataConverter.Default.PayloadConverter),
},
});

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
logger.LogInformation("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "interceptors-sample").
AddAllActivities<SayHelloActivities>(new()).
AddWorkflow<SayHelloWorkflow>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
logger.LogInformation("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync()
{
// Set our user ID that can be accessed in the workflow and activity
MyContext.UserId.Value = "some-user";

// Start workflow, send signal, wait for completion, issue query
logger.LogInformation("Executing workflow");
var handle = await client.StartWorkflowAsync(
(SayHelloWorkflow wf) => wf.RunAsync("Temporal"),
new(id: "interceptors-workflow-id", taskQueue: "interceptors-sample"));
await handle.SignalAsync(wf => wf.SignalCompleteAsync());
var result = await handle.GetResultAsync();
_ = await handle.QueryAsync(wf => wf.IsComplete());
logger.LogInformation("Workflow result: {Result}", result);
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "workflow":
await ExecuteWorkflowAsync();
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");
}
17 changes: 17 additions & 0 deletions src/ContextPropagation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Interceptors

This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout the
workflows and activities. While this demonstrates context propagation specifically, it can also be used to show how to
create interceptors for any other purpose.

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory in a
separate terminal to start the worker:

dotnet run worker

Then in another terminal, run the workflow from this directory:

dotnet run workflow

The workflow terminal will show the completed workflow result and the worker terminal will show the contextual user ID
is present in the workflow and activity.
16 changes: 16 additions & 0 deletions src/ContextPropagation/SayHelloActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace TemporalioSamples.ContextPropagation;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public class SayHelloActivities
{
[Activity]
public string SayHello(string name)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Activity called by user {UserId}",
MyContext.UserId.Value);
return $"Hello, {name}!";
}
}
42 changes: 42 additions & 0 deletions src/ContextPropagation/SayHelloWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace TemporalioSamples.ContextPropagation;

using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

[Workflow]
public class SayHelloWorkflow
{
private bool complete;

[WorkflowRun]
public async Task<string> RunAsync(string name)
{
Workflow.Logger.LogInformation(
"Workflow called by user {UserId}",
MyContext.UserId.Value);

// Wait for signal then run activity
await Workflow.WaitConditionAsync(() => complete);
return await Workflow.ExecuteActivityAsync(
(SayHelloActivities act) => act.SayHello(name),
new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
}

[WorkflowSignal]
public async Task SignalCompleteAsync()
{
Workflow.Logger.LogInformation(
"Signal called by user {UserId}",
MyContext.UserId.Value);
complete = true;
}

[WorkflowQuery]
public bool IsComplete()
{
Workflow.Logger.LogInformation(
"Query called by user {UserId}",
MyContext.UserId.Value);
return complete;
}
}
Loading

0 comments on commit 49ced3d

Please sign in to comment.