Skip to content

Commit

Permalink
Finish implementation including initial manual testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits committed Feb 3, 2025
1 parent 72168b5 commit f87497b
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 45 deletions.
7 changes: 7 additions & 0 deletions dotnet/AutoGen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Hello", "Hello", "{F42F9C8E
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.Core.Grpc", "src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj", "{3D83C6DB-ACEA-48F3-959F-145CCD2EE135}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GettingStartedGrpc", "samples\GettingStartedGrpc\GettingStartedGrpc.csproj", "{C3740DF1-18B1-4607-81E4-302F0308C848}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -306,6 +308,10 @@ Global
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D}.Release|Any CPU.Build.0 = Release|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C3740DF1-18B1-4607-81E4-302F0308C848}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -359,6 +365,7 @@ Global
{3D83C6DB-ACEA-48F3-959F-145CCD2EE135} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{AAD593FE-A49B-425E-A9FE-A0022CD25E3D} = {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1}
{F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6}
{C3740DF1-18B1-4607-81E4-302F0308C848} = {CE0AA8D5-12B8-4628-9589-DAD8CB0DDCF6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}
Expand Down
34 changes: 34 additions & 0 deletions dotnet/samples/GettingStartedGrpc/Checker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Checker.cs

using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.Extensions.Hosting;
using TerminationF = System.Func<int, bool>;

namespace GettingStartedGrpcSample;

[TypeSubscription("default")]
public class Checker(
AgentId id,
IAgentRuntime runtime,
IHostApplicationLifetime hostApplicationLifetime,
TerminationF runUntilFunc
) :
BaseAgent(id, runtime, "Modifier", null),
IHandle<Events.CountUpdate>
{
public async ValueTask HandleAsync(Events.CountUpdate item, MessageContext messageContext)
{
if (!runUntilFunc(item.NewCount))
{
Console.WriteLine($"\nChecker:\n{item.NewCount} passed the check, continue.");
await this.PublishMessageAsync(new Events.CountMessage { Content = item.NewCount }, new TopicId("default"));
}
else
{
Console.WriteLine($"\nChecker:\n{item.NewCount} failed the check, stopping.");
hostApplicationLifetime.StopApplication();
}
}
}
26 changes: 26 additions & 0 deletions dotnet/samples/GettingStartedGrpc/GettingStartedGrpc.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>getting_started</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Contracts\Microsoft.AutoGen.Contracts.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Core\Microsoft.AutoGen.Core.csproj" />
<ProjectReference Include="..\..\src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj" />
</ItemGroup>


<ItemGroup>
<Protobuf Include="message.proto" GrpcServices="Client" Link="Protos\message.proto" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
</ItemGroup>

</Project>
29 changes: 29 additions & 0 deletions dotnet/samples/GettingStartedGrpc/Modifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Modifier.cs

using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;

using ModifyF = System.Func<int, int>;

namespace GettingStartedGrpcSample;

[TypeSubscription("default")]
public class Modifier(
AgentId id,
IAgentRuntime runtime,
ModifyF modifyFunc
) :
BaseAgent(id, runtime, "Modifier", null),
IHandle<Events.CountMessage>
{

public async ValueTask HandleAsync(Events.CountMessage item, MessageContext messageContext)
{
int newValue = modifyFunc(item.Content);
Console.WriteLine($"\nModifier:\nModified {item.Content} to {newValue}");

var updateMessage = new Events.CountUpdate { NewCount = newValue };
await this.PublishMessageAsync(updateMessage, topic: new TopicId("default"));
}
}
36 changes: 36 additions & 0 deletions dotnet/samples/GettingStartedGrpc/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using GettingStartedGrpcSample;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.AutoGen.Core.Grpc;
using Microsoft.Extensions.DependencyInjection.Extensions;
using ModifyF = System.Func<int, int>;
using TerminationF = System.Func<int, bool>;

ModifyF modifyFunc = (int x) => x - 1;
TerminationF runUntilFunc = (int x) =>
{
return x <= 1;
};

AgentsAppBuilder appBuilder = new AgentsAppBuilder();
appBuilder.AddGrpcAgentWorker("http://localhost:50051");

appBuilder.Services.TryAddSingleton(modifyFunc);
appBuilder.Services.TryAddSingleton(runUntilFunc);

appBuilder.AddAgent<Checker>("Checker");
appBuilder.AddAgent<Modifier>("Modifier");

var app = await appBuilder.BuildAsync();
await app.StartAsync();

// Send the initial count to the agents app, running on the `local` runtime, and pass through the registered services via the application `builder`
await app.PublishMessageAsync(new GettingStartedGrpcSample.Events.CountMessage
{
Content = 10
}, new TopicId("default"));

