diff --git a/client/Cargo.toml b/client/Cargo.toml index 63eae12..e054eb2 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -46,3 +46,15 @@ required-features = ["gpio"] [[example]] name = "controller" required-features = ["roland", "gpio", "camloc"] + +[[example]] +name = "sensor" +required-features = ["roland"] + +[[example]] +name = "circle" +required-features = ["roland"] + +[[example]] +name = "tcp" +required-features = ["roland"] diff --git a/client/examples/simple.rs b/client/examples/simple.rs index cf32d3c..a5b234e 100644 --- a/client/examples/simple.rs +++ b/client/examples/simple.rs @@ -1,5 +1,8 @@ use roblib::roland::{LedColor, Roland}; -use roblib_client::{transports::udp::Udp, Result, Robot}; +use roblib_client::{ + transports::{tcp::Tcp, udp::Udp}, + Result, Robot, +}; use std::{thread::sleep, time::Duration}; fn main() -> Result<()> { diff --git a/client/examples/tcp.rs b/client/examples/tcp.rs new file mode 100644 index 0000000..f6ba020 --- /dev/null +++ b/client/examples/tcp.rs @@ -0,0 +1,16 @@ +use roblib::roland::Roland; +use roblib_client::{transports::tcp::Tcp, Result, Robot}; +use std::{thread::sleep, time::Duration}; + +fn main() -> Result<()> { + let ip = std::env::args() + .nth(1) + .unwrap_or_else(|| "localhost:1110".into()); + + let robot = Robot::new(Tcp::connect(ip)?); + + let up = robot.drive(0., 0.)?; + dbg!(up); + + Ok(()) +} diff --git a/client/src/transports/mod.rs b/client/src/transports/mod.rs index fa90631..41cebcb 100644 --- a/client/src/transports/mod.rs +++ b/client/src/transports/mod.rs @@ -1,11 +1,13 @@ use anyhow::Result; use roblib::{cmd::Command, event::Event}; -pub mod http; +// pub mod http; pub mod tcp; pub mod udp; // pub mod ws; +pub(self) const ID_START: u32 = 1; + pub trait Transport { fn cmd(&self, cmd: C) -> Result where diff --git a/client/src/transports/tcp.rs b/client/src/transports/tcp.rs index 86a50e7..4d6ec13 100644 --- a/client/src/transports/tcp.rs +++ b/client/src/transports/tcp.rs @@ -1,8 +1,11 @@ use super::{Subscribable, Transport}; use anyhow::Result; -use roblib::{cmd, event}; +use roblib::{ + cmd::{self, has_return, Command}, + event, +}; use serde::Deserialize; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, io::Write, sync::Arc}; type D<'a> = bincode::Deserializer< bincode::de::read::IoReader<&'a std::net::TcpStream>, @@ -38,12 +41,13 @@ impl Tcp { Ok(Self { inner, - id: 0.into(), + id: super::ID_START.into(), socket, }) } fn listen(inner: Arc, mut socket: std::net::TcpStream) -> Result<()> { + let bin = bincode::options(); loop { let running = inner.running.read().unwrap(); if !*running { @@ -51,35 +55,60 @@ impl Tcp { } drop(running); - let id: u32 = bincode::deserialize_from(&mut socket)?; + let id: u32 = bincode::Options::deserialize_from(bin, &mut socket)?; let mut handlers = inner.handlers.lock().unwrap(); - let Some(entry) = handlers.get_mut(&id) else { + let Some(handler) = handlers.get_mut(&id) else { return Err(anyhow::Error::msg("received response for unknown id")); }; - entry(bincode::Deserializer::with_reader( - &socket, - bincode::DefaultOptions::new(), - ))?; + handler(bincode::Deserializer::with_reader(&socket, bin))?; } } + + fn cmd_id(&self, cmd: C, id: u32) -> Result + where + C: Command, + C::Return: Send + 'static, + { + let concrete: cmd::Concrete = cmd.into(); + let buf = bincode::Options::serialize(bincode::options(), &(id, concrete))?; + (&self.socket).write_all(&(buf.len() as u32).to_be_bytes())?; + (&self.socket).write_all(&buf)?; + dbg!(&buf); + + Ok(if has_return::() { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + + let a: Handler = Box::new(move |mut des: D| { + let r = C::Return::deserialize(&mut des)?; + tx.send(r).unwrap(); + Ok::<(), anyhow::Error>(()) + }); + + self.inner.handlers.lock().unwrap().insert(id, a); + + rx.recv()? + } else { + unsafe { std::mem::zeroed() } + }) + } } impl Transport for Tcp { fn cmd(&self, cmd: C) -> anyhow::Result where - C: roblib::cmd::Command, + C: Command, C::Return: Send + 'static, { - let concrete: cmd::Concrete = cmd.into(); - let mut id_handle = self.id.lock().unwrap(); - bincode::serialize_into(&self.socket, &(*id_handle, concrete))?; - - *id_handle += 1; + let id = *id_handle; + *id_handle = id + 1; + drop(id_handle); - todo!() + let res = self.cmd_id(cmd, id); + self.inner.handlers.lock().unwrap().remove(&id); + res } } @@ -107,7 +136,7 @@ impl Subscribable for Tcp { return Err(anyhow::Error::msg("already subscribed to this event")); } - bincode::serialize_into(&self.socket, &(id, cmd))?; + bincode::Options::serialize_into(bincode::options(), &self.socket, &(id, cmd))?; *id_handle += 1; diff --git a/client/src/transports/udp.rs b/client/src/transports/udp.rs index 5f5ef09..8db10dc 100644 --- a/client/src/transports/udp.rs +++ b/client/src/transports/udp.rs @@ -42,7 +42,7 @@ impl Udp { std::thread::spawn(move || Self::recieve(i2, sock2)); Ok(Self { - id: 0.into(), + id: super::ID_START.into(), inner, sock, }) @@ -61,7 +61,7 @@ impl Udp { let buf = &buf[..len]; let mut curs = Cursor::new(buf); - let id: u32 = bincode::deserialize_from(&mut curs)?; + let id: u32 = bincode::Options::deserialize_from(bincode::options(), &mut curs)?; if let Some(h) = inner.handlers.lock().unwrap().get_mut(&id) { let pos = curs.position() as usize; let rest = &curs.into_inner()[pos..]; @@ -77,7 +77,10 @@ impl Udp { C::Return: Send + 'static, { let concrete: cmd::Concrete = cmd.into(); - self.sock.send(&bincode::serialize(&(id, concrete))?)?; + self.sock.send(&bincode::Options::serialize( + bincode::options(), + &(id, concrete), + )?)?; Ok(if has_return::() { let (tx, rx) = std::sync::mpsc::sync_channel(1); @@ -109,9 +112,7 @@ impl Transport for Udp { drop(id_handle); let res = self.cmd_id(cmd, id); - self.inner.handlers.lock().unwrap().remove(&id); - res } } @@ -141,7 +142,8 @@ impl Subscribable for Udp { let ev = ev.into(); let cmd: cmd::Concrete = cmd::Unsubscribe(ev).into(); - self.sock.send(&bincode::serialize(&cmd)?)?; + self.sock + .send(&bincode::Options::serialize(bincode::options(), &cmd)?)?; let id = self.inner.events.lock().unwrap().remove(&ev).unwrap(); diff --git a/roblib/src/camloc/cmd.rs b/roblib/src/camloc/cmd.rs index 680ae7e..037f3ea 100644 --- a/roblib/src/camloc/cmd.rs +++ b/roblib/src/camloc/cmd.rs @@ -1,7 +1,7 @@ use crate::cmd::Command; use roblib_macro::Command; -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct GetPosition; impl Command for GetPosition { const PREFIX: char = 'P'; diff --git a/roblib/src/cmd/concrete.rs b/roblib/src/cmd/concrete.rs index 9651252..1ce7fc6 100644 --- a/roblib/src/cmd/concrete.rs +++ b/roblib/src/cmd/concrete.rs @@ -1,13 +1,10 @@ -use std::fmt::Display; - +use crate::cmd::{self, Command}; use serde::{ de::{self, SeqAccess, Visitor}, ser::SerializeStruct, Deserialize, Serialize, }; -use crate::cmd::{self, Command}; - pub enum Concrete { #[cfg(feature = "roland")] MoveRobot(cmd::MoveRobot), @@ -46,6 +43,44 @@ pub enum Concrete { Nop(cmd::Nop), GetUptime(cmd::GetUptime), } +impl std::fmt::Debug for Concrete { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + #[cfg(feature = "roland")] + Self::MoveRobot(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::MoveRobotByAngle(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::StopRobot(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::Led(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::RolandServo(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::Buzzer(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::TrackSensor(v) => v.fmt(f), + #[cfg(feature = "roland")] + Self::UltraSensor(v) => v.fmt(f), + #[cfg(feature = "gpio")] + Self::PinMode(v) => v.fmt(f), + #[cfg(feature = "gpio")] + Self::ReadPin(v) => v.fmt(f), + #[cfg(feature = "gpio")] + Self::WritePin(v) => v.fmt(f), + #[cfg(feature = "gpio")] + Self::Pwm(v) => v.fmt(f), + #[cfg(feature = "gpio")] + Self::Servo(v) => v.fmt(f), + #[cfg(feature = "camloc")] + Self::GetPosition(v) => v.fmt(f), + Self::Subscribe(v) => v.fmt(f), + Self::Unsubscribe(v) => v.fmt(f), + Self::Nop(v) => v.fmt(f), + Self::GetUptime(v) => v.fmt(f), + } + } +} // TODO: automatize Concrete impls impl Concrete { @@ -392,9 +427,3 @@ impl<'de> Deserialize<'de> for Concrete { deserializer.deserialize_struct("Concrete", &["prefix", "cmd"], ConcreteVisitor) } } - -impl Display for Concrete { - fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - todo!() - } -} diff --git a/roblib/src/cmd/mod.rs b/roblib/src/cmd/mod.rs index d6e87b0..13a7123 100644 --- a/roblib/src/cmd/mod.rs +++ b/roblib/src/cmd/mod.rs @@ -27,27 +27,27 @@ pub const fn has_return() -> bool { std::mem::size_of::() != 0 } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Subscribe(pub event::ConcreteType); impl Command for Subscribe { const PREFIX: char = '+'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Unsubscribe(pub event::ConcreteType); impl Command for Unsubscribe { const PREFIX: char = '-'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Nop; impl Command for Nop { const PREFIX: char = '0'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct GetUptime; impl Command for GetUptime { const PREFIX: char = 'U'; diff --git a/roblib/src/gpio/cmd.rs b/roblib/src/gpio/cmd.rs index b69dc3f..2e6908c 100644 --- a/roblib/src/gpio/cmd.rs +++ b/roblib/src/gpio/cmd.rs @@ -1,35 +1,35 @@ use crate::cmd::Command; use roblib_macro::Command; -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct PinMode(pub u8, pub super::Mode); impl Command for PinMode { const PREFIX: char = 'p'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct ReadPin(pub u8); impl Command for ReadPin { const PREFIX: char = 'r'; type Return = bool; } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct WritePin(pub u8, pub bool); impl Command for WritePin { const PREFIX: char = 'w'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Pwm(pub u8, pub f64, pub f64); impl Command for Pwm { const PREFIX: char = 'W'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Servo(pub u8, pub f64); impl Command for Servo { const PREFIX: char = 'V'; diff --git a/roblib/src/roland/cmd.rs b/roblib/src/roland/cmd.rs index b3909b8..d80cc2f 100644 --- a/roblib/src/roland/cmd.rs +++ b/roblib/src/roland/cmd.rs @@ -1,49 +1,49 @@ use crate::cmd::Command; use roblib_macro::Command; -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct MoveRobot(pub f64, pub f64); impl Command for MoveRobot { const PREFIX: char = 'm'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct MoveRobotByAngle(pub f64, pub f64); impl Command for MoveRobotByAngle { const PREFIX: char = 'M'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct StopRobot; impl Command for StopRobot { const PREFIX: char = 's'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Led(pub bool, pub bool, pub bool); impl Command for Led { const PREFIX: char = 'l'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct RolandServo(pub f64); impl Command for RolandServo { const PREFIX: char = 'a'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct Buzzer(pub f64); impl Command for Buzzer { const PREFIX: char = 'b'; type Return = (); } -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct TrackSensor; impl Command for TrackSensor { const PREFIX: char = 't'; @@ -51,7 +51,7 @@ impl Command for TrackSensor { } // TODO: return Option -#[derive(Command, serde::Serialize, serde::Deserialize)] +#[derive(Command, serde::Serialize, serde::Deserialize, Debug)] pub struct UltraSensor; impl Command for UltraSensor { const PREFIX: char = 'u'; diff --git a/server/src/event_bus.rs b/server/src/event_bus.rs index 92b69a7..15d9896 100644 --- a/server/src/event_bus.rs +++ b/server/src/event_bus.rs @@ -28,13 +28,19 @@ pub(crate) struct EventBus { pub(self) robot: Arc, pub clients: RwLock>>, + pub bus_tcp: transports::tcp::Tx, pub bus_udp: transports::udp::Tx, } impl EventBus { - pub fn new(robot: Arc, bus_udp: transports::udp::Tx) -> Self { + pub fn new( + robot: Arc, + bus_tcp: transports::tcp::Tx, + bus_udp: transports::udp::Tx, + ) -> Self { Self { robot, clients: RwLock::new(HashMap::new()), + bus_tcp, bus_udp, } } @@ -64,6 +70,9 @@ impl EventBus { client: &SubscriptionId, ) -> anyhow::Result<()> { match client { + SubscriptionId::Tcp(addr, id) => { + self.bus_tcp.send((event.1.clone(), (*addr, *id)))?; + } SubscriptionId::Udp(addr, id) => self.bus_udp.send((event.1.clone(), (*addr, *id)))?, } Ok(()) @@ -81,9 +90,13 @@ impl EventBus { } } -pub(crate) async fn init(robot: Arc, bus_udp: transports::udp::Tx) { +pub(crate) async fn init( + robot: Arc, + bus_tcp: transports::tcp::Tx, + bus_udp: transports::udp::Tx, +) -> anyhow::Result<()> { let token = robot.abort_token.clone(); - let event_bus = Arc::new(EventBus::new(robot, bus_udp)); + let event_bus = Arc::new(EventBus::new(robot, bus_tcp, bus_udp)); #[cfg(all(feature = "roland", feature = "backend"))] let h2 = if event_bus.robot.roland.is_some() { @@ -103,6 +116,8 @@ pub(crate) async fn init(robot: Arc, bus_udp: transports::udp:: if let Some(handle) = h2 { handle.abort(); } + + Ok(()) } /// hook up all the "inputs" (backends) to the event bus diff --git a/server/src/main.rs b/server/src/main.rs index 2d4ccbe..63c9865 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,14 +6,12 @@ mod event_bus; mod logger; mod transports; use anyhow::Result; +use futures_util::future::join_all; use serde::Deserialize; -use std::{ - sync::{atomic::AtomicBool, Arc}, - time::Instant, -}; +use std::{sync::Arc, time::Instant}; use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; -use transports::udp; +use transports::{tcp, udp}; struct Backends { pub startup_time: Instant, @@ -48,7 +46,7 @@ fn def_web_port() -> u16 { #[derive(Debug, Deserialize)] struct Config { #[serde(default = "def_host")] - _tcp_host: String, + tcp_host: String, #[serde(default = "def_host")] udp_host: String, @@ -57,7 +55,7 @@ struct Config { _web_host: String, #[serde(default = "def_tcp_port")] - _tcp_port: u16, + tcp_port: u16, #[serde(default = "def_udp_port")] udp_port: u16, @@ -67,13 +65,11 @@ struct Config { } async fn try_main() -> Result<()> { - logger::init_log(Some("actix_web=info,roblib_server=debug,roblib=debug")); - let Config { - _tcp_host, + tcp_host, udp_host, _web_host, - _tcp_port, + tcp_port, udp_port, _web_port, } = match envy::from_env::() { @@ -95,6 +91,7 @@ async fn try_main() -> Result<()> { info!("Compiled with features: {features:?}"); // let event_bus = event_bus::init(); + let (tcp_tx, tcp_rx) = broadcast::channel(1024); let (udp_tx, udp_rx) = mpsc::unbounded_channel(); #[cfg(feature = "camloc")] @@ -202,8 +199,8 @@ async fn try_main() -> Result<()> { camloc, }); - // info!("TCP starting on {tcp_host}:{tcp_port}"); - // tcp::start((tcp_host, tcp_port), robot.clone(), tcp_rx).await?; + info!("TCP starting on {tcp_host}:{tcp_port}"); + let tcp_handle = tcp::start((tcp_host, tcp_port), robot.clone(), tcp_rx).await?; info!("UDP starting on {udp_host}:{udp_port}"); let (udp_handle, udp_event_handle) = @@ -232,32 +229,84 @@ async fn try_main() -> Result<()> { // udp_tx, // ))); - let ebus_handle = tokio::spawn(event_bus::init(robot.clone(), udp_tx)); + let ebus_handle = tokio::spawn(event_bus::init(robot.clone(), tcp_tx, udp_tx)); + + let mut sighandler = SigHandler::new(); tokio::select! { - _ = robot.abort_token.cancelled() => (), - _ = tokio::signal::ctrl_c() => { - log::debug!("SIGINT received"); + _ = robot.abort_token.cancelled() => { + log::error!("Abort requested internally, cleaning up..."); + }, + s = sighandler.wait() => { + log::error!("{s} received, cleaning up..."); robot.abort_token.cancel(); } }; log::debug!("abort: main"); + let force_stop = tokio::spawn(async move { + sighandler.wait().await; + log::warn!("Press ^C again to force exit (THE ROBOT WILL ESCAPE)"); + sighandler.wait().await; + log::error!("Bye! (Force shutdown)"); + std::process::exit(1); + }); + udp_handle.abort(); udp_event_handle.abort(); - let _ = ebus_handle.await; + let mut futures = Vec::new(); + futures.push(ebus_handle); + if let Ok(mut tcp_handles) = tcp_handle.await { + dbg!(); + futures.append(&mut tcp_handles); + } + log::debug!("Waiting on {} tasks", futures.len()); + join_all(futures).await; + + force_stop.abort(); + let _ = force_stop.await; Ok(()) } #[actix_web::main] async fn main() { + logger::init_log(Some("actix_web=info,roblib_server=debug,roblib=debug")); + match try_main().await { - Ok(_) => eprintln!("Bye!"), + Ok(_) => log::info!("Bye!"), Err(e) => { - eprintln!("ERROR: {e}"); + log::error!("ERROR: {e}"); std::process::exit(1); } } } + +struct SigHandler { + #[cfg(unix)] + sigterm: tokio::signal::unix::Signal, +} +impl SigHandler { + pub fn new() -> Self { + Self { + sigterm: tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .unwrap(), + } + } + pub async fn wait(&mut self) -> &str { + #[cfg(unix)] + tokio::select! { + _ = self.sigterm.recv() => "SIGTERM", + r = tokio::signal::ctrl_c() => { r.expect("failed to listen to ctrl-c"); "SIGINT" }, + } + + #[cfg(not(unix))] + { + tokio::signal::ctrl_c() + .await + .expect("failed to listen to ctrl-c"); + "SIGINT" + } + } +} diff --git a/server/src/transports/mod.rs b/server/src/transports/mod.rs index 8bbb199..69cee2c 100644 --- a/server/src/transports/mod.rs +++ b/server/src/transports/mod.rs @@ -1,20 +1,22 @@ // pub mod http; -// pub mod tcp; +pub mod tcp; pub mod udp; // pub mod ws; #[derive(Debug, PartialEq, Eq, Clone)] pub enum SubscriptionId { + Tcp(tcp::Id, tcp::SubId), Udp(udp::Id, udp::SubId), } impl SubscriptionId { pub fn same_client(&self, other: &Self) -> bool { match (self, other) { - (SubscriptionId::Udp(addr1, _), SubscriptionId::Udp(addr2, _)) => addr1 == addr2, - (SubscriptionId::Udp(_, _), _) => false, + (SubscriptionId::Tcp(addr1, _), SubscriptionId::Tcp(addr2, _)) => *addr1 == *addr2, + (SubscriptionId::Tcp(_, _), _) => false, - _ => false, + (SubscriptionId::Udp(addr1, _), SubscriptionId::Udp(addr2, _)) => *addr1 == *addr2, + (SubscriptionId::Udp(_, _), _) => false, } } } diff --git a/server/src/transports/tcp.rs b/server/src/transports/tcp.rs index 987b289..90bc9c2 100644 --- a/server/src/transports/tcp.rs +++ b/server/src/transports/tcp.rs @@ -1,65 +1,176 @@ -use std::sync::Arc; +use std::{io::Cursor, net::SocketAddr, sync::Arc, time::Duration}; -use crate::{cmd::execute_concrete, Backends, RUNNING}; -use actix::spawn; -use actix_web::rt::net::{TcpListener, TcpStream}; -use anyhow::Result; -use roblib::cmd::Concrete; +use crate::{cmd::execute_concrete, Backends}; +use roblib::{cmd, event::ConcreteValue}; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::ToSocketAddrs, + io::{AsyncReadExt, AsyncWriteExt, Interest}, + net::{TcpListener, TcpStream, ToSocketAddrs}, + spawn, + sync::broadcast::{Receiver, Sender}, + task::JoinHandle, }; -pub(crate) async fn start(addr: impl ToSocketAddrs, robot: Arc) -> Result<()> { +pub type Id = SocketAddr; +pub type SubId = u32; +pub type Item = (Id, SubId); +pub type Tx = Sender<(ConcreteValue, Item)>; +pub type Rx = Receiver<(ConcreteValue, Item)>; + +type Ret = Vec>>; + +pub(crate) async fn start( + addr: impl ToSocketAddrs, + robot: Arc, + rx: Rx, +) -> anyhow::Result> { let server = TcpListener::bind(addr).await?; - spawn(run(server, robot)); - Ok(()) + Ok(spawn(run(server, robot, rx))) } -async fn run(server: TcpListener, robot: Arc) -> Result<()> { - while RUNNING.load(std::sync::atomic::Ordering::SeqCst) { - let (stream, _addr) = server.accept().await?; - spawn(handle_client(robot.clone(), stream)); +async fn run(server: TcpListener, robot: Arc, rx: Rx) -> Ret { + let mut handles = Vec::new(); + loop { + let conn = tokio::select! { + biased; + _ = robot.abort_token.cancelled() => return handles, + Ok(conn) = server.accept() => conn, + }; + let h = spawn(handle_client(robot.clone(), conn, rx.resubscribe())); + handles.push(h); } } -async fn handle_client(robot: Arc, mut stream: TcpStream) -> Result<()> { +enum Thing { + ClientMessage(usize), + Event(ConcreteValue, Item), + Disconnect, + ServerAbort, +} + +async fn handle_client( + robot: Arc, + (mut stream, addr): (TcpStream, SocketAddr), + mut rx: Rx, +) -> anyhow::Result<()> { + let bin = bincode::options(); + const HEADER: usize = std::mem::size_of::(); + let mut buf = vec![0; 512]; + let mut len = 0; // no. of bytes read for the current command we're attempting to parse + let mut maybe_cmd_len = None; - while RUNNING.load(std::sync::atomic::Ordering::SeqCst) { - buf.resize(stream.read_u32().await? as usize, 0); + loop { + let thing = tokio::select! { + _ = robot.abort_token.cancelled() => Thing::ServerAbort, + Ok(n) = stream.read(&mut buf[len..( HEADER + maybe_cmd_len.unwrap_or(0) )]) => Thing::ClientMessage(n), + Ok(msg) = rx.recv() => Thing::Event(msg.0, msg.1), + _ = tokio::time::sleep(Duration::from_secs(5)) => { + let r = stream.ready(Interest::READABLE | Interest::WRITABLE).await; + if r.map_or(true, |r| r.is_read_closed() || r.is_write_closed()) { + Thing::Disconnect + } else { continue; } + } + }; - stream.read_exact(&mut buf).await?; + match thing { + Thing::ClientMessage(n) => { + if n == 0 { + log::debug!("received 0 sized msg, investigating disconnect"); + // give the socket some time to fully realize disconnect + tokio::time::sleep(Duration::from_millis(50)).await; + let r = stream.ready(Interest::READABLE | Interest::WRITABLE).await; + if r.map_or(true, |r| r.is_read_closed() || r.is_write_closed()) { + log::debug!("tcp client disconnected: {addr}"); + return Ok(()); + } + } - let res = match postcard::from_bytes::(&buf) { - Ok(c) => { - let mut serializer = postcard::Serializer { - output: postcard::ser_flavors::StdVec::new(), - }; + len += n; + log::debug!( + "Thing::ClientMessage - n: {n}, len: {len}, mblen: {maybe_cmd_len:?}, buflen: {}", + buf.len() + ); + // not enough bytes to get command length + if len < HEADER { + log::debug!("read more header"); + continue; + } - match execute_concrete(c, robot.clone(), &mut serializer).await { - Ok(r) => { - if let Some(()) = r { - postcard::ser_flavors::Flavor::finalize(serializer.output) - .map(Some) - .map_err(anyhow::Error::new) - } else { - Ok(None) - } + let cmd_len = match maybe_cmd_len { + Some(n) => n, + None => { + let cmd = u32::from_be_bytes((&buf[..HEADER]).try_into().unwrap()) as usize; + // buf.resize(HEADER + cmd, 0); + maybe_cmd_len = Some(cmd); + log::debug!("header received, cmdlen: {cmd}"); + cmd } - Err(e) => Err(e), + }; + + // not enough bytes to parse command, get some more + if len < HEADER + cmd_len { + log::debug!("read more command"); + continue; } + + let (id, cmd): (u32, cmd::Concrete) = + bincode::Options::deserialize(bin, &buf[HEADER..len])?; + + let mut c = Cursor::new(&mut buf[..]); + bincode::Options::serialize_into(bin, &mut c, &id)?; + + let res = execute_concrete( + cmd, + robot.clone(), + &mut bincode::Serializer::new(&mut c, bin), + ) + .await?; + + if res.is_some() { + stream.write_all(&buf).await?; + } + + // reset + len = 0; + maybe_cmd_len = None; + } + + Thing::Event(ev, (ev_addr, id)) => { + if addr != ev_addr { + continue; + } + let data = match ev { + #[cfg(feature = "roland")] + ConcreteValue::TrackSensor(v) => bincode::Options::serialize(bin, &(id, v)), + #[cfg(feature = "roland")] + ConcreteValue::UltraSensor(v) => bincode::Options::serialize(bin, &(id, v)), + #[cfg(feature = "gpio")] + ConcreteValue::GpioPin(v) => bincode::Options::serialize(bin, &(id, v)), + #[cfg(feature = "camloc")] + ConcreteValue::CamlocConnect(v) => bincode::Options::serialize(bin, &(id, v)), + #[cfg(feature = "camloc")] + ConcreteValue::CamlocDisconnect(v) => { + bincode::Options::serialize(bin, &(id, v)) + } + #[cfg(feature = "camloc")] + ConcreteValue::CamlocPosition(v) => bincode::Options::serialize(bin, &(id, v)), + #[cfg(feature = "camloc")] + ConcreteValue::CamlocInfoUpdate(v) => { + bincode::Options::serialize(bin, &(id, v)) + } + ConcreteValue::None => continue, + }?; + stream.write_all(&(data.len() as u32).to_be_bytes()).await?; + stream.write_all(&data).await?; } - Err(e) => Err(e.into()), - }; - match res { - Ok(Some(b)) => { - stream.write_all(&b).await?; + Thing::Disconnect => { + log::debug!("tcp client disconnected: {addr}"); + return Ok(()); } - Ok(None) => (), - Err(e) => { - stream.write_all(e.to_string().as_bytes()).await?; + Thing::ServerAbort => { + log::debug!("abort: tcp {addr}"); + return Ok(()); } } } diff --git a/server/src/transports/udp.rs b/server/src/transports/udp.rs index 2e44981..8145b74 100644 --- a/server/src/transports/udp.rs +++ b/server/src/transports/udp.rs @@ -1,17 +1,16 @@ +use super::SubscriptionId; use crate::{cmd::execute_concrete, event_bus::sub::SubStatus, Backends}; -use actix::spawn; use actix_web::rt::net::UdpSocket; use anyhow::Result; use roblib::{cmd, event::ConcreteValue}; use std::{io::Cursor, net::SocketAddr, sync::Arc}; use tokio::{ net::ToSocketAddrs, + spawn, sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; -use super::SubscriptionId; - pub type Id = SocketAddr; pub type SubId = u32; pub type Item = (Id, SubId); @@ -32,12 +31,13 @@ pub(crate) async fn start( } async fn run(server: Arc, robot: Arc) -> Result<()> { + let bin = bincode::options(); let mut buf = [0u8; 1024]; loop { let (len, addr) = server.recv_from(&mut buf).await?; - let (id, cmd): (u32, cmd::Concrete) = bincode::deserialize(&buf[..len])?; + let (id, cmd): (u32, cmd::Concrete) = bincode::Options::deserialize(bin, &buf[..len])?; match cmd { cmd::Concrete::Subscribe(c) => { @@ -59,12 +59,12 @@ async fn run(server: Arc, robot: Arc) -> Result<()> { } let mut c = Cursor::new(&mut buf[..]); - bincode::serialize_into(&mut c, &id)?; + bincode::Options::serialize_into(bin, &mut c, &id)?; let res = execute_concrete( cmd, robot.clone(), - &mut bincode::Serializer::new(&mut c, bincode::DefaultOptions::new()), + &mut bincode::Serializer::new(&mut c, bin), ) .await?; @@ -75,24 +75,39 @@ async fn run(server: Arc, robot: Arc) -> Result<()> { } async fn handle_event(mut event_bus: Rx, event_send: Arc) -> Result<()> { + let bin = bincode::options(); while let Some((ev, (addr, id))) = event_bus.recv().await { let val: Vec = match ev { #[cfg(feature = "roland")] - roblib::event::ConcreteValue::TrackSensor(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::TrackSensor(val) => { + bincode::Options::serialize(bin, &(id, val))? + } #[cfg(feature = "roland")] - roblib::event::ConcreteValue::UltraSensor(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::UltraSensor(val) => { + bincode::Options::serialize(bin, &(id, val))? + } #[cfg(feature = "gpio")] - roblib::event::ConcreteValue::GpioPin(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::GpioPin(val) => { + bincode::Options::serialize(bin, &(id, val))? + } #[cfg(feature = "camloc")] - roblib::event::ConcreteValue::CamlocConnect(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::CamlocConnect(val) => { + bincode::Options::serialize(bin, &(id, val))? + } #[cfg(feature = "camloc")] - roblib::event::ConcreteValue::CamlocDisconnect(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::CamlocDisconnect(val) => { + bincode::Options::serialize(bin, &(id, val))? + } #[cfg(feature = "camloc")] - roblib::event::ConcreteValue::CamlocPosition(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::CamlocPosition(val) => { + bincode::Options::serialize(bin, &(id, val))? + } #[cfg(feature = "camloc")] - roblib::event::ConcreteValue::CamlocInfoUpdate(val) => bincode::serialize(&(id, val))?, + roblib::event::ConcreteValue::CamlocInfoUpdate(val) => { + bincode::Options::serialize(bin, &(id, val))? + } roblib::event::ConcreteValue::None => continue, };