Skip to content

Commit

Permalink
added more controlo to Queues for notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
ppossanzini committed Nov 25, 2024
1 parent c9532e2 commit c5176e4
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Arbitrer.GRPC/RequestsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public RequestsManager(IOptions<MessageDispatcherOptions> options, ILogger<Reque

if (requestsManagerOptions.Value.AcceptMessageTypes.Count == 0)
{
foreach (var t in arbitrerOptions.Value.LocalRequests)
foreach (var t in arbitrerOptions.Value.LocalTypes)
requestsManagerOptions.Value.AcceptMessageTypes.Add(t);
}

Expand Down
22 changes: 18 additions & 4 deletions Arbitrer.RabbitMQ/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ public static IServiceCollection AddArbitrerRabbitMQMessageDispatcher(this IServ
public static string ArbitrerQueueName(this Type t, ArbitrerOptions options, StringBuilder sb = null)
{
if (options.QueueNames.TryGetValue(t, out string queueName)) return queueName;

sb = sb ?? new StringBuilder();
sb.Append(t.ArbitrerTypeName(options));

sb.Append("$");
if (t.IsNotification())
sb.Append(Guid.NewGuid().ToString());
return sb.ToString();
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
Expand All @@ -57,6 +57,20 @@ where typeof(IBaseRequest).IsAssignableFrom(t)
return options;
}

public static ArbitrerOptions NotificationsInASingleQueue(this ArbitrerOptions options, Func<IEnumerable<Type>, IEnumerable<Type>> notificationTypes = null)
{
var notifications = options.LocalTypes.Where(t => t.IsNotification());
if (notificationTypes != null)
notifications = notificationTypes(notifications);

foreach (var n in notifications.Where(t => t.IsNotification()))
{
options.SetTypeQueueName(n, $"{n.ArbitrerTypeName(options)}${Assembly.GetEntryAssembly()?.FullName}");
}

return options;
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
Expand Down Expand Up @@ -107,7 +121,7 @@ public static IServiceCollection AddRabbitMQRequestManager(this IServiceCollecti
services.AddHostedService<RequestsManager>();
return services;
}

/// <summary>
/// Computes the hash value of a string using the specified HashAlgorithm.
/// </summary>
Expand Down
8 changes: 4 additions & 4 deletions Arbitrer/Arbitrer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Arbitrer(IOptions<ArbitrerOptions> options, IServiceProvider serviceProvi
/// </summary>
/// <param name="t">The type to check.</param>
/// <returns>True if the type has a local handler registered, False otherwise.</returns>
public bool HasLocalHandler(Type t) => this._options.LocalRequests.Any(i => i == t);
public bool HasLocalHandler(Type t) => this._options.LocalTypes.Any(i => i == t);

/// <summary>
/// Determines if the specified type has a remote handler.
Expand All @@ -61,7 +61,7 @@ public Arbitrer(IOptions<ArbitrerOptions> options, IServiceProvider serviceProvi
/// </summary>
/// <param name="t">The type to check.</param>
/// <returns>True if the type has a remote handler; otherwise, false.</returns>
public bool HasRemoteHandler(Type t) => this._options.RemoteRequests.Any(i => i == t);
public bool HasRemoteHandler(Type t) => this._options.RemoteTypes.Any(i => i == t);


/// <summary>
Expand Down Expand Up @@ -145,15 +145,15 @@ public Task SendRemoteNotification<TRequest>(TRequest request, string queueName
/// <returns>
/// An IEnumerable of Type containing the local request types.
/// </returns>
public IEnumerable<Type> GetLocalRequestsTypes() => _options.LocalRequests;
public IEnumerable<Type> GetLocalRequestsTypes() => _options.LocalTypes;

/// <summary>
/// Retrieves the collection of remote request types.
/// </summary>
/// <returns>
/// Returns an enumerable collection of <see cref="Type"/> objects representing remote request types.
/// </returns>
public IEnumerable<Type> GetRemoteRequestsTypes() => _options.RemoteRequests;
public IEnumerable<Type> GetRemoteRequestsTypes() => _options.RemoteTypes;
}

public enum HandlerLocation
Expand Down
4 changes: 2 additions & 2 deletions Arbitrer/ArbitrerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public class ArbitrerOptions
/// Gets or sets the collection of local requests.
/// </summary>
/// <value>The local requests.</value>
public HashSet<Type> LocalRequests { get; private set; } = new HashSet<Type>();
public HashSet<Type> LocalTypes { get; private set; } = new HashSet<Type>();

/// <summary>
/// Gets the set of remote requests supported by the application.
/// </summary>
public HashSet<Type> RemoteRequests { get; private set; } = new HashSet<Type>();
public HashSet<Type> RemoteTypes { get; private set; } = new HashSet<Type>();

/// <summary>
/// Get the prefix of remote queue
Expand Down
14 changes: 7 additions & 7 deletions Arbitrer/extensions/ArbitrerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static ArbitrerOptions InferLocalNotifications(this ArbitrerOptions optio
public static ArbitrerOptions SetAsLocalRequest<T>(this ArbitrerOptions options, string queuePrefix = null, ILogger logger = null)
where T : IBaseRequest
{
options.LocalRequests.Add(typeof(T));
options.LocalTypes.Add(typeof(T));

if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.TypePrefixes.ContainsKey(typeof(T).FullName))
{
Expand All @@ -135,7 +135,7 @@ public static ArbitrerOptions SetAsLocalRequest<T>(this ArbitrerOptions options,
public static ArbitrerOptions ListenForNotification<T>(this ArbitrerOptions options, string queuePrefix = null, ILogger logger = null)
where T : INotification
{
options.LocalRequests.Add(typeof(T));
options.LocalTypes.Add(typeof(T));

if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.TypePrefixes.ContainsKey(typeof(T).FullName))
{
Expand All @@ -157,7 +157,7 @@ public static ArbitrerOptions ListenForNotification<T>(this ArbitrerOptions opti
public static ArbitrerOptions SetAsRemoteRequest<T>(this ArbitrerOptions options, string queuePrefix = null, ILogger logger = null)
where T : IBaseRequest
{
options.RemoteRequests.Add(typeof(T));
options.RemoteTypes.Add(typeof(T));

if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.TypePrefixes.ContainsKey(typeof(T).FullName))
{
Expand Down Expand Up @@ -186,7 +186,7 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign
select t).AsEnumerable();

foreach (var t in types)
options.LocalRequests.Add(t);
options.LocalTypes.Add(t);

if (!string.IsNullOrWhiteSpace(queuePrefix))
foreach (var t in types)
Expand All @@ -211,7 +211,7 @@ public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, F
ILogger logger = null)
{
foreach (var t in typesSelect())
options.LocalRequests.Add(t);
options.LocalTypes.Add(t);

if (!string.IsNullOrWhiteSpace(queuePrefix))
foreach (var t in typesSelect())
Expand Down Expand Up @@ -240,7 +240,7 @@ from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssignableFrom(t)
select t).AsEnumerable();
foreach (var t in types)
options.RemoteRequests.Add(t);
options.RemoteTypes.Add(t);

if (!string.IsNullOrWhiteSpace(queuePrefix))
foreach (var t in types)
Expand Down Expand Up @@ -269,7 +269,7 @@ public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options,
logger?.LogWarning("SetAsRemoteRequests : No Requests classes found in assemblies");

foreach (var t in types)
options.RemoteRequests.Add(t);
options.RemoteTypes.Add(t);

if (!string.IsNullOrWhiteSpace(queuePrefix))
foreach (var t in types)
Expand Down

0 comments on commit c5176e4

Please sign in to comment.