// Run until application shutdown
await app.WaitForShutdownAsync();
11 changes: 11 additions & 0 deletions dotnet/samples/GettingStartedGrpc/message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

option csharp_namespace = "GettingStartedGrpcSample.Events";

message CountMessage {
int32 content = 1;
}

message CountUpdate {
int32 new_count = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.AutoGen.Core.Grpc;

public static class AgentsAppBuilderExtensions
{
private const string _defaultAgentServiceAddress = "https://localhost:53071";
private const string _defaultAgentServiceAddress = "http://localhost:53071";

// TODO: How do we ensure AddGrpcAgentWorker and UseInProcessRuntime are mutually exclusive?
public static AgentsAppBuilder AddGrpcAgentWorker(this AgentsAppBuilder builder, string? agentServiceAddress = null)
Expand Down
40 changes: 22 additions & 18 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/CloudEventExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,35 @@ namespace Microsoft.AutoGen.Core.Grpc;
internal static class CloudEventExtensions
{
// Convert an ISubscrptionDefinition to a Protobuf Subscription
internal static CloudEvent CreateCloudEvent(Google.Protobuf.WellKnownTypes.Any payload, TopicId topic, string dataType, AgentId sender, string messageId)
internal static CloudEvent CreateCloudEvent(Google.Protobuf.WellKnownTypes.Any payload, TopicId topic, string dataType, AgentId? sender, string messageId)
{
var attributes = new Dictionary<string, CloudEvent.Types.CloudEventAttributeValue>
{
{
Constants.DATA_CONTENT_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.DATA_CONTENT_TYPE_PROTOBUF_VALUE }
},
{
Constants.DATA_SCHEMA_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = dataType }
},
{
Constants.MESSAGE_KIND_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.MESSAGE_KIND_VALUE_PUBLISH }
}
};

if (sender != null)
{
var senderNonNull = (AgentId)sender;
attributes.Add(Constants.AGENT_SENDER_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = senderNonNull.Type });
attributes.Add(Constants.AGENT_SENDER_KEY_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = senderNonNull.Key });
}

return new CloudEvent
{
ProtoData = payload,
Type = topic.Type,
Source = topic.Source,
Id = messageId,
Attributes = {
{
Constants.DATA_CONTENT_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.DATA_CONTENT_TYPE_PROTOBUF_VALUE }
},
{
Constants.DATA_SCHEMA_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = dataType }
},
{
Constants.AGENT_SENDER_TYPE_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = sender.Type }
},
{
Constants.AGENT_SENDER_KEY_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = sender.Key }
},
{
Constants.MESSAGE_KIND_ATTR, new CloudEvent.Types.CloudEventAttributeValue { CeString = Constants.MESSAGE_KIND_VALUE_PUBLISH }
}
}
Attributes = { attributes }
};

}
Expand Down
58 changes: 40 additions & 18 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed class AgentsContainer(IAgentRuntime hostingRuntime)
private readonly IAgentRuntime hostingRuntime = hostingRuntime;

private Dictionary<Contracts.AgentId, IHostableAgent> agentInstances = new();
private Dictionary<string, ISubscriptionDefinition> subscriptions = new();
public Dictionary<string, ISubscriptionDefinition> Subscriptions = new();
private Dictionary<AgentType, Func<Contracts.AgentId, IAgentRuntime, ValueTask<IHostableAgent>>> agentFactories = new();

