diff --git a/.github/workflows/test-and-release.yml b/.github/workflows/test-and-release.yml index b72896794..252f9af46 100644 --- a/.github/workflows/test-and-release.yml +++ b/.github/workflows/test-and-release.yml @@ -9,7 +9,8 @@ concurrency: jobs: pycyphal-test: name: Test PyCyphal - if: (github.event_name == 'push') || contains(github.event.head_commit.message, '#test') + # Run on push OR on 3rd-party PR. + if: (github.event_name == 'push') || (github.event.pull_request.author_association == 'NONE') strategy: fail-fast: false matrix: @@ -38,6 +39,17 @@ jobs: python -m pip install --upgrade pip setuptools nox shell: bash + - name: Build and Install Socketcand + if: ${{ runner.os == 'Linux' }} + run: | + sudo apt-get install -y autoconf + git clone https://github.com/linux-can/socketcand.git + cd socketcand + ./autogen.sh + ./configure + make + sudo make install + - name: Collect Linux diagnostic data if: ${{ runner.os == 'Linux' }} run: ip link show diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3e16892a8..906f7a110 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,12 @@ Changelog ========= +v1.16 +----- + +- Added support for the Socketcand interface. + See (`#306 `_) for details on the changes. + v1.15 ----- diff --git a/pycyphal/_version.py b/pycyphal/_version.py index 874b0d098..638c1217d 100644 --- a/pycyphal/_version.py +++ b/pycyphal/_version.py @@ -1 +1 @@ -__version__ = "1.15.4" +__version__ = "1.16.0" diff --git a/pycyphal/application/_transport_factory.py b/pycyphal/application/_transport_factory.py index b55f833fa..0bf289b28 100644 --- a/pycyphal/application/_transport_factory.py +++ b/pycyphal/application/_transport_factory.py @@ -297,6 +297,17 @@ def init(name: str, default: RelaxedValue) -> ValueProxy: from pycyphal.transport.can.media.candump import CandumpMedia media = CandumpMedia(iface.split(":", 1)[-1]) + elif iface.lower().startswith("socketcand:"): + from pycyphal.transport.can.media.socketcand import SocketcandMedia + + params = iface.split(":") + channel = params[1] + host = params[2] + port = 29536 + if len(params) == 4: + port = int(params[3]) + + media = SocketcandMedia(channel, host, port) else: from pycyphal.transport.can.media.pythoncan import PythonCANMedia diff --git a/pycyphal/transport/can/media/socketcand/__init__.py b/pycyphal/transport/can/media/socketcand/__init__.py new file mode 100644 index 000000000..9e4e2fc75 --- /dev/null +++ b/pycyphal/transport/can/media/socketcand/__init__.py @@ -0,0 +1 @@ +from ._socketcand import SocketcandMedia as SocketcandMedia diff --git a/pycyphal/transport/can/media/socketcand/_socketcand.py b/pycyphal/transport/can/media/socketcand/_socketcand.py new file mode 100644 index 000000000..8c3027326 --- /dev/null +++ b/pycyphal/transport/can/media/socketcand/_socketcand.py @@ -0,0 +1,272 @@ +# Copyright (c) 2019 OpenCyphal +# This software is distributed under the terms of the MIT License. +# Author: Alex Kiselev , Pavel Kirienko + +from __future__ import annotations +import queue +import time +import typing +import asyncio +import logging +import threading +from functools import partial +import dataclasses + +import can +from pycyphal.transport import Timestamp, ResourceClosedError, InvalidMediaConfigurationError +from pycyphal.transport.can.media import Media, FilterConfiguration, Envelope, FrameFormat, DataFrame + + +_logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class _TxItem: + msg: can.Message + timeout: float + future: asyncio.Future[None] + loop: asyncio.AbstractEventLoop + + +class SocketcandMedia(Media): + """ + Media interface adapter for `Socketcand `_ using the + built-in interface from `Python-CAN `_. + Please refer to the Socketcand documentation for information about supported hardware, + configuration, and installation instructions. + + This media interface supports only Classic CAN. + + Here is a basic usage example based on the Yakut CLI tool. + Suppose you have two computers: + One connected to a CAN-capable device and that computer is able to connect and receive CAN data from the + CAN device. Using socketcand with a command such as ``socketcand -v -i can0 -l 123.123.1.123`` + on this first computer will bind it too a socket (default port for socketcand is 29536, so it is also default here). + + Then, on your second computer:: + + export UAVCAN__CAN__IFACE="socketcand:can0:123.123.1.123" + yakut sub 33:uavcan.si.unit.voltage.scalar + + This will allow you to remotely receive CAN data on computer two through the wired connection on computer 1. + """ + + _MAXIMAL_TIMEOUT_SEC = 0.1 + + def __init__(self, channel: str, host: str, port: int = 29536) -> None: + """ + :param channel: Name of the CAN channel/interface that your remote computer is connected to; + often ``can0`` or ``vcan0``. + Comes after the ``-i`` in the socketcand command. + + :param host: Name of the remote IP address of the computer running socketcand; + should be in the format ``123.123.1.123``. + In the socketcand command, this is the IP address after ``-l``. + + :param port: Name of the port the socket is bound too. + As per socketcand's default value, here, the default is also 29536. + """ + + self._iface = "socketcand" + self._host = host + self._port = port + self._can_channel = channel + + self._closed = False + self._maybe_thread: typing.Optional[threading.Thread] = None + self._rx_handler: typing.Optional[Media.ReceivedFramesHandler] = None + # This is for communication with a thread that handles the call to _bus.send + self._tx_queue: queue.Queue[_TxItem | None] = queue.Queue() + self._tx_thread = threading.Thread(target=self._transmit_thread_worker, daemon=True) + + try: + self._bus = can.ThreadSafeBus( + interface=self._iface, + host=self._host, + port=self._port, + channel=self._can_channel, + ) + except can.CanError as ex: + raise InvalidMediaConfigurationError(f"Could not initialize PythonCAN: {ex}") from ex + super().__init__() + + @property + def interface_name(self) -> str: + return f"{self._iface}:{self._can_channel}:{self._host}:{self._port}" + + @property + def channel_name(self) -> str: + return self._can_channel + + @property + def host_name(self) -> str: + return self._host + + @property + def port_name(self) -> int: + return self._port + + # Python-CAN's wrapper for socketcand does not support FD frames, so mtu will always be 8 for now + @property + def mtu(self) -> int: + return 8 + + @property + def number_of_acceptance_filters(self) -> int: + """ + The value is currently fixed at 1 for all interfaces. + TODO: obtain the number of acceptance filters from Python-CAN. + """ + return 1 + + def start(self, handler: Media.ReceivedFramesHandler, no_automatic_retransmission: bool) -> None: + self._tx_thread.start() + if self._maybe_thread is None: + self._rx_handler = handler + self._maybe_thread = threading.Thread( + target=self._thread_function, args=(asyncio.get_event_loop(),), name=str(self), daemon=True + ) + self._maybe_thread.start() + if no_automatic_retransmission: + _logger.info("%s non-automatic retransmission is not supported", self) + else: + raise RuntimeError("The RX frame handler is already set up") + + def configure_acceptance_filters(self, configuration: typing.Sequence[FilterConfiguration]) -> None: + if self._closed: + raise ResourceClosedError(repr(self)) + filters = [] + for f in configuration: + d = {"can_id": f.identifier, "can_mask": f.mask} + if f.format is not None: # Per Python-CAN docs, if "extended" is not set, both base/ext will be accepted. + d["extended"] = f.format == FrameFormat.EXTENDED + filters.append(d) + self._bus.set_filters(filters) + _logger.debug("%s: Acceptance filters activated: %s", self, ", ".join(map(str, configuration))) + + def _transmit_thread_worker(self) -> None: + try: + while not self._closed: + tx = self._tx_queue.get(block=True) + if self._closed or tx is None: + break + try: + self._bus.send(tx.msg, tx.timeout) + tx.loop.call_soon_threadsafe(partial(tx.future.set_result, None)) + except Exception as ex: + tx.loop.call_soon_threadsafe(partial(tx.future.set_exception, ex)) + except Exception as ex: + _logger.critical( + "Unhandled exception in transmit thread, transmission thread stopped and transmission is no longer possible: %s", + ex, + exc_info=True, + ) + + async def send(self, frames: typing.Iterable[Envelope], monotonic_deadline: float) -> int: + num_sent = 0 + loopback: typing.List[typing.Tuple[Timestamp, Envelope]] = [] + loop = asyncio.get_running_loop() + for f in frames: + if self._closed: + raise ResourceClosedError(repr(self)) + message = can.Message( + arbitration_id=f.frame.identifier, + is_extended_id=(f.frame.format == FrameFormat.EXTENDED), + data=f.frame.data, + ) + try: + desired_timeout = monotonic_deadline - loop.time() + received_future: asyncio.Future[None] = asyncio.Future() + self._tx_queue.put_nowait( + _TxItem( + message, + max(desired_timeout, 0), + received_future, + asyncio.get_running_loop(), + ) + ) + await received_future + except (asyncio.TimeoutError, can.CanError): # CanError is also used to report timeouts (weird). + break + else: + num_sent += 1 + if f.loopback: + loopback.append((Timestamp.now(), f)) + # Fake received frames if hardware does not support loopback + if loopback: + loop.call_soon(self._invoke_rx_handler, loopback) + return num_sent + + def close(self) -> None: + self._closed = True + try: + self._tx_queue.put(None) + self._tx_thread.join(timeout=self._MAXIMAL_TIMEOUT_SEC * 10) + if self._maybe_thread is not None: + self._maybe_thread.join(timeout=self._MAXIMAL_TIMEOUT_SEC * 10) + self._maybe_thread = None + finally: + try: + self._bus.shutdown() + except Exception as ex: + _logger.exception("%s: Bus closing error: %s", self, ex) + + @staticmethod + def list_available_interface_names() -> typing.Iterable[str]: + """ + Returns an empty list. TODO: provide minimally functional implementation. + """ + return [] + + def _invoke_rx_handler(self, frs: typing.List[typing.Tuple[Timestamp, Envelope]]) -> None: + try: + # Don't call after closure to prevent race conditions and use-after-close. + if not self._closed and self._rx_handler is not None: + self._rx_handler(frs) + except Exception as exc: + _logger.exception("%s unhandled exception in the receive handler: %s; lost frames: %s", self, exc, frs) + + def _thread_function(self, loop: asyncio.AbstractEventLoop) -> None: + while not self._closed: + try: + batch = self._read_batch() + if batch: + try: + loop.call_soon_threadsafe(self._invoke_rx_handler, batch) + except RuntimeError as ex: + _logger.debug("%s: Event loop is closed, exiting: %r", self, ex) + break + except OSError as ex: + if not self._closed: + _logger.exception("%s thread input/output error; stopping: %s", self, ex) + break + except Exception as ex: + _logger.exception("%s thread failure: %s", self, ex) + if not self._closed: + time.sleep(1) # Is this an adequate failure management strategy? + + self._closed = True + _logger.info("%s thread is about to exit", self) + + def _read_batch(self) -> typing.List[typing.Tuple[Timestamp, Envelope]]: + batch: typing.List[typing.Tuple[Timestamp, Envelope]] = [] + while not self._closed: + msg = self._bus.recv(0.0 if batch else self._MAXIMAL_TIMEOUT_SEC) + if msg is None: + break + + timestamp = Timestamp(system_ns=time.time_ns(), monotonic_ns=time.monotonic_ns()) + + frame = self._parse_native_frame(msg) + if frame is not None: + batch.append((timestamp, Envelope(frame, False))) + return batch + + @staticmethod + def _parse_native_frame(msg: can.Message) -> typing.Optional[DataFrame]: + if msg.is_error_frame: # error frame, ignore silently + _logger.debug("Error frame dropped: id_raw=%08x", msg.arbitration_id) + return None + frame_format = FrameFormat.EXTENDED if msg.is_extended_id else FrameFormat.BASE + data = msg.data + return DataFrame(frame_format, msg.arbitration_id, data) diff --git a/tests/transport/can/media/_socketcand.py b/tests/transport/can/media/_socketcand.py new file mode 100644 index 000000000..0fc222060 --- /dev/null +++ b/tests/transport/can/media/_socketcand.py @@ -0,0 +1,138 @@ +# Copyright (c) 2023 OpenCyphal +# This software is distributed under the terms of the MIT License. + +import sys +import typing +import asyncio +import logging +import subprocess +import pytest + +from pycyphal.transport import Timestamp +from pycyphal.transport.can.media import Envelope, DataFrame, FrameFormat, FilterConfiguration +from pycyphal.transport.can.media.socketcand import SocketcandMedia + +if sys.platform != "linux": # pragma: no cover + pytest.skip("Socketcand test skipped because the system is not GNU/Linux", allow_module_level=True) + +_logger = logging.getLogger(__name__) + + +@pytest.fixture() +def _start_socketcand() -> typing.Generator[None, None, None]: + # starting a socketcand daemon in background + cmd = ["socketcand", "-i", "vcan0", "-l", "lo", "-p", "29536"] + + socketcand = subprocess.Popen( + cmd, + encoding="utf8", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + try: + stdout, stderr = socketcand.communicate(timeout=1) + except subprocess.TimeoutExpired: + pass # Successful liftoff + else: + _logger.debug("%s stdout:\n%s", cmd, stdout) + _logger.debug("%s stderr:\n%s", cmd, stderr) + raise subprocess.CalledProcessError(socketcand.returncode, cmd, stdout, stderr) + + yield None + socketcand.kill() + + +@pytest.mark.asyncio +async def _unittest_can_socketcand(_start_socketcand: None) -> None: + asyncio.get_running_loop().slow_callback_duration = 5.0 + + media_a = SocketcandMedia("vcan0", "127.0.0.1") + media_b = SocketcandMedia("vcan0", "127.0.0.1") + + assert media_a.mtu == 8 + assert media_b.mtu == 8 + assert media_a.interface_name == "socketcand:vcan0:127.0.0.1:29536" + assert media_b.interface_name == "socketcand:vcan0:127.0.0.1:29536" + assert media_a.channel_name == "vcan0" + assert media_b.channel_name == "vcan0" + assert media_a.host_name == "127.0.0.1" + assert media_b.host_name == "127.0.0.1" + assert media_a.port_name == 29536 + assert media_b.port_name == 29536 + assert media_a.number_of_acceptance_filters == media_b.number_of_acceptance_filters + assert media_a._maybe_thread is None + assert media_b._maybe_thread is None + + rx_a: typing.List[typing.Tuple[Timestamp, Envelope]] = [] + rx_b: typing.List[typing.Tuple[Timestamp, Envelope]] = [] + + def on_rx_a(frames: typing.Iterable[typing.Tuple[Timestamp, Envelope]]) -> None: + nonlocal rx_a + frames = list(frames) + print("RX A:", frames) + rx_a += frames + + def on_rx_b(frames: typing.Iterable[typing.Tuple[Timestamp, Envelope]]) -> None: + nonlocal rx_b + frames = list(frames) + print("RX B:", frames) + rx_b += frames + + media_a.start(on_rx_a, False) + media_b.start(on_rx_b, False) + + assert media_a._maybe_thread is not None + assert media_b._maybe_thread is not None + + await asyncio.sleep(2.0) # This wait is needed to ensure that the RX thread handles read timeout properly + + ts_begin = Timestamp.now() + await media_b.send( + [ + Envelope(DataFrame(FrameFormat.EXTENDED, 0xBADC0FE, bytearray(range(8))), loopback=True), + Envelope(DataFrame(FrameFormat.EXTENDED, 0x12345678, bytearray(range(0))), loopback=False), + Envelope(DataFrame(FrameFormat.BASE, 0x123, bytearray(range(6))), loopback=True), + ], + asyncio.get_event_loop().time() + 1.0, + ) + await asyncio.sleep(0.1) + ts_end = Timestamp.now() + + print("rx_a:", rx_a) + # Three received from another part + assert len(rx_a) == 3 + for ts, _f in rx_a: + assert ts_begin.monotonic_ns <= ts.monotonic_ns <= ts_end.monotonic_ns + assert ts_begin.system_ns <= ts.system_ns <= ts_end.system_ns + + rx_external = list(filter(lambda x: True, rx_a)) + + assert rx_external[0][1].frame.identifier == 0xBADC0FE + assert rx_external[0][1].frame.data == bytearray(range(8)) + assert rx_external[0][1].frame.format == FrameFormat.EXTENDED + + assert rx_external[1][1].frame.identifier == 0x12345678 + assert rx_external[1][1].frame.data == bytearray(range(0)) + assert rx_external[1][1].frame.format == FrameFormat.EXTENDED + + assert rx_external[2][1].frame.identifier == 0x123 + assert rx_external[2][1].frame.data == bytearray(range(6)) + assert rx_external[2][1].frame.format == FrameFormat.BASE + + print("rx_b:", rx_b) + # Two messages are loopback and were copied + assert len(rx_b) == 2 + + rx_loopback = list(filter(lambda x: True, rx_b)) + + assert rx_loopback[0][1].frame.identifier == 0xBADC0FE + assert rx_loopback[0][1].frame.data == bytearray(range(8)) + assert rx_loopback[0][1].frame.format == FrameFormat.EXTENDED + + assert rx_loopback[1][1].frame.identifier == 0x123 + assert rx_loopback[1][1].frame.data == bytearray(range(6)) + assert rx_loopback[1][1].frame.format == FrameFormat.BASE + + media_a.close() + media_b.close()