Skip to content

Commit

Permalink
Merge pull request #30 from fledge-iot/2.4.0RC
Browse files Browse the repository at this point in the history
2.4.0RC
  • Loading branch information
dianomicbot authored Apr 17, 2024
2 parents 9216f9d + 59c0e23 commit e8b419b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 14 deletions.
4 changes: 2 additions & 2 deletions VERSION.south.mqtt-readings
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fledge_south_mqtt-readings_version=2.3.0
fledge_version>=2.3
fledge_south_mqtt-readings_version=2.4.0
fledge_version>=2.4
Binary file modified docs/images/mqtt-sub.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 7 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ To create a south service you, as with any other south plugin

- **Asset Name**: Name of Asset.

- **Datapoint Name**: Datapoint name to be used in the reading object only for a primitive value, published to the topic

- Click *Next*

- Enable your service and click *Done*
Expand All @@ -50,13 +52,16 @@ Message Payload
---------------

The content of the message payload published to the topic, to which the service is configured to subscribe,
should be parsable to a JSON object.
should be parsable to a JSON object, Integer, Float or String.

e.g. `'{"humidity": 93.29, "temp": 16.82}'`
e.g. `'{"humidity": 93.29, "temp": 16.82}'`, `'100'`, `'90.9'`, `'STRING'`

.. code-block:: console
$ mosquitto_pub -h localhost -t "Room1/conditions" -m '{"humidity": 93.29, "temp": 16.82}'
$ mosquitto_pub -h localhost -t "Room1/conditions" -m '100'
$ mosquitto_pub -h localhost -t "Room1/conditions" -m '90.9'
$ mosquitto_pub -h localhost -t "Room1/conditions" -m 'STRING'
The mosquitto_pub client utility comes with the mosquitto package, and a great tool for conducting quick tests and troubleshooting.
https://mosquitto.org/man/mosquitto_pub-1.html
2 changes: 2 additions & 0 deletions docs/keywords
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
MQTT

4 changes: 4 additions & 0 deletions mqtt-pub/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def on_disconnect(client, userdata, rc):

# faking
def prepare_data():
# return "100"
# return "90.9"
# return "STRING"
# return '{"abc": 1}'
h = float("{0:.2f}".format(random.uniform(50, 100)))
t = float("{0:.2f}".format(random.uniform(10, 40)))
data = dict()
Expand Down
44 changes: 34 additions & 10 deletions python/fledge/plugins/south/mqtt-readings/mqtt-readings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import copy
import json
import logging
import uuid

import paho.mqtt.client as mqtt

Expand All @@ -57,12 +56,12 @@
from fledge.services.south.ingest import Ingest
import async_ingest

__author__ = "Praveen Garg"
__copyright__ = "Copyright (c) 2020 Dianomic Systems, Inc."
__author__ = "Praveen Garg, Oskar Gert"
__copyright__ = "Copyright (c) 2024 Dianomic Systems, Inc."
__license__ = "Apache 2.0"
__version__ = "${VERSION}"

_LOGGER = logger.setup(__name__, level=logging.INFO)
_LOGGER = logger.setup(__name__, level=logging.WARNING)

c_callback = None
c_ingest_ref = None
Expand Down Expand Up @@ -100,7 +99,7 @@
},
'password': {
'description': 'Password for broker authentication',
'type': 'string',
'type': 'password',
'default': '',
'order': '4',
'displayName': 'Password'
Expand Down Expand Up @@ -136,15 +135,24 @@
'default': 'mqtt-',
'order': '8',
'displayName': 'Asset Name',
'mandatory': 'true'
'mandatory': 'true',
'group': 'Reading'
},
'reading_datapoint_name_for_primitive_value': {
'description': 'Datapoint name to be used in the reading object only for a primitive value, published to the topic',
'type': 'string',
'default': 'datapoint',
'order': '9',
'displayName': 'Datapoint Name',
'group': 'Reading'
}
}


def plugin_info():
return {
'name': 'MQTT Subscriber',
'version': '2.3.0',
'version': '2.4.0',
'mode': 'async',
'type': 'south',
'interface': '1.0',
Expand Down Expand Up @@ -242,7 +250,7 @@ def plugin_register_ingest(handle, callback, ingest_ref):
class MqttSubscriberClient(object):
""" mqtt listener class"""

__slots__ = ['mqtt_client', 'broker_host', 'broker_port', 'username', 'password', 'topic', 'qos', 'keep_alive_interval', 'asset', 'loop']
__slots__ = ['mqtt_client', 'broker_host', 'broker_port', 'username', 'password', 'topic', 'qos', 'keep_alive_interval', 'asset', 'reading_datapoint_name_for_primitive_value', 'loop']

def __init__(self, config):
self.mqtt_client = mqtt.Client()
Expand All @@ -253,7 +261,9 @@ def __init__(self, config):
self.topic = config['topic']['value']
self.qos = int(config['qos']['value'])
self.keep_alive_interval = int(config['keepAliveInterval']['value'])

self.asset = config['assetName']['value']
self.reading_datapoint_name_for_primitive_value = config['reading_datapoint_name_for_primitive_value']['value']

def on_connect(self, client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server
Expand Down Expand Up @@ -301,10 +311,24 @@ def stop(self):
self.mqtt_client.disconnect()
self.mqtt_client.loop_stop()

def convert(self, msg):
constructors = [json.loads, int, float, str]
for constructor in constructors:
try:
# Only convert if type is string
converted_msg = constructor(msg) if type(msg) == str else msg
# Create dict if converted msg isn't already a dict
if not isinstance(converted_msg, dict):
converted_msg = {self.reading_datapoint_name_for_primitive_value: converted_msg}
except (ValueError, TypeError) as error:
pass
else:
return converted_msg
_LOGGER.exception("Unable to convert payload '%s' to a suitable type", str(msg))

async def save(self, msg):
"""Store msg content to Fledge """
# TODO: string and other types?
payload_json = json.loads(msg.payload.decode('utf-8'))
payload_json = self.convert(msg.payload.decode('utf-8'))
_LOGGER.debug("Ingesting %s on topic %s", payload_json, str(msg.topic))
data = {
'asset': self.asset,
Expand Down

0 comments on commit e8b419b

Please sign in to comment.