diff --git a/.github/workflows/build_tests.yml b/.github/workflows/build_tests.yml index b857af1..37fc583 100644 --- a/.github/workflows/build_tests.yml +++ b/.github/workflows/build_tests.yml @@ -37,13 +37,3 @@ jobs: - name: Install repo run: | pip install . - - uses: pypa/gh-action-pip-audit@v1.0.8 - with: - # Ignore irrelevant Mercurial vulnerability - # Ignore `requests` and `urllib3` vulnerabilities as they are not used in this package - # Ignore `setuptools` and `pip` vulnerabilities I don't think they apply here - ignore-vulns: | - PYSEC-2023-228 - GHSA-9wx4-h78v-vm56 - GHSA-34jh-p97f-mpxf - PYSEC-2022-43012 diff --git a/ovos_PHAL_plugin_mk1/__init__.py b/ovos_PHAL_plugin_mk1/__init__.py index 9e892ef..28836b7 100644 --- a/ovos_PHAL_plugin_mk1/__init__.py +++ b/ovos_PHAL_plugin_mk1/__init__.py @@ -1,17 +1,18 @@ import time from threading import Event from time import sleep +from typing import Optional import serial from ovos_bus_client.message import Message +from ovos_mark1.faceplate.icons import MusicIcon, WarningIcon, SnowIcon, StormIcon, SunnyIcon, \ + CloudyIcon, PartlyCloudyIcon, WindIcon, RainIcon, LightRainIcon +from ovos_plugin_manager.phal import PHALPlugin from ovos_utils import create_daemon from ovos_utils.log import LOG from ovos_utils.network_utils import is_connected -from ovos_mark1.faceplate.icons import MusicIcon, WarningIcon, SnowIcon, StormIcon, SunnyIcon, \ - CloudyIcon, PartlyCloudyIcon, WindIcon, RainIcon, LightRainIcon from ovos_PHAL_plugin_mk1.arduino import EnclosureReader, EnclosureWriter -from ovos_plugin_manager.phal import PHALPlugin # The Mark 1 hardware consists of a Raspberry Pi main CPU which is connected @@ -85,6 +86,9 @@ def __init__(self, bus=None, config=None): self.bus.on("mycroft.audio.service.play", self.on_music) self.bus.on("mycroft.audio.service.stop", self.on_display_reset) + self.bus.on("ovos.mk1.display_date", self.on_display_date) + self.bus.on("ovos.mk1.display_time", self.on_display_time) + self.bus.emit(Message("system.factory.reset.register", {"skill_id": "ovos-phal-plugin-mk1"})) @@ -144,7 +148,7 @@ def __init_serial(self): LOG.exception(f"Impossible to connect to serial: {self.port}") raise - def __reset(self, message=None): + def __reset(self, message: Optional[Message] = None): self.writer.write("eyes.reset") self.writer.write("mouth.reset") @@ -154,10 +158,10 @@ def handle_button_press(self): else: self.bus.emit(Message("mycroft.mic.listen")) - def on_music(self, message=None): + def on_music(self, message: Optional[Message] = None): MusicIcon(bus=self.bus).display() - def handle_get_color(self, message): + def handle_get_color(self, message: Message): """Get the eye RGB color for all pixels Returns: (list) list of (r,g,b) tuples for each eye pixel @@ -165,36 +169,36 @@ def handle_get_color(self, message): self.bus.emit(message.reply("enclosure.eyes.rgb", {"pixels": self._current_rgb})) - def handle_factory_reset(self, message): + def handle_factory_reset(self, message: Optional[Message] = None): self.writer.write("eyes.spin") self.writer.write("mouth.reset") # TODO re-flash firmware to faceplate - def handle_register_factory_reset_handler(self, message): + def handle_register_factory_reset_handler(self, message: Message): self.bus.emit(message.reply("system.factory.reset.register", {"skill_id": "ovos-phal-plugin-mk1"})) # Audio Events - def on_record_begin(self, message=None): + def on_record_begin(self, message: Optional[Message] = None): # NOTE: ignore self._mouth_events, listening should ALWAYS be obvious self.listening = True self.on_listen(message) - def on_record_end(self, message=None): + def on_record_end(self, message: Optional[Message] = None): self.listening = False self.on_display_reset(message) - def on_audio_output_start(self, message=None): + def on_audio_output_start(self, message: Optional[Message] = None): self.speaking = True if self._mouth_events: self.on_talk(message) - def on_audio_output_end(self, message=None): + def on_audio_output_end(self, message: Optional[Message] = None): self.speaking = False if self._mouth_events: self.on_display_reset(message) - def on_awake(self, message=None): + def on_awake(self, message: Optional[Message] = None): ''' on wakeup animation triggered by "mycroft.awoken" ''' @@ -205,7 +209,7 @@ def on_awake(self, message=None): # brighten the rest of the way self.writer.write("eyes.level=" + str(self.old_brightness)) - def on_sleep(self, message=None): + def on_sleep(self, message: Optional[Message] = None): ''' on naptime animation triggered by "recognizer_loop:sleep" ''' @@ -218,7 +222,7 @@ def on_sleep(self, message=None): time.sleep(0.15) self.writer.write("eyes.look=d") - def on_reset(self, message=None): + def on_reset(self, message: Optional[Message] = None): """The enclosure should restore itself to a started state. Typically this would be represented by the eyes being 'open' and the mouth reset to its default (smile or blank). @@ -228,31 +232,31 @@ def on_reset(self, message=None): self.writer.write("mouth.reset") # System Events - def on_no_internet(self, message=None): + def on_no_internet(self, message: Optional[Message] = None): """ triggered by "enclosure.notify.no_internet" """ WarningIcon(bus=self.bus).display() - def on_system_reset(self, message=None): + def on_system_reset(self, message: Optional[Message] = None): """The enclosure hardware should reset any CPUs, etc. triggered by "enclosure.system.reset" """ self.writer.write("system.reset") - def on_system_mute(self, message=None): + def on_system_mute(self, message: Optional[Message] = None): """Mute (turn off) the system speaker. triggered by "enclosure.system.mute" """ self.writer.write("system.mute") - def on_system_unmute(self, message=None): + def on_system_unmute(self, message: Optional[Message] = None): """Unmute (turn on) the system speaker. triggered by "enclosure.system.unmute" """ self.writer.write("system.unmute") - def on_system_blink(self, message=None): + def on_system_blink(self, message: Optional[Message] = None): """The 'eyes' should blink the given number of times. triggered by "enclosure.system.blink" @@ -260,49 +264,46 @@ def on_system_blink(self, message=None): times (int): number of times to blink """ times = 1 - if message and message.data: + if message: times = message.data.get("times", times) self.writer.write("system.blink=" + str(times)) # Eyes messages - def on_eyes_on(self, message=None): + def on_eyes_on(self, message: Optional[Message] = None): """Illuminate or show the eyes. triggered by "enclosure.eyes.on" """ self.writer.write("eyes.on") - def on_eyes_off(self, message=None): + def on_eyes_off(self, message: Optional[Message] = None): """Turn off or hide the eyes. triggered by "enclosure.eyes.off" """ self.writer.write("eyes.off") - def on_eyes_fill(self, message=None): + def on_eyes_fill(self, message: Message): """triggered by "enclosure.eyes.fill" """ - amount = 0 - if message and message.data: - percent = int(message.data.get("percentage", 0)) - amount = int(round(23.0 * percent / 100.0)) + percent = int(message.data.get("percentage", 0)) + amount = int(round(23.0 * percent / 100.0)) self.writer.write("eyes.fill=" + str(amount)) - def on_eyes_blink(self, message=None): + def on_eyes_blink(self, message: Optional[Message] = None): """Make the eyes blink triggered by "enclosure.eyes.blink" Args: side (str): 'r', 'l', or 'b' for 'right', 'left' or 'both' """ - side = "b" - if message and message.data: - side = message.data.get("side", side) + if message: + side = message.data.get("side", "b") self.writer.write("eyes.blink=" + side) - def on_eyes_narrow(self, message=None): + def on_eyes_narrow(self, message: Optional[Message] = None): """Make the eyes look narrow, like a squint triggered by "enclosure.eyes.narrow" """ self.writer.write("eyes.narrow") - def on_eyes_look(self, message=None): + def on_eyes_look(self, message: Message): """Make the eyes look to the given side triggered by "enclosure.eyes.look" Args: @@ -312,11 +313,10 @@ def on_eyes_look(self, message=None): 'd' for down 'c' for crossed """ - if message and message.data: - side = message.data.get("side", "") - self.writer.write("eyes.look=" + side) + side = message.data.get("side", "") + self.writer.write("eyes.look=" + side) - def on_eyes_color(self, message=None): + def on_eyes_color(self, message: Message): """Change the eye color to the given RGB color triggered by "enclosure.eyes.color" Args: @@ -324,107 +324,96 @@ def on_eyes_color(self, message=None): g (int): 0-255, green value b (int): 0-255, blue value """ - r, g, b = 255, 255, 255 - if message and message.data: - r = int(message.data.get("r", r)) - g = int(message.data.get("g", g)) - b = int(message.data.get("b", b)) + r = int(message.data.get("r", 255)) + g = int(message.data.get("g", 255)) + b = int(message.data.get("b", 255)) color = (r * 65536) + (g * 256) + b self._current_rgb = [(r, g, b) for i in range(self._num_pixels)] self.writer.write("eyes.color=" + str(color)) - def on_eyes_brightness(self, message=None): + def on_eyes_brightness(self, message: Message): """Set the brightness of the eyes in the display. triggered by "enclosure.eyes.brightness" Args: level (int): 1-30, bigger numbers being brighter """ - level = 30 - if message and message.data: - level = message.data.get("level", level) + level = message.data.get("level", 30) self.writer.write("eyes.level=" + str(level)) - def on_eyes_reset(self, message=None): + def on_eyes_reset(self, message: Optional[Message] = None): """Restore the eyes to their default (ready) state triggered by "enclosure.eyes.reset". """ self.writer.write("eyes.reset") - def on_eyes_timed_spin(self, message=None): + def on_eyes_timed_spin(self, message: Message): """Make the eyes 'roll' for the given time. triggered by "enclosure.eyes.timedspin" Args: length (int): duration in milliseconds of roll, None = forever """ - length = 5000 - if message and message.data: - length = message.data.get("length", length) + length = message.data.get("length", 5000) self.writer.write("eyes.spin=" + str(length)) - def on_eyes_volume(self, message=None): + def on_eyes_volume(self, message: Message): """Indicate the volume using the eyes triggered by "enclosure.eyes.volume" Args: volume (int): 0 to 11 """ - volume = 4 - if message and message.data: - volume = message.data.get("volume", volume) + volume = message.data.get("volume", 4) self.writer.write("eyes.volume=" + str(volume)) - def on_eyes_spin(self, message=None): + def on_eyes_spin(self, message: Optional[Message] = None): """ triggered by "enclosure.eyes.spin" """ self.writer.write("eyes.spin") - def on_eyes_set_pixel(self, message=None): + def on_eyes_set_pixel(self, message: Message): """ triggered by "enclosure.eyes.set_pixel" """ - idx = 0 - r, g, b = 255, 255, 255 - if message and message.data: - idx = int(message.data.get("idx", idx)) - r = int(message.data.get("r", r)) - g = int(message.data.get("g", g)) - b = int(message.data.get("b", b)) + idx = int(message.data.get("idx", 0)) + r = int(message.data.get("r", 255)) + g = int(message.data.get("g", 255)) + b = int(message.data.get("b", 255)) self._current_rgb[idx] = (r, g, b) color = (r * 65536) + (g * 256) + b self.writer.write("eyes.set=" + str(idx) + "," + str(color)) # Display (faceplate) messages - def on_display_reset(self, message=None): + def on_display_reset(self, message: Optional[Message] = None): """Restore the mouth display to normal (blank) triggered by "enclosure.mouth.reset" / "recognizer_loop:record_end" """ self.writer.write("mouth.reset") - def on_talk(self, message=None): + def on_talk(self, message: Optional[Message] = None): """Show a generic 'talking' animation for non-synched speech triggered by "enclosure.mouth.talk" """ self.writer.write("mouth.talk") - def on_think(self, message=None): + def on_think(self, message: Optional[Message] = None): """Show a 'thinking' image or animation triggered by "enclosure.mouth.think" """ self.writer.write("mouth.think") - def on_listen(self, message=None): + def on_listen(self, message: Optional[Message] = None): """Show a 'thinking' image or animation triggered by "enclosure.mouth.listen" / "recognizer_loop:record_begin" """ self.writer.write("mouth.listen") - def on_smile(self, message=None): + def on_smile(self, message: Optional[Message] = None): """Show a 'smile' image or animation triggered by "enclosure.mouth.smile" """ self.writer.write("mouth.smile") - def on_viseme(self, message=None): + def on_viseme(self, message: Message): """Display a viseme mouth shape for synced speech triggered by "enclosure.mouth.viseme" @@ -438,11 +427,10 @@ def on_viseme(self, message=None): 5 = shape for sounds like 'f' or 'v' 6 = shape for sounds like 'oy' or 'ao' """ - if message and message.data: - code = message.data["code"] - self.writer.write('mouth.viseme=' + code) + code = message.data["code"] + self.writer.write('mouth.viseme=' + code) - def on_viseme_list(self, message=None): + def on_viseme_list(self, message: Message): """ Send mouth visemes as a list in a single message. Args: @@ -459,30 +447,29 @@ def on_viseme_list(self, message=None): 5 = shape for sounds like 'f' or 'v' 6 = shape for sounds like 'oy' or 'ao' """ - if message and message.data: - start = message.data['start'] - visemes = message.data['visemes'] - - def animate_mouth(): - nonlocal start, visemes - self.showing_visemes = True - previous_end = -1 - for code, end in visemes: - if not self.showing_visemes: - break - if end < previous_end: - start = time.time() - previous_end = end - if time.time() < start + end: - self.writer.write('mouth.viseme=' + code) - sleep(start + end - time.time()) - self.writer.write("mouth.reset") - self.showing_visemes = False - - # use a thread to not block FakeBus (eg, voice sat) - create_daemon(animate_mouth) - - def on_text(self, message=None): + start = message.data['start'] + visemes = message.data['visemes'] + + def animate_mouth(): + nonlocal start, visemes + self.showing_visemes = True + previous_end = -1 + for code, end in visemes: + if not self.showing_visemes: + break + if end < previous_end: + start = time.time() + previous_end = end + if time.time() < start + end: + self.writer.write('mouth.viseme=' + code) + sleep(start + end - time.time()) + self.writer.write("mouth.reset") + self.showing_visemes = False + + # use a thread to not block FakeBus (eg, voice sat) + create_daemon(animate_mouth) + + def on_text(self, message: Message): """Display text (scrolling as needed) triggered by "enclosure.mouth.text" @@ -490,12 +477,10 @@ def on_text(self, message=None): Args: text (str): text string to display """ - text = "" - if message and message.data: - text = message.data.get("text", text) + text = message.data.get("text", "") self.writer.write("mouth.text=" + text) - def on_display(self, message=None): + def on_display(self, message: Message): """Display images on faceplate. Currently supports images up to 16x8, or half the face. You can use the 'x' parameter to cover the other half of the faceplate. @@ -511,22 +496,35 @@ def on_display(self, message=None): Useful if you'd like to display muliple images on the faceplate at once. """ - code = "" - x_offset = "" - y_offset = "" - clear_previous = "" - if message and message.data: - code = message.data.get("img_code", code) - x_offset = int(message.data.get("xOffset", x_offset)) - y_offset = int(message.data.get("yOffset", y_offset)) - clear_previous = message.data.get("clearPrev", clear_previous) - - clear_previous = int(str(clear_previous) == "True") + code = message.data.get("img_code", "") + x_offset = int(message.data.get("xOffset", 0)) + y_offset = int(message.data.get("yOffset", 0)) + clear_previous = message.data.get("clearPrev", "") + self._do_display(code, int(x_offset), int(y_offset), clear_previous.lower() == "true") + + def _do_display(self, img_code: str, x_offset: int = 0, y_offset: int = 0, refresh: bool = False): + """Display images on faceplate. Currently supports images up to 16x8, + or half the face. You can use the 'x' parameter to cover the other + half of the faceplate. + + triggered by "enclosure.mouth.display" + + Args: + img_code (str): text string that encodes a black and white image + x (int): x offset for image + y (int): y offset for image + refresh (bool): specify whether to clear the faceplate before + displaying the new image or not. + Useful if you'd like to display muliple images + on the faceplate at once. + """ + + clear_previous = int(refresh) clear_previous = "cP=" + str(clear_previous) + "," x_offset = "x=" + str(x_offset) + "," y_offset = "y=" + str(y_offset) + "," - message = "mouth.icon=" + x_offset + y_offset + clear_previous + code + message = "mouth.icon=" + x_offset + y_offset + clear_previous + img_code # Check if message exceeds Arduino's serial buffer input limit 64 bytes if len(message) > 60: message1 = message[:31] + "$" @@ -538,7 +536,7 @@ def on_display(self, message=None): sleep(0.1) self.writer.write(message) - def on_weather_display(self, message=None): + def on_weather_display(self, message: Message): """Show a the temperature and a weather icon triggered by "enclosure.weather.display" @@ -555,37 +553,94 @@ def on_weather_display(self, message=None): 7 = wind/mist temp (int): the temperature (either C or F, not indicated) """ - if message and message.data: - # Convert img_code to icon - img_code = message.data.get("img_code", None) - icon = None - if img_code == 0: - # sunny - icon = SunnyIcon(bus=self.bus).encode() - elif img_code == 1: - # partly cloudy - icon = PartlyCloudyIcon(bus=self.bus).encode() - elif img_code == 2: - # cloudy - icon = CloudyIcon(bus=self.bus).encode() - elif img_code == 3: - # light rain - icon = LightRainIcon(bus=self.bus).encode() - elif img_code == 4: - # raining - icon = RainIcon(bus=self.bus).encode() - elif img_code == 5: - # storming - icon = StormIcon(bus=self.bus).encode() - elif img_code == 6: - # snowing - icon = SnowIcon(bus=self.bus).encode() - elif img_code == 7: - # wind/mist - icon = WindIcon(bus=self.bus).encode() - - temp = message.data.get("temp", None) - if icon is not None and temp is not None: - icon = "x=2," + icon - msg = "weather.display=" + str(temp) + "," + str(icon) - self.writer.write(msg) + # Convert img_code to icon + img_code = message.data.get("img_code", None) + icon = None + if img_code == 0: + # sunny + icon = SunnyIcon(bus=self.bus).encode() + elif img_code == 1: + # partly cloudy + icon = PartlyCloudyIcon(bus=self.bus).encode() + elif img_code == 2: + # cloudy + icon = CloudyIcon(bus=self.bus).encode() + elif img_code == 3: + # light rain + icon = LightRainIcon(bus=self.bus).encode() + elif img_code == 4: + # raining + icon = RainIcon(bus=self.bus).encode() + elif img_code == 5: + # storming + icon = StormIcon(bus=self.bus).encode() + elif img_code == 6: + # snowing + icon = SnowIcon(bus=self.bus).encode() + elif img_code == 7: + # wind/mist + icon = WindIcon(bus=self.bus).encode() + + temp = message.data.get("temp", None) + if icon is not None and temp is not None: + icon = "x=2," + icon + msg = "weather.display=" + str(temp) + "," + str(icon) + self.writer.write(msg) + + # date/time + def on_display_date(self, message: Message): + self._deactivate_mouth_events() + self.on_text(message) + sleep(10) + self.on_display_reset() + self._activate_mouth_events() + + def on_display_time(self, message: Message): + code_dict = { + ':': 'CIICAA', + '0': 'EIMHEEMHAA', + '1': 'EIIEMHAEAA', + '2': 'EIEHEFMFAA', + '3': 'EIEFEFMHAA', + '4': 'EIMBABMHAA', + '5': 'EIMFEFEHAA', + '6': 'EIMHEFEHAA', + '7': 'EIEAEAMHAA', + '8': 'EIMHEFMHAA', + '9': 'EIMBEBMHAA', + } + + self._deactivate_mouth_events() + display_time = message.data.get("text") + # clear screen (draw two blank sections, numbers cover rest) + if len(display_time) == 4: + # for 4-character times, 9x8 blank + self._do_display(img_code="JIAAAAAAAAAAAAAAAAAA", refresh=False) + # self.enclosure.mouth_display(img_code="JIAAAAAAAAAAAAAAAAAA", + # refresh=False) + self._do_display(img_code="JIAAAAAAAAAAAAAAAAAA", x_offset=22, refresh=False) + # self.enclosure.mouth_display(img_code="JIAAAAAAAAAAAAAAAAAA", + # x=22, refresh=False) + else: + # for 5-character times, 7x8 blank + self._do_display(img_code="HIAAAAAAAAAAAAAA", refresh=False) + # self.enclosure.mouth_display(img_code="HIAAAAAAAAAAAAAA", + # refresh=False) + self._do_display(img_code="HIAAAAAAAAAAAAAA", x_offset=24, refresh=False) + # self.enclosure.mouth_display(img_code="HIAAAAAAAAAAAAAA", + # x=24, refresh=False) + + # draw the time, centered on display + xoffset = int((32 - (4 * (len(display_time)) - 2)) / 2) + for c in display_time: + if c in code_dict: + self._do_display(code_dict[c], x_offset=xoffset, refresh=False) + if c == ":": + xoffset += 2 # colon is 1 pixels + a space + else: + xoffset += 4 # digits are 3 pixels + a space + + self._do_display("CIAAAA", x_offset=29, refresh=False) + sleep(5) + self.on_display_reset() + self._activate_mouth_events()