diff --git a/src/ToMqttNet/MqttConnectionService.cs b/src/ToMqttNet/MqttConnectionService.cs index a8dbdf9..67967d8 100644 --- a/src/ToMqttNet/MqttConnectionService.cs +++ b/src/ToMqttNet/MqttConnectionService.cs @@ -37,12 +37,13 @@ public MqttConnectionService( protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + _logger.LogInformation("Executing {backgroundService}", GetType().FullName); var options = MqttOptions.ClientOptions; if(string.IsNullOrEmpty(options.ClientId)) - { - options.ClientId = MqttOptions.NodeId + "-" + _instanceId; - } + { + options.ClientId = MqttOptions.NodeId + "-" + _instanceId; + } options.WillPayload = Encoding.UTF8.GetBytes("0"); options.WillTopic = $"{MqttOptions.NodeId}/connected"; @@ -50,20 +51,33 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) if(options.ChannelOptions == null) { options.ChannelOptions = new MqttClientTcpOptions - { - Server = "mosquitto", + { + Server = "mosquitto", Port = 1883 - }; + }; } var optionsBuilder = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions(options); + _counters.SetPendingMessages(() => _mqttClient.PendingApplicationMessagesCount); + + _mqttClient.ConnectionStateChangedAsync += (evnt) => { + _counters.SetConnections(_mqttClient.IsConnected ? 1 : 0); + return Task.CompletedTask; + }; + + _mqttClient.ConnectingFailedAsync += (evnt) => { + _logger.LogWarning(evnt.Exception, "Connection to mqtt failed"); + + _counters.SetConnections(0); + return Task.CompletedTask; + }; + _mqttClient.ConnectedAsync += async (evnt) => { _logger.LogInformation("Connected to mqtt: {reason}", evnt.ConnectResult.ReasonString); - await _mqttClient.EnqueueAsync( new MqttApplicationMessageBuilder() .WithPayload("2") @@ -71,14 +85,12 @@ await _mqttClient.EnqueueAsync( .WithRetainFlag() .Build()); - _counters.SetConnections(1); OnConnect?.Invoke(this, new EventArgs()); }; _mqttClient.DisconnectedAsync += (evnt) => { _logger.LogInformation(evnt.Exception, "Disconnected from mqtt: {reason}", evnt.Reason); - _counters.SetConnections(0); OnDisconnect?.Invoke(this, new EventArgs()); return Task.CompletedTask; }; @@ -91,18 +103,18 @@ await _mqttClient.EnqueueAsync( { _logger.LogTrace("{topic}: {message}", evnt.ApplicationMessage.Topic, evnt.ApplicationMessage.ConvertPayloadToString()); } - OnApplicationMessageReceived?.Invoke(this, evnt); }catch(Exception e) { _logger.LogWarning(e, "Failed to handle message to topic {topic}", evnt.ApplicationMessage.Topic); - _counters.IncreaseMessagesHandled(false); + _counters.IncreaseMessagesHandled(success: false); return Task.CompletedTask; } - _counters.IncreaseMessagesHandled(true); + _counters.IncreaseMessagesHandled(success: true); return Task.CompletedTask; }; + _logger.LogInformation("Starting mqttclient"); await _mqttClient.StartAsync(optionsBuilder.Build()); } diff --git a/src/ToMqttNet/MqttCounters.cs b/src/ToMqttNet/MqttCounters.cs index c5c1f95..1529c14 100644 --- a/src/ToMqttNet/MqttCounters.cs +++ b/src/ToMqttNet/MqttCounters.cs @@ -8,7 +8,8 @@ public class MqttCounters private readonly Counter _messagesSent; private readonly Counter _messagesHandled; - private int _connections; + private Func _pendingMessages = () => 0; + private int _connections = 0; public MqttCounters(IMeterFactory meterFactory) { @@ -17,6 +18,7 @@ public MqttCounters(IMeterFactory meterFactory) _messagesHandled = _scope.CreateCounter("mqtt_client.messages_handled_total", "messages", description: "Amount of MQTT packages received"); _messagesSent = _scope.CreateCounter("mqtt_client.messages_sent_total", "messages", description: "Amount of MQTT packages sent"); _scope.CreateObservableGauge("mqtt_client.connections", () => _connections, unit: "connections", description: "Amount of MQTT connections created"); + _scope.CreateObservableGauge("mqtt_client.pending_messages", _pendingMessages, unit: "messages", description: "Amount of ingoing MQTT messages pending to be processed by the client"); } public void IncreaseMessagesSent() @@ -33,4 +35,9 @@ public void SetConnections(int amount) { _connections = amount; } + + public void SetPendingMessages(Func amount) + { + _pendingMessages = amount; + } } \ No newline at end of file