public async ValueTask<IHostableAgent> EnsureAgentAsync(Contracts.AgentId agentId)
Expand Down Expand Up @@ -57,22 +57,22 @@ public AgentType RegisterAgentFactory(AgentType type, Func<Contracts.AgentId, IA

public void AddSubscription(ISubscriptionDefinition subscription)
{
if (this.subscriptions.ContainsKey(subscription.Id))
if (this.Subscriptions.ContainsKey(subscription.Id))
{
throw new Exception($"Subscription with id {subscription.Id} already exists.");
}

this.subscriptions.Add(subscription.Id, subscription);
this.Subscriptions.Add(subscription.Id, subscription);
}

public bool RemoveSubscriptionAsync(string subscriptionId)
{
if (!this.subscriptions.ContainsKey(subscriptionId))
if (!this.Subscriptions.ContainsKey(subscriptionId))
{
throw new Exception($"Subscription with id {subscriptionId} does not exist.");
}

return this.subscriptions.Remove(subscriptionId);
return this.Subscriptions.Remove(subscriptionId);
}

public HashSet<AgentType> RegisteredAgentTypes => this.agentFactories.Keys.ToHashSet();
Expand All @@ -90,7 +90,7 @@ public GrpcAgentRuntime(AgentRpc.AgentRpcClient client,
this._logger = logger;
this._shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping);

this._messageRouter = new GrpcMessageRouter(client, this, logger, this._shutdownCts.Token);
this._messageRouter = new GrpcMessageRouter(client, this, _clientId, logger, this._shutdownCts.Token);
this._agentsContainer = new AgentsContainer(this);

this.ServiceProvider = serviceProvider;
Expand All @@ -109,14 +109,14 @@ public GrpcAgentRuntime(AgentRpc.AgentRpcClient client,

public IServiceProvider ServiceProvider { get; }

private string _clientId = Guid.NewGuid().ToString();
private Guid _clientId = Guid.NewGuid();
private CallOptions CallOptions
{
get
{
var metadata = new Metadata
{
{ "client-id", this._clientId }
{ "client-id", this._clientId.ToString() }
};
return new CallOptions(headers: metadata);
}
Expand Down Expand Up @@ -221,11 +221,15 @@ private async ValueTask HandlePublish(CloudEvent evt, CancellationToken cancella
}

var topic = new TopicId(evt.Type, evt.Source);
var sender = new Contracts.AgentId
Contracts.AgentId? sender = null;
if (evt.Attributes.TryGetValue(Constants.AGENT_SENDER_TYPE_ATTR, out var typeValue) && evt.Attributes.TryGetValue(Constants.AGENT_SENDER_KEY_ATTR, out var keyValue))
{
Type = evt.Attributes[Constants.AGENT_SENDER_TYPE_ATTR].CeString,
Key = evt.Attributes[Constants.AGENT_SENDER_KEY_ATTR].CeString
};
sender = new Contracts.AgentId
{
Type = typeValue.CeString,
Key = keyValue.CeString
};
}

var messageId = evt.Id;
var typeName = evt.Attributes[Constants.DATA_SCHEMA_ATTR].CeString;
Expand All @@ -238,8 +242,17 @@ private async ValueTask HandlePublish(CloudEvent evt, CancellationToken cancella
Topic = topic,
IsRpc = false
};
var agent = await this._agentsContainer.EnsureAgentAsync(sender);
await agent.OnMessageAsync(message, messageContext);

// Iterate over subscriptions values to find receiving agents
foreach (var subscription in this._agentsContainer.Subscriptions.Values)
{
if (subscription.Matches(topic))
{
var recipient = subscription.MapToAgent(topic);
var agent = await this._agentsContainer.EnsureAgentAsync(recipient);
await agent.OnMessageAsync(message, messageContext);
}
}
}

public ValueTask StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -290,9 +303,9 @@ public async ValueTask PublishMessageAsync(object message, TopicId topic, Contra
SerializationRegistry.RegisterSerializer(message.GetType());
}
var protoAny = (SerializationRegistry.GetSerializer(message.GetType()) ?? throw new Exception()).Serialize(message);
var typeName = SerializationRegistry.TypeNameResolver.ResolveTypeName(message);
var typeName = SerializationRegistry.TypeNameResolver.ResolveTypeName(message.GetType());

var cloudEvent = CloudEventExtensions.CreateCloudEvent(protoAny, topic, typeName, sender ?? new Contracts.AgentId(), messageId ?? Guid.NewGuid().ToString());
var cloudEvent = CloudEventExtensions.CreateCloudEvent(protoAny, topic, typeName, sender, messageId ?? Guid.NewGuid().ToString());

Message msg = new()
{
Expand Down Expand Up @@ -342,8 +355,17 @@ await this._client.RemoveSubscriptionAsync(new RemoveSubscriptionRequest
}, this.CallOptions);
}

public ValueTask<AgentType> RegisterAgentFactoryAsync(AgentType type, Func<Contracts.AgentId, IAgentRuntime, ValueTask<IHostableAgent>> factoryFunc)
=> ValueTask.FromResult(this._agentsContainer.RegisterAgentFactory(type, factoryFunc));
public async ValueTask<AgentType> RegisterAgentFactoryAsync(AgentType type, Func<Contracts.AgentId, IAgentRuntime, ValueTask<IHostableAgent>> factoryFunc)
{
this._agentsContainer.RegisterAgentFactory(type, factoryFunc);

await this._client.RegisterAgentAsync(new RegisterAgentTypeRequest
{
Type = type,
}, this.CallOptions);

return type;
}

public ValueTask<AgentProxy> TryGetAgentProxyAsync(Contracts.AgentId agentId)
{
Expand Down
Loading

0 comments on commit f87497b

Please sign in to comment.