Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding database name config #17

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions couchdb-saver/couchdb_saver_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import couchdb
import schedule
import paho.mqtt.client as mqtt
import logging

from base_mqtt_pub_sub import BaseMQTTPubSub

Expand All @@ -27,11 +28,13 @@ def __init__(
self: Any,
sensor_save_topic: str,
telemetry_save_topic: str,
audio_save_topic: str,
audio_base64_topic: str,
image_base64_topic: str,
couchdb_error_topic: str,
couchdb_user: str,
couchdb_password: str,
couchdb_server_ip: str,
couchdb_database_name: str,
device_ip: str,
debug: bool = False,
**kwargs: Any,
Expand All @@ -44,7 +47,7 @@ def __init__(
data sent to the database
telemetry_save_topic (str): the MQTT telemetry topic to subscribe to and
write the data sent to the database
audio_save_topic (str): the MQTT audio file topic to subscribe to and write
audio_base64_topic (str): the MQTT audio file topic to subscribe to and write
the data sent to the database
couchdb_error_topic (str): the MQTT topic to broadcast any CouchDB errors onto
couchdb_user (str): the username for CouchDB authentication
Expand All @@ -58,11 +61,13 @@ def __init__(
# assign constructor parameters to class attributes
self.sensor_save_topic = sensor_save_topic
self.telemetry_save_topic = telemetry_save_topic
self.audio_save_topic = audio_save_topic
self.audio_base64_topic = audio_base64_topic
self.image_base64_topic = image_base64_topic
self.couchdb_error_topic = couchdb_error_topic
self.couchdb_user = couchdb_user
self.couchdb_password = couchdb_password
self.couchdb_server_ip = couchdb_server_ip
self.couchdb_database_name = couchdb_database_name
self.device_ip = device_ip
self.debug = debug

Expand Down Expand Up @@ -94,19 +99,23 @@ class constructor or set later through user_data_set().
# validate payload JSON against JSON schema
jsonschema.validate(instance=payload_json_str, schema=self.schema)
except jsonschema.exceptions.ValidationError as err:
if self.debug:
# send validation errors on CouchDB error topic
self.publish_to_topic(self.couchdb_error_topic, err)
logging.warn(f"JSON validation failed for {payload_json_str}")
# if self.debug:
# send validation errors on CouchDB error topic
self.publish_to_topic(self.couchdb_error_topic, str(err))

# connect to local CouchDB instance
couch = couchdb.Server(f"http://admin:PASSWORD@{self.device_ip}:5984/")
couch = couchdb.Server(
f"http://{self.couchdb_user}:{self.couchdb_password}@{self.device_ip}:5984/"
)
database = (
couch.create("aisonobuoy")
if "aisonobuoy" not in couch
else couch["aisonobuoy"]
couch.create(self.couchdb_database_name)
if self.couchdb_database_name not in couch
else couch[self.couchdb_database_name]
)
# write to DB
database.save(payload_json_str)
_id, _ = database.save(payload_json_str)
logging.info(f"Document inserted at {_id}")

def main(self: Any) -> None:
"""Main loop and function that setup the heartbeat to keep the TCP/IP
Expand All @@ -119,9 +128,19 @@ def main(self: Any) -> None:

# subscribe to topics for database writing — callbacks are all the same
self.add_subscribe_topics(
[self.sensor_save_topic, self.telemetry_save_topic, self.audio_save_topic],
[self._to_save_callback, self._to_save_callback, self._to_save_callback],
[2, 2, 2],
[
self.sensor_save_topic,
self.telemetry_save_topic,
self.audio_base64_topic,
self.image_base64_topic,
],
[
self._to_save_callback,
self._to_save_callback,
self._to_save_callback,
self._to_save_callback,
],
[2, 2, 2, 2],
)

while True:
Expand All @@ -139,11 +158,13 @@ def main(self: Any) -> None:
saver = CouchDBSaverPubSub(
sensor_save_topic=str(os.environ.get("SENSOR_TOPIC")),
telemetry_save_topic=str(os.environ.get("TELEMETRY_TOPIC")),
audio_save_topic=str(os.environ.get("AUDIO_TOPIC")),
audio_base64_topic=str(os.environ.get("AUDIO_TOPIC")),
image_base64_topic=str(os.environ.get("IMAGE_TOPIC")),
couchdb_error_topic=str(os.environ.get("COUCHDB_ERROR_TOPIC")),
couchdb_user=str(os.environ.get("COUCHDB_USER")),
couchdb_password=str(os.environ.get("COUCHDB_PASSWORD")),
couchdb_server_ip=str(os.environ.get("COUCHDB_SERVER_IP_ADDR")),
couchdb_database_name=str(os.environ.get("COUCHDB_DATABASE_NAME")),
device_ip=str(os.environ.get("DEVICE_IP")),
mqtt_ip=str(os.environ.get("MQTT_IP")),
)
Expand Down
10 changes: 6 additions & 4 deletions example.env
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
HOSTNAME=pinebuoy-001
SENSOR_TOPIC=/AISonobuoy/${HOSTNAME}/AIS/edgetech-daisy/bytestring
TELEMETRY_TOPIC=/AISonobuoy/${HOSTNAME}/telemetry/telemetry/string
AUDIO_TOPIC=/AISonobuoy/${HOSTNAME}/Audio/edgetech-audio-recorder/string
COUCHDB_ERROR_TOPIC=/AISonobuoy/${HOSTNAME}/couchdbsaver/couchdb/errstring
SENSOR_TOPIC=/Multimodal/${HOSTNAME}/AIS/edgetech-daisy/bytestring
TELEMETRY_TOPIC=/Multimodal/${HOSTNAME}/telemetry/telemetry/string
AUDIO_TOPIC=/Multimodal/${HOSTNAME}/AudioClip/edgetech-audio-recorder/string
IMAGE_TOPIC=/Multimodal/${HOSTNAME}ImageClip/edgetech-axis-ptz-controller/string
COUCHDB_ERROR_TOPIC=/Multimodal/${HOSTNAME}/couchdbsaver/couchdb/errstring
MQTT_IP=mqtt
COUCHDB_USER=<YOUR_COUCHDB_USER>
COUCHDB_PASSWORD=<YOUR_COUCHDB_PASSWORD>
COUCHDB_SERVER_IP_ADDR=<YOUR_COUCHDB_SERVER_IP_ADDR>
DEVICE_IP=<YOUR_DEVICE_IP>
COUCHDB_DATABASE_NAME=<YOUR_DATABASE_NAME>