Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BLE support using DuplexStream #33

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ edition = "2021"
doctest = false

[features]
default = ["serde"]
default = ["serde", "bluetooth-le"]

serde = ["dep:serde", "dep:serde_json"]
ts-gen = ["serde", "dep:specta"]
bluetooth-le = ["dep:uuid","dep:btleplug"]
bluetooth-le = ["dep:uuid", "dep:btleplug", "dep:futures"]

[[example]]
name = "basic_serial"
Expand Down Expand Up @@ -56,6 +56,7 @@ serde_json = { version = "1.0", optional = true }
thiserror = "1.0.48"
uuid = { version = "1.6.1", optional = true }
btleplug = { version = "0.11.5", optional = true }
futures = { version = "0.3.31", optional = true }

[dev-dependencies]
fern = { version = "0.6.2", features = ["colored"] }
Expand Down
133 changes: 96 additions & 37 deletions src/connections/ble_handler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use btleplug::api::{
Central, CentralEvent, Characteristic, Manager as _, Peripheral as _, ScanFilter,
BDAddr, Central, CentralEvent, Characteristic, Manager as _, Peripheral as _, ScanFilter,
ValueNotification, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::stream::StreamExt;
use futures_util::stream::BoxStream;
use log::error;
use std::fmt::Display;
use std::future;
use std::str::FromStr;
use uuid::Uuid;

use crate::errors_internal::{BleConnectionError, Error, InternalStreamError};
use crate::types::EncodedToRadioPacketWithHeader;
use crate::utils::format_data_packet;

const MSH_SERVICE: Uuid = Uuid::from_u128(0x6ba1b218_15a8_461f_9fa8_5dcae273eafd);
const FROMRADIO: Uuid = Uuid::from_u128(0x2c55e69e_4993_11ed_b878_0242ac120002);
Expand All @@ -22,13 +28,47 @@ pub struct BleHandler {
fromnum_char: Characteristic,
}

#[derive(PartialEq)]
pub enum AdapterEvent {
Disconnected,
}

pub enum RadioMessage {
Eof,
Packet(EncodedToRadioPacketWithHeader),
}

pub enum BleId {
Name(String),
MacAddress(BDAddr),
}

impl BleId {
pub fn from_mac_address(mac: &str) -> Result<BleId, Error> {
let bdaddr = BDAddr::from_str(mac).map_err(|e| Error::InvalidParameter {
source: Box::new(e),
description: "Error while parsing a MAC address".to_owned(),
})?;
Ok(BleId::MacAddress(bdaddr))
}
}

impl Display for BleId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BleId::Name(name) => write!(f, "name={name}"),
BleId::MacAddress(mac) => write!(f, "MAC={mac}"),
}
}
}

