diff --git a/VERSION.south.mqtt-readings b/VERSION.south.mqtt-readings index d660c9f..b5dd426 100644 --- a/VERSION.south.mqtt-readings +++ b/VERSION.south.mqtt-readings @@ -1,2 +1,2 @@ -fledge_south_mqtt-readings_version=2.3.0 -fledge_version>=2.3 +fledge_south_mqtt-readings_version=2.4.0 +fledge_version>=2.4 diff --git a/docs/images/mqtt-sub.png b/docs/images/mqtt-sub.png index 2cac45a..068a89d 100644 Binary files a/docs/images/mqtt-sub.png and b/docs/images/mqtt-sub.png differ diff --git a/docs/index.rst b/docs/index.rst index 80a7eb3..ebdc284 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -41,6 +41,8 @@ To create a south service you, as with any other south plugin - **Asset Name**: Name of Asset. + - **Datapoint Name**: Datapoint name to be used in the reading object only for a primitive value, published to the topic + - Click *Next* - Enable your service and click *Done* @@ -50,13 +52,16 @@ Message Payload --------------- The content of the message payload published to the topic, to which the service is configured to subscribe, -should be parsable to a JSON object. +should be parsable to a JSON object, Integer, Float or String. -e.g. `'{"humidity": 93.29, "temp": 16.82}'` +e.g. `'{"humidity": 93.29, "temp": 16.82}'`, `'100'`, `'90.9'`, `'STRING'` .. code-block:: console $ mosquitto_pub -h localhost -t "Room1/conditions" -m '{"humidity": 93.29, "temp": 16.82}' + $ mosquitto_pub -h localhost -t "Room1/conditions" -m '100' + $ mosquitto_pub -h localhost -t "Room1/conditions" -m '90.9' + $ mosquitto_pub -h localhost -t "Room1/conditions" -m 'STRING' The mosquitto_pub client utility comes with the mosquitto package, and a great tool for conducting quick tests and troubleshooting. https://mosquitto.org/man/mosquitto_pub-1.html diff --git a/docs/keywords b/docs/keywords new file mode 100644 index 0000000..17ce7a6 --- /dev/null +++ b/docs/keywords @@ -0,0 +1,2 @@ +MQTT + diff --git a/mqtt-pub/__main__.py b/mqtt-pub/__main__.py index dafff5c..235b32a 100644 --- a/mqtt-pub/__main__.py +++ b/mqtt-pub/__main__.py @@ -34,6 +34,10 @@ def on_disconnect(client, userdata, rc): # faking def prepare_data(): + # return "100" + # return "90.9" + # return "STRING" + # return '{"abc": 1}' h = float("{0:.2f}".format(random.uniform(50, 100))) t = float("{0:.2f}".format(random.uniform(10, 40))) data = dict() diff --git a/python/fledge/plugins/south/mqtt-readings/mqtt-readings.py b/python/fledge/plugins/south/mqtt-readings/mqtt-readings.py index 010a8f5..ad1af42 100644 --- a/python/fledge/plugins/south/mqtt-readings/mqtt-readings.py +++ b/python/fledge/plugins/south/mqtt-readings/mqtt-readings.py @@ -47,7 +47,6 @@ import copy import json import logging -import uuid import paho.mqtt.client as mqtt @@ -57,12 +56,12 @@ from fledge.services.south.ingest import Ingest import async_ingest -__author__ = "Praveen Garg" -__copyright__ = "Copyright (c) 2020 Dianomic Systems, Inc." +__author__ = "Praveen Garg, Oskar Gert" +__copyright__ = "Copyright (c) 2024 Dianomic Systems, Inc." __license__ = "Apache 2.0" __version__ = "${VERSION}" -_LOGGER = logger.setup(__name__, level=logging.INFO) +_LOGGER = logger.setup(__name__, level=logging.WARNING) c_callback = None c_ingest_ref = None @@ -100,7 +99,7 @@ }, 'password': { 'description': 'Password for broker authentication', - 'type': 'string', + 'type': 'password', 'default': '', 'order': '4', 'displayName': 'Password' @@ -136,7 +135,16 @@ 'default': 'mqtt-', 'order': '8', 'displayName': 'Asset Name', - 'mandatory': 'true' + 'mandatory': 'true', + 'group': 'Reading' + }, + 'reading_datapoint_name_for_primitive_value': { + 'description': 'Datapoint name to be used in the reading object only for a primitive value, published to the topic', + 'type': 'string', + 'default': 'datapoint', + 'order': '9', + 'displayName': 'Datapoint Name', + 'group': 'Reading' } } @@ -144,7 +152,7 @@ def plugin_info(): return { 'name': 'MQTT Subscriber', - 'version': '2.3.0', + 'version': '2.4.0', 'mode': 'async', 'type': 'south', 'interface': '1.0', @@ -242,7 +250,7 @@ def plugin_register_ingest(handle, callback, ingest_ref): class MqttSubscriberClient(object): """ mqtt listener class""" - __slots__ = ['mqtt_client', 'broker_host', 'broker_port', 'username', 'password', 'topic', 'qos', 'keep_alive_interval', 'asset', 'loop'] + __slots__ = ['mqtt_client', 'broker_host', 'broker_port', 'username', 'password', 'topic', 'qos', 'keep_alive_interval', 'asset', 'reading_datapoint_name_for_primitive_value', 'loop'] def __init__(self, config): self.mqtt_client = mqtt.Client() @@ -253,7 +261,9 @@ def __init__(self, config): self.topic = config['topic']['value'] self.qos = int(config['qos']['value']) self.keep_alive_interval = int(config['keepAliveInterval']['value']) + self.asset = config['assetName']['value'] + self.reading_datapoint_name_for_primitive_value = config['reading_datapoint_name_for_primitive_value']['value'] def on_connect(self, client, userdata, flags, rc): """ The callback for when the client receives a CONNACK response from the server @@ -301,10 +311,24 @@ def stop(self): self.mqtt_client.disconnect() self.mqtt_client.loop_stop() + def convert(self, msg): + constructors = [json.loads, int, float, str] + for constructor in constructors: + try: + # Only convert if type is string + converted_msg = constructor(msg) if type(msg) == str else msg + # Create dict if converted msg isn't already a dict + if not isinstance(converted_msg, dict): + converted_msg = {self.reading_datapoint_name_for_primitive_value: converted_msg} + except (ValueError, TypeError) as error: + pass + else: + return converted_msg + _LOGGER.exception("Unable to convert payload '%s' to a suitable type", str(msg)) + async def save(self, msg): """Store msg content to Fledge """ - # TODO: string and other types? - payload_json = json.loads(msg.payload.decode('utf-8')) + payload_json = self.convert(msg.payload.decode('utf-8')) _LOGGER.debug("Ingesting %s on topic %s", payload_json, str(msg.topic)) data = { 'asset': self.asset,