diff --git a/build.sh b/build.sh index ffd4c36..6ee96e1 100755 --- a/build.sh +++ b/build.sh @@ -34,4 +34,5 @@ cmd="cargo $command --release --target=$target --features=$f" echo $cmd -podman run --rm -it --init -v $(pwd):/src docker.io/vbeni/rust-arm-musl $cmd +$docker run --rm -it --init -v $(pwd):/src docker.io/vbeni/rust-arm-musl $cmd + diff --git a/client/examples/controller.rs b/client/examples/controller.rs index d52acc5..8fc43c9 100644 --- a/client/examples/controller.rs +++ b/client/examples/controller.rs @@ -47,8 +47,13 @@ fn main() -> Result<()> { Concrete::Pwm(c) => robot.cmd(c)?, Concrete::Servo(c) => robot.cmd(c)?, - Concrete::Subscribe(c) => robot.cmd(c)?, - Concrete::Unsubscribe(c) => robot.cmd(c)?, + Concrete::Subscribe(_) => { + println!("Subscribe no supported"); + } + Concrete::Unsubscribe(_) => { + println!("Unsubscribe no supported"); + } + Concrete::Nop(c) => robot.cmd(c)?, Concrete::GetUptime(c) => println!("{:?}", robot.cmd(c)?), diff --git a/client/src/transports/tcp.rs b/client/src/transports/tcp.rs index 1af4dfd..86a50e7 100644 --- a/client/src/transports/tcp.rs +++ b/client/src/transports/tcp.rs @@ -94,7 +94,7 @@ impl Subscribable for Tcp { let id = *id_handle; let ev = Into::::into(ev); - let cmd: cmd::Concrete = cmd::Subscribe(ev.clone()).into(); + let cmd: cmd::Concrete = cmd::Subscribe(ev).into(); let already_contains = handlers .insert( @@ -118,7 +118,7 @@ impl Subscribable for Tcp { fn unsubscribe(&self, ev: E) -> anyhow::Result<()> { let concrete_event = ev.into(); - let cmd: cmd::Concrete = cmd::Unsubscribe(concrete_event.clone()).into(); + let cmd: cmd::Concrete = cmd::Unsubscribe(concrete_event).into(); bincode::serialize_into(&self.socket, &cmd)?; diff --git a/client/src/transports/udp.rs b/client/src/transports/udp.rs index bb21770..dce6de6 100644 --- a/client/src/transports/udp.rs +++ b/client/src/transports/udp.rs @@ -2,7 +2,7 @@ use crate::Transport; use anyhow::Result; use roblib::{ cmd::{self, has_return, Command}, - event::{self, Event}, + event::Event, }; use serde::Deserialize; use std::{collections::HashMap, io::Cursor, sync::Arc}; @@ -70,18 +70,12 @@ impl Udp { } Ok(()) } -} -impl Transport for Udp { - fn cmd(&self, cmd: C) -> Result + fn cmd_id(&self, cmd: C, id: u32) -> Result where C: Command, C::Return: Send + 'static, { - let mut id_handle = self.id.lock().unwrap(); - let id = *id_handle; - *id_handle = id + 1; - let concrete: cmd::Concrete = cmd.into(); self.sock.send(&bincode::serialize(&(id, concrete))?)?; @@ -107,6 +101,21 @@ impl Transport for Udp { } } +impl Transport for Udp { + fn cmd(&self, cmd: C) -> Result + where + C: Command, + C::Return: Send + 'static, + { + let mut id_handle = self.id.lock().unwrap(); + let id = *id_handle; + *id_handle = id + 1; + drop(id_handle); + + self.cmd_id(cmd, id) + } +} + impl Subscribable for Udp { fn subscribe(&self, ev: E, mut handler: F) -> Result<()> where @@ -115,13 +124,11 @@ impl Subscribable for Udp { { let mut id_handle = self.id.lock().unwrap(); let id = *id_handle; - *id_handle += id + 1; - - let ev = Into::::into(ev); - let cmd: cmd::Concrete = cmd::Subscribe(ev).into(); + *id_handle = id + 1; + drop(id_handle); - let buf = bincode::serialize(&(id, cmd))?; - self.sock.send(&buf)?; + self.cmd_id(cmd::Subscribe(ev.into()), id)? + .map_err(anyhow::Error::msg)?; self.inner.handlers.lock().unwrap().insert( id, @@ -133,7 +140,7 @@ impl Subscribable for Udp { fn unsubscribe(&self, ev: E) -> Result<()> { let ev = ev.into(); - let cmd: cmd::Concrete = cmd::Unsubscribe(ev.clone()).into(); + let cmd: cmd::Concrete = cmd::Unsubscribe(ev).into(); self.sock.send(&bincode::serialize(&cmd)?)?; diff --git a/roblib/src/camloc/event.rs b/roblib/src/camloc/event.rs index 1b36585..a7ce365 100644 --- a/roblib/src/camloc/event.rs +++ b/roblib/src/camloc/event.rs @@ -4,25 +4,25 @@ use roblib_macro::Event; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct CamlocConnect; impl crate::event::Event for CamlocConnect { type Item = (SocketAddr, PlacedCamera); } -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct CamlocDisconnect; impl crate::event::Event for CamlocDisconnect { type Item = SocketAddr; } -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct CamlocPosition; impl crate::event::Event for CamlocPosition { type Item = Position; } -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct CamlocInfoUpdate; impl crate::event::Event for CamlocInfoUpdate { type Item = (SocketAddr, PlacedCamera); diff --git a/roblib/src/cmd/mod.rs b/roblib/src/cmd/mod.rs index b453920..638bb2c 100644 --- a/roblib/src/cmd/mod.rs +++ b/roblib/src/cmd/mod.rs @@ -4,6 +4,7 @@ use serde::{de::DeserializeOwned, Serialize}; pub mod concrete; use crate::event; + #[cfg(feature = "roland")] pub use crate::roland::cmd::*; @@ -30,13 +31,13 @@ pub const fn has_return() -> bool { pub struct Subscribe(pub event::ConcreteType); impl Command for Subscribe { const PREFIX: char = '+'; - type Return = (); + type Return = Result<(), String>; } #[derive(Command, serde::Serialize, serde::Deserialize)] pub struct Unsubscribe(pub event::ConcreteType); impl Command for Unsubscribe { const PREFIX: char = '-'; - type Return = (); + type Return = Result<(), String>; } #[derive(Command, serde::Serialize, serde::Deserialize)] diff --git a/roblib/src/event.rs b/roblib/src/event.rs index 32528ae..2e49047 100644 --- a/roblib/src/event.rs +++ b/roblib/src/event.rs @@ -1,10 +1,19 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; +#[cfg(feature = "roland")] +pub use crate::roland::event::*; + +#[cfg(feature = "gpio")] +pub use crate::gpio::event::*; + +#[cfg(feature = "camloc")] +pub use crate::camloc::event::*; + pub trait Event: Serialize + DeserializeOwned + Into + From { type Item: Serialize + DeserializeOwned; } -#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum ConcreteType { #[cfg(feature = "roland")] TrackSensor(crate::roland::event::TrackSensor), diff --git a/roblib/src/gpio/backend/simple.rs b/roblib/src/gpio/backend/simple.rs index 8ad3cd1..2dca8ac 100644 --- a/roblib/src/gpio/backend/simple.rs +++ b/roblib/src/gpio/backend/simple.rs @@ -1,9 +1,6 @@ use crate::{ get_servo_pwm_durations, - gpio::{ - event::{Event, Subscriber}, - Mode, - }, + gpio::{event::Event, Mode}, }; use rppal::gpio::{InputPin, OutputPin, Trigger}; use std::{ @@ -14,6 +11,10 @@ use std::{ }, }; +pub trait Subscriber: Send + Sync { + fn handle(&self, event: Event); +} + enum Pin { Input(InputPin), Output(OutputPin), @@ -98,14 +99,15 @@ impl SimpleGpioBackend { return log::error!("Handlers removed without clearing interrupt!"); }; // rising/falling edge thing seems to be backwards - let value = !(l as u8 != 0); - if let Ok(_) = handle.last_value.fetch_update(SeqCst, SeqCst, |last| { + let value = l as u8 == 0; + let res = &handle.last_value.fetch_update(SeqCst, SeqCst, |last| { log::debug!("({last}) {value}"); if value != last { return Some(value); } None - }) { + }); + if res.is_ok() { let ev = Event::PinChanged(pin, value); handle.handler.handle(ev) } diff --git a/roblib/src/gpio/event.rs b/roblib/src/gpio/event.rs index 290b546..f5ac19a 100644 --- a/roblib/src/gpio/event.rs +++ b/roblib/src/gpio/event.rs @@ -6,11 +6,7 @@ pub enum Event { PinChanged(u8, bool), } -pub trait Subscriber: Send + Sync { - fn handle(&self, event: Event); -} - -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct GpioPin(pub u8); impl crate::event::Event for GpioPin { type Item = bool; diff --git a/roblib/src/roland/backend.rs b/roblib/src/roland/backend.rs index 118d8cf..3f565a2 100644 --- a/roblib/src/roland/backend.rs +++ b/roblib/src/roland/backend.rs @@ -2,7 +2,10 @@ use super::Roland; use crate::get_servo_pwm_durations; use anyhow::Result; use rppal::gpio::{Gpio, InputPin, OutputPin}; -use std::{sync::Mutex, time::Instant}; +use std::{ + sync::Mutex, + time::{Duration, Instant}, +}; pub mod constants { pub mod motors { @@ -147,6 +150,8 @@ pub struct RolandBackend { servo: Mutex, motor: Mutex, leds: Mutex, + + gpio: Gpio, } impl Drop for RolandBackend { @@ -168,6 +173,8 @@ impl RolandBackend { ultra_sensor: UltraSensor::new(&gpio)?.into(), track_sensor: TrackSensor::new(&gpio)?.into(), + + gpio, }; // ran here as well to reset servo to center @@ -175,6 +182,40 @@ impl RolandBackend { Ok(roland) } + + pub fn setup_tracksensor_interrupts(&self) -> Result<()> { + let mut s = self.track_sensor.lock().unwrap(); + + s.l1.set_interrupt(rppal::gpio::Trigger::Both)?; + s.l2.set_interrupt(rppal::gpio::Trigger::Both)?; + s.r1.set_interrupt(rppal::gpio::Trigger::Both)?; + s.r2.set_interrupt(rppal::gpio::Trigger::Both)?; + + Ok(()) + } + + pub fn poll_tracksensor(&self, timeout: Option) -> Result> { + let s = self.track_sensor.lock().unwrap(); + + let ps = [&s.l1, &s.l2, &s.r1, &s.r2]; + let res = self.gpio.poll_interrupts(&ps, false, timeout)?; + + let Some((p, v)) = res else { return Ok(None) }; + + Ok(Some(( + ps.iter().position(|a| *a == p).unwrap(), + v as u8 != 0, + ))) + } + + pub fn clear_tracksensor_interrupts(&self) -> Result<()> { + let mut s = self.track_sensor.lock().unwrap(); + s.l1.clear_interrupt()?; + s.l2.clear_interrupt()?; + s.r1.clear_interrupt()?; + s.r2.clear_interrupt()?; + Ok(()) + } } impl Roland for RolandBackend { diff --git a/roblib/src/roland/event.rs b/roblib/src/roland/event.rs index 023ede7..56f29b4 100644 --- a/roblib/src/roland/event.rs +++ b/roblib/src/roland/event.rs @@ -3,13 +3,13 @@ use std::time::Duration; use roblib_macro::Event; use serde::{Deserialize, Serialize}; -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct TrackSensor; impl crate::event::Event for TrackSensor { type Item = [bool; 4]; } -#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)] +#[derive(Event, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct UltraSensor(pub Duration); impl crate::event::Event for UltraSensor { type Item = f64; diff --git a/server/src/event_bus.rs b/server/src/event_bus.rs index fc17b6c..6169aa0 100644 --- a/server/src/event_bus.rs +++ b/server/src/event_bus.rs @@ -1,23 +1,25 @@ use roblib::event::{ConcreteType, ConcreteValue}; use std::{collections::HashMap, sync::Arc}; -use sub::EventSub; +use sub::SubStatus; use tokio::sync::RwLock; use crate::transports::{self, SubscriptionId}; /// another channel to handle changes to subscriptions /// sent by the transport layer, received by the event bus sender workers +#[allow(unused)] pub mod sub { use crate::transports::SubscriptionId; use tokio::sync::broadcast::{Receiver, Sender}; #[derive(Debug, Clone)] - pub enum EventSub { + pub enum SubStatus { Subscribe, Unsubscribe, + Disconnect, } /// (event, true for sub, false for unsub) - pub type Item = (roblib::event::ConcreteType, SubscriptionId, EventSub); + pub type Item = (roblib::event::ConcreteType, SubscriptionId, SubStatus); pub type Tx = Sender; pub type Rx = Receiver; } @@ -37,9 +39,10 @@ impl EventBus { } } + #[allow(unused)] pub async fn resolve_send(&self, event: (ConcreteType, ConcreteValue)) -> anyhow::Result<()> { let clients = self.clients.read().await; - let Some(v) = clients.get(&event.0) else{ + let Some(v) = clients.get(&event.0) else { return Ok(()); }; self.send_all(event, v) @@ -50,24 +53,30 @@ impl EventBus { event: (ConcreteType, ConcreteValue), ) -> anyhow::Result<()> { let clients = self.clients.blocking_read(); - let Some(v) = clients.get(&event.0) else{ + let Some(v) = clients.get(&event.0) else { return Ok(()); }; self.send_all(event, v) } + fn send( + &self, + event: (ConcreteType, ConcreteValue), + client: &SubscriptionId, + ) -> anyhow::Result<()> { + match client { + SubscriptionId::Udp(addr, id) => self.bus_udp.send((event.1.clone(), (*addr, *id)))?, + } + Ok(()) + } + fn send_all( &self, event: (ConcreteType, ConcreteValue), clients: &Vec, ) -> anyhow::Result<()> { for client in clients { - match client { - SubscriptionId::Udp(addr, id) => { - self.bus_udp - .send((event.0.clone(), event.1.clone(), (*addr, *id)))? - } - }; + self.send(event.clone(), client)?; } Ok(()) } @@ -79,117 +88,223 @@ pub(super) async fn connect(event_bus: EventBus) { #[cfg(all(feature = "roland", feature = "backend"))] if event_bus.robot.roland.is_some() { - let robot = event_bus.robot.clone(); - std::thread::spawn(move || connect_roland(robot)); + let event_bus = event_bus.clone(); + std::thread::spawn(move || connect_roland(event_bus)); } // handle client subscription state changes // TODO: on client disconnect, unsubscribe them from all events - while let Ok(sub) = event_bus.robot.sub.subscribe().recv().await { + while let Ok((ty, id, sub)) = event_bus.robot.sub.subscribe().recv().await { let mut clients = event_bus.clients.write().await; - let v = clients.entry(sub.0.clone()).or_default(); - match sub.2 { - EventSub::Subscribe => { - if v.len() == 0 { - // setup resource - match sub.0 { - #[cfg(all(feature = "roland", feature = "backend"))] - ConcreteType::TrackSensor(_) => todo!(), - #[cfg(all(feature = "roland", feature = "backend"))] - ConcreteType::UltraSensor(_) => todo!(), - #[cfg(all(feature = "gpio", feature = "backend"))] - ConcreteType::GpioPin(p) => { - struct Sub(Arc); - impl roblib::gpio::event::Subscriber for Sub { - fn handle(&self, event: roblib::gpio::event::Event) { - let msg = match event { - roblib::gpio::event::Event::PinChanged(pin, value) => ( - ConcreteType::GpioPin(roblib::gpio::event::GpioPin( - pin, - )), - ConcreteValue::GpioPin(value), - ), - }; - if let Err(e) = self.0.resolve_send_blocking(msg) { - log::error!("event_bus dropped: {e}"); - } - } - } - if let Some(r) = &event_bus.robot.raw_gpio { - if let Err(e) = r.subscribe(p.0, Box::new(Sub(event_bus.clone()))) { - log::error!("Failed to subscribe to gpio pin: {e}"); - } - } - } - _ => (), - } + + if let SubStatus::Disconnect = sub { + for (ty, v) in clients.iter_mut() { + v.retain(|s| !s.same_client(&id)); + if v.is_empty() { + cleanup_resource(&event_bus, *ty); } - v.push(sub.1); } - EventSub::Unsubscribe => { - if v.len() == 0 { - log::warn!("Tried to unsubscribe from empty event"); + continue; + } + + let ids = clients.entry(ty).or_default(); + + match sub { + SubStatus::Disconnect => unreachable!(), + + SubStatus::Subscribe => { + if ids.is_empty() { + create_resource(&event_bus, ty); + } else if ids.contains(&id) { + log::error!("Attempted double subscription on event {ty:?}"); continue; } - let Some(i) = v.iter().position(|x| x == &sub.1) else { - log::warn!("Tried to unsubscribe but was never subscribed"); + ids.push(id); + } + SubStatus::Unsubscribe => { + if ids.is_empty() { + log::error!("Tried to unsubscribe from empty event"); + continue; + } + + let Some(i) = ids.iter().position(|x| x == &id) else { + log::error!("Tried to unsubscribe but was never subscribed"); continue; }; - v.remove(i); - - if v.len() == 0 { - // cleanup resource - match sub.0 { - #[cfg(all(feature = "roland", feature = "backend"))] - ConcreteType::TrackSensor(_) => todo!(), - #[cfg(all(feature = "roland", feature = "backend"))] - ConcreteType::UltraSensor(_) => todo!(), - #[cfg(all(feature = "gpio", feature = "backend"))] - ConcreteType::GpioPin(p) => { - if let Some(r) = &event_bus.robot.raw_gpio { - if let Err(e) = r.unsubscribe(p.0) { - log::error!("Failed to unsubscribe from gpio pin: {e}"); - } - } - } - _ => (), - } + + ids.remove(i); + + if ids.is_empty() { + cleanup_resource(&event_bus, ty); } } } } + log::error!("event_bus_sub dropped"); } -#[cfg(all(feature = "roland", feature = "backend"))] -fn connect_roland(robot: Arc) { - let mut track_subs = 0u32; - let mut ultra_subs = 0u32; - let mut rx = robot.sub.subscribe(); - loop { - if track_subs + ultra_subs == 0 { - // ch.recv() ... - } else { - while track_subs + ultra_subs > 0 { - while let Ok(sub) = rx.try_recv() { - // update subs - match sub.0 { - ConcreteType::TrackSensor(_) => todo!(), - ConcreteType::UltraSensor(_) => todo!(), - _ => (), +fn create_resource(event_bus: &Arc, ty: ConcreteType) { + match ty { + #[cfg(all(feature = "gpio", feature = "backend"))] + ConcreteType::GpioPin(p) => { + struct Sub(Arc); + impl roblib::gpio::backend::simple::Subscriber for Sub { + fn handle(&self, event: roblib::gpio::event::Event) { + let msg = match event { + roblib::gpio::event::Event::PinChanged(pin, value) => ( + ConcreteType::GpioPin(roblib::gpio::event::GpioPin(pin)), + ConcreteValue::GpioPin(value), + ), + }; + + if let Err(e) = self.0.resolve_send_blocking(msg) { + log::error!("event_bus dropped: {e}"); } } + } + + if let Some(r) = &event_bus.robot.raw_gpio { + if let Err(e) = r.subscribe(p.0, Box::new(Sub(event_bus.clone()))) { + log::error!("Failed to subscribe to gpio pin: {e}"); + } + } + } + + _ => todo!(), + } +} + +fn cleanup_resource(event_bus: &Arc, ty: ConcreteType) { + match ty { + #[cfg(all(feature = "roland", feature = "backend"))] + ConcreteType::TrackSensor(_) | ConcreteType::UltraSensor(_) => (), + + #[cfg(all(feature = "gpio", feature = "backend"))] + ConcreteType::GpioPin(p) => { + if let Some(r) = &event_bus.robot.raw_gpio { + if let Err(e) = r.unsubscribe(p.0) { + log::error!("Failed to unsubscribe from gpio pin: {e}"); + } + } + } + + _ => todo!(), + } +} + +#[cfg(all(feature = "roland", feature = "backend"))] +fn connect_roland(event_bus: Arc) -> anyhow::Result<()> { + use std::time::{Duration, Instant}; + + use roblib::{event, roland::Roland}; + use tokio::sync::broadcast::error::RecvError; + use SubStatus::*; + + let mut rx = event_bus.robot.sub.subscribe(); - track_subs += 1; + let roland = event_bus.robot.roland.as_ref().unwrap(); + let mut track_sensor_state = roland.track_sensor()?; + roland.setup_tracksensor_interrupts()?; - if track_subs > 0 { - // track_sensor + let mut track_subs = 0; + + struct UltraScheduleData { + id: SubscriptionId, + next: Instant, + interval: Duration, + } + let mut ultra = vec![]; + + loop { + if track_subs + ultra.len() == 0 { + let (ty, id, sub) = match rx.blocking_recv() { + Ok(v) => v, + Err(RecvError::Closed) => return Err(anyhow::anyhow!("sub channel closed")), + Err(RecvError::Lagged(n)) => { + error!("sub channel skipping {n}"); + continue; } + }; - if ultra_subs > 0 { - // ultra_sensor + if let SubStatus::Subscribe = sub { + match ty { + ConcreteType::TrackSensor(_) => track_subs += 1, + ConcreteType::UltraSensor(event::UltraSensor(interval)) => { + ultra.push(UltraScheduleData { + id, + interval, + next: Instant::now() + interval, + }); + } + _ => continue, } } } + + while let Ok((ty, id, sub)) = rx.try_recv() { + match ty { + ConcreteType::TrackSensor(_) => match sub { + Subscribe => track_subs += 1, + Unsubscribe => track_subs = track_subs.saturating_sub(1), + Disconnect => unreachable!(), + }, + + ConcreteType::UltraSensor(event::UltraSensor(interval)) => match sub { + Subscribe => { + ultra.push(UltraScheduleData { + id, + interval, + next: Instant::now() + interval, + }); + } + Unsubscribe => { + // TODO: + //ultra_subs = track_subs.saturating_sub(1) + } + Disconnect => unreachable!(), + }, + + _ => (), + } + } + + const MAX_WAIT: Duration = Duration::from_millis(200); + + let now = Instant::now(); + + let next_ultra = ultra.iter_mut().min_by_key(|d| d.next); + + let poll_dur = if let Some(next_ultra) = &next_ultra { + next_ultra.next - now - roblib::roland::backend::constants::ultra_sensor::BLAST_DURATION + } else { + MAX_WAIT + }; + + if track_subs > 0 { + if let Some((track_index, val)) = roland.poll_tracksensor(Some(poll_dur))? { + track_sensor_state[track_index] = val; + event_bus.resolve_send_blocking(( + ConcreteType::TrackSensor(event::TrackSensor), + ConcreteValue::TrackSensor(track_sensor_state), + ))?; + } + } else { + std::thread::sleep(poll_dur); + } + + if let Some(next_ultra) = next_ultra { + let v = roland.ultra_sensor()?; + event_bus.send( + ( + ConcreteType::UltraSensor(event::UltraSensor(next_ultra.interval)), + ConcreteValue::UltraSensor(v), + ), + &next_ultra.id, + )?; + next_ultra.next += next_ultra.interval; + } + + // TODO: batching } } diff --git a/server/src/main.rs b/server/src/main.rs index 24b79db..0f12d21 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -93,10 +93,7 @@ async fn try_main() -> Result<()> { #[cfg(feature = "camloc")] let camloc = { - use roblib::camloc::{ - event, - server::{extrapolations::LinearExtrapolation, service}, - }; + use roblib::camloc::server::{extrapolations::LinearExtrapolation, service}; // TODO: config let serv = service::start( @@ -197,10 +194,10 @@ async fn try_main() -> Result<()> { camloc, }); - // info!("TCP starting on port {tcp_port}"); - // tcp::start((tcp_host, tcp_port), robot.clone()).await?; + // info!("TCP starting on {tcp_host}:{tcp_port}"); + // tcp::start((tcp_host, tcp_port), robot.clone(), tcp_rx).await?; - info!("UDP starting on port {udp_port}"); + info!("UDP starting on {udp_host}:{udp_port}"); udp::start((udp_host, udp_port), robot.clone(), udp_rx).await?; // info!("Webserver starting on port {web_port}"); diff --git a/server/src/transports/mod.rs b/server/src/transports/mod.rs index 5e6c928..8bbb199 100644 --- a/server/src/transports/mod.rs +++ b/server/src/transports/mod.rs @@ -1,5 +1,3 @@ -use std::net::SocketAddr; - // pub mod http; // pub mod tcp; pub mod udp; @@ -7,5 +5,29 @@ pub mod udp; #[derive(Debug, PartialEq, Eq, Clone)] pub enum SubscriptionId { - Udp(SocketAddr, u32), + Udp(udp::Id, udp::SubId), +} + +impl SubscriptionId { + pub fn same_client(&self, other: &Self) -> bool { + match (self, other) { + (SubscriptionId::Udp(addr1, _), SubscriptionId::Udp(addr2, _)) => addr1 == addr2, + (SubscriptionId::Udp(_, _), _) => false, + + _ => false, + } + } } + +// #[derive(Debug, PartialEq, Eq, Clone)] +// pub enum ClientId { +// Udp(udp::Id), +// } + +// impl From for ClientId { +// fn from(value: SubscriptionId) -> Self { +// match value { +// SubscriptionId::Udp(id, _) => Self::Udp(id), +// } +// } +// } diff --git a/server/src/transports/udp.rs b/server/src/transports/udp.rs index ef67a15..efa19b4 100644 --- a/server/src/transports/udp.rs +++ b/server/src/transports/udp.rs @@ -1,8 +1,8 @@ -use crate::{cmd::execute_concrete, event_bus::sub::EventSub, Backends}; +use crate::{cmd::execute_concrete, event_bus::sub::SubStatus, Backends}; use actix::spawn; use actix_web::rt::net::UdpSocket; use anyhow::Result; -use roblib::cmd; +use roblib::{cmd, event::ConcreteValue}; use std::{io::Cursor, net::SocketAddr, sync::Arc}; use tokio::{ net::ToSocketAddrs, @@ -11,13 +11,11 @@ use tokio::{ use super::SubscriptionId; -pub type Item = ( - roblib::event::ConcreteType, - roblib::event::ConcreteValue, - (SocketAddr, u32), -); -pub type Tx = UnboundedSender; -pub type Rx = UnboundedReceiver; +pub type Id = SocketAddr; +pub type SubId = u32; +pub type Item = (Id, SubId); +pub type Tx = UnboundedSender<(ConcreteValue, Item)>; +pub type Rx = UnboundedReceiver<(ConcreteValue, Item)>; pub(crate) async fn start(addr: impl ToSocketAddrs, robot: Arc, rx: Rx) -> Result<()> { let server = UdpSocket::bind(addr).await?; @@ -40,14 +38,14 @@ async fn run(server: UdpSocket, robot: Arc, rx: Rx) -> Result<()> { match cmd { cmd::Concrete::Subscribe(c) => { let sub = SubscriptionId::Udp(addr, id); - if let Err(e) = robot.sub.send((c.0, sub, EventSub::Subscribe)) { + if let Err(e) = robot.sub.send((c.0, sub, SubStatus::Subscribe)) { log::error!("event bus sub error: {e}"); }; continue; } cmd::Concrete::Unsubscribe(c) => { let sub = SubscriptionId::Udp(addr, id); - if let Err(e) = robot.sub.send((c.0, sub, EventSub::Unsubscribe)) { + if let Err(e) = robot.sub.send((c.0, sub, SubStatus::Unsubscribe)) { log::error!("event bus sub error: {e}"); }; continue; @@ -73,7 +71,7 @@ async fn run(server: UdpSocket, robot: Arc, rx: Rx) -> Result<()> { } async fn handle_event(mut event_bus: Rx, event_send: Arc) -> Result<()> { - while let Some((ty, ev, (addr, id))) = event_bus.recv().await { + while let Some((ev, (addr, id))) = event_bus.recv().await { let val: Vec = match ev { #[cfg(feature = "roland")] roblib::event::ConcreteValue::TrackSensor(val) => bincode::serialize(&(id, val))?,