Skip to content

Commit

Permalink
Merge pull request #24 from JonasMH/develop
Browse files Browse the repository at this point in the history
Improve logging
  • Loading branch information
JonasMH authored Oct 10, 2023
2 parents a3365f4 + f63d52e commit f8e7931
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
36 changes: 24 additions & 12 deletions src/ToMqttNet/MqttConnectionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,48 +37,60 @@ public MqttConnectionService(

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Executing {backgroundService}", GetType().FullName);
var options = MqttOptions.ClientOptions;

if(string.IsNullOrEmpty(options.ClientId))
{
options.ClientId = MqttOptions.NodeId + "-" + _instanceId;
}
{
options.ClientId = MqttOptions.NodeId + "-" + _instanceId;
}

options.WillPayload = Encoding.UTF8.GetBytes("0");
options.WillTopic = $"{MqttOptions.NodeId}/connected";
options.WillRetain = true;

if(options.ChannelOptions == null) {
options.ChannelOptions = new MqttClientTcpOptions
{
Server = "mosquitto",
{
Server = "mosquitto",
Port = 1883
};
};
}

var optionsBuilder = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(options);

_counters.SetPendingMessages(() => _mqttClient.PendingApplicationMessagesCount);

_mqttClient.ConnectionStateChangedAsync += (evnt) => {
_counters.SetConnections(_mqttClient.IsConnected ? 1 : 0);
return Task.CompletedTask;
};

_mqttClient.ConnectingFailedAsync += (evnt) => {
_logger.LogWarning(evnt.Exception, "Connection to mqtt failed");
_counters.SetConnections(0);
return Task.CompletedTask;
};

_mqttClient.ConnectedAsync += async (evnt) =>
{
_logger.LogInformation("Connected to mqtt: {reason}", evnt.ConnectResult.ReasonString);
await _mqttClient.EnqueueAsync(
new MqttApplicationMessageBuilder()
.WithPayload("2")
.WithTopic($"{MqttOptions.NodeId}/connected")
.WithRetainFlag()
.Build());
_counters.SetConnections(1);
OnConnect?.Invoke(this, new EventArgs());
};

_mqttClient.DisconnectedAsync += (evnt) =>
{
_logger.LogInformation(evnt.Exception, "Disconnected from mqtt: {reason}", evnt.Reason);
_counters.SetConnections(0);
OnDisconnect?.Invoke(this, new EventArgs());
return Task.CompletedTask;
};
Expand All @@ -91,18 +103,18 @@ await _mqttClient.EnqueueAsync(
{
_logger.LogTrace("{topic}: {message}", evnt.ApplicationMessage.Topic, evnt.ApplicationMessage.ConvertPayloadToString());
}
OnApplicationMessageReceived?.Invoke(this, evnt);
}catch(Exception e)
{
_logger.LogWarning(e, "Failed to handle message to topic {topic}", evnt.ApplicationMessage.Topic);
_counters.IncreaseMessagesHandled(false);
_counters.IncreaseMessagesHandled(success: false);
return Task.CompletedTask;
}
_counters.IncreaseMessagesHandled(true);
_counters.IncreaseMessagesHandled(success: true);
return Task.CompletedTask;
};

_logger.LogInformation("Starting mqttclient");
await _mqttClient.StartAsync(optionsBuilder.Build());
}

Expand Down
9 changes: 8 additions & 1 deletion src/ToMqttNet/MqttCounters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ public class MqttCounters

private readonly Counter<long> _messagesSent;
private readonly Counter<long> _messagesHandled;
private int _connections;
private Func<int> _pendingMessages = () => 0;
private int _connections = 0;

public MqttCounters(IMeterFactory meterFactory)
{
Expand All @@ -17,6 +18,7 @@ public MqttCounters(IMeterFactory meterFactory)
_messagesHandled = _scope.CreateCounter<long>("mqtt_client.messages_handled_total", "messages", description: "Amount of MQTT packages received");
_messagesSent = _scope.CreateCounter<long>("mqtt_client.messages_sent_total", "messages", description: "Amount of MQTT packages sent");
_scope.CreateObservableGauge<int>("mqtt_client.connections", () => _connections, unit: "connections", description: "Amount of MQTT connections created");
_scope.CreateObservableGauge<int>("mqtt_client.pending_messages", _pendingMessages, unit: "messages", description: "Amount of ingoing MQTT messages pending to be processed by the client");
}

public void IncreaseMessagesSent()
Expand All @@ -33,4 +35,9 @@ public void SetConnections(int amount)
{
_connections = amount;
}

public void SetPendingMessages(Func<int> amount)
{
_pendingMessages = amount;
}
}

0 comments on commit f8e7931

Please sign in to comment.