diff --git a/pyproject.toml b/pyproject.toml index ffc9104..9023747 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,10 +53,11 @@ dynamic = ["version"] requires-python = ">=3.8" dependencies = [ "pyserial >=3,<4", - "april_vision >=2.1,<3", + "april_vision==2.2.0", "paho-mqtt >=2,<3", "pydantic >=1.9.1,<2", "typing-extensions; python_version<'3.10'", + "tomli >=2.0.1,<3; python_version<'3.11'", ] classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", diff --git a/sr/robot3/arduino.py b/sr/robot3/arduino.py index c3d86c6..f628bbd 100644 --- a/sr/robot3/arduino.py +++ b/sr/robot3/arduino.py @@ -11,10 +11,18 @@ from .exceptions import IncorrectBoardError from .logging import log_to_debug from .serial_wrapper import SerialWrapper -from .utils import Board, BoardIdentity, get_USB_identity, map_to_float +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, + get_simulator_boards, get_USB_identity, map_to_float, +) logger = logging.getLogger(__name__) BAUDRATE = 115200 +if IN_SIMULATOR: + # Place each command on a new line in the simulator to simplify the implementation + ENDLINE = '\n' +else: + ENDLINE = '' SUPPORTED_VID_PIDS = { (0x2341, 0x0043), # Arduino Uno rev 3 @@ -58,7 +66,7 @@ class Arduino(Board): """ The Arduino board interface. - This is intended to be used with Arduino Uno boards running the sbot firmware. + This is intended to be used with Arduino Uno boards running the SR firmware. :param serial_port: The serial port to connect to. :param initial_identity: The identity of the board, as reported by the USB descriptor. @@ -127,17 +135,46 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified Arduino at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found Arduino-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified Arduino at port {serial_port!r} " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified arduino at port {serial_port!r} " + "could not be identified. Ignoring this device") + return None + + logger.warning( + f"Found Arduino-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, Arduino]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to Arduinos + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('Arduino'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, @@ -152,6 +189,9 @@ def _get_supported_boards( :param ignored_serials: A list of serial number to ignore during board discovery :return: A mapping of board serial numbers to Arduinos """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} if ignored_serials is None: ignored_serials = [] @@ -192,7 +232,7 @@ def identify(self) -> BoardIdentity: :return: The identity of the board. """ - response = self._serial.query('v', endl='') + response = self._serial.query('v', endl=ENDLINE) response_fields = response.split(':') # The arduino firmware cannot access the serial number reported in the USB descriptor @@ -224,7 +264,9 @@ def command(self, command: str) -> str: :param command: The command to send to the board. :return: The response from the board. """ - return self._serial.query(command, endl='') + if IN_SIMULATOR: + logger.warning("The command method is not fully supported in the simulator") + return self._serial.query(command, endl=ENDLINE) def map_pin_number(self, pin_number: int) -> str: """ @@ -241,6 +283,34 @@ def map_pin_number(self, pin_number: int) -> str: raise ValueError("Invalid pin provided") from None return chr(pin_number + ord('a')) + @log_to_debug + def ultrasound_measure( + self, + pulse_pin: int, + echo_pin: int, + ) -> int: + """ + Measure the distance to an object using an ultrasound sensor. + + The sensor can only measure distances up to 4m. + + :param pulse_pin: The pin to send the ultrasound pulse from. + :param echo_pin: The pin to read the ultrasound echo from. + :raises ValueError: If either of the pins are invalid + :return: The distance measured by the ultrasound sensor in mm. + """ + try: # bounds check + pulse_id = self.map_pin_number(pulse_pin) + except ValueError: + raise ValueError("Invalid pulse pin provided") from None + try: + echo_id = self.map_pin_number(echo_pin) + except ValueError: + raise ValueError("Invalid echo pin provided") from None + + response = self._serial.query(f'u{pulse_id}{echo_id}', endl=ENDLINE) + return int(response) + def __repr__(self) -> str: return f"<{self.__class__.__qualname__}: {self._serial}>" @@ -303,7 +373,7 @@ def mode(self, value: GPIOPinMode) -> None: mode_char = MODE_CHAR_MAP.get(value) if mode_char is None: raise IOError(f'Pin mode {value} is not supported') - self._serial.write(self._build_command(mode_char), endl='') + self._serial.write(self._build_command(mode_char), endl=ENDLINE) self._mode = value @log_to_debug @@ -318,7 +388,7 @@ def digital_read(self) -> bool: self._check_if_disabled() if self.mode not in DIGITAL_READ_MODES: raise IOError(f'Digital read is not supported in {self.mode}') - response = self._serial.query(self._build_command('r'), endl='') + response = self._serial.query(self._build_command('r'), endl=ENDLINE) return response == 'h' @log_to_debug @@ -334,9 +404,9 @@ def digital_write(self, value: bool) -> None: if self.mode not in DIGITAL_WRITE_MODES: raise IOError(f'Digital write is not supported in {self.mode}') if value: - self._serial.write(self._build_command('h'), endl='') + self._serial.write(self._build_command('h'), endl=ENDLINE) else: - self._serial.write(self._build_command('l'), endl='') + self._serial.write(self._build_command('l'), endl=ENDLINE) @log_to_debug def analog_read(self) -> float: @@ -357,7 +427,7 @@ def analog_read(self) -> float: raise IOError(f'Analog read is not supported in {self.mode}') if not self._supports_analog: raise IOError('Pin does not support analog read') - response = self._serial.query(self._build_command('a'), endl='') + response = self._serial.query(self._build_command('a'), endl=ENDLINE) # map the response from the ADC range to the voltage range return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0) diff --git a/sr/robot3/camera.py b/sr/robot3/camera.py index db659fa..f8f0280 100644 --- a/sr/robot3/camera.py +++ b/sr/robot3/camera.py @@ -1,9 +1,11 @@ """An implementation of a camera board using the april_vision library.""" +from __future__ import annotations + import logging from pathlib import Path from typing import Callable, Dict, Iterable, List, Optional, Union -from april_vision import CalibratedCamera, Frame +from april_vision import CalibratedCamera, Frame, FrameSource from april_vision import Marker as AprilMarker from april_vision import ( Processor, USBCamera, __version__, calibrations, @@ -13,8 +15,11 @@ from numpy.typing import NDArray from .marker import Marker -from .utils import Board, BoardIdentity +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, BoardInfo, get_simulator_boards, +) +PathLike = Union[Path, str] LOGGER = logging.getLogger(__name__) robot_calibrations = calibrations.copy() @@ -30,9 +35,11 @@ class AprilCamera(Board): in order to determine the spatial positon and orientation of the markers that it has detected. - :param camera_id: The index of the camera to use. - :param camera_data: The calibration data for the camera. + :param camera_source: The source of the camera frames. + :param calibration: The intrinsic calibration of the camera. :param serial_num: The serial number of the camera. + :param name: The name of the camera. + :param vidpid: The VID:PID of the camera. """ __slots__ = ('_serial_num', '_cam') @@ -56,28 +63,82 @@ def _discover(cls) -> Dict[str, 'AprilCamera']: :return: A dict of cameras, keyed by their name and index. """ + if IN_SIMULATOR: + return { + camera_info.serial_number: cls.from_webots_camera(camera_info) + for camera_info in get_simulator_boards('CameraBoard') + } + return { (serial := f"{camera_data.name} - {camera_data.index}"): - cls(camera_data.index, camera_data=camera_data, serial_num=serial) + cls.from_id(camera_data.index, camera_data=camera_data, serial_num=serial) for camera_data in find_cameras(robot_calibrations) } - def __init__(self, camera_id: int, camera_data: CalibratedCamera, serial_num: str) -> None: + def __init__( + self, camera_source: FrameSource, + calibration: tuple[float, float, float, float] | None, + serial_num: str, + name: str, + vidpid: str = "", + ) -> None: + # The processor handles the detection and pose estimation + self._cam = Processor( + camera_source, + calibration=calibration, + name=name, + vidpid=vidpid, + mask_unknown_size_tags=True, + ) + self._serial_num = serial_num + + @classmethod + def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera': + """ + Create a camera from a webots camera. + + :param camera_info: The information about the virtual camera, + including the url to connect to. + :return: The camera object. + """ + from .simulator.camera import WebotsRemoteCameraSource + + camera_source = WebotsRemoteCameraSource(camera_info) + return cls( + camera_source, + calibration=camera_source.calibration, + serial_num=camera_info.serial_number, + name=camera_info.serial_number, + ) + + @classmethod + def from_id( + cls, + camera_id: int, + camera_data: CalibratedCamera, + serial_num: str, + ) -> 'AprilCamera': + """ + Create a camera from an ID. + + :param camera_id: The ID of the camera to create. + :param camera_data: The calibration data for the camera. + :param serial_num: The serial number of the camera. + :return: The camera object. + """ # The camera source handles the connection between the camera and the processor camera_source = USBCamera.from_calibration_file( camera_id, calibration_file=camera_data.calibration, vidpid=camera_data.vidpid, ) - # The processor handles the detection and pose estimation - self._cam = Processor( + return cls( camera_source, calibration=camera_source.calibration, + serial_num=serial_num, name=camera_data.name, vidpid=camera_data.vidpid, - mask_unknown_size_tags=True, ) - self._serial_num = serial_num def identify(self) -> BoardIdentity: """ @@ -103,34 +164,41 @@ def close(self) -> None: """ self._cam.close() - def see(self, *, frame: Optional[NDArray] = None) -> List[Marker]: + def see( + self, + *, + frame: Optional[NDArray] = None, + save: Optional[PathLike] = None, + ) -> List[Marker]: """ Capture an image and identify fiducial markers. :param frame: An image to detect markers in, instead of capturing a new one, + :param save: If given, save the annotated frame to the path. + This is given a JPEG extension if none is provided. :returns: list of markers that the camera could see. """ + if frame is None: + frame = self._cam.capture() + markers = self._cam.see(frame=frame) + + if save: + self._cam.save(save, frame=frame, detections=markers) return [Marker.from_april_vision_marker(marker) for marker in markers] - def capture(self) -> NDArray: + def capture(self, *, save: Optional[PathLike] = None) -> NDArray: """ Get the raw image data from the camera. + :param save: If given, save the annotated frame to the path. + This is given a JPEG extension if none is provided. :returns: Camera pixel data """ - return self._cam.capture() - - def save(self, path: Union[Path, str], *, frame: Optional[NDArray] = None) -> None: - """ - Save an annotated image to a path. - - :param path: The path to save the image to, - this is given a JPEG extension if none is provided. - :param frame: An image to annotate and save, instead of capturing a new one, - defaults to None - """ - self._cam.save(path, frame=frame) + raw_frame = self._cam.capture() + if save: + self._cam.save(save, frame=raw_frame, annotated=False) + return raw_frame def _set_marker_sizes( self, diff --git a/sr/robot3/kch.py b/sr/robot3/kch.py index 1503312..a786d73 100644 --- a/sr/robot3/kch.py +++ b/sr/robot3/kch.py @@ -2,17 +2,28 @@ from __future__ import annotations import atexit +import logging import warnings from enum import IntEnum, unique from typing import Optional +from .exceptions import BoardDisconnectionError, IncorrectBoardError +from .serial_wrapper import SerialWrapper +from .utils import IN_SIMULATOR, Board, BoardIdentity, get_simulator_boards + try: import RPi.GPIO as GPIO # isort: ignore - HAS_HAT = True + HAS_HAT = True if not IN_SIMULATOR else False except ImportError: HAS_HAT = False +logger = logging.getLogger(__name__) + +# Only used in the simulator +BAUDRATE = 115200 + + @unique class RobotLEDs(IntEnum): """Mapping of LEDs to GPIO Pins.""" @@ -71,6 +82,8 @@ class KCH: __slots__ = ('_leds', '_pwm') def __init__(self) -> None: + self._leds: tuple[LED, ...] + if HAS_HAT: GPIO.setmode(GPIO.BCM) with warnings.catch_warnings(): @@ -81,11 +94,7 @@ def __init__(self) -> None: # suppress this as we know the reason behind the warning GPIO.setup(RobotLEDs.all_leds(), GPIO.OUT, initial=GPIO.LOW) self._pwm: Optional[GPIO.PWM] = None - self._leds = tuple( - LED(rgb_led) for rgb_led in RobotLEDs.user_leds() - ) - if HAS_HAT: # We are not running cleanup so the LED state persists after the code completes, # this will cause a warning with `GPIO.setup()` which we can suppress # atexit.register(GPIO.cleanup) @@ -94,6 +103,25 @@ def __init__(self) -> None: # Mypy isn't aware of the version of atexit.register(func, *args) atexit.register(GPIO.cleanup, RobotLEDs.START) # type: ignore[call-arg] + self._leds = tuple( + PhysicalLED(rgb_led) for rgb_led in RobotLEDs.user_leds() + ) + elif IN_SIMULATOR: + led_server = LedServer.initialise() + if led_server is not None: + self._leds = tuple( + SimulationLED(v, led_server) + for v in range(len(RobotLEDs.user_leds())) + ) + else: + self._leds = tuple( + LED() for _ in RobotLEDs.user_leds() + ) + else: + self._leds = tuple( + LED() for _ in RobotLEDs.user_leds() + ) + @property def _start(self) -> bool: """Get the state of the start LED.""" @@ -122,7 +150,60 @@ def leds(self) -> tuple['LED', ...]: class LED: - """User programmable LED.""" + """ + User programmable LED. + + This is a dummy class to handle the case where this is run on neither the + Raspberry Pi nor the simulator. + As such, this class does nothing. + """ + __slots__ = ('_led',) + + @property + def r(self) -> bool: + """Get the state of the Red LED segment.""" + return False + + @r.setter + def r(self, value: bool) -> None: + """Set the state of the Red LED segment.""" + + @property + def g(self) -> bool: + """Get the state of the Green LED segment.""" + return False + + @g.setter + def g(self, value: bool) -> None: + """Set the state of the Green LED segment.""" + + @property + def b(self) -> bool: + """Get the state of the Blue LED segment.""" + return False + + @b.setter + def b(self, value: bool) -> None: + """Set the state of the Blue LED segment.""" + + @property + def colour(self) -> tuple[bool, bool, bool]: + """Get the colour of the user LED.""" + return False, False, False + + @colour.setter + def colour(self, value: tuple[bool, bool, bool]) -> None: + """Set the colour of the user LED.""" + if not isinstance(value, (tuple, list)) or len(value) != 3: + raise ValueError("The LED requires 3 values for it's colour") + + +class PhysicalLED(LED): + """ + User programmable LED. + + Used when running on the Raspberry Pi to control the actual LEDs. + """ __slots__ = ('_led',) def __init__(self, led: tuple[int, int, int]): @@ -131,47 +212,41 @@ def __init__(self, led: tuple[int, int, int]): @property def r(self) -> bool: """Get the state of the Red LED segment.""" - return GPIO.input(self._led[0]) if HAS_HAT else False + return GPIO.input(self._led[0]) @r.setter def r(self, value: bool) -> None: """Set the state of the Red LED segment.""" - if HAS_HAT: - GPIO.output(self._led[0], GPIO.HIGH if value else GPIO.LOW) + GPIO.output(self._led[0], GPIO.HIGH if value else GPIO.LOW) @property def g(self) -> bool: """Get the state of the Green LED segment.""" - return GPIO.input(self._led[1]) if HAS_HAT else False + return GPIO.input(self._led[1]) @g.setter def g(self, value: bool) -> None: """Set the state of the Green LED segment.""" - if HAS_HAT: - GPIO.output(self._led[1], GPIO.HIGH if value else GPIO.LOW) + GPIO.output(self._led[1], GPIO.HIGH if value else GPIO.LOW) @property def b(self) -> bool: """Get the state of the Blue LED segment.""" - return GPIO.input(self._led[2]) if HAS_HAT else False + return GPIO.input(self._led[2]) @b.setter def b(self, value: bool) -> None: """Set the state of the Blue LED segment.""" - if HAS_HAT: - GPIO.output(self._led[2], GPIO.HIGH if value else GPIO.LOW) + GPIO.output(self._led[2], GPIO.HIGH if value else GPIO.LOW) @property def colour(self) -> tuple[bool, bool, bool]: """Get the colour of the user LED.""" - if HAS_HAT: - return ( - GPIO.input(self._led[0]), - GPIO.input(self._led[1]), - GPIO.input(self._led[2]), - ) - else: - return False, False, False + return ( + GPIO.input(self._led[0]), + GPIO.input(self._led[1]), + GPIO.input(self._led[2]), + ) @colour.setter def colour(self, value: tuple[bool, bool, bool]) -> None: @@ -179,10 +254,161 @@ def colour(self, value: tuple[bool, bool, bool]) -> None: if not isinstance(value, (tuple, list)) or len(value) != 3: raise ValueError("The LED requires 3 values for it's colour") - if HAS_HAT: - GPIO.output( - self._led, - tuple( - GPIO.HIGH if v else GPIO.LOW for v in value - ), - ) + GPIO.output( + self._led, + tuple( + GPIO.HIGH if v else GPIO.LOW for v in value + ), + ) + + +class LedServer(Board): + """ + LED control over a socket. + + Used when running in the simulator to control the simulated LEDs. + """ + + @staticmethod + def get_board_type() -> str: + """ + Return the type of the board. + + :return: The literal string 'KCHv1B'. + """ + return 'KCHv1B' + + def __init__( + self, + serial_port: str, + initial_identity: BoardIdentity | None = None, + ) -> None: + if initial_identity is None: + initial_identity = BoardIdentity() + self._serial = SerialWrapper( + serial_port, + BAUDRATE, + identity=initial_identity, + ) + + self._identity = self.identify() + if self._identity.board_type != self.get_board_type(): + raise IncorrectBoardError(self._identity.board_type, self.get_board_type()) + self._serial.set_identity(self._identity) + + # Reset the board to a known state + self._serial.write('*RESET') + + @classmethod + def initialise(cls) -> 'LedServer' | None: + """Initialise the LED server using simulator discovery.""" + # The filter here is the name of the emulated board in the simulator + boards = get_simulator_boards('LedBoard') + + if not boards: + return None + + board_info = boards[0] + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + + try: + board = cls(board_info.url, initial_identity) + except BoardDisconnectionError: + logger.warning( + f"Simulator specified LED board at port {board_info.url!r}, " + "could not be identified. Ignoring this device") + return None + except IncorrectBoardError as err: + logger.warning( + f"Board returned type {err.returned_type!r}, " + f"expected {err.expected_type!r}. Ignoring this device") + return None + + return board + + def identify(self) -> BoardIdentity: + """ + Get the identity of the board. + + :return: The identity of the board. + """ + response = self._serial.query('*IDN?') + return BoardIdentity(*response.split(':')) + + def set_leds(self, led_num: int, value: tuple[bool, bool, bool]) -> None: + """Set the colour of the LED.""" + self._serial.write(f'LED:{led_num}:SET:{value[0]:d}:{value[1]:d}:{value[2]:d}') + + def get_leds(self, led_num: int) -> tuple[bool, bool, bool]: + """Get the colour of the LED.""" + response = self._serial.query(f'LED:{led_num}:GET?') + red, green, blue = response.split(':') + return bool(int(red)), bool(int(green)), bool(int(blue)) + + +class SimulationLED(LED): + """ + User programmable LED. + + Used when running in the simulator to control the simulated LEDs. + """ + __slots__ = ('_led_num', '_server') + + def __init__(self, led_num: int, server: LedServer) -> None: + self._led_num = led_num + self._server = server + + @property + def r(self) -> bool: + """Get the state of the Red LED segment.""" + return self._server.get_leds(self._led_num)[0] + + @r.setter + def r(self, value: bool) -> None: + """Set the state of the Red LED segment.""" + # Fetch the current state of the LED so we can update only the red value + current = self._server.get_leds(self._led_num) + self._server.set_leds(self._led_num, (value, current[1], current[2])) + + @property + def g(self) -> bool: + """Get the state of the Green LED segment.""" + return self._server.get_leds(self._led_num)[1] + + @g.setter + def g(self, value: bool) -> None: + """Set the state of the Green LED segment.""" + # Fetch the current state of the LED so we can update only the green value + current = self._server.get_leds(self._led_num) + self._server.set_leds(self._led_num, (current[0], value, current[2])) + + @property + def b(self) -> bool: + """Get the state of the Blue LED segment.""" + return self._server.get_leds(self._led_num)[2] + + @b.setter + def b(self, value: bool) -> None: + """Set the state of the Blue LED segment.""" + # Fetch the current state of the LED so we can update only the blue value + current = self._server.get_leds(self._led_num) + self._server.set_leds(self._led_num, (current[0], current[1], value)) + + @property + def colour(self) -> tuple[bool, bool, bool]: + """Get the colour of the user LED.""" + return self._server.get_leds(self._led_num) + + @colour.setter + def colour(self, value: tuple[bool, bool, bool]) -> None: + """Set the colour of the user LED.""" + if not isinstance(value, (tuple, list)) or len(value) != 3: + raise ValueError("The LED requires 3 values for it's colour") + + self._server.set_leds(self._led_num, value) diff --git a/sr/robot3/motor_board.py b/sr/robot3/motor_board.py index db561af..3b1af7d 100644 --- a/sr/robot3/motor_board.py +++ b/sr/robot3/motor_board.py @@ -13,8 +13,8 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, - get_USB_identity, map_to_float, map_to_int, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, map_to_float, map_to_int, ) logger = logging.getLogger(__name__) @@ -113,17 +113,46 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified motor board at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found motor board-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified motor board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified motor board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + return None + + logger.warning( + f"Found motor board-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, MotorBoard]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to boards. + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('MotorBoard'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, manual_boards: Optional[list[str]] = None, @@ -137,6 +166,9 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to motor boards. """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} serial_ports = comports() for port in serial_ports: diff --git a/sr/robot3/power_board.py b/sr/robot3/power_board.py index 6978822..f3d31bd 100644 --- a/sr/robot3/power_board.py +++ b/sr/robot3/power_board.py @@ -6,17 +6,21 @@ from enum import IntEnum from time import sleep from types import MappingProxyType -from typing import NamedTuple, Optional +from typing import Callable, NamedTuple, Optional from serial.tools.list_ports import comports from .exceptions import IncorrectBoardError from .logging import log_to_debug from .serial_wrapper import SerialWrapper -from .utils import Board, BoardIdentity, float_bounds_check, get_USB_identity +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, +) logger = logging.getLogger(__name__) BAUDRATE = 115200 # Since the power board is a USB device, this is ignored +_SLEEP_FN = sleep # For use in the simulator class PowerOutputPosition(IntEnum): @@ -132,20 +136,50 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified power board at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found power board-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified power board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified power board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + + logger.warning( + f"Found power board-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, PowerBoard]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to boards. + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('PowerBoard'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( - cls, manual_boards: Optional[list[str]] = None, + cls, + manual_boards: Optional[list[str]] = None, + sleep_fn: Optional[Callable[[float], None]] = None, ) -> MappingProxyType[str, PowerBoard]: """ Find all connected power boards. @@ -156,6 +190,12 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to power boards. """ + if sleep_fn is not None: + global _SLEEP_FN + _SLEEP_FN = sleep_fn + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} serial_ports = comports() for port in serial_ports: @@ -507,7 +547,7 @@ def buzz(self, frequency: float, duration: float, *, blocking: bool = False) -> self._serial.write(cmd) if blocking: - sleep(duration) + _SLEEP_FN(duration) def __repr__(self) -> str: return f"<{self.__class__.__qualname__}: {self._serial}>" diff --git a/sr/robot3/robot.py b/sr/robot3/robot.py index 577cd53..0a76871 100644 --- a/sr/robot3/robot.py +++ b/sr/robot3/robot.py @@ -3,8 +3,10 @@ import itertools import logging +import os import time from pathlib import Path +from socket import socket from types import MappingProxyType from typing import Mapping, Optional @@ -20,7 +22,8 @@ from .power_board import Note, PowerBoard from .raw_serial import RawSerial from .servo_board import ServoBoard -from .utils import ensure_atexit_on_term, obtain_lock, singular +from .simulator.time_server import TimeServer +from .utils import IN_SIMULATOR, ensure_atexit_on_term, obtain_lock, singular logger = logging.getLogger(__name__) @@ -44,6 +47,7 @@ class Robot: __slots__ = ( '_lock', '_metadata', '_power_board', '_motor_boards', '_servo_boards', '_arduinos', '_cameras', '_mqtt', '_astoria', '_kch', '_raw_ports', + '_time_server', ) def __init__( @@ -56,7 +60,15 @@ def __init__( manual_boards: Optional[dict[str, list[str]]] = None, raw_ports: Optional[list[tuple[str, int]]] = None, ) -> None: - self._lock = obtain_lock() + self._lock: TimeServer | socket | None + if IN_SIMULATOR: + self._lock = TimeServer.initialise() + if self._lock is None: + raise OSError( + 'Unable to obtain lock, Is another robot instance already running?' + ) + else: + self._lock = obtain_lock() self._metadata: Optional[Metadata] = None setup_logging(debug, trace_logging) @@ -64,7 +76,11 @@ def __init__( logger.info(f"sr.robot3 version {__version__}") - self._mqtt, self._astoria = init_astoria_mqtt() + if IN_SIMULATOR: + self._mqtt = None + self._astoria = None + else: + self._mqtt, self._astoria = init_astoria_mqtt() if manual_boards: self._init_power_board(manual_boards.get(PowerBoard.get_board_type(), [])) @@ -88,7 +104,7 @@ def _init_power_board(self, manual_boards: Optional[list[str]] = None) -> None: defaults to None :raises RuntimeError: If exactly one PowerBoard is not found """ - power_boards = PowerBoard._get_supported_boards(manual_boards) + power_boards = PowerBoard._get_supported_boards(manual_boards, sleep_fn=self.sleep) self._power_board = singular(power_boards) # Enable all the outputs, so that we can find other boards. @@ -125,7 +141,10 @@ def _init_aux_boards( self._servo_boards = ServoBoard._get_supported_boards(manual_servoboards) self._arduinos = Arduino._get_supported_boards(manual_arduinos, ignored_arduinos) if raw_ports: - self._raw_ports = RawSerial._get_supported_boards(raw_ports) + if not IN_SIMULATOR: + self._raw_ports = RawSerial._get_supported_boards(raw_ports) + else: + logger.warning("Raw ports are not available in the simulator.") def _init_camera(self) -> None: """ @@ -134,9 +153,15 @@ def _init_camera(self) -> None: These cameras are used for AprilTag detection and provide location data of markers in its field of view. """ + if IN_SIMULATOR: + publish_fn = None + else: + assert self._mqtt is not None + publish_fn = self._mqtt.wrapped_publish + self._cameras = MappingProxyType(_setup_cameras( game_specific.MARKER_SIZES, - self._mqtt.wrapped_publish, + publish_fn, )) def _log_connected_boards(self) -> None: @@ -279,27 +304,35 @@ def sleep(self, secs: float) -> None: """ Sleep for a number of seconds. - This is a convenience method that can be used instead of time.sleep(). - This exists for compatibility with the simulator API only. + This is a convenience method that should be used instead of time.sleep() + to make your code compatible with the simulator. :param secs: The number of seconds to sleep for """ - time.sleep(secs) + if IN_SIMULATOR: + assert isinstance(self._lock, TimeServer) + self._lock.sleep(secs) + else: + time.sleep(secs) @log_to_debug def time(self) -> float: """ Get the number of seconds since the Unix Epoch. - This is a convenience method that can be used instead of time.time(). - This exists for compatibility with the simulator API only. + This is a convenience method that should be used instead of time.time() + to make your code compatible with the simulator. NOTE: The robot's clock resets each time the robot is restarted, so this will not be the correct actual time but can be used to measure elapsed time. :returns: the number of seconds since the Unix Epoch. """ - return time.time() + if IN_SIMULATOR: + assert isinstance(self._lock, TimeServer) + return self._lock.get_time() + else: + return time.time() @property @log_to_debug @@ -350,7 +383,11 @@ def usbkey(self) -> Path: :returns: path to the mountpoint of the USB code drive. """ - return self._astoria.fetch_mount_path() + if IN_SIMULATOR: + return Path(os.environ['SBOT_USBKEY_PATH']) + else: + assert self._astoria is not None + return self._astoria.fetch_mount_path() @property def is_simulated(self) -> bool: @@ -359,7 +396,7 @@ def is_simulated(self) -> bool: :returns: True if the robot is simulated. False otherwise. """ - return False + return IN_SIMULATOR @log_to_debug def wait_start(self) -> None: @@ -371,25 +408,39 @@ def wait_start(self) -> None: Once the start button is pressed, the metadata will be loaded and the timeout will start if in competition mode. """ + def null_button_pressed() -> bool: + return False + + if IN_SIMULATOR: + remote_start_pressed = null_button_pressed + else: + assert self._astoria is not None + remote_start_pressed = self._astoria.get_start_button_pressed + # ignore previous button presses _ = self.power_board._start_button() - _ = self._astoria.get_start_button_pressed() + _ = remote_start_pressed() logger.info('Waiting for start button.') self.power_board.piezo.buzz(Note.A6, 0.1) self.power_board._run_led.flash() self.kch._flash_start() - while not ( - self.power_board._start_button() or self._astoria.get_start_button_pressed() - ): - time.sleep(0.1) + while not self.power_board._start_button() or remote_start_pressed(): + self.sleep(0.1) logger.info("Start signal received; continuing.") self.power_board._run_led.on() self.kch._start = False - # Load the latest metadata that we have received over MQTT - self._metadata = self._astoria.fetch_current_metadata() + if IN_SIMULATOR: + self._metadata = Metadata.parse_file( + Path(os.environ['SBOT_METADATA_PATH']) / 'astoria.json' + ) + else: + assert self._astoria is not None + # Load the latest metadata that we have received over MQTT + self._metadata = self._astoria.fetch_current_metadata() - if self._metadata.game_timeout is not None: + # Simulator timeout is handled by the simulator supervisor + if self._metadata.game_timeout and not IN_SIMULATOR: timeout.kill_after_delay(self._metadata.game_timeout) diff --git a/sr/robot3/serial_wrapper.py b/sr/robot3/serial_wrapper.py index 8276842..c0a1724 100644 --- a/sr/robot3/serial_wrapper.py +++ b/sr/robot3/serial_wrapper.py @@ -17,7 +17,7 @@ from .exceptions import BoardDisconnectionError from .logging import TRACE -from .utils import BoardIdentity +from .utils import IN_SIMULATOR, BoardIdentity logger = logging.getLogger(__name__) @@ -29,7 +29,12 @@ RetType = TypeVar("RetType") E = TypeVar("E", bound=BaseException) -BASE_TIMEOUT: float | None = 0.5 +BASE_TIMEOUT: float | None + +if IN_SIMULATOR: + BASE_TIMEOUT = None # Disable timeouts while in the simulator to allow for pausing +else: + BASE_TIMEOUT = 0.5 def retry( @@ -208,9 +213,10 @@ def _connect(self) -> bool: """ try: self.serial.open() - # Wait for the board to be ready to receive data - # Certain boards will reset when the serial port is opened - time.sleep(self.delay_after_connect) + if not IN_SIMULATOR: + # Wait for the board to be ready to receive data + # Certain boards will reset when the serial port is opened + time.sleep(self.delay_after_connect) self.serial.reset_input_buffer() except serial.SerialException: logger.error( diff --git a/sr/robot3/servo_board.py b/sr/robot3/servo_board.py index 4aaa95a..3ce28da 100644 --- a/sr/robot3/servo_board.py +++ b/sr/robot3/servo_board.py @@ -12,8 +12,8 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, - get_USB_identity, map_to_float, map_to_int, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, map_to_float, map_to_int, ) DUTY_MIN = 500 @@ -110,17 +110,46 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified servo board at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found servo board-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified servo board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified servo board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + return None + + logger.warning( + f"Found servo board-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, ServoBoard]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to boards. + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('ServoBoard'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, manual_boards: Optional[list[str]] = None, @@ -134,6 +163,9 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to servo boards. """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} serial_ports = comports() for port in serial_ports: diff --git a/sr/robot3/simulator/__init__.py b/sr/robot3/simulator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sr/robot3/simulator/camera.py b/sr/robot3/simulator/camera.py new file mode 100644 index 0000000..e5bd134 --- /dev/null +++ b/sr/robot3/simulator/camera.py @@ -0,0 +1,75 @@ +import struct + +import cv2 +import numpy as np +from april_vision import FrameSource +from numpy.typing import NDArray +from serial import serial_for_url + +from ..utils import BoardInfo + +HEADER_SIZE = 5 # 1 byte for the type, 4 bytes for the length +IMAGE_TAG_ID = 0 + + +class WebotsRemoteCameraSource(FrameSource): + # Webots cameras include an alpha channel, this informs april_vision of how to handle it + COLOURSPACE = cv2.COLOR_BGRA2GRAY + + def __init__(self, camera_info: BoardInfo) -> None: + self.calibration = (0.0, 0.0, 0.0, 0.0) + # Use pyserial to give a nicer interface for connecting to the camera socket + self._serial = serial_for_url(camera_info.url, baudrate=115200, timeout=None) + + # Check the camera is connected + response = self._make_request("*IDN?") + if not response.split(b":")[1].lower().startswith(b"cam"): + raise RuntimeError(f"Camera not connected to a camera, returned: {response!r}") + + # Get the calibration data for this camera + response = self._make_request("CAM:CALIBRATION?") + + # The calibration data is returned as a string of floats separated by colons + new_calibration = tuple(map(float, response.split(b":"))) + assert len(new_calibration) == 4, f"Invalid calibration data: {new_calibration}" + self.calibration = new_calibration + + # Get the image size for this camera + response = self._make_request("CAM:RESOLUTION?") + self.image_size = tuple(map(int, response.split(b":"))) + assert len(self.image_size) == 2, f"Invalid image dimensions: {self.image_size}" + + def read(self, fresh: bool = True) -> NDArray: + """ + The method for getting a new frame. + + :param fresh: Whether to flush the device's buffer before capturing + the frame, unused. + """ + self._serial.write(b"CAM:FRAME!\n") + # The image is encoded as a TLV (Type, Length, Value) packet + # so we need to read the header to get the type and length of the image + header = self._serial.read(HEADER_SIZE) + assert len(header) == HEADER_SIZE, f"Invalid header length: {len(header)}" + img_tag, img_len = struct.unpack('>BI', header) + assert img_tag == IMAGE_TAG_ID, f"Invalid image tag: {img_tag}" + + # Get the image data now we know the length + img_data = self._serial.read(img_len) + assert len(img_data) == img_len, f"Invalid image data length: {len(img_data)}" + + rgb_frame_raw: NDArray[np.uint8] = np.frombuffer(img_data, np.uint8) + + # Height is first, then width, then channels + return rgb_frame_raw.reshape((self.image_size[1], self.image_size[0], 4)) + + def close(self) -> None: + """Close the underlying socket on exit.""" + self._serial.close() + + def _make_request(self, command: str) -> bytes: + self._serial.write(command.encode() + b"\n") + response = self._serial.readline() + if not response.endswith(b"\n") or response.startswith(b"NACK:"): + raise RuntimeError(f"Failed to communicate with camera, returned: {response!r}") + return response diff --git a/sr/robot3/simulator/time_server.py b/sr/robot3/simulator/time_server.py new file mode 100644 index 0000000..26a8be1 --- /dev/null +++ b/sr/robot3/simulator/time_server.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import logging +from datetime import datetime + +from ..exceptions import BoardDisconnectionError, IncorrectBoardError +from ..serial_wrapper import SerialWrapper +from ..utils import BoardIdentity, get_simulator_boards + +logger = logging.getLogger(__name__) + +BAUDRATE = 115200 + + +class TimeServer: + @staticmethod + def get_board_type() -> str: + """ + Return the type of the board. + + :return: The literal string 'TimeServer'. + """ + return 'TimeServer' + + def __init__( + self, + serial_port: str, + initial_identity: BoardIdentity | None = None, + ) -> None: + if initial_identity is None: + initial_identity = BoardIdentity() + self._serial = SerialWrapper( + serial_port, + BAUDRATE, + identity=initial_identity, + # Disable the timeout so sleep works properly + timeout=None, + ) + + self._identity = self.identify() + if self._identity.board_type != self.get_board_type(): + raise IncorrectBoardError(self._identity.board_type, self.get_board_type()) + self._serial.set_identity(self._identity) + + @classmethod + def initialise(cls) -> 'TimeServer' | None: + # The filter here is the name of the emulated board in the simulator + boards = get_simulator_boards('TimeServer') + + if not boards: + return None + + board_info = boards[0] + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + + try: + board = cls(board_info.url, initial_identity) + except BoardDisconnectionError: + logger.warning( + f"Simulator specified time server at port {board_info.url!r}, " + "could not be identified. Ignoring this device") + return None + except IncorrectBoardError as err: + logger.warning( + f"Board returned type {err.returned_type!r}, " + f"expected {err.expected_type!r}. Ignoring this device") + return None + + return board + + def identify(self) -> BoardIdentity: + """ + Get the identity of the board. + + :return: The identity of the board. + """ + response = self._serial.query('*IDN?') + return BoardIdentity(*response.split(':')) + + def get_time(self) -> float: + time_str = self._serial.query('TIME?') + return datetime.fromisoformat(time_str).timestamp() + + def sleep(self, duration: float) -> None: + if duration < 0: + raise ValueError("sleep length must be non-negative") + + self._serial.query(f'SLEEP:{int(duration * 1000)}') diff --git a/sr/robot3/utils.py b/sr/robot3/utils.py index 8d14e19..87ee6cd 100644 --- a/sr/robot3/utils.py +++ b/sr/robot3/utils.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging +import os import signal import socket from abc import ABC, abstractmethod @@ -14,6 +15,8 @@ T = TypeVar('T') logger = logging.getLogger(__name__) +IN_SIMULATOR = os.environ.get('WEBOTS_SIMULATOR', '') == '1' + class BoardIdentity(NamedTuple): """ @@ -34,6 +37,13 @@ class BoardIdentity(NamedTuple): sw_version: str = "" +class BoardInfo(NamedTuple): + """A container for the information about a board connection.""" + url: str + serial_number: str + type_str: str + + class Board(ABC): """ This is the base class for all boards. @@ -228,6 +238,36 @@ def handle_signal(handled_signum: int, frame: Optional[FrameType]) -> None: signal.signal(signal.SIGTERM, handle_signal) +def get_simulator_boards(board_filter: str = '') -> list[BoardInfo]: + """ + Get a list of all boards configured in the simulator. + + This is used to support discovery of boards in the simulator environment. + + :param board_filter: A filter to only return boards of a certain type + :return: A list of board connection information + """ + if 'WEBOTS_ROBOT' not in os.environ: + return [] + + simulator_data = os.environ['WEBOTS_ROBOT'].splitlines() + simulator_boards = [] + + for board_data in simulator_data: + board_data = board_data.rstrip('/') + board_fragment, serial_number = board_data.rsplit('/', 1) + board_url, board_type = board_fragment.rsplit('/', 1) + + board_info = BoardInfo(url=board_url, serial_number=serial_number, type_str=board_type) + + if board_filter and board_info.type_str != board_filter: + continue + + simulator_boards.append(board_info) + + return simulator_boards + + def list_ports() -> None: """ Print a list of all connected USB serial ports.