Skip to content

Commit

Permalink
Major surgery to add Observe support
Browse files Browse the repository at this point in the history
  • Loading branch information
jasta committed Apr 28, 2022
1 parent 5a4e0d6 commit 6f2137c
Show file tree
Hide file tree
Showing 19 changed files with 1,208 additions and 197 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ thiserror = "1.0.30"
env_logger = "0.9.0"
log = "0.4.16"
coap-lite = "0.9.0"
dashmap = "5.2.0"
rand = "0.8.5"

[dev-dependencies]
async-stream = "0.3.3"
async-stream = "0.3.3"
tokio = { version = "1.17.0", features = ["test-util"] }
64 changes: 39 additions & 25 deletions examples/app_observable.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::mem;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use coap_lite::link_format::LINK_ATTR_RESOURCE_TYPE;
use tokio::sync::Mutex;
use log::info;
use tokio::sync::{oneshot, Mutex};
use tokio::time;

use coap_server::app::{AppBuilder, CoapError, Request, Response};
use coap_server::app::{ObservableResource, Observers};
use coap_server::app::ObservableResource;
use coap_server::app::{AppBuilder, CoapError, Observers, ObserversHolder, Request, Response};
use coap_server::FatalServerError;
use coap_server::{app, CoapServer, UdpTransport};

Expand All @@ -25,6 +28,9 @@ fn build_app() -> AppBuilder<SocketAddr> {
.resource(
app::resource("/counter")
.link_attr(LINK_ATTR_RESOURCE_TYPE, "counter")
// Try `coap-client -s 10 -m get coap://localhost/counter`. You can also
// in parallel run `coap-client -m put coap://localhost/counter/inc` to show the
// values increment in response to user behaviour.
.observable(counter_state)
.get(move |req| handle_get_counter(req, state_for_get.clone())),
)
Expand All @@ -34,28 +40,38 @@ fn build_app() -> AppBuilder<SocketAddr> {
)
}

#[derive(Clone)]
#[derive(Default, Clone)]
struct CounterState {
counter: Arc<Mutex<u32>>,
observers: Option<Observers>,
}

impl Default for CounterState {
fn default() -> Self {
Self {
counter: Arc::new(Mutex::new(0)),
observers: None,
}
}
observers: ObserversHolder,
}

#[async_trait]
impl ObservableResource for CounterState {
fn on_first_observer(&mut self, observers: Observers) {
self.observers = Some(observers);
}

fn on_last_observer(&mut self) -> Observers {
mem::take(&mut self.observers).unwrap()
async fn on_active(&self, observers: Observers) -> Observers {
info!("Observe active...");
self.observers.attach(observers).await;
let (tx, mut rx) = oneshot::channel();
let counter = self.counter.clone();
let observers = self.observers.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = &mut rx => {
return
}
_ = interval.tick() => {
*counter.lock().await += 1;
observers.notify_change().await;
}
}
}
});
self.observers.stay_active().await;
tx.send(()).unwrap();
info!("Observe no longer active!");
self.observers.detach().await
}
}

Expand All @@ -65,7 +81,7 @@ async fn handle_get_counter(
) -> Result<Response, CoapError> {
let count = *state.counter.lock().await;
let mut response = request.new_response();
response.message.payload = format!("{count}").into_bytes();
response.message.payload = format!("{count}\n").into_bytes();
Ok(response)
}

Expand All @@ -77,8 +93,6 @@ async fn handle_put_counter_inc(
let mut count = state.counter.lock().await;
*count += 1;
}
if let Some(observers) = state.observers {
observers.notify_change();
}
state.observers.notify_change().await;
Ok(request.new_response())
}
26 changes: 9 additions & 17 deletions src/app/app_builder.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;

use crate::app::app_handler::AppHandler;
use crate::app::resource_builder::DiscoverableResource;
use crate::app::resource_handler::ResourceHandler;
use crate::app::ResourceBuilder;
use crate::packet_handler::IntoHandler;

