From 4ed416d63f8e2cfb254e9cba691171f0b0cb1a4f Mon Sep 17 00:00:00 2001 From: Paolo Possanzini Date: Tue, 17 Dec 2024 13:00:49 +0100 Subject: [PATCH] watchguard --- Arbitrer.RabbitMQ/RequestsManager.cs | 90 ++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 18 deletions(-) diff --git a/Arbitrer.RabbitMQ/RequestsManager.cs b/Arbitrer.RabbitMQ/RequestsManager.cs index 90b8ed3..625d344 100644 --- a/Arbitrer.RabbitMQ/RequestsManager.cs +++ b/Arbitrer.RabbitMQ/RequestsManager.cs @@ -51,6 +51,9 @@ public class RequestsManager : IHostedService /// private IChannel _channel = null; + private Thread _watchguard = null; + + private Dictionary _consumers = new Dictionary(); /// /// Represents a SHA256 hash algorithm instance used for hashing data. @@ -86,29 +89,47 @@ public RequestsManager(IOptions options, ILoggerA representing the asynchronous operation. public async Task StartAsync(CancellationToken cancellationToken) { - if (_connection == null) + await CheckConnection(cancellationToken); + + await CheckRequestsConsumers(cancellationToken); + + await ValidateConnectionQos(cancellationToken); + + _watchguard = new Thread(() => Watchguard()) { IsBackground = true }; + _watchguard.Start(); + } + + private async void Watchguard() + { + await Task.Delay(TimeSpan.FromMinutes(2)); + while (true) { - _logger.LogInformation($"ARBITRER: Creating RabbitMQ Conection to '{_options.HostName}'..."); - var factory = new ConnectionFactory - { - HostName = _options.HostName, - UserName = _options.UserName, - Password = _options.Password, - VirtualHost = _options.VirtualHost, - Port = _options.Port, + var toremove = new HashSet(); - ClientProvidedName = _options.ClientName - }; + foreach (var k in _consumers) + { + if (!k.Value.IsRunning) + { + _logger.LogError($"Stopping consumer for {k.Key}: The consumer is stopped for {k.Value.ShutdownReason?.Exception?.Message ?? "unknown reason"}"); + toremove.Add(k.Key); + } + } - _connection = await factory.CreateConnectionAsync(cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + foreach (var t in toremove) + { + if (_consumers.ContainsKey(t)) + _consumers.Remove(t); + } - await _channel.ExchangeDeclareAsync(Constants.ArbitrerExchangeName, ExchangeType.Topic, cancellationToken: cancellationToken); + await CheckRequestsConsumers(CancellationToken.None); - _logger.LogInformation("ARBITRER: ready !"); + toremove.Clear(); + await Task.Delay(TimeSpan.FromMinutes(2)); } + } - + private async Task CheckRequestsConsumers(CancellationToken cancellationToken) + { foreach (var t in _arbitrer.GetLocalRequestsTypes()) { if (t is null) continue; @@ -131,6 +152,7 @@ await _channel.QueueDeclareAsync(queue: queueName, durable: _options.Durable, var consumer = new AsyncEventingBasicConsumer(_channel); + _consumers.Add(t, consumer); var consumerMethod = typeof(RequestsManager) .GetMethod(isNotification ? nameof(ConsumeChannelNotification) : nameof(ConsumeChannelMessage), BindingFlags.Instance | BindingFlags.NonPublic)? @@ -152,7 +174,10 @@ await _channel.QueueDeclareAsync(queue: queueName, durable: _options.Durable, await _channel.BasicConsumeAsync(queue: queueName, autoAck: isNotification, consumer: consumer, cancellationToken: cancellationToken); } + } + private async Task ValidateConnectionQos(CancellationToken cancellationToken) + { try { if (_options.PerChannelQos == 0) @@ -186,9 +211,39 @@ await _channel.BasicQosAsync(0, _options.PerChannelQos > ushort.MaxValue ? ushor _logger.LogError(ex.Message); _logger.LogError(ex.StackTrace); } - } + private async Task CheckConnection(CancellationToken cancellationToken) + { + if (_connection != null && _connection.IsOpen) + { + _connection = null; + } + + if (_connection == null) + { + _logger.LogInformation($"ARBITRER: Creating RabbitMQ Connection to '{_options.HostName}'..."); + var factory = new ConnectionFactory + { + HostName = _options.HostName, + UserName = _options.UserName, + Password = _options.Password, + VirtualHost = _options.VirtualHost, + Port = _options.Port, + + ClientProvidedName = _options.ClientName + }; + + _connection = await factory.CreateConnectionAsync(cancellationToken); + _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + + await _channel.ExchangeDeclareAsync(Constants.ArbitrerExchangeName, ExchangeType.Topic, cancellationToken: cancellationToken); + + _logger.LogInformation("ARBITRER: ready !"); + } + } + + /// /// ConsumeChannelNotification is a private asynchronous method that handles the consumption of channel notifications. /// The type of messages to be consumed The object that triggered the event The event arguments containing the consumed message @@ -286,7 +341,6 @@ public async Task StopAsync(CancellationToken cancellationToken) catch { } - } } } \ No newline at end of file