diff --git a/lib/pysquared/cdh.py b/lib/pysquared/cdh.py index 4aab42f1..4a854fbf 100644 --- a/lib/pysquared/cdh.py +++ b/lib/pysquared/cdh.py @@ -1,168 +1,166 @@ -import json import random import time -import lib.pysquared.commandsConfig as commandsConfig - -commands = commandsConfig.commands -# parses json & assigns data to variables -with open("config.json", "r") as f: - json_data = f.read() -config = json.loads(json_data) -jokereply = config["jokereply"] -super_secret_code = config["super_secret_code"].encode("utf-8") -repeat_code = config["repeat_code"].encode("utf-8") -print(f"Super secret code is: {super_secret_code}") - - -############### hot start helper ############### -def hotstart_handler(cubesat, msg): - # try - try: - cubesat.radio1.node = cubesat.cfg["id"] # this sat's radiohead ID - cubesat.radio1.destination = cubesat.cfg["gs"] # target gs radiohead ID - except Exception: - pass - # check that message is for me - if msg[0] == cubesat.radio1.node: - # TODO check for optional radio config - - # manually send ACK - cubesat.radio1.send("!", identifier=msg[2], flags=0x80) - # TODO remove this delay. for testing only! - time.sleep(0.5) - message_handler(cubesat, msg) - else: - print( - f"not for me? target id: {hex(msg[0])}, my id: {hex(cubesat.radio1.node)}" +from lib.pysquared.config import Config + + +class CommandDataHandler: + """ + Constructor + """ + + def __init__(self, config: Config) -> None: + self._commands: dict = { + b"\x8eb": "noop", + b"\xd4\x9f": "hreset", + b"\x12\x06": "shutdown", + b"8\x93": "query", + b"\x96\xa2": "exec_cmd", + b"\xa5\xb4": "joke_reply", + b"\x56\xc4": "FSK", + } + self._jokereply: list[str] = config.getList("jokereply") + self._super_secret_code: str = config.getStr("super_secret_code").encode( + "utf-8" ) - - -############### message handler ############### -def message_handler(cubesat, msg): - multi_msg = False - if len(msg) >= 10: # [RH header 4 bytes] [pass-code(4 bytes)] [cmd 2 bytes] - if bytes(msg[4:8]) == super_secret_code: - # check if multi-message flag is set - if msg[3] & 0x08: - multi_msg = True - # strip off RH header - msg = bytes(msg[4:]) - cmd = msg[4:6] # [pass-code(4 bytes)] [cmd 2 bytes] [args] - cmd_args = None - if len(msg) > 6: - print("command with args") - try: - cmd_args = msg[6:] # arguments are everything after - print("cmd args: {}".format(cmd_args)) - except Exception as e: - print("arg decoding error: {}".format(e)) - if cmd in commands: + self._repeat_code: str = config.getStr("repeat_code").encode("utf-8") + print(f"Super secret code is: {self._super_secret_code}") + + ############### hot start helper ############### + def hotstart_handler(self, cubesat, msg) -> None: + # check that message is for me + if msg[0] == cubesat.radio1.node: + # TODO check for optional radio config + + # manually send ACK + cubesat.radio1.send("!", identifier=msg[2], flags=0x80) + # TODO remove this delay. for testing only! + time.sleep(0.5) + self.message_handler(cubesat, msg) + else: + print( + f"not for me? target id: {hex(msg[0])}, my id: {hex(cubesat.radio1.node)}" + ) + + ############### message handler ############### + def message_handler(self, cubesat, msg) -> None: + multi_msg: bool = False + if len(msg) >= 10: # [RH header 4 bytes] [pass-code(4 bytes)] [cmd 2 bytes] + if bytes(msg[4:8]) == self._super_secret_code: + # check if multi-message flag is set + if msg[3] & 0x08: + multi_msg = True + # strip off RH header + msg = bytes(msg[4:]) + cmd = msg[4:6] # [pass-code(4 bytes)] [cmd 2 bytes] [args] + cmd_args = None + if len(msg) > 6: + print("command with args") + try: + cmd_args = msg[6:] # arguments are everything after + print("cmd args: {}".format(cmd_args)) + except Exception as e: + print("arg decoding error: {}".format(e)) + if cmd in self._commands: + try: + if cmd_args is None: + print("running {} (no args)".format(self._commands[cmd])) + # eval a string turns it into a func name + eval(self._commands[cmd])(cubesat) + else: + print( + "running {} (with args: {})".format( + self._commands[cmd], cmd_args + ) + ) + eval(self._commands[cmd])(cubesat, cmd_args) + except Exception as e: + print("something went wrong: {}".format(e)) + cubesat.radio1.send(str(e).encode()) + else: + print("invalid command!") + cubesat.radio1.send(b"invalid cmd" + msg[4:]) + # check for multi-message mode + if multi_msg: + # TODO check for optional radio config + print("multi-message mode enabled") + response = cubesat.radio1.receive( + keep_listening=True, + with_ack=True, + with_header=True, + view=True, + timeout=10, + ) + if response is not None: + cubesat.c_gs_resp += 1 + self.message_handler(cubesat, response) + elif bytes(msg[4:6]) == self._repeat_code: + print("Repeating last message!") try: - if cmd_args is None: - print("running {} (no args)".format(commands[cmd])) - # eval a string turns it into a func name - eval(commands[cmd])(cubesat) - else: - print( - "running {} (with args: {})".format(commands[cmd], cmd_args) - ) - eval(commands[cmd])(cubesat, cmd_args) + cubesat.radio1.send(msg[6:]) except Exception as e: - print("something went wrong: {}".format(e)) - cubesat.radio1.send(str(e).encode()) + print("error repeating message: {}".format(e)) else: - print("invalid command!") - cubesat.radio1.send(b"invalid cmd" + msg[4:]) - # check for multi-message mode - if multi_msg: - # TODO check for optional radio config - print("multi-message mode enabled") - response = cubesat.radio1.receive( - keep_listening=True, - with_ack=True, - with_header=True, - view=True, - timeout=10, - ) - if response is not None: - cubesat.c_gs_resp += 1 - message_handler(cubesat, response) - elif bytes(msg[4:6]) == repeat_code: - print("Repeating last message!") - try: - cubesat.radio1.send(msg[6:]) - except Exception as e: - print("error repeating message: {}".format(e)) - else: - print("bad code?") + print("bad code?") - -########### commands without arguments ########### -def noop(cubesat): - print("no-op") - pass - - -def hreset(cubesat): - print("Resetting") - try: - cubesat.radio1.send(data=b"resetting") - cubesat.micro.on_next_reset(cubesat.micro.RunMode.NORMAL) - cubesat.micro.reset() - except Exception: + ########### commands without arguments ########### + def noop(cubesat) -> None: + print("no-op") pass - -def FSK(cubesat): - cubesat.f_fsk = True - - -def joke_reply(cubesat): - joke = random.choice(jokereply) - print(joke) - cubesat.radio1.send(joke) - - -########### commands with arguments ########### - - -def shutdown(cubesat, args): - # make shutdown require yet another pass-code - if args == b"\x0b\xfdI\xec": - print("valid shutdown command received") - # set shutdown NVM bit flag - cubesat.f_shtdwn = True - - """ - Exercise for the user: - Implement a means of waking up from shutdown - See beep-sat guide for more details - https://pycubed.org/resources - """ - - # deep sleep + listen - # TODO config radio - cubesat.radio1.listen() - if "st" in cubesat.radio_cfg: - _t = cubesat.radio_cfg["st"] - else: - _t = 5 - import alarm - - time_alarm = alarm.time.TimeAlarm( - monotonic_time=time.monotonic() + eval("1e" + str(_t)) - ) # default 1 day - # set hot start flag right before sleeping - cubesat.f_hotstrt = True - alarm.exit_and_deep_sleep_until_alarms(time_alarm) - - -def query(cubesat, args): - print(f"query: {args}") - print(cubesat.radio1.send(data=str(eval(args)))) - - -def exec_cmd(cubesat, args): - print(f"exec: {args}") - exec(args) + def hreset(cubesat) -> None: + print("Resetting") + try: + cubesat.radio1.send(data=b"resetting") + cubesat.micro.on_next_reset(cubesat.micro.RunMode.NORMAL) + cubesat.micro.reset() + except Exception: + pass + + def FSK(cubesat) -> None: + cubesat.f_fsk: bool = True + + def joke_reply(self, cubesat) -> None: + joke: str = random.choice(self._jokereply) + print(joke) + cubesat.radio1.send(joke) + + ########### commands with arguments ########### + + def shutdown(cubesat, args) -> None: + # make shutdown require yet another pass-code + if args == b"\x0b\xfdI\xec": + print("valid shutdown command received") + # set shutdown NVM bit flag + cubesat.f_shtdwn: bool = True + + """ + Exercise for the user: + Implement a means of waking up from shutdown + See beep-sat guide for more details + https://pycubed.org/resources + """ + + # deep sleep + listen + # TODO config radio + cubesat.radio1.listen() + if "st" in cubesat.radio_cfg: + _t = cubesat.radio_cfg["st"] + else: + _t = 5 + import alarm + + time_alarm = alarm.time.TimeAlarm( + monotonic_time=time.monotonic() + eval("1e" + str(_t)) + ) # default 1 day + # set hot start flag right before sleeping + cubesat.f_hotstrt: bool = True + alarm.exit_and_deep_sleep_until_alarms(time_alarm) + + def query(cubesat, args) -> None: + print(f"query: {args}") + print(cubesat.radio1.send(data=str(eval(args)))) + + def exec_cmd(cubesat, args) -> None: + print(f"exec: {args}") + exec(args) diff --git a/lib/pysquared/commandsConfig.py b/lib/pysquared/commandsConfig.py deleted file mode 100644 index 3578b83f..00000000 --- a/lib/pysquared/commandsConfig.py +++ /dev/null @@ -1,10 +0,0 @@ -# file for configuring commands -commands = { - b"\x8eb": "noop", - b"\xd4\x9f": "hreset", - b"\x12\x06": "shutdown", - b"8\x93": "query", - b"\x96\xa2": "exec_cmd", - b"\xa5\xb4": "joke_reply", - b"\x56\xc4": "FSK", -} diff --git a/lib/pysquared/config.py b/lib/pysquared/config.py new file mode 100644 index 00000000..f3a20e73 --- /dev/null +++ b/lib/pysquared/config.py @@ -0,0 +1,120 @@ +""" +Class for encapsulating config.json. The goal is to +distribute these values across the files & variables +that use them. Instantiation happens in main. + +Also it allow values to be set temporarily using setter +function, and values can be set permanently using the +saver functions. Following the FPrime model. +""" + +import json + + +class Config: + """ + Constructor + """ + + def __init__(self) -> None: + # parses json & assigns data to variables + with open("config.json", "r") as f: + json_data = f.read() + self._config: dict = json.loads(json_data) + + """ + Categorized getter functions + """ + + def getStr(self, key: str) -> str: + """Gets a string value from the config dictionary""" + return self._config[key] + + def getInt(self, key: str) -> int: + """Gets an int value from the config dictionary""" + return self._config[key] + + def getFloat(self, key: str) -> float: + """Gets a float value from the config dictionary""" + return self._config[key] + + def getBool(self, key: str) -> bool: + """Gets a bool value from the config dictionary""" + return self._config[key] + + def getList(self, key: str) -> list[str]: + """Gets a list value from the config dictionary""" + return self._config[key] + + """ + Categorized setter functions + """ + + def setStr(self, key: str, value: str) -> None: + """Sets the string value in the config dictionary + Does not save value to disk, will not persist through reboots + """ + self._config[key] = value + + def setInt(self, key: str, value: int) -> None: + """Sets the string value in the config dictionary + Does not save value to disk, will not persist through reboots + """ + self._config[key] = value + + def setFloat(self, key: str, value: float) -> None: + """Sets the string value in the config dictionary + Does not save value to disk, will not persist through reboots + """ + self._config[key] = value + + def setBool(self, key: str, value: bool) -> None: + """Sets the string value in the config dictionary + Does not save value to disk, will not persist through reboots + """ + self._config[key] = value + + def setList(self, key: str, value: str) -> None: + """Sets the string value from a list inside of the config dictionary + Does not save value to disk, will not persist through reboots + """ + self._config[key].append(value) + + """ + Categorized saver functions + """ + + def saveStr(self, key: str, strValue: str) -> None: + """Saves the string value to config.json + Saves value to disk, will persist through reboots + """ + # add logic to write back to config + pass + + def saveInt(self, key: str, intValue: int) -> None: + """Saves the int value to config.json + Saves value to disk, will persist through reboots + """ + # add logic to write back to config + pass + + def saveFloat(self, key: str, floatValue: float) -> None: + """Saves the float value to config.json + Saves value to disk, will persist through reboots + """ + # add logic to write back to config + pass + + def saveBool(self, key: str, boolValue: bool) -> None: + """Saves the bool value to config.json + Saves value to disk, will persist through reboots + """ + # add logic to write back to config + pass + + def saveList(self, key: str, listValue: str) -> None: + """Saves the bool value to config.json + Saves value to disk, will persist through reboots + """ + # add logic to write back to config + pass diff --git a/lib/pysquared/functions.py b/lib/pysquared/functions.py index 9e406c9a..6a216ba0 100755 --- a/lib/pysquared/functions.py +++ b/lib/pysquared/functions.py @@ -6,7 +6,6 @@ """ import gc -import json import random import time import traceback @@ -14,6 +13,7 @@ import alarm from lib.pysquared.battery_helper import BatteryHelper +from lib.pysquared.config import Config from lib.pysquared.debugcolor import co from lib.pysquared.packet_manager import PacketManager from lib.pysquared.packet_sender import PacketSender @@ -32,7 +32,7 @@ def debug_print(self, statement: Any) -> None: if self.debug: print(co("[Functions]" + str(statement), "green", "bold")) - def __init__(self, cubesat: Satellite) -> None: + def __init__(self, cubesat: Satellite, config: Config) -> None: self.cubesat: Satellite = cubesat self.battery: BatteryHelper = BatteryHelper(cubesat) self.debug: bool = cubesat.debug @@ -41,23 +41,19 @@ def __init__(self, cubesat: Satellite) -> None: self.pm: PacketManager = PacketManager(max_packet_size=128) self.ps: PacketSender = PacketSender(cubesat.radio1, self.pm, max_retries=3) - # parses json & assigns data to variables - with open("config.json", "r") as f: - json_data = f.read() - config = json.loads(json_data) - - self.cubesatName: str = config["cubesatName"] + self.config: Config = config + self.cubesatName: str = config.getStr("cubesatName") self.Errorcount: int = 0 self.facestring: list = [None, None, None, None, None] - self.jokes: list[str] = config["jokes"] - self.last_battery_temp: float = config["last_battery_temp"] - self.sleep_duration: int = config["sleep_duration"] - self.callsign: str = config["callsign"] + self.jokes: list[str] = config.getList("jokes") + self.last_battery_temp: float = config.getFloat("last_battery_temp") + self.sleep_duration: int = config.getInt("sleep_duration") + self.callsign: str = config.getStr("callsign") self.state_bool: bool = False self.face_data_baton: bool = False - self.detumble_enable_z: bool = config["detumble_enable_z"] - self.detumble_enable_x: bool = config["detumble_enable_x"] - self.detumble_enable_y: bool = config["detumble_enable_y"] + self.detumble_enable_z: bool = config.getBool("detumble_enable_z") + self.detumble_enable_x: bool = config.getBool("detumble_enable_x") + self.detumble_enable_y: bool = config.getBool("detumble_enable_y") """ Satellite Management Functions @@ -239,7 +235,11 @@ def send_face(self) -> None: del Field def listen(self) -> bool: - import lib.pysquared.cdh as cdh + # need to instanciate cdh to feed it the config var + # assigned from the Config object + from lib.pysquared.cdh import CommandDataHandler + + cdh = CommandDataHandler(self.config) # This just passes the message through. Maybe add more functionality later. try: diff --git a/lib/pysquared/pysquared.py b/lib/pysquared/pysquared.py index 8a45b590..15db43b7 100755 --- a/lib/pysquared/pysquared.py +++ b/lib/pysquared/pysquared.py @@ -8,7 +8,6 @@ """ # Common CircuitPython Libs -import json import sys import time import traceback @@ -33,6 +32,7 @@ # Hardware Specific Libs from lib.adafruit_rfm import rfm9x, rfm9xfsk # Radio from lib.pysquared.bitflags import bitFlag, multiBitFlag +from lib.pysquared.config import Config # Configs from lib.pysquared.debugcolor import co # Importing typing libraries @@ -92,45 +92,33 @@ def error_print(self, statement: Any) -> None: if self.debug: print(co("[pysquared]" + str(statement), "red", "bold")) - def __init__(self) -> None: - # parses json & assigns data to variables - with open("config.json", "r") as f: - json_data = f.read() - config = json.loads(json_data) - + def __init__(self, config: Config) -> None: + self.cubesatName: str = config.getStr("cubesatName") """ Big init routine as the whole board is brought up. Starting with config variables. """ - self.debug: bool = config["debug"] # Define verbose output here. True or False - self.legacy: bool = config[ - "legacy" - ] # Define if the board is used with legacy or not - self.heating: bool = config["heating"] # Currently not used - self.orpheus: bool = config[ - "orpheus" - ] # Define if the board is used with Orpheus or not - self.is_licensed: bool = config["is_licensed"] + self.debug: bool = config.getBool("debug") + self.legacy: bool = config.getBool("legacy") + self.heating: bool = config.getBool("heating") + self.orpheus: bool = config.getBool("orpheus") # maybe change var name + self.is_licensed: bool = config.getBool("is_licensed") """ Define the normal power modes """ - self.NORMAL_TEMP: int = config["NORMAL_TEMP"] - self.NORMAL_BATT_TEMP: int = config[ - "NORMAL_BATT_TEMP" - ] # Set to 0 BEFORE FLIGHT!!!!! - self.NORMAL_MICRO_TEMP: int = config["NORMAL_MICRO_TEMP"] - self.NORMAL_CHARGE_CURRENT: float = config["NORMAL_CHARGE_CURRENT"] - self.NORMAL_BATTERY_VOLTAGE: float = config["NORMAL_BATTERY_VOLTAGE"] # 6.9 - self.CRITICAL_BATTERY_VOLTAGE: float = config["CRITICAL_BATTERY_VOLTAGE"] # 6.6 - self.vlowbatt: float = config["vlowbatt"] - self.battery_voltage: float = config[ - "battery_voltage" - ] # default value for testing REPLACE WITH REAL VALUE - self.current_draw: float = config[ - "current_draw" - ] # default value for testing REPLACE WITH REAL VALUE - self.REBOOT_TIME: int = config["REBOOT_TIME"] # 1 hour - self.turbo_clock: bool = config["turbo_clock"] + self.NORMAL_TEMP: int = config.getInt("NORMAL_TEMP") + self.NORMAL_BATT_TEMP: int = config.getInt("NORMAL_BATT_TEMP") + self.NORMAL_MICRO_TEMP: int = config.getInt("NORMAL_MICRO_TEMP") + self.NORMAL_CHARGE_CURRENT: float = config.getFloat("NORMAL_CHARGE_CURRENT") + self.NORMAL_BATTERY_VOLTAGE: float = config.getFloat("NORMAL_BATTERY_VOLTAGE") + self.CRITICAL_BATTERY_VOLTAGE: float = config.getFloat( + "CRITICAL_BATTERY_VOLTAGE" + ) + self.vlowbatt: float = config.getFloat("vlowbatt") + self.battery_voltage: float = config.getFloat("battery_voltage") + self.current_draw: float = config.getFloat("current_draw") + self.REBOOT_TIME: int = config.getInt("REBOOT_TIME") + self.turbo_clock: bool = config.getBool("turbo_clock") """ Setting up data buffers @@ -229,7 +217,7 @@ def __init__(self) -> None: self.i2c0: busio.I2C = busio.I2C(board.I2C0_SCL, board.I2C0_SDA) self.hardware["I2C0"] = True else: - self.debug_print("[Orpheus] I2C0 not initialized") + self.debug_print(f"{self.cubesatName} I2C0 not initialized") except Exception as e: self.error_print( diff --git a/main.py b/main.py index 76c3ca22..09aee9e5 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,8 @@ import microcontroller -import lib.pysquared.pysquared as pysquared +from lib.pysquared.config import Config +from lib.pysquared.pysquared import Satellite print("=" * 70) print("Hello World!") @@ -27,8 +28,10 @@ print(f"Code Starting in {loiter_time-i} seconds") time.sleep(1) - print("Initializing CubeSat") - c = pysquared.Satellite() + print("Initializing Config") + config = Config() + print("Initializing Cubesat") + c = Satellite(config) c.watchdog_pet() import gc # Garbage collection @@ -41,7 +44,7 @@ def debug_print(statement): if c.debug: print(co(str(c.uptime) + "[MAIN]" + str(statement), "blue", "bold")) - f = functions.functions(c) + f = functions.functions(c, config) def initial_boot(): c.watchdog_pet() diff --git a/repl.py b/repl.py index f52b1ac7..04b86fb8 100644 --- a/repl.py +++ b/repl.py @@ -1,4 +1,7 @@ -import lib.pysquared.pysquared as pysquared +# from lib.pysquared.pysquared import cubesat as c +import lib.pysquared.config as Config +import lib.pysquared.pysquared as Satellite print("Initializing a cubesat object as `c` in the REPL...") -c = pysquared.Satellite() +config = Config.Config() +c = Satellite.Satellite(config)