diff --git a/Cargo.toml b/Cargo.toml index 26de931..352f351 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,22 +10,40 @@ categories = ["network-programming"] keywords = ["coap", "server", "async", "IoT"] edition = "2021" +[features] +default-features = ["tokio"] +tokio = ["std", "dep:tokio", "dep:tokio-stream", "dep:tokio-util"] +std = [] +observable = [] +embassy = ["dep:oneshot", "dep:embassy-util", "dep:embassy-executor", "dep:embassy-net", "dep:watch"] + [dependencies] -tokio = { version = "1.17.0", features = ["full"] } -tokio-stream = "0.1.8" -tokio-util = { version = "0.7.0", features = ["codec", "net"] } -bytes = "1.1.0" -futures = "0.3.21" +tokio = { version = "1.17.0", features = ["full"], optional = true } +tokio-stream = { version = "0.1.8", optional = true } +tokio-util = { version = "0.7.0", features = ["codec", "net"], optional = true } +thingbuf = { version = "0.1", default-features = false } +bytes = { version = "1.1.0", default-features = false } +futures = { version = "0.3.21", default-features = false } async-trait = "0.1.53" pin-project = "1.0.10" sync_wrapper = "0.1.1" dyn-clone = "1.0.5" -anyhow = "1.0.56" -thiserror = "1.0.30" -env_logger = "0.9.0" -log = "0.4.16" -coap-lite = "0.9.0" -rand = "0.8.5" +anyhow = { version = "1.0.56", default-features = false } +log = { version = "0.4.16", default-features = false } +coap-lite = { path = "../coap-lite", default-features = false } +rand = { version = "0.8", default-features = false } +hashbrown = "0.12" + +embassy-executor = { path = "../embassy/embassy-executor", default-features = false, optional = true } +embassy-net = { path = "../embassy/embassy-net", default-features = false, optional = true } +oneshot = { version = "0.1", default-features = false, features = ["async"], optional = true } +watch = { path = "../watch", default-features = false, optional = true } + +[target.'cfg(target_os = "none")'.dependencies] +embassy-util = { path = "../embassy/embassy-util", default-features = false, optional = true } + +[target.'cfg(not(target_os = "none"))'.dependencies] +embassy-util = { path = "../embassy/embassy-util", features = ["std"], optional = true } [dev-dependencies] async-stream = "0.3.3" diff --git a/src/app/app_builder.rs b/src/app/app_builder.rs index 6b39636..08eccb7 100644 --- a/src/app/app_builder.rs +++ b/src/app/app_builder.rs @@ -1,5 +1,8 @@ -use std::fmt::Debug; -use std::hash::Hash; +use core::fmt::Debug; +use core::hash::Hash; + +use alloc::vec::Vec; +use rand::Rng; use crate::app::app_handler::AppHandler; use crate::app::ResourceBuilder; @@ -92,10 +95,12 @@ impl AppBuilder { } } -impl - IntoHandler, Endpoint> for AppBuilder +impl< + Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static, + R: Rng + Send + Sync + Clone + 'static, + > IntoHandler, Endpoint, R> for AppBuilder { - fn into_handler(self, mtu: Option) -> AppHandler { - AppHandler::from_builder(self, mtu) + fn into_handler(self, mtu: Option, rng: R) -> AppHandler { + AppHandler::from_builder(self, mtu, rng) } } diff --git a/src/app/app_handler.rs b/src/app/app_handler.rs index 9bbc7dd..98060b3 100644 --- a/src/app/app_handler.rs +++ b/src/app/app_handler.rs @@ -1,15 +1,20 @@ -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::pin::Pin; -use std::sync::Arc; +use alloc::boxed::Box; +use alloc::fmt::Debug; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::pin::Pin; +use core::{hash::Hash, task::Poll}; +use hashbrown::HashMap; +use pin_project::pin_project; +use rand::Rng; use coap_lite::{BlockHandler, CoapRequest, MessageClass, MessageType, Packet}; -use futures::Stream; +#[cfg(feature = "embassy")] +use embassy_util::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex}; +use futures::{Future, Stream}; use log::{debug, warn}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::Mutex; -use tokio_stream::wrappers::UnboundedReceiverStream; +#[cfg(feature = "tokio")] +use tokio::sync::{mpsc::UnboundedSender, Mutex}; use crate::app::app_builder::AppBuilder; use crate::app::block_handler_util::new_block_handler; @@ -29,55 +34,95 @@ pub(crate) const DEFAULT_BLOCK_TRANSFER: bool = true; /// Main PacketHandler for an application suite of handlers. Efficiency and concurrency are /// the primary goals of this implementation, but with the need to balance developer friendliness /// of the main API. -pub struct AppHandler { +pub struct AppHandler { + #[cfg(feature = "tokio")] retransmission_manager: Arc>>, + #[cfg(feature = "embassy")] + retransmission_manager: Arc>>, /// Special internal [`coap_lite::BlockHandler`] that we use only for formatting errors /// that might be larger than MTU. + #[cfg(feature = "tokio")] error_block_handler: Arc>>, + #[cfg(feature = "embassy")] + error_block_handler: Arc>>, /// Full set of handlers registered for this app, grouped by path but searchable using inexact /// matching. See [`PathMatcher`] for more. handlers_by_path: Arc>>, + rng: R, } -impl Clone for AppHandler { +impl Clone for AppHandler { fn clone(&self) -> Self { Self { retransmission_manager: self.retransmission_manager.clone(), error_block_handler: self.error_block_handler.clone(), handlers_by_path: self.handlers_by_path.clone(), + rng: self.rng.clone(), } } } -impl PacketHandler - for AppHandler +#[pin_project] +struct PacketStream { + #[pin] + fut: F, + response: Vec, + fut_complete: bool, +} + +impl> + Send> Stream for PacketStream { + type Item = Packet; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let this = self.project(); + + if !*this.fut_complete { + match this.fut.poll(cx) { + Poll::Ready(response) => { + *this.response = response; + *this.fut_complete = true; + // FIXME: should use some more efficient container + this.response.reverse(); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + + Poll::Ready(this.response.pop()) + } +} + +impl + PacketHandler for AppHandler { fn handle<'a>( &'a self, packet: Packet, peer: Endpoint, ) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - - // TODO: This spawn is technically unnecessary as we could implement a Stream ourselves - // similar to how async-stream crate does it, but the boiler plate doesn't really seem - // worth it for now. - tokio::spawn({ - let cloned_self = self.clone(); - async move { - cloned_self.handle_packet(tx, packet, peer).await; - } - }); - Box::pin(UnboundedReceiverStream::new(rx)) + let handler = self.handle_packet(packet, peer); + Box::pin(PacketStream { + fut: handler, + response: vec![], + fut_complete: false, + }) } } -impl AppHandler { - pub fn from_builder(builder: AppBuilder, mtu: Option) -> Self { +impl + AppHandler +{ + pub fn from_builder(builder: AppBuilder, mtu: Option, mut rng: R) -> Self { let retransmission_manager = Arc::new(Mutex::new(RetransmissionManager::new( TransmissionParameters::default(), + &mut rng, ))); let build_params = BuildParameters { @@ -110,16 +155,20 @@ impl AppHandler, packet: Packet, peer: Endpoint) { + async fn handle_packet(&self, packet: Packet, peer: Endpoint) -> Vec { match packet.header.code { MessageClass::Request(_) => { - self.handle_get(tx, packet, peer).await; + let mut packets = vec![]; + self.handle_get(&mut packets, packet, peer).await; + packets } MessageClass::Response(_) => { warn!("Spurious response message from {peer:?}, ignoring..."); + vec![] } MessageClass::Empty => { match packet.header.get_type() { @@ -133,26 +182,30 @@ impl AppHandler { // A common way in CoAP to trigger a cheap "ping" to make sure // the server is alive. - tx.send(new_pong_message(&packet)).unwrap(); + vec![new_pong_message(&packet)] } MessageType::NonConfirmable => { debug!("Ignoring Non-Confirmable Empty message from {peer:?}"); + vec![] } } } code => { warn!("Unhandled message code {code} from {peer:?}, ignoring..."); + vec![] } } } - async fn handle_get(&self, tx: UnboundedSender, packet: Packet, peer: Endpoint) { + async fn handle_get(&self, out: &mut Vec, packet: Packet, peer: Endpoint) { let mut request = CoapRequest::from_packet(packet, peer); - if let Err(e) = self.try_handle_get(&tx, &mut request).await { + if let Err(e) = self.try_handle_get(out, &mut request).await { if request.apply_from_error(e.into_handling_error()) { // If the error happens to need block2 handling, let's do that here... let _ = self @@ -160,14 +213,14 @@ impl AppHandler, + out: &mut Vec, request: &mut CoapRequest, ) -> Result<(), CoapError> { let paths = request.get_path_as_vec().map_err(CoapError::bad_request)?; @@ -193,7 +246,8 @@ impl AppHandler Err(CoapError::not_found()), } diff --git a/src/app/coap_utils.rs b/src/app/coap_utils.rs index 2499c20..ef4eb12 100644 --- a/src/app/coap_utils.rs +++ b/src/app/coap_utils.rs @@ -1,5 +1,7 @@ -use std::collections::HashMap; +use hashbrown::HashMap; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; use coap_lite::error::IncompatibleOptionValueFormat; use coap_lite::option_value::OptionValueType; use coap_lite::{CoapOption, CoapRequest, MessageClass, MessageType, Packet}; diff --git a/src/app/core_handler.rs b/src/app/core_handler.rs index 417f905..3e79ddb 100644 --- a/src/app/core_handler.rs +++ b/src/app/core_handler.rs @@ -1,7 +1,10 @@ -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; +use alloc::boxed::Box; +use alloc::string::String; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::fmt::Debug; +use core::hash::Hash; +use hashbrown::HashMap; use async_trait::async_trait; use coap_lite::{ContentFormat, ResponseType}; diff --git a/src/app/core_link.rs b/src/app/core_link.rs index c907cf0..800be19 100644 --- a/src/app/core_link.rs +++ b/src/app/core_link.rs @@ -1,9 +1,12 @@ use crate::app::resource_builder::DiscoverableResource; +use alloc::boxed::Box; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; use coap_lite::link_format::{LinkAttributeWrite, LinkFormatWrite}; use coap_lite::ContentFormat; +use core::fmt::{Debug, Error, Write}; use dyn_clone::DynClone; -use std::collections::HashMap; -use std::fmt::{Debug, Error, Write}; +use hashbrown::HashMap; #[derive(Default, Debug)] pub struct CoreLink { diff --git a/src/app/error.rs b/src/app/error.rs index 6fce9e0..1bcd090 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -1,6 +1,7 @@ -use std::fmt::Debug; -use std::{error, fmt}; +use core::fmt; +use core::fmt::Debug; +use alloc::string::{String, ToString}; use coap_lite::error::HandlingError; use coap_lite::ResponseType; @@ -51,7 +52,8 @@ impl fmt::Display for CoapError { } } -impl error::Error for CoapError {} +#[cfg(feature = "std")] +impl std::error::Error for CoapError {} impl From for CoapError { fn from(src: HandlingError) -> Self { diff --git a/src/app/mod.rs b/src/app/mod.rs index c8d97ab..3891fe4 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -1,13 +1,16 @@ pub use app_builder::AppBuilder; +use core::fmt::Debug; +use core::hash::Hash; pub use error::CoapError; +#[cfg(feature = "observable")] pub use observable_resource::ObservableResource; +#[cfg(feature = "observable")] pub use observers::Observers; +#[cfg(feature = "observable")] pub use observers::ObserversHolder; pub use request::Request; pub use resource_builder::ResourceBuilder; pub use response::Response; -use std::fmt::Debug; -use std::hash::Hash; pub mod app_builder; pub mod app_handler; @@ -16,8 +19,11 @@ mod coap_utils; mod core_handler; mod core_link; pub mod error; +#[cfg(feature = "observable")] pub mod observable_resource; +#[cfg(feature = "observable")] mod observe_handler; +#[cfg(feature = "observable")] mod observers; mod path_matcher; pub mod request; diff --git a/src/app/observable_resource.rs b/src/app/observable_resource.rs index ac5aa4e..90b65aa 100644 --- a/src/app/observable_resource.rs +++ b/src/app/observable_resource.rs @@ -1,3 +1,4 @@ +use alloc::boxed::Box; use async_trait::async_trait; use crate::app::observers::Observers; diff --git a/src/app/observe_handler.rs b/src/app/observe_handler.rs index 8ef69a3..282ccba 100644 --- a/src/app/observe_handler.rs +++ b/src/app/observe_handler.rs @@ -3,14 +3,21 @@ //! Supports handling incoming requests and manipulating outgoing responses transparently to //! semi-transparently handle Observe requests from clients in a reasonable and consistent way. -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; +use alloc::borrow::ToOwned; +use alloc::boxed::Box; +use alloc::string::String; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::fmt::Debug; +use core::hash::Hash; +use hashbrown::HashMap; use coap_lite::{CoapRequest, ObserveOption}; +#[cfg(feature = "embassy")] +use embassy_util::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex}; use log::{debug, warn}; -use rand::RngCore; +use rand::Rng; +#[cfg(feature = "tokio")] use tokio::sync::{oneshot, watch, Mutex}; use crate::app::observers::NotificationState; @@ -22,7 +29,11 @@ use crate::app::{CoapError, ObservableResource, Observers}; pub struct ObserveHandler { path_prefix: String, resource: Arc>, + #[cfg(feature = "tokio")] registrations_by_path: Arc>>>, + #[cfg(feature = "embassy")] + registrations_by_path: + Arc>>>, } #[derive(Debug)] @@ -57,9 +68,10 @@ impl ObserveHandler { } } - pub async fn maybe_process_registration( + pub async fn maybe_process_registration( &self, request_response_pair: &mut CoapRequest, + rng: &mut R, ) -> Result { let observe_flag_result = request_response_pair.get_observe_flag(); if observe_flag_result.is_none() { @@ -99,7 +111,7 @@ impl ObserveHandler { ObserveOption::Register => { let for_path = by_path.entry(path).or_insert_with_key(|path_key| { let mut u24_bytes = [0u8; 3]; - rand::thread_rng().fill_bytes(&mut u24_bytes); + rng.fill_bytes(&mut u24_bytes); let relative_path = path_key.replace(&self.path_prefix, ""); let observers = Observers::new( key_from_path(&relative_path), @@ -109,16 +121,17 @@ impl ObserveHandler { let self_clone = self.to_owned(); let path_key_clone = path_key.clone(); - tokio::spawn(async move { + /*tokio::spawn(async move { self_clone - .handle_on_active_lifecycle(path_key_clone, observers) + .handle_on_active_lifecycle(path_key_cflone, observers) .await; }); RegistrationsForPath { registrations: HashMap::new(), notify_change_tx, - } + }*/ + todo!() }); let notify_rx = for_path.notify_change_tx.subscribe(); diff --git a/src/app/observers.rs b/src/app/observers.rs index fee0cf6..c1647e5 100644 --- a/src/app/observers.rs +++ b/src/app/observers.rs @@ -1,6 +1,11 @@ use crate::app::path_matcher::{key_from_path, PathMatcher}; use crate::app::u24::u24; -use std::sync::Arc; +use alloc::string::String; +use alloc::sync::Arc; +use alloc::vec::Vec; +#[cfg(feature = "embassy")] +use embassy_util::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex}; +#[cfg(feature = "tokio")] use tokio::sync::{watch, Mutex, RwLock}; /// Optional convenience mechanism to aid in managing dynamic [`Observers`] instances. @@ -20,13 +25,16 @@ use tokio::sync::{watch, Mutex, RwLock}; /// } /// } /// ``` -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ObserversHolder { + #[cfg(feature = "tokio")] inner: Arc>>>, + // Embassy has no RwLock currently. + #[cfg(feature = "embassy")] + inner: Arc>>>, } /// Handle that can be used to inform the server when changes are detected. -#[derive(Debug)] pub struct Observers { relative_path_key: Vec, @@ -34,7 +42,10 @@ pub struct Observers { /// This will become the Observe value (i.e. the sequence number) if one is not provided /// by the handler directly. + #[cfg(feature = "tokio")] change_num: Arc>, + #[cfg(feature = "embassy")] + change_num: Arc>, } #[derive(Debug, Copy, Clone)] @@ -44,23 +55,38 @@ pub enum NotificationState { } impl ObserversHolder { + #[cfg(feature = "tokio")] pub fn new() -> Self { Self { inner: Arc::new(RwLock::new(PathMatcher::new_empty())), } } + #[cfg(feature = "embassy")] + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(PathMatcher::new_empty())), + } + } + /// Attach a new [`Observers`] instance which affects how [`notify_change`] behaves. pub async fn attach(&self, observers: Observers) -> Attached<'_> { let key = observers.relative_path_key.clone(); let observers_arc = Arc::new(observers); - self.inner.write().await.insert(key.clone(), observers_arc.clone()); - Attached { key, value: observers_arc, holder: self } + self.inner + .lock() + .await + .insert(key.clone(), observers_arc.clone()); + Attached { + key, + value: observers_arc, + holder: self, + } } /// Defers to [`Observers::notify_change`] when attached; does nothing otherwise. pub async fn notify_change(&self) { - for observers in self.inner.read().await.values() { + for observers in self.inner.lock().await.values() { observers.notify_change().await; } } @@ -80,7 +106,7 @@ impl ObserversHolder { pub async fn notify_change_for_path(&self, relative_path: &str) { for result in self .inner - .read() + .lock() .await .match_all(&key_from_path(relative_path)) { @@ -104,8 +130,11 @@ impl<'a> Attached<'a> { /// Detach and return the owned [`Observers`] instance, meant to be sent back to /// [`crate::app::ObservableResource::on_active`]. pub async fn detach(self) -> Observers { - self.holder.inner.write().await.remove(&self.key).unwrap(); - Arc::try_unwrap(self.value).unwrap() + self.holder.inner.lock().await.remove(&self.key).unwrap(); + match Arc::try_unwrap(self.value) { + Ok(v) => v, + Err(_) => panic!("detach error"), + } } } diff --git a/src/app/path_matcher.rs b/src/app/path_matcher.rs index f66d78e..5a863c7 100644 --- a/src/app/path_matcher.rs +++ b/src/app/path_matcher.rs @@ -1,5 +1,7 @@ -use std::collections::hash_map::Values; -use std::collections::HashMap; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; +use hashbrown::hash_map::Values; +use hashbrown::HashMap; /// Lookup mechanism that uses inexact matching of input paths by finding the most specific /// match and returning that instead. See [`PathMatcher::lookup`] for more information. @@ -16,6 +18,7 @@ impl FromIterator<(Vec, V)> for PathMatcher { } } +#[allow(dead_code)] impl PathMatcher { pub fn new_empty() -> Self { Self { diff --git a/src/app/request.rs b/src/app/request.rs index 2e165f0..6ae7421 100644 --- a/src/app/request.rs +++ b/src/app/request.rs @@ -1,3 +1,4 @@ +use alloc::{string::String, vec::Vec}; use coap_lite::{CoapRequest, CoapResponse, RequestType, ResponseType}; use crate::app::response::Response; diff --git a/src/app/request_handler.rs b/src/app/request_handler.rs index b24441f..295acd3 100644 --- a/src/app/request_handler.rs +++ b/src/app/request_handler.rs @@ -1,7 +1,8 @@ use crate::app::{CoapError, Request, Response}; +use alloc::boxed::Box; use async_trait::async_trait; +use core::future::Future; use dyn_clone::DynClone; -use std::future::Future; #[async_trait] pub trait RequestHandler: DynClone + 'static { diff --git a/src/app/resource_builder.rs b/src/app/resource_builder.rs index 281abf3..1830b05 100644 --- a/src/app/resource_builder.rs +++ b/src/app/resource_builder.rs @@ -1,21 +1,27 @@ -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; - +use alloc::boxed::Box; +use alloc::string::{String, ToString}; +use alloc::sync::Arc; +use core::fmt::Debug; +use core::hash::Hash; +use hashbrown::HashMap; + +#[cfg(feature = "observable")] use coap_lite::link_format::LINK_ATTR_OBSERVABLE; use coap_lite::RequestType; +#[cfg(feature = "embassy")] +use embassy_util::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex}; +#[cfg(feature = "tokio")] use tokio::sync::Mutex; use crate::app::app_builder::ConfigBuilder; use crate::app::block_handler_util::new_block_handler; use crate::app::core_link::{CoreLink, LinkAttributeValue}; -use crate::app::observable_resource::ObservableResource; -use crate::app::observe_handler::ObserveHandler; use crate::app::request_handler::RequestHandler; use crate::app::request_type_key::RequestTypeKey; use crate::app::resource_handler::ResourceHandler; use crate::app::retransmission_manager::RetransmissionManager; +#[cfg(feature = "observable")] +use crate::app::{observable_resource::ObservableResource, observe_handler::ObserveHandler}; /// Configure a specific resource handler, potentially with distinct per-method handlers. pub struct ResourceBuilder { @@ -23,6 +29,7 @@ pub struct ResourceBuilder { config: ConfigBuilder, attributes: CoreLink, handlers: HashMap + Send + Sync>>, + #[cfg(feature = "observable")] observable: Option>, } @@ -33,6 +40,7 @@ impl ResourceBuilder config: ConfigBuilder::default(), attributes: CoreLink::new(path), handlers: HashMap::new(), + #[cfg(feature = "observable")] observable: None, } } @@ -43,6 +51,12 @@ impl ResourceBuilder self } + /// See [`crate::app::AppBuilder::enable_block_transfer`]. + pub fn enable_block_transfer(mut self) -> Self { + self.config.block_transfer = Some(true); + self + } + /// See [`crate::app::AppBuilder::disable_block_transfer`]. pub fn disable_block_transfer(mut self) -> Self { self.config.block_transfer = Some(false); @@ -129,11 +143,13 @@ impl ResourceBuilder /// updates to be delivered to registered observers. /// /// For more information, see [RFC 7641](https://datatracker.ietf.org/doc/html/rfc7641) + #[cfg(feature = "observable")] pub fn observable(mut self, observable: impl ObservableResource + Send + Sync) -> Self { self.observable = Some(Box::new(observable)); self.link_attr(LINK_ATTR_OBSERVABLE, ()) } + #[cfg(feature = "observable")] pub(crate) fn build(self, params: BuildParameters) -> Resource { let discoverable = if self.config.discoverable.unwrap_or(true) { Some(DiscoverableResource::from(self.attributes)) @@ -167,12 +183,46 @@ impl ResourceBuilder handler, } } + + #[cfg(not(feature = "observable"))] + pub(crate) fn build(self, params: BuildParameters) -> Resource { + let discoverable = if self.config.discoverable.unwrap_or(true) { + Some(DiscoverableResource::from(self.attributes)) + } else { + None + }; + + let block_transfer = self + .config + .block_transfer + .unwrap_or(crate::app::app_handler::DEFAULT_BLOCK_TRANSFER); + let block_handler = if block_transfer { + Some(Arc::new(Mutex::new(new_block_handler(params.mtu)))) + } else { + None + }; + + let handler = ResourceHandler { + handlers: self.handlers, + block_handler, + retransmission_manager: params.retransmission_manager, + }; + Resource { + path: self.path, + discoverable, + handler, + } + } } #[derive(Clone)] pub(crate) struct BuildParameters { pub mtu: Option, + #[cfg(feature = "tokio")] pub retransmission_manager: Arc>>, + #[cfg(feature = "embassy")] + pub retransmission_manager: + Arc>>, } #[derive(Clone)] diff --git a/src/app/resource_handler.rs b/src/app/resource_handler.rs index 15b7de4..6763d37 100644 --- a/src/app/resource_handler.rs +++ b/src/app/resource_handler.rs @@ -1,15 +1,19 @@ -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; +use alloc::boxed::Box; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::fmt::Debug; +use core::hash::Hash; +use hashbrown::HashMap; +use rand::Rng; -use coap_lite::{BlockHandler, CoapOption, CoapRequest, MessageType, Packet}; -use log::debug; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::Mutex; +use coap_lite::{BlockHandler, CoapRequest, Packet}; +#[cfg(feature = "embassy")] +use embassy_util::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex}; +#[cfg(feature = "tokio")] +use tokio::sync::{mpsc::UnboundedSender, Mutex}; +#[cfg(feature = "observable")] use crate::app::observe_handler::{ObserveHandler, RegistrationEvent}; -use crate::app::observers::NotificationState; use crate::app::request_handler::RequestHandler; use crate::app::request_type_key::RequestTypeKey; use crate::app::retransmission_manager::RetransmissionManager; @@ -17,9 +21,19 @@ use crate::app::{CoapError, Request}; pub struct ResourceHandler { pub handlers: HashMap + Send + Sync>>, + #[cfg(all(feature = "tokio", feature = "observable"))] pub observe_handler: Option>>>, + #[cfg(feature = "tokio")] pub block_handler: Option>>>, + #[cfg(feature = "tokio")] pub retransmission_manager: Arc>>, + #[cfg(all(feature = "embassy", feature = "observable"))] + pub observe_handler: Option>>>, + #[cfg(feature = "embassy")] + pub block_handler: Option>>>, + #[cfg(feature = "embassy")] + pub retransmission_manager: + Arc>>, } impl Clone for ResourceHandler { @@ -31,6 +45,7 @@ impl Clone for ResourceHandler Clone for ResourceHandler ResourceHandler { - pub async fn handle( + pub async fn handle( &self, - tx: &UnboundedSender, + out: &mut Vec, wrapped_request: Request, + rng: &mut R, ) -> Result<(), CoapError> { let method = *wrapped_request.original.get_method(); let method_handler = self @@ -53,7 +69,7 @@ impl ResourceHandler // Loop here so we can "park" to wait for notify_change calls from an Observers // instances. For non-observe cases, this loop breaks after its first iteration. match method_handler { - Some(handler) => self.do_handle(handler, tx, wrapped_request).await, + Some(handler) => self.do_handle(handler, out, wrapped_request, rng).await, None => Err(CoapError::method_not_allowed()), } } @@ -62,11 +78,13 @@ impl ResourceHandler // these parts are especially expensive as it contains the request/response payloads and this // can be avoided by rethinking the Request/Response type system a bit and divorcing ourselves // from CoapRequest/CoapResponse. - async fn do_handle( + #[cfg(feature = "observable")] + async fn do_handle( &self, handler: &Box + Send + Sync>, - tx: &UnboundedSender, + out: &mut Vec, wrapped_request: Request, + rng: &mut R, ) -> Result<(), CoapError> { let mut initial_pair = wrapped_request.original.clone(); if !self.maybe_handle_block_request(&mut initial_pair).await? { @@ -80,15 +98,14 @@ impl ResourceHandler fut.await? } let registration = self - .maybe_handle_observe_registration(&mut initial_pair) + .maybe_handle_observe_registration(&mut initial_pair, rng) .await?; - tx.send(initial_pair.response.as_ref().unwrap().message.clone()) - .unwrap(); + out.push(initial_pair.response.as_ref().unwrap().message.clone()); if let RegistrationEvent::Registered(mut receiver) = registration { debug!("Observe initiated by {:?}", initial_pair.source); loop { - tokio::select! { + futures_util::select! { _ = &mut receiver.termination_rx => { debug!("Observe terminated by peer: {:?}", initial_pair.source); break @@ -149,6 +166,30 @@ impl ResourceHandler Ok(()) } + #[cfg(not(feature = "observable"))] + async fn do_handle( + &self, + handler: &Box + Send + Sync>, + out: &mut Vec, + wrapped_request: Request, + _rng: &mut R, + ) -> Result<(), CoapError> { + let mut initial_pair = wrapped_request.original.clone(); + if !self.maybe_handle_block_request(&mut initial_pair).await? { + let fut = { + self.generate_and_assign_response( + handler, + &mut initial_pair, + wrapped_request.clone(), + ) + }; + fut.await? + } + out.push(initial_pair.response.as_ref().unwrap().message.clone()); + + Ok(()) + } + async fn generate_and_assign_response( &self, handler: &Box + Send + Sync>, @@ -185,15 +226,17 @@ impl ResourceHandler } } - async fn maybe_handle_observe_registration( + #[cfg(feature = "observable")] + async fn maybe_handle_observe_registration( &self, request: &mut CoapRequest, + rng: &mut R, ) -> Result { if let Some(observe_handler) = &self.observe_handler { observe_handler .lock() .await - .maybe_process_registration(request) + .maybe_process_registration(request, rng) .await } else { Ok(RegistrationEvent::NoChange) diff --git a/src/app/retransmission_manager.rs b/src/app/retransmission_manager.rs index 1c3d132..8d0c767 100644 --- a/src/app/retransmission_manager.rs +++ b/src/app/retransmission_manager.rs @@ -1,17 +1,23 @@ +#![allow(dead_code)] + +use alloc::string::String; use anyhow::anyhow; -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::ops::RangeInclusive; -use std::time::Duration; +use core::fmt::{self, Debug}; +use core::hash::Hash; +use core::ops::RangeInclusive; +use core::time::Duration; +use hashbrown::HashMap; use coap_lite::{MessageType, Packet}; -use log::debug; +#[cfg(feature = "embassy")] +use embassy_util::channel::mpmc::DynamicSender as UnboundedSender; use rand::Rng; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::watch; -use tokio::time; -use tokio::time::Instant; +#[cfg(feature = "tokio")] +use tokio::{ + sync::mpsc::UnboundedSender, + sync::watch, + time::{self, Instant}, +}; pub type MessageId = u16; @@ -48,9 +54,10 @@ enum ReplyEvent { } impl RetransmissionManager { - pub fn new(parameters: TransmissionParameters) -> Self { + pub fn new(parameters: TransmissionParameters, rng: &mut R) -> Self { + let next_message_id = rng.gen(); Self { - next_message_id: rand::thread_rng().gen(), + next_message_id, unacknowledged_messages: Default::default(), parameters, } @@ -81,12 +88,12 @@ impl RetransmissionManager { /// /// Note that this method mutates the packet that is to be sent to ensure it is Confirmable /// and has an appropriate message ID. This ensures that the method is infallible. - pub fn send_reliably( + pub fn send_reliably<'r>( &mut self, mut packet: Packet, peer: Endpoint, - packet_tx: UnboundedSender, - ) -> SendReliably { + packet_tx: UnboundedSender<'r, Packet>, + ) -> SendReliably<'r, Endpoint> { packet.header.message_id = self.next_message_id; self.next_message_id = self.next_message_id.wrapping_add(1); packet.header.set_type(MessageType::Confirmable); @@ -155,21 +162,22 @@ impl TransmissionParameters { } #[must_use = "don't forget to call into_future() and await it!"] -pub struct SendReliably { +pub struct SendReliably<'a, Endpoint> { packet: Packet, peer: Endpoint, - packet_tx: UnboundedSender, + packet_tx: UnboundedSender<'a, Packet>, parameters: TransmissionParameters, reply_rx: watch::Receiver, } -impl SendReliably { +impl SendReliably<'_, Endpoint> { pub fn get_message_id(&self) -> MessageId { self.packet.header.message_id } - pub async fn into_future(self) -> Result<(), SendFailed> { - let mut next_timeout = rand::thread_rng().gen_range(self.parameters.ack_timeout_range()); + #[cfg(feature = "observable")] + pub async fn into_future(self, mut rng: R) -> Result<(), SendFailed> { + let mut next_timeout = rng.gen_range(self.parameters.ack_timeout_range()); for attempt in 0..=self.parameters.max_retransmit { if attempt > 0 { let retransmits = attempt - 1; @@ -177,10 +185,11 @@ impl SendReliably { let peer = &self.peer; debug!("Attempting retransmission #{retransmits} of message ID {message_id} to {peer:?}"); } - self.packet_tx - .send(self.packet.clone()) - .map_err(anyhow::Error::msg)?; - let deadline = Instant::now() + next_timeout; + self.packet_tx.send(self.packet.clone()).await; + let deadline = Instant::now() + + embassy_executor::time::Duration::from_micros( + next_timeout.as_micros().try_into().unwrap(), + ); next_timeout *= 2; loop { let mut reply_rx = self.reply_rx.clone(); @@ -207,19 +216,29 @@ impl SendReliably { } } -#[derive(thiserror::Error, Debug)] +#[derive(Debug)] pub enum SendFailed { - #[error("no remote reply after {0} attempts")] NoReply(usize), - - #[error("reset message received")] Reset, + TransmissionError(anyhow::Error), + InternalError(String), +} - #[error(transparent)] - TransmissionError(#[from] anyhow::Error), +impl fmt::Display for SendFailed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NoReply(attempts) => write!(f, "no remote reply after {} attempts", attempts), + Self::Reset => write!(f, "reset message received"), + Self::TransmissionError(err) => write!(f, "{}", err), + Self::InternalError(err) => write!(f, "internal error: {}", err), + } + } +} - #[error("internal error: {0}")] - InternalError(String), +impl From for SendFailed { + fn from(err: anyhow::Error) -> Self { + Self::TransmissionError(err) + } } impl MessageKey { @@ -258,7 +277,7 @@ mod tests { let result = { let handle = manager.send_reliably(sent_packet, &TestEndpoint(123), packet_tx); message_id = Some(handle.get_message_id()); - handle.into_future().await + handle.into_future(rand::thread_rng()).await }; if let Err(SendFailed::NoReply(2)) = result { @@ -292,7 +311,7 @@ mod tests { manager .maybe_handle_reply(ack_packet, &TestEndpoint(123)) .unwrap(); - handle.into_future().await + handle.into_future(rand::thread_rng()).await }; result.unwrap(); @@ -316,7 +335,7 @@ mod tests { manager .maybe_handle_reply(reset_packet, &TestEndpoint(123)) .unwrap(); - handle.into_future().await + handle.into_future(rand::thread_rng()).await }; if let Err(SendFailed::Reset) = result { diff --git a/src/app/u24.rs b/src/app/u24.rs index 2431043..b8523b9 100644 --- a/src/app/u24.rs +++ b/src/app/u24.rs @@ -1,7 +1,7 @@ //! Bit of a toy implementation of a "proper" u24 representation. Isn't learning Rust fun? :) -use std::fmt::{Debug, Display, Formatter}; -use std::ops::{Add, AddAssign}; +use core::fmt::{Debug, Display, Formatter}; +use core::ops::{Add, AddAssign}; #[derive(Copy, Clone, PartialEq, Eq)] #[allow(non_camel_case_types)] @@ -38,13 +38,13 @@ impl u24 { } impl Display for u24 { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { Display::fmt(&self.0, f) } } impl Debug for u24 { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { Debug::fmt(&self.0, f) } } @@ -127,7 +127,7 @@ impl From for u64 { pub struct TryFromCustomIntError; impl Display for TryFromCustomIntError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { f.write_str("out of range integral type conversion attempted") } } diff --git a/src/lib.rs b/src/lib.rs index 4fc4c44..494f850 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,14 +31,20 @@ //! //! See other [examples](https://github.com/jasta/coap-server-rs/tree/main/examples) for more information. +#![no_std] +#![feature(type_alias_impl_trait)] + +#[macro_use] +extern crate alloc; +#[macro_use] extern crate core; pub use server::CoapServer; pub use server::FatalServerError; -pub use udp::UdpTransport; +// pub use udp::UdpTransport; pub mod app; pub mod packet_handler; pub mod server; pub mod transport; -pub mod udp; +// pub mod udp; diff --git a/src/packet_handler.rs b/src/packet_handler.rs index ed6f64a..8fffd98 100644 --- a/src/packet_handler.rs +++ b/src/packet_handler.rs @@ -1,7 +1,9 @@ -use std::pin::Pin; +use core::pin::Pin; +use alloc::boxed::Box; use coap_lite::Packet; use futures::Stream; +use rand::Rng; /// "Low-level" raw packet handler intended to support the full range of CoAP features. This /// is little more than a callback informing the user that a packet has arrived, allowing for @@ -17,18 +19,20 @@ pub trait PacketHandler: Clone { ) -> Pin + Send + 'a>>; } -pub trait IntoHandler +pub trait IntoHandler where Handler: PacketHandler + Send + 'static, + R: Rng + Send + Clone, { - fn into_handler(self, mtu: Option) -> Handler; + fn into_handler(self, mtu: Option, rng: R) -> Handler; } -impl IntoHandler for Handler +impl IntoHandler for Handler where Handler: PacketHandler + Send + 'static, + R: Rng + Send + Clone, { - fn into_handler(self, _mtu: Option) -> Handler { + fn into_handler(self, _mtu: Option, _rng: R) -> Handler { self } } diff --git a/src/server.rs b/src/server.rs index 1875403..0b10ddd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,24 +1,35 @@ -use std::fmt::Debug; -use std::pin::Pin; +use core::fmt::{self, Debug}; +use core::pin::Pin; +use alloc::boxed::Box; +use alloc::string::{String, ToString}; use coap_lite::Packet; +use embassy_executor::executor::Spawner; +use embassy_util::Either; +#[cfg(feature = "embassy")] +use embassy_util::{ + blocking_mutex::raw::CriticalSectionRawMutex, + channel::mpmc::{DynamicReceiver, DynamicSender}, +}; use futures::stream::Fuse; use futures::{SinkExt, StreamExt}; use log::{error, trace, warn}; +use rand::Rng; +#[cfg(feature = "tokio")] use tokio::sync::mpsc::{Receiver, Sender}; use crate::packet_handler::{IntoHandler, PacketHandler}; use crate::transport::{FramedBinding, FramedItem, FramedReadError, Transport, TransportError}; /// Primary server API to configure, bind, and ultimately run the CoAP server. -pub struct CoapServer { +pub struct CoapServer<'a, Handler, Endpoint> { binding: Fuse>>>, - packet_relay_rx: Receiver>, - packet_relay_tx: Sender>, + packet_relay_rx: DynamicReceiver<'a, FramedItem>, + packet_relay_tx: DynamicSender<'a, FramedItem>, handler: Option, } -impl CoapServer +impl<'a, Handler, Endpoint: Debug + Send + Clone + 'static> CoapServer<'a, Handler, Endpoint> where Handler: PacketHandler + Send + 'static, { @@ -26,9 +37,17 @@ where /// customers will wish to use [`crate::udp::UdpTransport`]. pub async fn bind>( transport: T, - ) -> Result { + ) -> Result, TransportError> { let binding = transport.bind().await?; - let (packet_tx, packet_rx) = tokio::sync::mpsc::channel(32); + let channel = Box::new(embassy_util::channel::mpmc::Channel::< + CriticalSectionRawMutex, + _, + 32, + >::new()); + // FIXME: avoid memory leak + let channel = Box::leak(channel); + let packet_tx = channel.sender().into(); + let packet_rx = channel.receiver().into(); Ok(Self { binding: binding.fuse(), packet_relay_rx: packet_rx, @@ -41,33 +60,34 @@ where /// encounters unrecoverable issues, typically due to programmer error in this crate itself /// or transport errors not related to a specific peer. The intention is that this crate /// should be highly reliable and run indefinitely for properly configured use cases. - pub async fn serve( + pub async fn serve( mut self, - handler: impl IntoHandler, + handler: impl IntoHandler, + rng: R, ) -> Result<(), FatalServerError> { let mtu = self.binding.get_ref().mtu(); - self.handler = Some(handler.into_handler(mtu)); + self.handler = Some(handler.into_handler(mtu, rng)); + let spawner = embassy_executor::executor::Spawner::for_current_executor().await; loop { - tokio::select! { - event = self.binding.select_next_some() => { - self.handle_rx_event(event)?; - } - Some(item) = self.packet_relay_rx.recv() => { - self.handle_packet_relay(item).await; - } + match embassy_util::select(self.binding.select_next_some(), self.packet_relay_rx.recv()) + .await + { + Either::First(event) => self.handle_rx_event(event, spawner).await?, + Either::Second(item) => self.handle_packet_relay(item).await, } } } - fn handle_rx_event( + async fn handle_rx_event( &self, result: Result, FramedReadError>, + spawner: Spawner, ) -> Result<(), FatalServerError> { match result { Ok((packet, peer)) => { trace!("Incoming packet from {peer:?}: {packet:?}"); - self.do_handle_request(packet, peer)? + self.do_handle_request(packet, peer, spawner).await? } Err((transport_err, peer)) => { warn!("Error from {peer:?}: {transport_err}"); @@ -80,7 +100,12 @@ where Ok(()) } - fn do_handle_request(&self, packet: Packet, peer: Endpoint) -> Result<(), FatalServerError> { + async fn do_handle_request( + &self, + packet: Packet, + peer: Endpoint, + _spawner: Spawner, + ) -> Result<(), FatalServerError> { let handler = self .handler .as_ref() @@ -91,23 +116,22 @@ where packet, peer, ); - tokio::spawn(reply_stream); + // FIXME: should spawn the task onto executor but embassy can spawn only + // static futures. + reply_stream.await; Ok(()) } async fn gen_and_send_responses( handler: Handler, - packet_tx: Sender>, + packet_tx: DynamicSender<'_, FramedItem>, packet: Packet, peer: Endpoint, ) { let mut stream = handler.handle(packet, peer.clone()); while let Some(response) = stream.next().await { let cloned_peer = peer.clone(); - packet_tx - .send((response, cloned_peer)) - .await - .expect("packet_rx closed?"); + packet_tx.send((response, cloned_peer)).await; } } @@ -122,14 +146,27 @@ where /// Fatal error preventing the server from starting or continuing. Typically the result of /// programmer error or misconfiguration. -#[derive(thiserror::Error, Debug)] +#[derive(Debug)] pub enum FatalServerError { /// Programmer error within this crate, file a bug! - #[error("internal error: {0}")] InternalError(String), /// Transport error that is not related to any individual peer but would prevent any future /// packet exchanges on the transport. Must abort the server. - #[error("fatal transport error: {0}")] - Transport(#[from] TransportError), + Transport(TransportError), +} + +impl fmt::Display for FatalServerError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InternalError(err) => write!(f, "internal error: {}", err), + Self::Transport(err) => write!(f, "fatal transport error: {}", err), + } + } +} + +impl From for FatalServerError { + fn from(err: TransportError) -> Self { + Self::Transport(err) + } } diff --git a/src/transport.rs b/src/transport.rs index 31eb71c..652f89c 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,10 +1,10 @@ +use alloc::{boxed::Box, string::String}; use async_trait::async_trait; use coap_lite::error::MessageError; use coap_lite::Packet; +use core::fmt::{self, Debug}; +use core::pin::Pin; use futures::{Sink, Stream}; -use std::fmt::Debug; -use std::io; -use std::pin::Pin; /// Generalization of the underlying CoAP transport, intended primarily to make it easy to support a /// wide range of protocols (TCP, DTLS, websockets, BLE, etc) but also to eventually support @@ -60,14 +60,34 @@ pub type FramedWriteError = TransportError; /// Generalized errors indicating a range of transport-related issues such as being unable to bind, /// disconnections from remote peers, malformed input, etc. Most of these errors are non-fatal /// and the server can happily continue serving other customers. -#[derive(thiserror::Error, Debug)] +#[derive(Debug)] pub enum TransportError { - #[error("generic I/O error")] - IoError(#[from] Option), + #[cfg(feature = "std")] + IoError(Option), + MalformedPacket(MessageError), + Unspecified(String), +} + +impl fmt::Display for TransportError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + #[cfg(feature = "std")] + Self::IoError(_) => write!(f, "generic I/O error"), + Self::MalformedPacket(_) => write!(f, "packet was malformed"), + Self::Unspecified(err) => write!(f, "unspecified: {}", err), + } + } +} - #[error("packet was malformed")] - MalformedPacket(#[from] MessageError), +#[cfg(feature = "std")] +impl From> for TransportError { + fn from(x: Option) -> Self { + Self::IoError(x) + } +} - #[error("unspecified: {0}")] - Unspecified(String), +impl From for TransportError { + fn from(x: MessageError) -> Self { + Self::MalformedPacket(x) + } }