Skip to content

Commit

Permalink
watchguard
Browse files Browse the repository at this point in the history
  • Loading branch information
ppossanzini committed Dec 17, 2024
1 parent 7df3426 commit 4ed416d
Showing 1 changed file with 72 additions and 18 deletions.
90 changes: 72 additions & 18 deletions Arbitrer.RabbitMQ/RequestsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class RequestsManager : IHostedService
/// </summary>
private IChannel _channel = null;

private Thread _watchguard = null;

private Dictionary<Type, AsyncEventingBasicConsumer> _consumers = new Dictionary<Type, AsyncEventingBasicConsumer>();

/// <summary>
/// Represents a SHA256 hash algorithm instance used for hashing data.
Expand Down Expand Up @@ -86,29 +89,47 @@ public RequestsManager(IOptions<MessageDispatcherOptions> options, ILogger<Reque
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
if (_connection == null)
await CheckConnection(cancellationToken);

await CheckRequestsConsumers(cancellationToken);

await ValidateConnectionQos(cancellationToken);

_watchguard = new Thread(() => Watchguard()) { IsBackground = true };
_watchguard.Start();
}

private async void Watchguard()
{
await Task.Delay(TimeSpan.FromMinutes(2));
while (true)
{
_logger.LogInformation($"ARBITRER: Creating RabbitMQ Conection to '{_options.HostName}'...");
var factory = new ConnectionFactory
{
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost,
Port = _options.Port,
var toremove = new HashSet<Type>();

ClientProvidedName = _options.ClientName
};
foreach (var k in _consumers)
{
if (!k.Value.IsRunning)
{
_logger.LogError($"Stopping consumer for {k.Key}: The consumer is stopped for {k.Value.ShutdownReason?.Exception?.Message ?? "unknown reason"}");
toremove.Add(k.Key);
}
}

_connection = await factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
foreach (var t in toremove)
{
if (_consumers.ContainsKey(t))
_consumers.Remove(t);
}

await _channel.ExchangeDeclareAsync(Constants.ArbitrerExchangeName, ExchangeType.Topic, cancellationToken: cancellationToken);
await CheckRequestsConsumers(CancellationToken.None);

_logger.LogInformation("ARBITRER: ready !");
toremove.Clear();
await Task.Delay(TimeSpan.FromMinutes(2));
}
}


private async Task CheckRequestsConsumers(CancellationToken cancellationToken)
{
foreach (var t in _arbitrer.GetLocalRequestsTypes())
{
if (t is null) continue;
Expand All @@ -131,6 +152,7 @@ await _channel.QueueDeclareAsync(queue: queueName, durable: _options.Durable,


var consumer = new AsyncEventingBasicConsumer(_channel);
_consumers.Add(t, consumer);

var consumerMethod = typeof(RequestsManager)
.GetMethod(isNotification ? nameof(ConsumeChannelNotification) : nameof(ConsumeChannelMessage), BindingFlags.Instance | BindingFlags.NonPublic)?
Expand All @@ -152,7 +174,10 @@ await _channel.QueueDeclareAsync(queue: queueName, durable: _options.Durable,

await _channel.BasicConsumeAsync(queue: queueName, autoAck: isNotification, consumer: consumer, cancellationToken: cancellationToken);
}
}

private async Task ValidateConnectionQos(CancellationToken cancellationToken)
{
try
{
if (_options.PerChannelQos == 0)
Expand Down Expand Up @@ -186,9 +211,39 @@ await _channel.BasicQosAsync(0, _options.PerChannelQos > ushort.MaxValue ? ushor
_logger.LogError(ex.Message);
_logger.LogError(ex.StackTrace);
}

}

private async Task CheckConnection(CancellationToken cancellationToken)
{
if (_connection != null && _connection.IsOpen)
{
_connection = null;
}

if (_connection == null)
{
_logger.LogInformation($"ARBITRER: Creating RabbitMQ Connection to '{_options.HostName}'...");
var factory = new ConnectionFactory
{
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost,
Port = _options.Port,

ClientProvidedName = _options.ClientName
};

_connection = await factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);

await _channel.ExchangeDeclareAsync(Constants.ArbitrerExchangeName, ExchangeType.Topic, cancellationToken: cancellationToken);

_logger.LogInformation("ARBITRER: ready !");
}
}


/// <summary>
/// ConsumeChannelNotification is a private asynchronous method that handles the consumption of channel notifications. </summary>
/// <typeparam name="T">The type of messages to be consumed</typeparam> <param name="sender">The object that triggered the event</param> <param name="ea">The event arguments containing the consumed message</param>
Expand Down Expand Up @@ -286,7 +341,6 @@ public async Task StopAsync(CancellationToken cancellationToken)
catch
{
}

}
}
}

0 comments on commit 4ed416d

Please sign in to comment.