Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for being used with sbot_simulator #17

Merged
merged 13 commits into from
Sep 1, 2024
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ dynamic = ["version"]
requires-python = ">=3.8"
dependencies = [
"pyserial >=3,<4",
"april_vision >=2.1,<3",
"april_vision==2.2.0",
"paho-mqtt >=2,<3",
"pydantic >=1.9.1,<2",
"typing-extensions; python_version<'3.10'",
"tomli >=2.0.1,<3; python_version<'3.11'",
]
classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
Expand Down
104 changes: 87 additions & 17 deletions sr/robot3/arduino.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@
from .exceptions import IncorrectBoardError
from .logging import log_to_debug
from .serial_wrapper import SerialWrapper
from .utils import Board, BoardIdentity, get_USB_identity, map_to_float
from .utils import (
IN_SIMULATOR, Board, BoardIdentity,
get_simulator_boards, get_USB_identity, map_to_float,
)

logger = logging.getLogger(__name__)
BAUDRATE = 115200
if IN_SIMULATOR:
# Place each command on a new line in the simulator to simplify the implementation
ENDLINE = '\n'
else:
ENDLINE = ''

SUPPORTED_VID_PIDS = {
(0x2341, 0x0043), # Arduino Uno rev 3
Expand Down Expand Up @@ -58,7 +66,7 @@ class Arduino(Board):
"""
The Arduino board interface.

This is intended to be used with Arduino Uno boards running the sbot firmware.
This is intended to be used with Arduino Uno boards running the SR firmware.

:param serial_port: The serial port to connect to.
:param initial_identity: The identity of the board, as reported by the USB descriptor.
Expand Down Expand Up @@ -127,17 +135,46 @@ def _get_valid_board(
f"expected {err.expected_type!r}. Ignoring this device")
return None
except Exception:
if initial_identity is not None and initial_identity.board_type == 'manual':
logger.warning(
f"Manually specified Arduino at port {serial_port!r}, "
"could not be identified. Ignoring this device")
else:
logger.warning(
f"Found Arduino-like serial port at {serial_port!r}, "
"but it could not be identified. Ignoring this device")
if initial_identity is not None:
if initial_identity.board_type == 'manual':
logger.warning(
f"Manually specified Arduino at port {serial_port!r}, "
WillB97 marked this conversation as resolved.
Show resolved Hide resolved
"could not be identified. Ignoring this device")
elif initial_identity.manufacturer == 'sbot_simulator':
logger.warning(
f"Simulator specified arduino at port {serial_port!r}, "
WillB97 marked this conversation as resolved.
Show resolved Hide resolved
"could not be identified. Ignoring this device")
return None

logger.warning(
f"Found Arduino-like serial port at {serial_port!r}, "
"but it could not be identified. Ignoring this device")
return None
return board

@classmethod
def _get_simulator_boards(cls) -> MappingProxyType[str, Arduino]:
"""
Get the simulator boards.

:return: A mapping of board serial numbers to Arduinos
"""
boards = {}
# The filter here is the name of the emulated board in the simulator
for board_info in get_simulator_boards('Arduino'):

# Create board identity from the info given
initial_identity = BoardIdentity(
manufacturer='sbot_simulator',
WillB97 marked this conversation as resolved.
Show resolved Hide resolved
board_type=board_info.type_str,
asset_tag=board_info.serial_number,
)
if (board := cls._get_valid_board(board_info.url, initial_identity)) is None:
continue

boards[board._identity.asset_tag] = board
return MappingProxyType(boards)

@classmethod
def _get_supported_boards(
cls,
Expand All @@ -152,6 +189,9 @@ def _get_supported_boards(
:param ignored_serials: A list of serial number to ignore during board discovery
:return: A mapping of board serial numbers to Arduinos
"""
if IN_SIMULATOR:
return cls._get_simulator_boards()

boards = {}
if ignored_serials is None:
ignored_serials = []
Expand Down Expand Up @@ -192,7 +232,7 @@ def identify(self) -> BoardIdentity:

:return: The identity of the board.
"""
response = self._serial.query('v', endl='')
response = self._serial.query('v', endl=ENDLINE)
response_fields = response.split(':')

# The arduino firmware cannot access the serial number reported in the USB descriptor
Expand Down Expand Up @@ -224,7 +264,9 @@ def command(self, command: str) -> str:
:param command: The command to send to the board.
:return: The response from the board.
"""
return self._serial.query(command, endl='')
if IN_SIMULATOR:
logger.warning("The command method is not fully supported in the simulator")
return self._serial.query(command, endl=ENDLINE)

def map_pin_number(self, pin_number: int) -> str:
"""
Expand All @@ -241,6 +283,34 @@ def map_pin_number(self, pin_number: int) -> str:
raise ValueError("Invalid pin provided") from None
return chr(pin_number + ord('a'))

@log_to_debug
def ultrasound_measure(
self,
pulse_pin: int,
echo_pin: int,
) -> int:
"""
Measure the distance to an object using an ultrasound sensor.

The sensor can only measure distances up to 4m.

:param pulse_pin: The pin to send the ultrasound pulse from.
:param echo_pin: The pin to read the ultrasound echo from.
:raises ValueError: If either of the pins are invalid
:return: The distance measured by the ultrasound sensor in mm.
"""
try: # bounds check
pulse_id = self.map_pin_number(pulse_pin)
except ValueError:
raise ValueError("Invalid pulse pin provided") from None
try:
echo_id = self.map_pin_number(echo_pin)
except ValueError:
raise ValueError("Invalid echo pin provided") from None

response = self._serial.query(f'u{pulse_id}{echo_id}', endl=ENDLINE)
return int(response)

def __repr__(self) -> str:
return f"<{self.__class__.__qualname__}: {self._serial}>"

Expand Down Expand Up @@ -303,7 +373,7 @@ def mode(self, value: GPIOPinMode) -> None:
mode_char = MODE_CHAR_MAP.get(value)
if mode_char is None:
raise IOError(f'Pin mode {value} is not supported')
self._serial.write(self._build_command(mode_char), endl='')
self._serial.write(self._build_command(mode_char), endl=ENDLINE)
self._mode = value

@log_to_debug
Expand All @@ -318,7 +388,7 @@ def digital_read(self) -> bool:
self._check_if_disabled()
if self.mode not in DIGITAL_READ_MODES:
raise IOError(f'Digital read is not supported in {self.mode}')
response = self._serial.query(self._build_command('r'), endl='')
response = self._serial.query(self._build_command('r'), endl=ENDLINE)
return response == 'h'

@log_to_debug
Expand All @@ -334,9 +404,9 @@ def digital_write(self, value: bool) -> None:
if self.mode not in DIGITAL_WRITE_MODES:
raise IOError(f'Digital write is not supported in {self.mode}')
if value:
self._serial.write(self._build_command('h'), endl='')
self._serial.write(self._build_command('h'), endl=ENDLINE)
else:
self._serial.write(self._build_command('l'), endl='')
self._serial.write(self._build_command('l'), endl=ENDLINE)

@log_to_debug
def analog_read(self) -> float:
Expand All @@ -357,7 +427,7 @@ def analog_read(self) -> float:
raise IOError(f'Analog read is not supported in {self.mode}')
if not self._supports_analog:
raise IOError('Pin does not support analog read')
response = self._serial.query(self._build_command('a'), endl='')
response = self._serial.query(self._build_command('a'), endl=ENDLINE)
# map the response from the ADC range to the voltage range
return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0)

Expand Down
116 changes: 92 additions & 24 deletions sr/robot3/camera.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""An implementation of a camera board using the april_vision library."""
from __future__ import annotations

import logging
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Union

from april_vision import CalibratedCamera, Frame
from april_vision import CalibratedCamera, Frame, FrameSource
from april_vision import Marker as AprilMarker
from april_vision import (
Processor, USBCamera, __version__, calibrations,
Expand All @@ -13,8 +15,11 @@
from numpy.typing import NDArray

from .marker import Marker
from .utils import Board, BoardIdentity
from .utils import (
IN_SIMULATOR, Board, BoardIdentity, BoardInfo, get_simulator_boards,
)

PathLike = Union[Path, str]
LOGGER = logging.getLogger(__name__)

robot_calibrations = calibrations.copy()
Expand All @@ -30,9 +35,11 @@ class AprilCamera(Board):
in order to determine the spatial positon and orientation of the markers
that it has detected.

:param camera_id: The index of the camera to use.
:param camera_data: The calibration data for the camera.
:param camera_source: The source of the camera frames.
:param calibration: The intrinsic calibration of the camera.
:param serial_num: The serial number of the camera.
:param name: The name of the camera.
:param vidpid: The VID:PID of the camera.
"""
__slots__ = ('_serial_num', '_cam')

Expand All @@ -56,28 +63,82 @@ def _discover(cls) -> Dict[str, 'AprilCamera']:

:return: A dict of cameras, keyed by their name and index.
"""
if IN_SIMULATOR:
return {
camera_info.serial_number: cls.from_webots_camera(camera_info)
for camera_info in get_simulator_boards('CameraBoard')
}

return {
(serial := f"{camera_data.name} - {camera_data.index}"):
cls(camera_data.index, camera_data=camera_data, serial_num=serial)
cls.from_id(camera_data.index, camera_data=camera_data, serial_num=serial)
for camera_data in find_cameras(robot_calibrations)
}

def __init__(self, camera_id: int, camera_data: CalibratedCamera, serial_num: str) -> None:
def __init__(
self, camera_source: FrameSource,
calibration: tuple[float, float, float, float] | None,
serial_num: str,
name: str,
vidpid: str = "",
) -> None:
# The processor handles the detection and pose estimation
self._cam = Processor(
camera_source,
calibration=calibration,
name=name,
vidpid=vidpid,
mask_unknown_size_tags=True,
)
self._serial_num = serial_num

@classmethod
def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera':
"""
Create a camera from a webots camera.

:param camera_info: The information about the virtual camera,
including the url to connect to.
:return: The camera object.
"""
from .simulator.camera import WebotsRemoteCameraSource

camera_source = WebotsRemoteCameraSource(camera_info)
return cls(
camera_source,
calibration=camera_source.calibration,
serial_num=camera_info.serial_number,
name=camera_info.serial_number,
)

@classmethod
def from_id(
cls,
camera_id: int,
camera_data: CalibratedCamera,
serial_num: str,
) -> 'AprilCamera':
"""
Create a camera from an ID.

:param camera_id: The ID of the camera to create.
:param camera_data: The calibration data for the camera.
:param serial_num: The serial number of the camera.
:return: The camera object.
"""
# The camera source handles the connection between the camera and the processor
camera_source = USBCamera.from_calibration_file(
camera_id,
calibration_file=camera_data.calibration,
vidpid=camera_data.vidpid,
)
# The processor handles the detection and pose estimation
self._cam = Processor(
return cls(
camera_source,
calibration=camera_source.calibration,
serial_num=serial_num,
name=camera_data.name,
vidpid=camera_data.vidpid,
mask_unknown_size_tags=True,
)
self._serial_num = serial_num

def identify(self) -> BoardIdentity:
"""
Expand All @@ -103,34 +164,41 @@ def close(self) -> None:
"""
self._cam.close()

def see(self, *, frame: Optional[NDArray] = None) -> List[Marker]:
def see(
self,
*,
frame: Optional[NDArray] = None,
save: Optional[PathLike] = None,
) -> List[Marker]:
"""
Capture an image and identify fiducial markers.

:param frame: An image to detect markers in, instead of capturing a new one,
:param save: If given, save the annotated frame to the path.
This is given a JPEG extension if none is provided.
:returns: list of markers that the camera could see.
"""
if frame is None:
frame = self._cam.capture()

markers = self._cam.see(frame=frame)

if save:
self._cam.save(save, frame=frame, detections=markers)
return [Marker.from_april_vision_marker(marker) for marker in markers]

def capture(self) -> NDArray:
def capture(self, *, save: Optional[PathLike] = None) -> NDArray:
"""
Get the raw image data from the camera.

:param save: If given, save the annotated frame to the path.
This is given a JPEG extension if none is provided.
:returns: Camera pixel data
"""
return self._cam.capture()

def save(self, path: Union[Path, str], *, frame: Optional[NDArray] = None) -> None:
"""
Save an annotated image to a path.

:param path: The path to save the image to,
this is given a JPEG extension if none is provided.
:param frame: An image to annotate and save, instead of capturing a new one,
defaults to None
"""
self._cam.save(path, frame=frame)
raw_frame = self._cam.capture()
if save:
self._cam.save(save, frame=raw_frame, annotated=False)
return raw_frame

def _set_marker_sizes(
self,
Expand Down
Loading
Loading