Skip to content

Commit

Permalink
Added Counter Interceptor Sample (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
rross authored Aug 20, 2024
1 parent 175feb5 commit 0cacb5d
Show file tree
Hide file tree
Showing 12 changed files with 445 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Prerequisites:
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.
* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPr
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterInterceptor", "src\CounterInterceptor\TemporalioSamples.CounterInterceptor.csproj", "{F9C44936-8BF9-4919-BB66-8F1888E22AEB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -155,6 +157,10 @@ Global
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F9C44936-8BF9-4919-BB66-8F1888E22AEB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -185,5 +191,6 @@ Global
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{F9C44936-8BF9-4919-BB66-8F1888E22AEB} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
43 changes: 43 additions & 0 deletions src/CounterInterceptor/Counts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace TemporalioSamples.CounterInterceptor;
public class Counts
{
private uint clientExecutions;
private uint clientQueries;
private uint clientSignals;
private uint workflowReplays;
private uint workflowSignals;
private uint workflowQueries;
private uint workflowChildExecutions;
private uint workflowActivityExecutions;

public ref uint ClientExecutions => ref clientExecutions;

public ref uint ClientSignals => ref clientSignals;

public ref uint ClientQueries => ref clientQueries;

public string ClientInfo() =>
$"\n\tTotal Number of Workflow Exec: {ClientExecutions}\n\t" +
$"Total Number of Signals: {ClientSignals}\n\t" +
$"Total Number of Queries: {ClientQueries}";

public ref uint WorkflowReplays => ref workflowReplays;

public ref uint WorkflowSignals => ref workflowSignals;

public ref uint WorkflowQueries => ref workflowQueries;

public ref uint WorkflowChildExecutions => ref workflowChildExecutions;

public ref uint WorkflowActivityExecutions => ref workflowActivityExecutions;

public string WorkflowInfo() =>
$"\n\tTotal Number of Workflow Replays: {WorkflowReplays}\n\t" +
$"Total Number of Child Workflow Exec: {WorkflowChildExecutions}\n\t" +
$"Total Number of Activity Exec: {WorkflowActivityExecutions}\n\t" +
$"Total Number of Signals: {WorkflowSignals}\n\t" +
$"Total Number of Queries: {WorkflowQueries}";

public override string ToString() =>
ClientInfo() + WorkflowInfo();
}
15 changes: 15 additions & 0 deletions src/CounterInterceptor/MyActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace TemporalioSamples.CounterInterceptor;

using System.Diagnostics;
using Temporalio.Activities;

public class MyActivities
{
[Activity]
public string SayHello(string name, string title) =>
$"Hello {title} {name}";

[Activity]
public string SayGoodBye(string name, string title) =>
$"Goodby {title} {name}";
}
17 changes: 17 additions & 0 deletions src/CounterInterceptor/MyChildWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Workflows;

[Workflow]
public class MyChildWorkflow
{
private readonly ActivityOptions activityOptions = new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(10),
};

[WorkflowRun]
public async Task<string> RunAsync(string name, string title) =>
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayHello(name, title), activityOptions) +
await Workflow.ExecuteActivityAsync((MyActivities act) => act.SayGoodBye(name, title), activityOptions);
}
144 changes: 144 additions & 0 deletions src/CounterInterceptor/MyCounterInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
namespace TemporalioSamples.CounterInterceptor;

using System.Collections.Concurrent;
using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Client.Interceptors;
using Temporalio.Worker.Interceptors;
using Temporalio.Workflows;

public class MyCounterInterceptor : IClientInterceptor, IWorkerInterceptor
{
public ConcurrentDictionary<string, Counts> Counts { get; } = new();

public string WorkerInfo() =>
string.Join(
"\n",
Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.WorkflowInfo()}"));

public string ClientInfo() =>
string.Join(
"\n",
Counts.Select(kvp => $"** Workflow ID: {kvp.Key} {kvp.Value.ClientInfo()}"));

public ClientOutboundInterceptor InterceptClient(ClientOutboundInterceptor nextInterceptor) =>
new ClientOutbound(this, nextInterceptor);

public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor nextInterceptor) =>
new WorkflowInbound(this, nextInterceptor);

public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) =>
new ActivityInbound(this, nextInterceptor);

private void Increment(string id, Action<Counts> increment) =>
increment(Counts.GetOrAdd(id, _ => new()));

private sealed class ClientOutbound : ClientOutboundInterceptor
{
private MyCounterInterceptor root;

public ClientOutbound(MyCounterInterceptor root, ClientOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowAsync<TWorkflow, TResult>(
StartWorkflowInput input)
{
var id = input.Options.Id ?? "None";
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientExecutions));
return base.StartWorkflowAsync<TWorkflow, TResult>(input);
}

public override Task SignalWorkflowAsync(SignalWorkflowInput input)
{
var id = input.Id;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientSignals));
return base.SignalWorkflowAsync(input);
}

public override Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInput input)
{
var id = input.Id;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].ClientQueries));
return base.QueryWorkflowAsync<TResult>(input);
}
}

private sealed class WorkflowInbound : WorkflowInboundInterceptor
{
private readonly MyCounterInterceptor root;

internal WorkflowInbound(MyCounterInterceptor root, WorkflowInboundInterceptor next)
: base(next) => this.root = root;

public override void Init(WorkflowOutboundInterceptor outbound) =>
base.Init(new WorkflowOutbound(root, outbound));

public override Task<object?> ExecuteWorkflowAsync(ExecuteWorkflowInput input)
{
// Count only if we're not replaying
if (!Workflow.Unsafe.IsReplaying)
{
var id = Workflow.Info.WorkflowId;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowReplays));
}
return base.ExecuteWorkflowAsync(input);
}

