From cdbc8398577ce968b71e1b1b3f6cded738639c1c Mon Sep 17 00:00:00 2001 From: arichadda Date: Thu, 24 Aug 2023 10:21:18 -0400 Subject: [PATCH 1/2] adding database name config --- couchdb-saver/couchdb_saver_pub_sub.py | 40 +++++++++++++++++++------- example.env | 10 ++++--- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/couchdb-saver/couchdb_saver_pub_sub.py b/couchdb-saver/couchdb_saver_pub_sub.py index 7fb2425..77c5018 100644 --- a/couchdb-saver/couchdb_saver_pub_sub.py +++ b/couchdb-saver/couchdb_saver_pub_sub.py @@ -27,11 +27,13 @@ def __init__( self: Any, sensor_save_topic: str, telemetry_save_topic: str, - audio_save_topic: str, + audio_base64_topic: str, + image_base64_topic: str, couchdb_error_topic: str, couchdb_user: str, couchdb_password: str, couchdb_server_ip: str, + couchdb_database_name: str, device_ip: str, debug: bool = False, **kwargs: Any, @@ -44,7 +46,7 @@ def __init__( data sent to the database telemetry_save_topic (str): the MQTT telemetry topic to subscribe to and write the data sent to the database - audio_save_topic (str): the MQTT audio file topic to subscribe to and write + audio_base64_topic (str): the MQTT audio file topic to subscribe to and write the data sent to the database couchdb_error_topic (str): the MQTT topic to broadcast any CouchDB errors onto couchdb_user (str): the username for CouchDB authentication @@ -58,11 +60,13 @@ def __init__( # assign constructor parameters to class attributes self.sensor_save_topic = sensor_save_topic self.telemetry_save_topic = telemetry_save_topic - self.audio_save_topic = audio_save_topic + self.audio_base64_topic = audio_base64_topic + self.image_base64_topic = image_base64_topic self.couchdb_error_topic = couchdb_error_topic self.couchdb_user = couchdb_user self.couchdb_password = couchdb_password self.couchdb_server_ip = couchdb_server_ip + self.couchdb_database_name = couchdb_database_name self.device_ip = device_ip self.debug = debug @@ -99,11 +103,13 @@ class constructor or set later through user_data_set(). self.publish_to_topic(self.couchdb_error_topic, err) # connect to local CouchDB instance - couch = couchdb.Server(f"http://admin:PASSWORD@{self.device_ip}:5984/") + couch = couchdb.Server( + f"http://{self.couchdb_user}:{self.couchdb_password}@{self.device_ip}:5984/" + ) database = ( - couch.create("aisonobuoy") - if "aisonobuoy" not in couch - else couch["aisonobuoy"] + couch.create(self.couchdb_database_name) + if self.couchdb_database_name not in couch + else couch[self.couchdb_database_name] ) # write to DB database.save(payload_json_str) @@ -119,9 +125,19 @@ def main(self: Any) -> None: # subscribe to topics for database writing — callbacks are all the same self.add_subscribe_topics( - [self.sensor_save_topic, self.telemetry_save_topic, self.audio_save_topic], - [self._to_save_callback, self._to_save_callback, self._to_save_callback], - [2, 2, 2], + [ + self.sensor_save_topic, + self.telemetry_save_topic, + self.audio_base64_topic, + self.image_base64_topic, + ], + [ + self._to_save_callback, + self._to_save_callback, + self._to_save_callback, + self._to_save_callback, + ], + [2, 2, 2, 2], ) while True: @@ -139,11 +155,13 @@ def main(self: Any) -> None: saver = CouchDBSaverPubSub( sensor_save_topic=str(os.environ.get("SENSOR_TOPIC")), telemetry_save_topic=str(os.environ.get("TELEMETRY_TOPIC")), - audio_save_topic=str(os.environ.get("AUDIO_TOPIC")), + audio_base64_topic=str(os.environ.get("AUDIO_TOPIC")), + image_base64_topic=str(os.environ.get("IMAGE_TOPIC")), couchdb_error_topic=str(os.environ.get("COUCHDB_ERROR_TOPIC")), couchdb_user=str(os.environ.get("COUCHDB_USER")), couchdb_password=str(os.environ.get("COUCHDB_PASSWORD")), couchdb_server_ip=str(os.environ.get("COUCHDB_SERVER_IP_ADDR")), + couchdb_database_name=str(os.environ.get("COUCHDB_DATABASE_NAME")), device_ip=str(os.environ.get("DEVICE_IP")), mqtt_ip=str(os.environ.get("MQTT_IP")), ) diff --git a/example.env b/example.env index 1f53743..d755f0e 100644 --- a/example.env +++ b/example.env @@ -1,10 +1,12 @@ HOSTNAME=pinebuoy-001 -SENSOR_TOPIC=/AISonobuoy/${HOSTNAME}/AIS/edgetech-daisy/bytestring -TELEMETRY_TOPIC=/AISonobuoy/${HOSTNAME}/telemetry/telemetry/string -AUDIO_TOPIC=/AISonobuoy/${HOSTNAME}/Audio/edgetech-audio-recorder/string -COUCHDB_ERROR_TOPIC=/AISonobuoy/${HOSTNAME}/couchdbsaver/couchdb/errstring +SENSOR_TOPIC=/Multimodal/${HOSTNAME}/AIS/edgetech-daisy/bytestring +TELEMETRY_TOPIC=/Multimodal/${HOSTNAME}/telemetry/telemetry/string +AUDIO_TOPIC=/Multimodal/${HOSTNAME}/AudioClip/edgetech-audio-recorder/string +IMAGE_TOPIC=/Multimodal/${HOSTNAME}ImageClip/edgetech-axis-ptz-controller/string +COUCHDB_ERROR_TOPIC=/Multimodal/${HOSTNAME}/couchdbsaver/couchdb/errstring MQTT_IP=mqtt COUCHDB_USER= COUCHDB_PASSWORD= COUCHDB_SERVER_IP_ADDR= DEVICE_IP= +COUCHDB_DATABASE_NAME= \ No newline at end of file From 6b1f4f70860f1f992e04246f9a3a8a41ca148a6d Mon Sep 17 00:00:00 2001 From: arichadda Date: Thu, 31 Aug 2023 16:39:52 -0400 Subject: [PATCH 2/2] adding additional logging --- couchdb-saver/couchdb_saver_pub_sub.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/couchdb-saver/couchdb_saver_pub_sub.py b/couchdb-saver/couchdb_saver_pub_sub.py index 77c5018..564f004 100644 --- a/couchdb-saver/couchdb_saver_pub_sub.py +++ b/couchdb-saver/couchdb_saver_pub_sub.py @@ -11,6 +11,7 @@ import couchdb import schedule import paho.mqtt.client as mqtt +import logging from base_mqtt_pub_sub import BaseMQTTPubSub @@ -98,9 +99,10 @@ class constructor or set later through user_data_set(). # validate payload JSON against JSON schema jsonschema.validate(instance=payload_json_str, schema=self.schema) except jsonschema.exceptions.ValidationError as err: - if self.debug: - # send validation errors on CouchDB error topic - self.publish_to_topic(self.couchdb_error_topic, err) + logging.warn(f"JSON validation failed for {payload_json_str}") + # if self.debug: + # send validation errors on CouchDB error topic + self.publish_to_topic(self.couchdb_error_topic, str(err)) # connect to local CouchDB instance couch = couchdb.Server( @@ -112,7 +114,8 @@ class constructor or set later through user_data_set(). else couch[self.couchdb_database_name] ) # write to DB - database.save(payload_json_str) + _id, _ = database.save(payload_json_str) + logging.info(f"Document inserted at {_id}") def main(self: Any) -> None: """Main loop and function that setup the heartbeat to keep the TCP/IP