Skip to content

Commit

Permalink
fix message propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
ppossanzini committed Nov 4, 2024
1 parent f46047a commit 27aa36c
Showing 1 changed file with 16 additions and 45 deletions.
61 changes: 16 additions & 45 deletions Arbitrer/pipelines/ArbitrerPipeline.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
Expand All @@ -13,7 +14,9 @@ namespace Arbitrer.Pipelines
/// </summary>
/// <typeparam name="TRequest">The type of the request.</typeparam>
/// <typeparam name="TResponse">The type of the response.</typeparam>
public class ArbitrerPipeline<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> //where TRequest : notnull
public class ArbitrerPipeline<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : class, IBaseRequest
//where TRequest : notnull
{
private readonly IArbitrer arbitrer;
private readonly ILogger<Arbitrer> _logger;
Expand All @@ -24,45 +27,7 @@ public ArbitrerPipeline(IArbitrer arbitrer, ILogger<Arbitrer> logger)
_logger = logger;
}

// Implementation for legacy version for .netstandard 2.0 compatibility
/// <summary>
/// Handles a request asynchronously.
/// </summary>
/// <typeparam name="TRequest">The type of the request.</typeparam>
/// <typeparam name="TResponse">The type of the response.</typeparam>
/// <param name="request">The request to be handled.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="next">The delegate to invoke the next handler in the pipeline.</param>
/// <returns>A task representing the asynchronous handling of the request.</returns>
/// <exception cref="InvalidHandlerException">Thrown when the handler location is invalid.</exception>
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
object req = request;
string queueName = null;

if (typeof(IExplicitQueue).IsAssignableFrom(request.GetType()))
{
queueName = ((IExplicitQueue)request).QueueName;
req = ((IExplicitQueue)request).MessageObject;
}

try
{
switch (arbitrer.GetLocation(req.GetType()))
{
case HandlerLocation.Local: return await next().ConfigureAwait(false);
case HandlerLocation.Remote: return await arbitrer.InvokeRemoteHandler<TRequest, TResponse>(request, queueName);
default: throw new InvalidHandlerException();
}
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
throw;
}
}

// Implementation for version > 11
/// <summary>
/// Handles the request by invoking the appropriate handler based on the location of the request.
/// </summary>
Expand All @@ -74,21 +39,27 @@ public async Task<TResponse> Handle(TRequest request, CancellationToken cancella
/// <returns>The response data.</returns>
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
object req = request;
string queueName = null;

if (typeof(IExplicitQueue).IsAssignableFrom(request.GetType()))
{
queueName = ((IExplicitQueue)request).QueueName;
req = ((IExplicitQueue)request).MessageObject;
var queueName = ((IExplicitQueue)request).QueueName;
var req = ((IExplicitQueue)request).MessageObject;
var type = request.GetType().GetGenericArguments()[0];

return ((TResponse)this.GetType().GetMethod(nameof(InvokeHandler), BindingFlags.Instance | BindingFlags.NonPublic).MakeGenericMethod(type)
.Invoke(this, new object[] { next, req, queueName }));
}

return await InvokeHandler(next, request, null);
}

private async Task<TResponse> InvokeHandler<T>(RequestHandlerDelegate<TResponse> next, T req, string queueName)
{
try
{
switch (arbitrer.GetLocation(req.GetType()))
{
case HandlerLocation.Local: return await next().ConfigureAwait(false);
case HandlerLocation.Remote: return await arbitrer.InvokeRemoteHandler<TRequest, TResponse>(request, queueName);
case HandlerLocation.Remote: return await arbitrer.InvokeRemoteHandler<T, TResponse>(req, queueName);
default: throw new InvalidHandlerException();
}
}
Expand Down

0 comments on commit 27aa36c

Please sign in to comment.