From d2d49103c69e87dbfeab6f5d59791a0c46da654a Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sat, 17 Aug 2024 17:36:37 +0100 Subject: [PATCH 01/12] Initial implementation for simulator integration Currently excludes support for KCH, astoria & arduino --- sr/robot3/arduino.py | 53 ++++++++++++++--- sr/robot3/camera.py | 81 +++++++++++++++++++++---- sr/robot3/motor_board.py | 52 +++++++++++++---- sr/robot3/power_board.py | 52 ++++++++++++++--- sr/robot3/robot.py | 32 ++++++++-- sr/robot3/serial_wrapper.py | 15 +++-- sr/robot3/servo_board.py | 52 +++++++++++++---- sr/robot3/simulator/__init__.py | 0 sr/robot3/simulator/camera.py | 75 ++++++++++++++++++++++++ sr/robot3/simulator/time_server.py | 94 ++++++++++++++++++++++++++++++ sr/robot3/utils.py | 40 +++++++++++++ 11 files changed, 487 insertions(+), 59 deletions(-) create mode 100644 sr/robot3/simulator/__init__.py create mode 100644 sr/robot3/simulator/camera.py create mode 100644 sr/robot3/simulator/time_server.py diff --git a/sr/robot3/arduino.py b/sr/robot3/arduino.py index c3d86c68..1d38d424 100644 --- a/sr/robot3/arduino.py +++ b/sr/robot3/arduino.py @@ -11,7 +11,10 @@ from .exceptions import IncorrectBoardError from .logging import log_to_debug from .serial_wrapper import SerialWrapper -from .utils import Board, BoardIdentity, get_USB_identity, map_to_float +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, + get_simulator_boards, get_USB_identity, map_to_float, +) logger = logging.getLogger(__name__) BAUDRATE = 115200 @@ -127,17 +130,46 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified Arduino at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found Arduino-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified Arduino at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified arduino at port {serial_port!r}, " + "could not be identified. Ignoring this device") + return None + + logger.warning( + f"Found Arduino-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, Arduino]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to Arduinos + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('Arduino'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, @@ -152,6 +184,9 @@ def _get_supported_boards( :param ignored_serials: A list of serial number to ignore during board discovery :return: A mapping of board serial numbers to Arduinos """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} if ignored_serials is None: ignored_serials = [] diff --git a/sr/robot3/camera.py b/sr/robot3/camera.py index db659fab..2734ef8a 100644 --- a/sr/robot3/camera.py +++ b/sr/robot3/camera.py @@ -1,9 +1,11 @@ """An implementation of a camera board using the april_vision library.""" +from __future__ import annotations + import logging from pathlib import Path from typing import Callable, Dict, Iterable, List, Optional, Union -from april_vision import CalibratedCamera, Frame +from april_vision import CalibratedCamera, Frame, FrameSource from april_vision import Marker as AprilMarker from april_vision import ( Processor, USBCamera, __version__, calibrations, @@ -13,8 +15,11 @@ from numpy.typing import NDArray from .marker import Marker -from .utils import Board, BoardIdentity +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, BoardInfo, get_simulator_boards, +) +PathLike = Union[Path, str] LOGGER = logging.getLogger(__name__) robot_calibrations = calibrations.copy() @@ -30,9 +35,11 @@ class AprilCamera(Board): in order to determine the spatial positon and orientation of the markers that it has detected. - :param camera_id: The index of the camera to use. - :param camera_data: The calibration data for the camera. + :param camera_source: The source of the camera frames. + :param calibration: The intrinsic calibration of the camera. :param serial_num: The serial number of the camera. + :param name: The name of the camera. + :param vidpid: The VID:PID of the camera. """ __slots__ = ('_serial_num', '_cam') @@ -56,28 +63,82 @@ def _discover(cls) -> Dict[str, 'AprilCamera']: :return: A dict of cameras, keyed by their name and index. """ + if IN_SIMULATOR: + return { + camera_info.serial_number: cls.from_webots_camera(camera_info) + for camera_info in get_simulator_boards('CameraBoard') + } + return { (serial := f"{camera_data.name} - {camera_data.index}"): - cls(camera_data.index, camera_data=camera_data, serial_num=serial) + cls.from_id(camera_data.index, camera_data=camera_data, serial_num=serial) for camera_data in find_cameras(robot_calibrations) } - def __init__(self, camera_id: int, camera_data: CalibratedCamera, serial_num: str) -> None: + def __init__( + self, camera_source: FrameSource, + calibration: tuple[float, float, float, float] | None, + serial_num: str, + name: str, + vidpid: str = "", + ) -> None: + # The processor handles the detection and pose estimation + self._cam = Processor( + camera_source, + calibration=calibration, + name=name, + vidpid=vidpid, + mask_unknown_size_tags=True, + ) + self._serial_num = serial_num + + @classmethod + def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera': + """ + Create a camera from a webots camera. + + :param camera_info: The information about the virtual camera, + including the url to connect to. + :return: The camera object. + """ + from .simulator.camera import WebotsRemoteCameraSource + + camera_source = WebotsRemoteCameraSource(camera_info) + return cls( + camera_source, + calibration=camera_source.calibration, + serial_num=camera_info.serial_number, + name=camera_info.serial_number, + ) + + @classmethod + def from_id( + cls, + camera_id: int, + camera_data: CalibratedCamera, + serial_num: str, + ) -> 'AprilCamera': + """ + Create a camera from an ID. + + :param camera_id: The ID of the camera to create. + :param camera_data: The calibration data for the camera. + :param serial_num: The serial number of the camera. + :return: The camera object. + """ # The camera source handles the connection between the camera and the processor camera_source = USBCamera.from_calibration_file( camera_id, calibration_file=camera_data.calibration, vidpid=camera_data.vidpid, ) - # The processor handles the detection and pose estimation - self._cam = Processor( + return cls( camera_source, calibration=camera_source.calibration, + serial_num=serial_num, name=camera_data.name, vidpid=camera_data.vidpid, - mask_unknown_size_tags=True, ) - self._serial_num = serial_num def identify(self) -> BoardIdentity: """ diff --git a/sr/robot3/motor_board.py b/sr/robot3/motor_board.py index db561af7..3b1af7dc 100644 --- a/sr/robot3/motor_board.py +++ b/sr/robot3/motor_board.py @@ -13,8 +13,8 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, - get_USB_identity, map_to_float, map_to_int, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, map_to_float, map_to_int, ) logger = logging.getLogger(__name__) @@ -113,17 +113,46 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified motor board at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found motor board-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified motor board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified motor board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + return None + + logger.warning( + f"Found motor board-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, MotorBoard]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to boards. + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('MotorBoard'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, manual_boards: Optional[list[str]] = None, @@ -137,6 +166,9 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to motor boards. """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} serial_ports = comports() for port in serial_ports: diff --git a/sr/robot3/power_board.py b/sr/robot3/power_board.py index 6978822e..21fb8e24 100644 --- a/sr/robot3/power_board.py +++ b/sr/robot3/power_board.py @@ -13,7 +13,10 @@ from .exceptions import IncorrectBoardError from .logging import log_to_debug from .serial_wrapper import SerialWrapper -from .utils import Board, BoardIdentity, float_bounds_check, get_USB_identity +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, +) logger = logging.getLogger(__name__) BAUDRATE = 115200 # Since the power board is a USB device, this is ignored @@ -132,17 +135,45 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified power board at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found power board-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified power board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified power board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + + logger.warning( + f"Found power board-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, PowerBoard]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to boards. + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('PowerBoard'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, manual_boards: Optional[list[str]] = None, @@ -156,6 +187,9 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to power boards. """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} serial_ports = comports() for port in serial_ports: diff --git a/sr/robot3/robot.py b/sr/robot3/robot.py index 577cd530..063b434e 100644 --- a/sr/robot3/robot.py +++ b/sr/robot3/robot.py @@ -5,6 +5,7 @@ import logging import time from pathlib import Path +from socket import socket from types import MappingProxyType from typing import Mapping, Optional @@ -20,7 +21,8 @@ from .power_board import Note, PowerBoard from .raw_serial import RawSerial from .servo_board import ServoBoard -from .utils import ensure_atexit_on_term, obtain_lock, singular +from .simulator.time_server import TimeServer +from .utils import IN_SIMULATOR, ensure_atexit_on_term, obtain_lock, singular logger = logging.getLogger(__name__) @@ -44,6 +46,7 @@ class Robot: __slots__ = ( '_lock', '_metadata', '_power_board', '_motor_boards', '_servo_boards', '_arduinos', '_cameras', '_mqtt', '_astoria', '_kch', '_raw_ports', + '_time_server', ) def __init__( @@ -56,7 +59,15 @@ def __init__( manual_boards: Optional[dict[str, list[str]]] = None, raw_ports: Optional[list[tuple[str, int]]] = None, ) -> None: - self._lock = obtain_lock() + self._lock: TimeServer | socket | None + if IN_SIMULATOR: + self._lock = TimeServer.initialise() + if self._lock is None: + raise OSError( + 'Unable to obtain lock, Is another robot instance already running?' + ) + else: + self._lock = obtain_lock() self._metadata: Optional[Metadata] = None setup_logging(debug, trace_logging) @@ -284,7 +295,11 @@ def sleep(self, secs: float) -> None: :param secs: The number of seconds to sleep for """ - time.sleep(secs) + if IN_SIMULATOR: + assert isinstance(self._lock, TimeServer) + self._lock.sleep(secs) + else: + time.sleep(secs) @log_to_debug def time(self) -> float: @@ -299,7 +314,11 @@ def time(self) -> float: :returns: the number of seconds since the Unix Epoch. """ - return time.time() + if IN_SIMULATOR: + assert isinstance(self._lock, TimeServer) + self._lock.get_time() + else: + return time.time() @property @log_to_debug @@ -359,7 +378,7 @@ def is_simulated(self) -> bool: :returns: True if the robot is simulated. False otherwise. """ - return False + return IN_SIMULATOR @log_to_debug def wait_start(self) -> None: @@ -391,5 +410,6 @@ def wait_start(self) -> None: # Load the latest metadata that we have received over MQTT self._metadata = self._astoria.fetch_current_metadata() - if self._metadata.game_timeout is not None: + # Simulator timeout is handled by the simulator supervisor + if self._metadata.game_timeout and not IN_SIMULATOR: timeout.kill_after_delay(self._metadata.game_timeout) diff --git a/sr/robot3/serial_wrapper.py b/sr/robot3/serial_wrapper.py index 8276842f..87b9932e 100644 --- a/sr/robot3/serial_wrapper.py +++ b/sr/robot3/serial_wrapper.py @@ -17,7 +17,7 @@ from .exceptions import BoardDisconnectionError from .logging import TRACE -from .utils import BoardIdentity +from .utils import IN_SIMULATOR, BoardIdentity logger = logging.getLogger(__name__) @@ -29,8 +29,12 @@ RetType = TypeVar("RetType") E = TypeVar("E", bound=BaseException) -BASE_TIMEOUT: float | None = 0.5 +BASE_TIMEOUT: float | None +if IN_SIMULATOR: + BASE_TIMEOUT = None # Disable timeouts while in the simulator to allow for pausing +else: + BASE_TIMEOUT = 0.5 def retry( times: int, exceptions: Union[type[E], tuple[type[E], ...]] @@ -208,9 +212,10 @@ def _connect(self) -> bool: """ try: self.serial.open() - # Wait for the board to be ready to receive data - # Certain boards will reset when the serial port is opened - time.sleep(self.delay_after_connect) + if not IN_SIMULATOR: + # Wait for the board to be ready to receive data + # Certain boards will reset when the serial port is opened + time.sleep(self.delay_after_connect) self.serial.reset_input_buffer() except serial.SerialException: logger.error( diff --git a/sr/robot3/servo_board.py b/sr/robot3/servo_board.py index 4aaa95af..3ce28da7 100644 --- a/sr/robot3/servo_board.py +++ b/sr/robot3/servo_board.py @@ -12,8 +12,8 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, - get_USB_identity, map_to_float, map_to_int, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, map_to_float, map_to_int, ) DUTY_MIN = 500 @@ -110,17 +110,46 @@ def _get_valid_board( f"expected {err.expected_type!r}. Ignoring this device") return None except Exception: - if initial_identity is not None and initial_identity.board_type == 'manual': - logger.warning( - f"Manually specified servo board at port {serial_port!r}, " - "could not be identified. Ignoring this device") - else: - logger.warning( - f"Found servo board-like serial port at {serial_port!r}, " - "but it could not be identified. Ignoring this device") + if initial_identity is not None: + if initial_identity.board_type == 'manual': + logger.warning( + f"Manually specified servo board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + elif initial_identity.manufacturer == 'sbot_simulator': + logger.warning( + f"Simulator specified servo board at port {serial_port!r}, " + "could not be identified. Ignoring this device") + return None + + logger.warning( + f"Found servo board-like serial port at {serial_port!r}, " + "but it could not be identified. Ignoring this device") return None return board + @classmethod + def _get_simulator_boards(cls) -> MappingProxyType[str, ServoBoard]: + """ + Get the simulator boards. + + :return: A mapping of board serial numbers to boards. + """ + boards = {} + # The filter here is the name of the emulated board in the simulator + for board_info in get_simulator_boards('ServoBoard'): + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + if (board := cls._get_valid_board(board_info.url, initial_identity)) is None: + continue + + boards[board._identity.asset_tag] = board + return MappingProxyType(boards) + @classmethod def _get_supported_boards( cls, manual_boards: Optional[list[str]] = None, @@ -134,6 +163,9 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to servo boards. """ + if IN_SIMULATOR: + return cls._get_simulator_boards() + boards = {} serial_ports = comports() for port in serial_ports: diff --git a/sr/robot3/simulator/__init__.py b/sr/robot3/simulator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sr/robot3/simulator/camera.py b/sr/robot3/simulator/camera.py new file mode 100644 index 00000000..e5bd134e --- /dev/null +++ b/sr/robot3/simulator/camera.py @@ -0,0 +1,75 @@ +import struct + +import cv2 +import numpy as np +from april_vision import FrameSource +from numpy.typing import NDArray +from serial import serial_for_url + +from ..utils import BoardInfo + +HEADER_SIZE = 5 # 1 byte for the type, 4 bytes for the length +IMAGE_TAG_ID = 0 + + +class WebotsRemoteCameraSource(FrameSource): + # Webots cameras include an alpha channel, this informs april_vision of how to handle it + COLOURSPACE = cv2.COLOR_BGRA2GRAY + + def __init__(self, camera_info: BoardInfo) -> None: + self.calibration = (0.0, 0.0, 0.0, 0.0) + # Use pyserial to give a nicer interface for connecting to the camera socket + self._serial = serial_for_url(camera_info.url, baudrate=115200, timeout=None) + + # Check the camera is connected + response = self._make_request("*IDN?") + if not response.split(b":")[1].lower().startswith(b"cam"): + raise RuntimeError(f"Camera not connected to a camera, returned: {response!r}") + + # Get the calibration data for this camera + response = self._make_request("CAM:CALIBRATION?") + + # The calibration data is returned as a string of floats separated by colons + new_calibration = tuple(map(float, response.split(b":"))) + assert len(new_calibration) == 4, f"Invalid calibration data: {new_calibration}" + self.calibration = new_calibration + + # Get the image size for this camera + response = self._make_request("CAM:RESOLUTION?") + self.image_size = tuple(map(int, response.split(b":"))) + assert len(self.image_size) == 2, f"Invalid image dimensions: {self.image_size}" + + def read(self, fresh: bool = True) -> NDArray: + """ + The method for getting a new frame. + + :param fresh: Whether to flush the device's buffer before capturing + the frame, unused. + """ + self._serial.write(b"CAM:FRAME!\n") + # The image is encoded as a TLV (Type, Length, Value) packet + # so we need to read the header to get the type and length of the image + header = self._serial.read(HEADER_SIZE) + assert len(header) == HEADER_SIZE, f"Invalid header length: {len(header)}" + img_tag, img_len = struct.unpack('>BI', header) + assert img_tag == IMAGE_TAG_ID, f"Invalid image tag: {img_tag}" + + # Get the image data now we know the length + img_data = self._serial.read(img_len) + assert len(img_data) == img_len, f"Invalid image data length: {len(img_data)}" + + rgb_frame_raw: NDArray[np.uint8] = np.frombuffer(img_data, np.uint8) + + # Height is first, then width, then channels + return rgb_frame_raw.reshape((self.image_size[1], self.image_size[0], 4)) + + def close(self) -> None: + """Close the underlying socket on exit.""" + self._serial.close() + + def _make_request(self, command: str) -> bytes: + self._serial.write(command.encode() + b"\n") + response = self._serial.readline() + if not response.endswith(b"\n") or response.startswith(b"NACK:"): + raise RuntimeError(f"Failed to communicate with camera, returned: {response!r}") + return response diff --git a/sr/robot3/simulator/time_server.py b/sr/robot3/simulator/time_server.py new file mode 100644 index 00000000..26a8be1b --- /dev/null +++ b/sr/robot3/simulator/time_server.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import logging +from datetime import datetime + +from ..exceptions import BoardDisconnectionError, IncorrectBoardError +from ..serial_wrapper import SerialWrapper +from ..utils import BoardIdentity, get_simulator_boards + +logger = logging.getLogger(__name__) + +BAUDRATE = 115200 + + +class TimeServer: + @staticmethod + def get_board_type() -> str: + """ + Return the type of the board. + + :return: The literal string 'TimeServer'. + """ + return 'TimeServer' + + def __init__( + self, + serial_port: str, + initial_identity: BoardIdentity | None = None, + ) -> None: + if initial_identity is None: + initial_identity = BoardIdentity() + self._serial = SerialWrapper( + serial_port, + BAUDRATE, + identity=initial_identity, + # Disable the timeout so sleep works properly + timeout=None, + ) + + self._identity = self.identify() + if self._identity.board_type != self.get_board_type(): + raise IncorrectBoardError(self._identity.board_type, self.get_board_type()) + self._serial.set_identity(self._identity) + + @classmethod + def initialise(cls) -> 'TimeServer' | None: + # The filter here is the name of the emulated board in the simulator + boards = get_simulator_boards('TimeServer') + + if not boards: + return None + + board_info = boards[0] + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + + try: + board = cls(board_info.url, initial_identity) + except BoardDisconnectionError: + logger.warning( + f"Simulator specified time server at port {board_info.url!r}, " + "could not be identified. Ignoring this device") + return None + except IncorrectBoardError as err: + logger.warning( + f"Board returned type {err.returned_type!r}, " + f"expected {err.expected_type!r}. Ignoring this device") + return None + + return board + + def identify(self) -> BoardIdentity: + """ + Get the identity of the board. + + :return: The identity of the board. + """ + response = self._serial.query('*IDN?') + return BoardIdentity(*response.split(':')) + + def get_time(self) -> float: + time_str = self._serial.query('TIME?') + return datetime.fromisoformat(time_str).timestamp() + + def sleep(self, duration: float) -> None: + if duration < 0: + raise ValueError("sleep length must be non-negative") + + self._serial.query(f'SLEEP:{int(duration * 1000)}') diff --git a/sr/robot3/utils.py b/sr/robot3/utils.py index 8d14e190..87ee6cda 100644 --- a/sr/robot3/utils.py +++ b/sr/robot3/utils.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging +import os import signal import socket from abc import ABC, abstractmethod @@ -14,6 +15,8 @@ T = TypeVar('T') logger = logging.getLogger(__name__) +IN_SIMULATOR = os.environ.get('WEBOTS_SIMULATOR', '') == '1' + class BoardIdentity(NamedTuple): """ @@ -34,6 +37,13 @@ class BoardIdentity(NamedTuple): sw_version: str = "" +class BoardInfo(NamedTuple): + """A container for the information about a board connection.""" + url: str + serial_number: str + type_str: str + + class Board(ABC): """ This is the base class for all boards. @@ -228,6 +238,36 @@ def handle_signal(handled_signum: int, frame: Optional[FrameType]) -> None: signal.signal(signal.SIGTERM, handle_signal) +def get_simulator_boards(board_filter: str = '') -> list[BoardInfo]: + """ + Get a list of all boards configured in the simulator. + + This is used to support discovery of boards in the simulator environment. + + :param board_filter: A filter to only return boards of a certain type + :return: A list of board connection information + """ + if 'WEBOTS_ROBOT' not in os.environ: + return [] + + simulator_data = os.environ['WEBOTS_ROBOT'].splitlines() + simulator_boards = [] + + for board_data in simulator_data: + board_data = board_data.rstrip('/') + board_fragment, serial_number = board_data.rsplit('/', 1) + board_url, board_type = board_fragment.rsplit('/', 1) + + board_info = BoardInfo(url=board_url, serial_number=serial_number, type_str=board_type) + + if board_filter and board_info.type_str != board_filter: + continue + + simulator_boards.append(board_info) + + return simulator_boards + + def list_ports() -> None: """ Print a list of all connected USB serial ports. From fb31775a057be5c5b89a58500c02129b77da0ace Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sat, 17 Aug 2024 19:45:06 +0100 Subject: [PATCH 02/12] Add astoria and metadata simulator support --- sr/robot3/robot.py | 49 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/sr/robot3/robot.py b/sr/robot3/robot.py index 063b434e..ffe3f14a 100644 --- a/sr/robot3/robot.py +++ b/sr/robot3/robot.py @@ -3,6 +3,7 @@ import itertools import logging +import os import time from pathlib import Path from socket import socket @@ -75,7 +76,11 @@ def __init__( logger.info(f"sr.robot3 version {__version__}") - self._mqtt, self._astoria = init_astoria_mqtt() + if IN_SIMULATOR: + self._mqtt = None + self._astoria = None + else: + self._mqtt, self._astoria = init_astoria_mqtt() if manual_boards: self._init_power_board(manual_boards.get(PowerBoard.get_board_type(), [])) @@ -145,9 +150,16 @@ def _init_camera(self) -> None: These cameras are used for AprilTag detection and provide location data of markers in its field of view. """ + + if IN_SIMULATOR: + publish_fn = None + else: + assert self._mqtt is not None + publish_fn = self._mqtt.wrapped_publish + self._cameras = MappingProxyType(_setup_cameras( game_specific.MARKER_SIZES, - self._mqtt.wrapped_publish, + publish_fn, )) def _log_connected_boards(self) -> None: @@ -316,7 +328,7 @@ def time(self) -> float: """ if IN_SIMULATOR: assert isinstance(self._lock, TimeServer) - self._lock.get_time() + return self._lock.get_time() else: return time.time() @@ -369,7 +381,11 @@ def usbkey(self) -> Path: :returns: path to the mountpoint of the USB code drive. """ - return self._astoria.fetch_mount_path() + if IN_SIMULATOR: + return Path(os.environ['SBOT_USBKEY_PATH']) + else: + assert self._astoria is not None + return self._astoria.fetch_mount_path() @property def is_simulated(self) -> bool: @@ -390,25 +406,38 @@ def wait_start(self) -> None: Once the start button is pressed, the metadata will be loaded and the timeout will start if in competition mode. """ + def null_button_pressed() -> bool: + return False + + if IN_SIMULATOR: + remote_start_pressed = null_button_pressed + else: + assert self._astoria is not None + remote_start_pressed = self._astoria.get_start_button_pressed + # ignore previous button presses _ = self.power_board._start_button() - _ = self._astoria.get_start_button_pressed() + _ = remote_start_pressed() logger.info('Waiting for start button.') self.power_board.piezo.buzz(Note.A6, 0.1) self.power_board._run_led.flash() self.kch._flash_start() - while not ( - self.power_board._start_button() or self._astoria.get_start_button_pressed() - ): + while not self.power_board._start_button() or remote_start_pressed(): time.sleep(0.1) logger.info("Start signal received; continuing.") self.power_board._run_led.on() self.kch._start = False - # Load the latest metadata that we have received over MQTT - self._metadata = self._astoria.fetch_current_metadata() + if IN_SIMULATOR: + self._metadata = Metadata.parse_file( + Path(os.environ['SBOT_METADATA_PATH']) / 'astoria.json' + ) + else: + assert self._astoria is not None + # Load the latest metadata that we have received over MQTT + self._metadata = self._astoria.fetch_current_metadata() # Simulator timeout is handled by the simulator supervisor if self._metadata.game_timeout and not IN_SIMULATOR: From 8273daa27637be8982d05ab996f27e89eb9a4d1f Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sat, 17 Aug 2024 20:27:13 +0100 Subject: [PATCH 03/12] Add kch simulator support --- sr/robot3/kch.py | 286 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 256 insertions(+), 30 deletions(-) diff --git a/sr/robot3/kch.py b/sr/robot3/kch.py index 15033122..a786d732 100644 --- a/sr/robot3/kch.py +++ b/sr/robot3/kch.py @@ -2,17 +2,28 @@ from __future__ import annotations import atexit +import logging import warnings from enum import IntEnum, unique from typing import Optional +from .exceptions import BoardDisconnectionError, IncorrectBoardError +from .serial_wrapper import SerialWrapper +from .utils import IN_SIMULATOR, Board, BoardIdentity, get_simulator_boards + try: import RPi.GPIO as GPIO # isort: ignore - HAS_HAT = True + HAS_HAT = True if not IN_SIMULATOR else False except ImportError: HAS_HAT = False +logger = logging.getLogger(__name__) + +# Only used in the simulator +BAUDRATE = 115200 + + @unique class RobotLEDs(IntEnum): """Mapping of LEDs to GPIO Pins.""" @@ -71,6 +82,8 @@ class KCH: __slots__ = ('_leds', '_pwm') def __init__(self) -> None: + self._leds: tuple[LED, ...] + if HAS_HAT: GPIO.setmode(GPIO.BCM) with warnings.catch_warnings(): @@ -81,11 +94,7 @@ def __init__(self) -> None: # suppress this as we know the reason behind the warning GPIO.setup(RobotLEDs.all_leds(), GPIO.OUT, initial=GPIO.LOW) self._pwm: Optional[GPIO.PWM] = None - self._leds = tuple( - LED(rgb_led) for rgb_led in RobotLEDs.user_leds() - ) - if HAS_HAT: # We are not running cleanup so the LED state persists after the code completes, # this will cause a warning with `GPIO.setup()` which we can suppress # atexit.register(GPIO.cleanup) @@ -94,6 +103,25 @@ def __init__(self) -> None: # Mypy isn't aware of the version of atexit.register(func, *args) atexit.register(GPIO.cleanup, RobotLEDs.START) # type: ignore[call-arg] + self._leds = tuple( + PhysicalLED(rgb_led) for rgb_led in RobotLEDs.user_leds() + ) + elif IN_SIMULATOR: + led_server = LedServer.initialise() + if led_server is not None: + self._leds = tuple( + SimulationLED(v, led_server) + for v in range(len(RobotLEDs.user_leds())) + ) + else: + self._leds = tuple( + LED() for _ in RobotLEDs.user_leds() + ) + else: + self._leds = tuple( + LED() for _ in RobotLEDs.user_leds() + ) + @property def _start(self) -> bool: """Get the state of the start LED.""" @@ -122,7 +150,60 @@ def leds(self) -> tuple['LED', ...]: class LED: - """User programmable LED.""" + """ + User programmable LED. + + This is a dummy class to handle the case where this is run on neither the + Raspberry Pi nor the simulator. + As such, this class does nothing. + """ + __slots__ = ('_led',) + + @property + def r(self) -> bool: + """Get the state of the Red LED segment.""" + return False + + @r.setter + def r(self, value: bool) -> None: + """Set the state of the Red LED segment.""" + + @property + def g(self) -> bool: + """Get the state of the Green LED segment.""" + return False + + @g.setter + def g(self, value: bool) -> None: + """Set the state of the Green LED segment.""" + + @property + def b(self) -> bool: + """Get the state of the Blue LED segment.""" + return False + + @b.setter + def b(self, value: bool) -> None: + """Set the state of the Blue LED segment.""" + + @property + def colour(self) -> tuple[bool, bool, bool]: + """Get the colour of the user LED.""" + return False, False, False + + @colour.setter + def colour(self, value: tuple[bool, bool, bool]) -> None: + """Set the colour of the user LED.""" + if not isinstance(value, (tuple, list)) or len(value) != 3: + raise ValueError("The LED requires 3 values for it's colour") + + +class PhysicalLED(LED): + """ + User programmable LED. + + Used when running on the Raspberry Pi to control the actual LEDs. + """ __slots__ = ('_led',) def __init__(self, led: tuple[int, int, int]): @@ -131,47 +212,41 @@ def __init__(self, led: tuple[int, int, int]): @property def r(self) -> bool: """Get the state of the Red LED segment.""" - return GPIO.input(self._led[0]) if HAS_HAT else False + return GPIO.input(self._led[0]) @r.setter def r(self, value: bool) -> None: """Set the state of the Red LED segment.""" - if HAS_HAT: - GPIO.output(self._led[0], GPIO.HIGH if value else GPIO.LOW) + GPIO.output(self._led[0], GPIO.HIGH if value else GPIO.LOW) @property def g(self) -> bool: """Get the state of the Green LED segment.""" - return GPIO.input(self._led[1]) if HAS_HAT else False + return GPIO.input(self._led[1]) @g.setter def g(self, value: bool) -> None: """Set the state of the Green LED segment.""" - if HAS_HAT: - GPIO.output(self._led[1], GPIO.HIGH if value else GPIO.LOW) + GPIO.output(self._led[1], GPIO.HIGH if value else GPIO.LOW) @property def b(self) -> bool: """Get the state of the Blue LED segment.""" - return GPIO.input(self._led[2]) if HAS_HAT else False + return GPIO.input(self._led[2]) @b.setter def b(self, value: bool) -> None: """Set the state of the Blue LED segment.""" - if HAS_HAT: - GPIO.output(self._led[2], GPIO.HIGH if value else GPIO.LOW) + GPIO.output(self._led[2], GPIO.HIGH if value else GPIO.LOW) @property def colour(self) -> tuple[bool, bool, bool]: """Get the colour of the user LED.""" - if HAS_HAT: - return ( - GPIO.input(self._led[0]), - GPIO.input(self._led[1]), - GPIO.input(self._led[2]), - ) - else: - return False, False, False + return ( + GPIO.input(self._led[0]), + GPIO.input(self._led[1]), + GPIO.input(self._led[2]), + ) @colour.setter def colour(self, value: tuple[bool, bool, bool]) -> None: @@ -179,10 +254,161 @@ def colour(self, value: tuple[bool, bool, bool]) -> None: if not isinstance(value, (tuple, list)) or len(value) != 3: raise ValueError("The LED requires 3 values for it's colour") - if HAS_HAT: - GPIO.output( - self._led, - tuple( - GPIO.HIGH if v else GPIO.LOW for v in value - ), - ) + GPIO.output( + self._led, + tuple( + GPIO.HIGH if v else GPIO.LOW for v in value + ), + ) + + +class LedServer(Board): + """ + LED control over a socket. + + Used when running in the simulator to control the simulated LEDs. + """ + + @staticmethod + def get_board_type() -> str: + """ + Return the type of the board. + + :return: The literal string 'KCHv1B'. + """ + return 'KCHv1B' + + def __init__( + self, + serial_port: str, + initial_identity: BoardIdentity | None = None, + ) -> None: + if initial_identity is None: + initial_identity = BoardIdentity() + self._serial = SerialWrapper( + serial_port, + BAUDRATE, + identity=initial_identity, + ) + + self._identity = self.identify() + if self._identity.board_type != self.get_board_type(): + raise IncorrectBoardError(self._identity.board_type, self.get_board_type()) + self._serial.set_identity(self._identity) + + # Reset the board to a known state + self._serial.write('*RESET') + + @classmethod + def initialise(cls) -> 'LedServer' | None: + """Initialise the LED server using simulator discovery.""" + # The filter here is the name of the emulated board in the simulator + boards = get_simulator_boards('LedBoard') + + if not boards: + return None + + board_info = boards[0] + + # Create board identity from the info given + initial_identity = BoardIdentity( + manufacturer='sbot_simulator', + board_type=board_info.type_str, + asset_tag=board_info.serial_number, + ) + + try: + board = cls(board_info.url, initial_identity) + except BoardDisconnectionError: + logger.warning( + f"Simulator specified LED board at port {board_info.url!r}, " + "could not be identified. Ignoring this device") + return None + except IncorrectBoardError as err: + logger.warning( + f"Board returned type {err.returned_type!r}, " + f"expected {err.expected_type!r}. Ignoring this device") + return None + + return board + + def identify(self) -> BoardIdentity: + """ + Get the identity of the board. + + :return: The identity of the board. + """ + response = self._serial.query('*IDN?') + return BoardIdentity(*response.split(':')) + + def set_leds(self, led_num: int, value: tuple[bool, bool, bool]) -> None: + """Set the colour of the LED.""" + self._serial.write(f'LED:{led_num}:SET:{value[0]:d}:{value[1]:d}:{value[2]:d}') + + def get_leds(self, led_num: int) -> tuple[bool, bool, bool]: + """Get the colour of the LED.""" + response = self._serial.query(f'LED:{led_num}:GET?') + red, green, blue = response.split(':') + return bool(int(red)), bool(int(green)), bool(int(blue)) + + +class SimulationLED(LED): + """ + User programmable LED. + + Used when running in the simulator to control the simulated LEDs. + """ + __slots__ = ('_led_num', '_server') + + def __init__(self, led_num: int, server: LedServer) -> None: + self._led_num = led_num + self._server = server + + @property + def r(self) -> bool: + """Get the state of the Red LED segment.""" + return self._server.get_leds(self._led_num)[0] + + @r.setter + def r(self, value: bool) -> None: + """Set the state of the Red LED segment.""" + # Fetch the current state of the LED so we can update only the red value + current = self._server.get_leds(self._led_num) + self._server.set_leds(self._led_num, (value, current[1], current[2])) + + @property + def g(self) -> bool: + """Get the state of the Green LED segment.""" + return self._server.get_leds(self._led_num)[1] + + @g.setter + def g(self, value: bool) -> None: + """Set the state of the Green LED segment.""" + # Fetch the current state of the LED so we can update only the green value + current = self._server.get_leds(self._led_num) + self._server.set_leds(self._led_num, (current[0], value, current[2])) + + @property + def b(self) -> bool: + """Get the state of the Blue LED segment.""" + return self._server.get_leds(self._led_num)[2] + + @b.setter + def b(self, value: bool) -> None: + """Set the state of the Blue LED segment.""" + # Fetch the current state of the LED so we can update only the blue value + current = self._server.get_leds(self._led_num) + self._server.set_leds(self._led_num, (current[0], current[1], value)) + + @property + def colour(self) -> tuple[bool, bool, bool]: + """Get the colour of the user LED.""" + return self._server.get_leds(self._led_num) + + @colour.setter + def colour(self, value: tuple[bool, bool, bool]) -> None: + """Set the colour of the user LED.""" + if not isinstance(value, (tuple, list)) or len(value) != 3: + raise ValueError("The LED requires 3 values for it's colour") + + self._server.set_leds(self._led_num, value) From c98cf80fa43cdf2cbc33ca96ce56e979897e360b Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sat, 17 Aug 2024 20:28:35 +0100 Subject: [PATCH 04/12] Linting --- sr/robot3/serial_wrapper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sr/robot3/serial_wrapper.py b/sr/robot3/serial_wrapper.py index 87b9932e..c0a17245 100644 --- a/sr/robot3/serial_wrapper.py +++ b/sr/robot3/serial_wrapper.py @@ -36,6 +36,7 @@ else: BASE_TIMEOUT = 0.5 + def retry( times: int, exceptions: Union[type[E], tuple[type[E], ...]] ) -> Callable[[Callable[Param, RetType]], Callable[Param, RetType]]: From 3978af60aed447c80c3ce6d5ae1128d32226e155 Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sat, 17 Aug 2024 20:30:00 +0100 Subject: [PATCH 05/12] Bump april_vision to properly support the virtual camera --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ffc9104b..b282e72f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ dynamic = ["version"] requires-python = ">=3.8" dependencies = [ "pyserial >=3,<4", - "april_vision >=2.1,<3", + "april_vision==2.2.0", "paho-mqtt >=2,<3", "pydantic >=1.9.1,<2", "typing-extensions; python_version<'3.10'", From 08f3717370faa75dc33b8d8cfe523b8c7da1e496 Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 18 Aug 2024 12:13:09 +0100 Subject: [PATCH 06/12] Update arduino interface to support ultrasound and simulator --- sr/robot3/arduino.py | 51 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/sr/robot3/arduino.py b/sr/robot3/arduino.py index 1d38d424..74ab9653 100644 --- a/sr/robot3/arduino.py +++ b/sr/robot3/arduino.py @@ -18,6 +18,11 @@ logger = logging.getLogger(__name__) BAUDRATE = 115200 +if IN_SIMULATOR: + # Place each command on a new line in the simulator to simplify the implementation + ENDLINE = '\n' +else: + ENDLINE = '' SUPPORTED_VID_PIDS = { (0x2341, 0x0043), # Arduino Uno rev 3 @@ -61,7 +66,7 @@ class Arduino(Board): """ The Arduino board interface. - This is intended to be used with Arduino Uno boards running the sbot firmware. + This is intended to be used with Arduino Uno boards running the SR firmware. :param serial_port: The serial port to connect to. :param initial_identity: The identity of the board, as reported by the USB descriptor. @@ -227,7 +232,7 @@ def identify(self) -> BoardIdentity: :return: The identity of the board. """ - response = self._serial.query('v', endl='') + response = self._serial.query('v', endl=ENDLINE) response_fields = response.split(':') # The arduino firmware cannot access the serial number reported in the USB descriptor @@ -259,7 +264,9 @@ def command(self, command: str) -> str: :param command: The command to send to the board. :return: The response from the board. """ - return self._serial.query(command, endl='') + if IN_SIMULATOR: + logger.warning("The command method is not fully supported in the simulator") + return self._serial.query(command, endl=ENDLINE) def map_pin_number(self, pin_number: int) -> str: """ @@ -276,6 +283,34 @@ def map_pin_number(self, pin_number: int) -> str: raise ValueError("Invalid pin provided") from None return chr(pin_number + ord('a')) + @log_to_debug + def ultrasound_measure( + self, + pulse_pin: int, + echo_pin: int, + ) -> int: + """ + Measure the distance to an object using an ultrasound sensor. + + The sensor can only measure distances up to 4m. + + :param pulse_pin: The pin to send the ultrasound pulse from. + :param echo_pin: The pin to read the ultrasound echo from. + :raises ValueError: If either of the pins are invalid + :return: The distance measured by the ultrasound sensor in mm. + """ + try: # bounds check + pulse_id = self.map_pin_number(pulse_pin) + except ValueError: + raise ValueError("Invalid pulse pin provided") from None + try: + echo_id = self.map_pin_number(echo_pin) + except ValueError: + raise ValueError("Invalid echo pin provided") from None + + response = self._serial.query(f'u{pulse_id}{echo_id}', endl=ENDLINE) + return int(response) + def __repr__(self) -> str: return f"<{self.__class__.__qualname__}: {self._serial}>" @@ -338,7 +373,7 @@ def mode(self, value: GPIOPinMode) -> None: mode_char = MODE_CHAR_MAP.get(value) if mode_char is None: raise IOError(f'Pin mode {value} is not supported') - self._serial.write(self._build_command(mode_char), endl='') + self._serial.write(self._build_command(mode_char), endl=ENDLINE) self._mode = value @log_to_debug @@ -353,7 +388,7 @@ def digital_read(self) -> bool: self._check_if_disabled() if self.mode not in DIGITAL_READ_MODES: raise IOError(f'Digital read is not supported in {self.mode}') - response = self._serial.query(self._build_command('r'), endl='') + response = self._serial.query(self._build_command('r'), endl=ENDLINE) return response == 'h' @log_to_debug @@ -369,9 +404,9 @@ def digital_write(self, value: bool) -> None: if self.mode not in DIGITAL_WRITE_MODES: raise IOError(f'Digital write is not supported in {self.mode}') if value: - self._serial.write(self._build_command('h'), endl='') + self._serial.write(self._build_command('h'), endl=ENDLINE) else: - self._serial.write(self._build_command('l'), endl='') + self._serial.write(self._build_command('l'), endl=ENDLINE) @log_to_debug def analog_read(self) -> float: @@ -392,7 +427,7 @@ def analog_read(self) -> float: raise IOError(f'Analog read is not supported in {self.mode}') if not self._supports_analog: raise IOError('Pin does not support analog read') - response = self._serial.query(self._build_command('a'), endl='') + response = self._serial.query(self._build_command('a'), endl=ENDLINE) # map the response from the ADC range to the voltage range return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0) From c30ba1ef501162d68f132e0cec758d7554bdfe1d Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 18 Aug 2024 12:13:44 +0100 Subject: [PATCH 07/12] Add warning about raw ports not functioning in the simulator --- sr/robot3/robot.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sr/robot3/robot.py b/sr/robot3/robot.py index ffe3f14a..a55f2b76 100644 --- a/sr/robot3/robot.py +++ b/sr/robot3/robot.py @@ -141,7 +141,10 @@ def _init_aux_boards( self._servo_boards = ServoBoard._get_supported_boards(manual_servoboards) self._arduinos = Arduino._get_supported_boards(manual_arduinos, ignored_arduinos) if raw_ports: - self._raw_ports = RawSerial._get_supported_boards(raw_ports) + if not IN_SIMULATOR: + self._raw_ports = RawSerial._get_supported_boards(raw_ports) + else: + logger.warning("Raw ports are not available in the simulator.") def _init_camera(self) -> None: """ @@ -150,7 +153,6 @@ def _init_camera(self) -> None: These cameras are used for AprilTag detection and provide location data of markers in its field of view. """ - if IN_SIMULATOR: publish_fn = None else: From f8643de49bea5f99e9278b3487f0e04e45a02db9 Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 18 Aug 2024 13:03:15 +0100 Subject: [PATCH 08/12] Further fixes --- sr/robot3/robot.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sr/robot3/robot.py b/sr/robot3/robot.py index a55f2b76..75ade4fd 100644 --- a/sr/robot3/robot.py +++ b/sr/robot3/robot.py @@ -304,8 +304,8 @@ def sleep(self, secs: float) -> None: """ Sleep for a number of seconds. - This is a convenience method that can be used instead of time.sleep(). - This exists for compatibility with the simulator API only. + This is a convenience method that should be used instead of time.sleep() + to make your code compatible with the simulator. :param secs: The number of seconds to sleep for """ @@ -320,8 +320,8 @@ def time(self) -> float: """ Get the number of seconds since the Unix Epoch. - This is a convenience method that can be used instead of time.time(). - This exists for compatibility with the simulator API only. + This is a convenience method that should be used instead of time.time() + to make your code compatible with the simulator. NOTE: The robot's clock resets each time the robot is restarted, so this will not be the correct actual time but can be used to measure elapsed time. @@ -427,7 +427,7 @@ def null_button_pressed() -> bool: self.kch._flash_start() while not self.power_board._start_button() or remote_start_pressed(): - time.sleep(0.1) + self.sleep(0.1) logger.info("Start signal received; continuing.") self.power_board._run_led.on() self.kch._start = False From f924c8cd61daaa955090f9db1370fb428c8e139a Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 18 Aug 2024 13:16:37 +0100 Subject: [PATCH 09/12] Fix blocking buzz using correct sleep function --- sr/robot3/power_board.py | 12 +++++++++--- sr/robot3/robot.py | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sr/robot3/power_board.py b/sr/robot3/power_board.py index 21fb8e24..f3d31bdf 100644 --- a/sr/robot3/power_board.py +++ b/sr/robot3/power_board.py @@ -6,7 +6,7 @@ from enum import IntEnum from time import sleep from types import MappingProxyType -from typing import NamedTuple, Optional +from typing import Callable, NamedTuple, Optional from serial.tools.list_ports import comports @@ -20,6 +20,7 @@ logger = logging.getLogger(__name__) BAUDRATE = 115200 # Since the power board is a USB device, this is ignored +_SLEEP_FN = sleep # For use in the simulator class PowerOutputPosition(IntEnum): @@ -176,7 +177,9 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, PowerBoard]: @classmethod def _get_supported_boards( - cls, manual_boards: Optional[list[str]] = None, + cls, + manual_boards: Optional[list[str]] = None, + sleep_fn: Optional[Callable[[float], None]] = None, ) -> MappingProxyType[str, PowerBoard]: """ Find all connected power boards. @@ -187,6 +190,9 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to power boards. """ + if sleep_fn is not None: + global _SLEEP_FN + _SLEEP_FN = sleep_fn if IN_SIMULATOR: return cls._get_simulator_boards() @@ -541,7 +547,7 @@ def buzz(self, frequency: float, duration: float, *, blocking: bool = False) -> self._serial.write(cmd) if blocking: - sleep(duration) + _SLEEP_FN(duration) def __repr__(self) -> str: return f"<{self.__class__.__qualname__}: {self._serial}>" diff --git a/sr/robot3/robot.py b/sr/robot3/robot.py index 75ade4fd..0a768710 100644 --- a/sr/robot3/robot.py +++ b/sr/robot3/robot.py @@ -104,7 +104,7 @@ def _init_power_board(self, manual_boards: Optional[list[str]] = None) -> None: defaults to None :raises RuntimeError: If exactly one PowerBoard is not found """ - power_boards = PowerBoard._get_supported_boards(manual_boards) + power_boards = PowerBoard._get_supported_boards(manual_boards, sleep_fn=self.sleep) self._power_board = singular(power_boards) # Enable all the outputs, so that we can find other boards. From 67f832ea8f9c682ce3fb72a39d501fc8127df7b0 Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 18 Aug 2024 14:01:53 +0100 Subject: [PATCH 10/12] Rework vision API to add save arguments directly to functions --- sr/robot3/camera.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/sr/robot3/camera.py b/sr/robot3/camera.py index 2734ef8a..f8f0280f 100644 --- a/sr/robot3/camera.py +++ b/sr/robot3/camera.py @@ -164,34 +164,41 @@ def close(self) -> None: """ self._cam.close() - def see(self, *, frame: Optional[NDArray] = None) -> List[Marker]: + def see( + self, + *, + frame: Optional[NDArray] = None, + save: Optional[PathLike] = None, + ) -> List[Marker]: """ Capture an image and identify fiducial markers. :param frame: An image to detect markers in, instead of capturing a new one, + :param save: If given, save the annotated frame to the path. + This is given a JPEG extension if none is provided. :returns: list of markers that the camera could see. """ + if frame is None: + frame = self._cam.capture() + markers = self._cam.see(frame=frame) + + if save: + self._cam.save(save, frame=frame, detections=markers) return [Marker.from_april_vision_marker(marker) for marker in markers] - def capture(self) -> NDArray: + def capture(self, *, save: Optional[PathLike] = None) -> NDArray: """ Get the raw image data from the camera. + :param save: If given, save the annotated frame to the path. + This is given a JPEG extension if none is provided. :returns: Camera pixel data """ - return self._cam.capture() - - def save(self, path: Union[Path, str], *, frame: Optional[NDArray] = None) -> None: - """ - Save an annotated image to a path. - - :param path: The path to save the image to, - this is given a JPEG extension if none is provided. - :param frame: An image to annotate and save, instead of capturing a new one, - defaults to None - """ - self._cam.save(path, frame=frame) + raw_frame = self._cam.capture() + if save: + self._cam.save(save, frame=raw_frame, annotated=False) + return raw_frame def _set_marker_sizes( self, From 8b738b3a33316a574db0c108d2344536fa4d5e5b Mon Sep 17 00:00:00 2001 From: WillB97 Date: Sun, 25 Aug 2024 10:39:19 +0100 Subject: [PATCH 11/12] Add missing dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b282e72f..90237470 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ dependencies = [ "paho-mqtt >=2,<3", "pydantic >=1.9.1,<2", "typing-extensions; python_version<'3.10'", + "tomli >=2.0.1,<3; python_version<'3.11'", ] classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", From d9d0afb6f8c0719df727f8ee87c67631ad117dce Mon Sep 17 00:00:00 2001 From: Will Barber Date: Sat, 31 Aug 2024 22:25:41 +0100 Subject: [PATCH 12/12] Correct grammar Co-authored-by: Jake Howard --- sr/robot3/arduino.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sr/robot3/arduino.py b/sr/robot3/arduino.py index 74ab9653..f628bbd4 100644 --- a/sr/robot3/arduino.py +++ b/sr/robot3/arduino.py @@ -138,11 +138,11 @@ def _get_valid_board( if initial_identity is not None: if initial_identity.board_type == 'manual': logger.warning( - f"Manually specified Arduino at port {serial_port!r}, " + f"Manually specified Arduino at port {serial_port!r} " "could not be identified. Ignoring this device") elif initial_identity.manufacturer == 'sbot_simulator': logger.warning( - f"Simulator specified arduino at port {serial_port!r}, " + f"Simulator specified arduino at port {serial_port!r} " "could not be identified. Ignoring this device") return None