public override Task HandleSignalAsync(HandleSignalInput input)
{
// Count only if we're not replaying
if (!Workflow.Unsafe.IsReplaying)
{
var id = Workflow.Info.WorkflowId;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowSignals));
}
return base.HandleSignalAsync(input);
}

public override object? HandleQuery(HandleQueryInput input)
{
// Count only if we're not replaying
if (!Workflow.Unsafe.IsReplaying)
{
var id = Workflow.Info.WorkflowId;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowQueries));
}
return base.HandleQuery(input);
}
}

private sealed class WorkflowOutbound : WorkflowOutboundInterceptor
{
private readonly MyCounterInterceptor root;

internal WorkflowOutbound(MyCounterInterceptor root, WorkflowOutboundInterceptor next)
: base(next) => this.root = root;

public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflowAsync<TWorkflow, TResult>(
StartChildWorkflowInput input)
{
// Count only if we're not replaying
if (!Workflow.Unsafe.IsReplaying)
{
var id = Workflow.Info.WorkflowId;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowChildExecutions));
}
return base.StartChildWorkflowAsync<TWorkflow, TResult>(input);
}
}

private sealed class ActivityInbound : ActivityInboundInterceptor
{
private readonly MyCounterInterceptor root;

internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor next)
: base(next) => this.root = root;

public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input)
{
var id = ActivityExecutionContext.Current.Info.WorkflowId;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowActivityExecutions));
return base.ExecuteActivityAsync(input);
}
}
}
43 changes: 43 additions & 0 deletions src/CounterInterceptor/MyWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Workflows;

[Workflow]
public class MyWorkflow
{
private bool exit; // Automatically defaults to false

[WorkflowRun]
public async Task<string> RunAsync()
{
// Wait for greeting info
await Workflow.WaitConditionAsync(() =>
!string.IsNullOrEmpty(Name) && !string.IsNullOrEmpty(Title));

// Execute Child Workflow
var result = await Workflow.ExecuteChildWorkflowAsync(
(MyChildWorkflow wf) => wf.RunAsync(Name, Title),
new() { Id = "counter-interceptor-child" });

// Wait for exit signal
await Workflow.WaitConditionAsync(() => exit);

return result;
}

[WorkflowSignal]
public async Task SignalNameAndTitleAsync(string name, string title)
{
Name = name;
Title = title;
}

[WorkflowQuery]
public string Name { get; private set; } = string.Empty;

[WorkflowQuery]
public string Title { get; private set; } = string.Empty;

[WorkflowSignal]
public async Task ExitAsync() => exit = true;
}
73 changes: 73 additions & 0 deletions src/CounterInterceptor/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
namespace TemporalioSamples.CounterInterceptor;

using Temporalio.Client;
using Temporalio.Worker;

internal class Program
{
private static async Task Main(string[] args)
{
var counterInterceptor = new MyCounterInterceptor();
var client = await TemporalClient.ConnectAsync(
options: new("localhost:7233")
{
Interceptors = new[]
{
counterInterceptor,
},
});

var activities = new MyActivities();

var taskQueue = "CounterInterceptorTaskQueue";

var workerOptions = new TemporalWorkerOptions(taskQueue).
AddAllActivities(activities).
AddWorkflow<MyWorkflow>().
AddWorkflow<MyChildWorkflow>();

// workerOptions.Interceptors = new[] { counterInterceptor };
using var worker = new TemporalWorker(
client,
workerOptions);

// Run worker until cancelled
Console.WriteLine("Running worker...");

// Start the workers
await worker.ExecuteAsync(async () =>
{
// Start the workflow
var handle = await client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
new(id: Guid.NewGuid().ToString(), taskQueue: taskQueue));
Console.WriteLine("Sending name and title to workflow");
await handle.SignalAsync(wf => wf.SignalNameAndTitleAsync("John", "Customer"));
var name = await handle.QueryAsync(wf => wf.Name);
var title = await handle.QueryAsync(wf => wf.Title);
// Send exit signal to workflow
await handle.SignalAsync(wf => wf.ExitAsync());
var result = await handle.GetResultAsync();
Console.WriteLine($"Workflow result is {result}");
Console.WriteLine("Query results: ");
Console.WriteLine($"\tName: {name}");
Console.WriteLine($"\tTitle: {title}");
// Print worker counter info
Console.WriteLine("\nCollected Worker Counter Info:\n");
Console.WriteLine(counterInterceptor.WorkerInfo());
Console.WriteLine($"Number of unique workflows: {counterInterceptor.Counts.Count}");
// Print client counter info
Console.WriteLine();
Console.WriteLine("Collected Client Counter Info:\n");
Console.WriteLine(counterInterceptor.ClientInfo());
});
}
}
13 changes: 13 additions & 0 deletions src/CounterInterceptor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# dotnet-counter-interceptor
The sample demonstrates:
- the use of a Worker Workflow Interceptor that counts the number of Workflow Executions, Child Workflow Executions, and Activity Executions and the number of Signals and Queries. It is based
off of the [Java sample](https://github.com/temporalio/samples-java/tree/main) located [here](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/countinterceptor)
- the use of a Client Workflow Interceptor that counts the number of Workflow Executions and the number of Signals and Queries.

To run, first see [README.md](https://github.com/temporalio/samples-dotnet/blob/main/README.md) for prerequisites

## Run Worker and Client
```bash
# make sure you have temporal server running
dotnet run
```
Loading

0 comments on commit 0cacb5d

Please sign in to comment.