#[allow(dead_code)]
impl BleHandler {
pub async fn new(name: String) -> Result<Self, Error> {
let (radio, adapter) = Self::find_ble_radio(&name).await?;
pub async fn new(ble_id: &BleId) -> Result<Self, Error> {
let (radio, adapter) = Self::find_ble_radio(ble_id).await?;
radio.connect().await.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: format!("Failed to connect to the device {name}"),
description: format!("Failed to connect to the device {ble_id}"),
})?;
let [toradio_char, fromnum_char, fromradio_char] =
Self::find_characteristics(&radio).await?;
Expand All @@ -54,7 +94,7 @@ impl BleHandler {
/// It searches for the 'MSH_SERVICE' running on the device.
///
/// It also returns the associated adapter that can reach this radio.
async fn find_ble_radio(name: &str) -> Result<(Peripheral, Adapter), Error> {
async fn find_ble_radio(ble_id: &BleId) -> Result<(Peripheral, Adapter), Error> {
//TODO: support searching both by a name and by a MAC address
let scan_error_fn = |e: btleplug::Error| Error::StreamBuildError {
source: Box::new(e),
Expand All @@ -64,18 +104,23 @@ impl BleHandler {
let adapters = manager.adapters().await.map_err(scan_error_fn)?;

for adapter in &adapters {
let peripherals = Self::scan_peripherals(&adapter).await;
let peripherals = Self::scan_peripherals(adapter).await;
match peripherals {
Err(e) => {
error!("Error while scanning for meshtastic peripherals: {e:?}");
// We continue, as there can be another adapter that can work
continue;
}
Ok(peripherals) => {
let needle = Some(name.to_owned());
for peripheral in peripherals {
if let Ok(Some(peripheral_properties)) = peripheral.properties().await {
if peripheral_properties.local_name == needle {
let matches = match ble_id {
BleId::Name(name) => {
peripheral_properties.local_name.as_ref() == Some(name)
}
BleId::MacAddress(mac) => peripheral_properties.address == *mac,
};
if matches {
return Ok((peripheral, adapter.clone()));
}
}
Expand All @@ -86,8 +131,8 @@ impl BleHandler {
Err(Error::StreamBuildError {
source: Box::new(BleConnectionError()),
description: format!(
"Failed to find {name}, or meshtastic is not running on the device"
) + ", or it's already connected.",
"Failed to find {ble_id}, or meshtastic is not running on the device"
) + ", or it's already connected to a client.",
})
}

Expand Down Expand Up @@ -137,20 +182,27 @@ impl BleHandler {
})
}

pub async fn read_from_radio(&self) -> Result<Vec<u8>, Error> {
pub async fn read_from_radio(&self) -> Result<RadioMessage, Error> {
self.radio
.read(&self.fromradio_char)
.await
.map_err(Self::ble_read_error_fn)
.and_then(|data| {
if data.is_empty() {
Ok(RadioMessage::Eof)
} else {
format_data_packet(data.into()).map(RadioMessage::Packet)
}
})
}

fn parse_u32(data: Vec<u8>) -> Result<u32, Error> {
let parsed_value = u32::from_le_bytes(data.as_slice().try_into().map_err(|e| {
let data = data.as_slice().try_into().map_err(|e| {
Error::InternalStreamError(InternalStreamError::StreamReadError {
source: Box::new(e),
})
})?);
Ok(parsed_value)
})?;
Ok(u32::from_le_bytes(data))
}

pub async fn read_fromnum(&self) -> Result<u32, Error> {
Expand All @@ -159,44 +211,51 @@ impl BleHandler {
.read(&self.fromnum_char)
.await
.map_err(Self::ble_read_error_fn)?;
if data.is_empty() {
return Ok(0);
}
Self::parse_u32(data)
}

pub async fn notifications(&self) -> Result<BoxStream<ValueNotification>, Error> {
pub async fn notifications(&self) -> Result<BoxStream<u32>, Error> {
self.radio
.subscribe(&self.fromnum_char)
.await
.map_err(Self::ble_read_error_fn)?;
self.radio
let notification_stream = self
.radio
.notifications()
.await
.map_err(Self::ble_read_error_fn)
}
.map_err(Self::ble_read_error_fn)?;

pub async fn filter_map(notification: ValueNotification) -> Option<u32> {
match notification {
ValueNotification {
uuid: FROMNUM,
value,
} => Some(Self::parse_u32(value).unwrap()),
_ => None,
}
Ok(Box::pin(notification_stream.filter_map(
|notification| match notification {
ValueNotification {
uuid: FROMNUM,
value,
} => future::ready(Self::parse_u32(value).ok()),
_ => future::ready(None),
},
)))
}

pub async fn adapter_events(&self) -> Result<BoxStream<CentralEvent>, Error> {
self.adapter
pub async fn adapter_events(&self) -> Result<BoxStream<AdapterEvent>, Error> {
let stream = self
.adapter
.events()
.await
.map_err(|e| Error::StreamBuildError {
source: Box::new(e),
description: format!("Failed to listen to device events"),
})
}

pub fn is_disconnected_event(&self, event: Option<CentralEvent>) -> bool {
if let Some(CentralEvent::DeviceDisconnected(peripheral_id)) = event {
return self.radio.id() == peripheral_id;
}
return false;
description: "Failed to listen to device events".to_owned(),
})?;
let id = self.radio.id();
Ok(Box::pin(stream.filter_map(move |event| {
if let CentralEvent::DeviceDisconnected(peripheral_id) = event {
if id == peripheral_id {
return future::ready(Some(AdapterEvent::Disconnected));
}
}
future::ready(None)
})))
}
}
6 changes: 6 additions & 0 deletions src/errors_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub enum Error {
packet: EncodedToRadioPacketWithHeader,
},

#[error("Invalid function parameter: {source:?}")]
InvalidParameter {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
description: String,
},

/// An error indicating that the library failed when performing an operation on an internal data stream.
#[error(transparent)]
InternalStreamError(#[from] InternalStreamError),
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub mod utils {
/// can also be used to list all available serial ports on the host machine.
pub mod stream {
pub use crate::utils_internal::available_serial_ports;
#[cfg(feature = "bluetooth-le")]
pub use crate::utils_internal::build_ble_stream;
pub use crate::utils_internal::build_serial_stream;
pub use crate::utils_internal::build_tcp_stream;
}
Expand Down
113 changes: 113 additions & 0 deletions src/utils_internal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
#[cfg(feature = "bluetooth-le")]
use crate::connections::ble_handler::BleHandler;
use crate::errors_internal::Error;
#[cfg(feature = "bluetooth-le")]
use futures::stream::StreamExt;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use rand::{distributions::Standard, prelude::Distribution, Rng};
#[cfg(feature = "bluetooth-le")]
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use tokio_serial::{available_ports, SerialPort, SerialStream};

use crate::connections::stream_api::StreamHandle;
Expand Down Expand Up @@ -194,6 +200,113 @@ pub async fn build_tcp_stream(
Ok(StreamHandle::from_stream(stream))
}

/// A helper method that uses the `btleplug` and `tokio` crates to build a BLE stream
/// that is compatible with the `StreamApi` API. This requires that the stream
/// implements `AsyncReadExt + AsyncWriteExt` traits.
///
/// This method is intended to be used to create a `DuplexStream` instance, which is
/// then passed into the `StreamApi::connect` method.
///
/// # Arguments
///
/// * `ble_id` - Name or MAC address of a BLE device
///
/// # Returns
///
/// Returns a result that resolves to a `tokio::io::DuplexStream` instance, or
/// an error if the stream could not be created.
///
/// # Examples
///
/// ```
/// // Connect to a radio, identified by its MAC address
/// let duplex_stream = utils::build_ble_stream(BleId::from_mac_address("E3:44:4E:18:F7:A4").await?;
/// let decoded_listener = stream_api.connect(duplex_stream).await;
/// ```
///
/// # Errors
///
/// Will return an instance of `Error` in the event that the radio refuses the connection, it
/// cannot be found or if the specified address is invalid.
///
/// # Panics
///
/// None
///
#[cfg(feature = "bluetooth-le")]
pub async fn build_ble_stream(
ble_id: &crate::connections::ble_handler::BleId,
) -> Result<StreamHandle<DuplexStream>, Error> {
use crate::{
connections::ble_handler::{AdapterEvent, RadioMessage},
errors_internal::InternalStreamError,
};
let ble_handler = BleHandler::new(ble_id).await?;
// `client` will be returned to the user, server is the opposite end of the channel and it's
// directly connected to a `BleHandler`.
let (client, mut server) = tokio::io::duplex(1024);
let handle = tokio::spawn(async move {
let duplex_write_error_fn = |e| {
Error::InternalStreamError(InternalStreamError::StreamWriteError {
source: Box::new(e),
})
};
let mut read_messages_count = ble_handler.read_fromnum().await?;
let mut buf = [0u8; 1024];
if let Ok(len) = server.read(&mut buf).await {
ble_handler.write_to_radio(&buf[..len]).await?
}
loop {
match ble_handler.read_from_radio().await? {
RadioMessage::Eof => break,
RadioMessage::Packet(packet) => {
server
.write(packet.data())
.await
.map_err(duplex_write_error_fn)?;
}
}
}

let mut notification_stream = ble_handler.notifications().await?;
let mut adapter_events = ble_handler.adapter_events().await?;
loop {
// Note: the following `tokio::select` is only half-duplex on the BLE radio. While we
// are reading from the radio, we are not writing to it and vice versa. However, BLE is
// a half-duplex technology, so we wouldn't gain much with a full duplex solution
// anyway.
tokio::select!(
// Data from device, forward it to the user
notification = notification_stream.next() => {
let avail_msg_count = notification.ok_or(InternalStreamError::Eof)?;
for _ in read_messages_count..avail_msg_count {
if let RadioMessage::Packet(packet) = ble_handler.read_from_radio().await? {
server.write(packet.data()).await.map_err(duplex_write_error_fn)?;
}
}
read_messages_count = avail_msg_count;
},
// Data from user, forward it to the device
from_server = server.read(&mut buf) => {
let len = from_server.map_err(duplex_write_error_fn)?;
ble_handler.write_to_radio(&buf[..len]).await?;
},
event = adapter_events.next() => {
if Some(AdapterEvent::Disconnected) == event {
log::error!("BLE disconnected");
Err(InternalStreamError::ConnectionLost)?
}
}
);
}
});

Ok(StreamHandle {
stream: client,
join_handle: Some(handle),
})
}

/// A helper method to generate random numbers using the `rand` crate.
///
/// This method is intended to be used to generate random id values. This method
Expand Down