/// Main builder API to configure how the CoAP server should respond to requests
pub struct AppBuilder<Endpoint> {
pub struct AppBuilder<Endpoint: Ord + Clone> {
pub(crate) config: ConfigBuilder,
pub(crate) discoverable_resources: Vec<DiscoverableResource>,
pub(crate) resources_by_path: HashMap<String, ResourceHandler<Endpoint>>,
pub(crate) resources: Vec<ResourceBuilder<Endpoint>>,
}

#[derive(Debug, Default, Clone)]
Expand All @@ -20,17 +17,16 @@ pub(crate) struct ConfigBuilder {
pub block_transfer: Option<bool>,
}

impl<Endpoint> Default for AppBuilder<Endpoint> {
impl<Endpoint: Ord + Clone> Default for AppBuilder<Endpoint> {
fn default() -> Self {
Self {
config: ConfigBuilder::default(),
discoverable_resources: Vec::new(),
resources_by_path: HashMap::new(),
resources: Vec::new(),
}
}
}

impl<Endpoint> AppBuilder<Endpoint> {
impl<Endpoint: Ord + Clone> AppBuilder<Endpoint> {
pub fn new() -> Self {
Default::default()
}
Expand Down Expand Up @@ -82,11 +78,7 @@ impl<Endpoint> AppBuilder<Endpoint> {

/// Add a resource handler to the app by the configured path.
pub fn resource(mut self, resource: ResourceBuilder<Endpoint>) -> Self {
let built = resource.build();
if let Some(discoverable) = built.discoverable {
self.discoverable_resources.push(discoverable);
}
self.resources_by_path.insert(built.path, built.handler);
self.resources.push(resource);
self
}

Expand All @@ -99,8 +91,8 @@ impl<Endpoint> AppBuilder<Endpoint> {
}
}

impl<Endpoint: Debug + Ord + Clone + Send + 'static> IntoHandler<AppHandler<Endpoint>, Endpoint>
for AppBuilder<Endpoint>
impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static>
IntoHandler<AppHandler<Endpoint>, Endpoint> for AppBuilder<Endpoint>
{
fn into_handler(self, mtu: Option<u32>) -> AppHandler<Endpoint> {
AppHandler::from_builder(self, mtu)
Expand Down
127 changes: 86 additions & 41 deletions src/app/app_handler.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,57 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::Arc;

use coap_lite::{BlockHandler, BlockHandlerConfig, CoapRequest, MessageClass, Packet};
use coap_lite::{BlockHandler, CoapRequest, MessageClass, MessageType, Packet};
use futures::Stream;
use log::{debug, warn};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::app::app_builder::AppBuilder;
use crate::app::block_handler_util::new_block_handler;
use crate::app::coap_utils::new_pong_message;
use crate::app::core_handler::CoreRequestHandler;
use crate::app::error::CoapError;
use crate::app::path_matcher::{MatchedResult, PathMatcher};
use crate::app::resource_builder::BuildParameters;
use crate::app::resource_handler::ResourceHandler;
use crate::app::retransmission_manager::{RetransmissionManager, TransmissionParameters};
use crate::app::Request;
use crate::packet_handler::PacketHandler;

const DEFAULT_DISCOVERABLE: bool = true;
const DEFAULT_BLOCK_TRANSFER: bool = true;
pub(crate) const DEFAULT_BLOCK_TRANSFER: bool = true;

pub struct AppHandler<Endpoint: Ord + Clone> {
block_handler: Option<Arc<Mutex<BlockHandler<Endpoint>>>>,
resources_by_path: Arc<PathMatcher<ResourceHandler<Endpoint>>>,
/// Main PacketHandler for an application suite of handlers. Efficiency and concurrency are
/// the primary goals of this implementation, but with the need to balance developer friendliness
/// of the main API.
pub struct AppHandler<Endpoint: Debug + Clone + Ord + Eq + Hash> {
retransmission_manager: Arc<Mutex<RetransmissionManager<Endpoint>>>,

/// Special internal [`coap_lite::BlockHandler`] that we use only for formatting errors
/// that might be larger than MTU.
error_block_handler: Arc<Mutex<BlockHandler<Endpoint>>>,

/// Full set of handlers registered for this app, grouped by path but searchable using inexact
/// matching. See [`PathMatcher`] for more.
handlers_by_path: Arc<PathMatcher<ResourceHandler<Endpoint>>>,
}

impl<Endpoint: Ord + Clone> Clone for AppHandler<Endpoint> {
impl<Endpoint: Debug + Clone + Ord + Eq + Hash> Clone for AppHandler<Endpoint> {
fn clone(&self) -> Self {
Self {
block_handler: self.block_handler.clone(),
resources_by_path: self.resources_by_path.clone(),
retransmission_manager: self.retransmission_manager.clone(),
error_block_handler: self.error_block_handler.clone(),
handlers_by_path: self.handlers_by_path.clone(),
}
}
}

impl<Endpoint: Debug + Ord + Send + Clone + 'static> PacketHandler<Endpoint>
impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> PacketHandler<Endpoint>
for AppHandler<Endpoint>
{
fn handle<'a>(
Expand All @@ -57,34 +74,42 @@ impl<Endpoint: Debug + Ord + Send + Clone + 'static> PacketHandler<Endpoint>
}
}

impl<Endpoint: Debug + Ord + Clone + Send + 'static> AppHandler<Endpoint> {
pub fn from_builder(mut builder: AppBuilder<Endpoint>, mtu: Option<u32>) -> Self {
impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endpoint> {
pub fn from_builder(builder: AppBuilder<Endpoint>, mtu: Option<u32>) -> Self {
let retransmission_manager = Arc::new(Mutex::new(RetransmissionManager::new(
TransmissionParameters::default(),
)));

let build_params = BuildParameters {
mtu,
retransmission_manager: retransmission_manager.clone(),
};

let error_block_handler = Arc::new(Mutex::new(new_block_handler(mtu)));

let mut discoverable_resources = Vec::new();
let mut handlers = HashMap::new();
for resource_builder in builder.resources {
let resource = resource_builder.build(build_params.clone());
if let Some(discoverable) = resource.discoverable {
discoverable_resources.push(discoverable);
}
handlers.insert(resource.path, resource.handler);
}

let discoverable = builder.config.discoverable.unwrap_or(DEFAULT_DISCOVERABLE);
if discoverable {
CoreRequestHandler::install(&mut builder);
let core_resource = CoreRequestHandler::new_resource_builder(discoverable_resources)
.build(build_params.clone());
handlers.insert(core_resource.path, core_resource.handler);
}

let resources_by_path = Arc::new(PathMatcher::from_path_strings(builder.resources_by_path));

let block_transfer = builder
.config
.block_transfer
.unwrap_or(DEFAULT_BLOCK_TRANSFER);
let block_handler = if block_transfer {
let mut block_config = BlockHandlerConfig::default();
if let Some(mtu) = mtu {
if let Ok(mtu) = usize::try_from(mtu) {
block_config.max_total_message_size = mtu;
}
}
Some(Arc::new(Mutex::new(BlockHandler::new(block_config))))
} else {
None
};
let handlers_by_path = Arc::new(PathMatcher::from_path_strings(handlers));

Self {
block_handler,
resources_by_path,
retransmission_manager,
error_block_handler,
handlers_by_path,
}
}

Expand All @@ -97,10 +122,30 @@ impl<Endpoint: Debug + Ord + Clone + Send + 'static> AppHandler<Endpoint> {
warn!("Spurious response message from {peer:?}, ignoring...");
}
MessageClass::Empty => {
debug!("Empty message from {peer:?} not handled: not implemented!");
match packet.header.get_type() {
t @ (MessageType::Acknowledgement | MessageType::Reset) => {
let mut retransmission_manager = self.retransmission_manager.lock().await;
if let Err(packet) =
retransmission_manager.maybe_handle_reply(packet, &peer)
{
let message_id = packet.header.message_id;
debug!(
"Got {t:?} from {peer:?} for unrecognized message ID {message_id}"
);
}
}
MessageType::Confirmable => {
// A common way in CoAP to trigger a cheap "ping" to make sure
// the server is alive.
tx.send(new_pong_message(&packet)).unwrap();
}
MessageType::NonConfirmable => {
debug!("Ignoring Non-Confirmable Empty message from {peer:?}");
}
}
}
n => {
warn!("Unhandled message code {n} from {peer:?}, ignoring...");
code => {
warn!("Unhandled message code {code} from {peer:?}, ignoring...");
}
}
}
Expand All @@ -110,9 +155,11 @@ impl<Endpoint: Debug + Ord + Clone + Send + 'static> AppHandler<Endpoint> {
if let Err(e) = self.try_handle_get(&tx, &mut request).await {
if request.apply_from_error(e.into_handling_error()) {
// If the error happens to need block2 handling, let's do that here...
if let Some(ref block_handler) = self.block_handler {
let _ = block_handler.lock().await.intercept_response(&mut request);
}
let _ = self
.error_block_handler
.lock()
.await
.intercept_response(&mut request);
tx.send(request.response.unwrap().message).unwrap();
}
}
Expand All @@ -125,7 +172,7 @@ impl<Endpoint: Debug + Ord + Clone + Send + 'static> AppHandler<Endpoint> {
) -> Result<(), CoapError> {
let paths = request.get_path_as_vec().map_err(CoapError::bad_request)?;

let resource = self.resources_by_path.lookup(&paths);
let resource = self.handlers_by_path.lookup(&paths);
if log::log_enabled!(log::Level::Debug) {
let peer = &request.source;
let method = request.get_method();
Expand All @@ -146,9 +193,7 @@ impl<Endpoint: Debug + Ord + Clone + Send + 'static> AppHandler<Endpoint> {
unmatched_path: Vec::from(&paths[matched_index..]),
};

value
.handle(tx, wrapped_request, self.block_handler.clone())
.await
value.handle(tx, wrapped_request).await
}
None => Err(CoapError::not_found()),
}
Expand Down
11 changes: 11 additions & 0 deletions src/app/block_handler_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use coap_lite::{BlockHandler, BlockHandlerConfig};

pub fn new_block_handler<Endpoint: Ord + Clone>(mtu: Option<u32>) -> BlockHandler<Endpoint> {
let mut config = BlockHandlerConfig::default();
if let Some(mtu) = mtu {
if let Ok(mtu) = usize::try_from(mtu) {
config.max_total_message_size = mtu;
}
}
BlockHandler::new(config)
}
Loading

0 comments on commit 6f2137c

Please sign in to comment.