From bfd9ee8c886945a6d02e15e5f166e61352a2a801 Mon Sep 17 00:00:00 2001 From: beni69 <73666440+beni69@users.noreply.github.com> Date: Tue, 1 Aug 2023 17:34:04 +0200 Subject: [PATCH] wip: tcp client event debugging --- client/examples/gpio_in.rs | 2 +- roblib/src/cmd/mod.rs | 9 ++++++++- server/src/transports/tcp.rs | 23 ++++++++++++----------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/client/examples/gpio_in.rs b/client/examples/gpio_in.rs index ee144d1..2c8b546 100644 --- a/client/examples/gpio_in.rs +++ b/client/examples/gpio_in.rs @@ -42,8 +42,8 @@ fn main() -> Result<()> { log::info!("subscribe"); inp.subscribe(move |b| { - dbg!(b); out.set(b)?; + dbg!(b); Ok(()) })?; diff --git a/roblib/src/cmd/mod.rs b/roblib/src/cmd/mod.rs index 12b487d..b255d1c 100644 --- a/roblib/src/cmd/mod.rs +++ b/roblib/src/cmd/mod.rs @@ -17,7 +17,14 @@ pub use crate::camloc::cmd::*; pub use self::concrete::Concrete; pub trait Command: - Serialize + DeserializeOwned + Into + From + Send + Sync + 'static + Serialize + + DeserializeOwned + + Into + + From + + Send + + Sync + + 'static + + std::fmt::Debug { const PREFIX: char; type Return: Serialize + DeserializeOwned + Send + Sync + 'static; diff --git a/server/src/transports/tcp.rs b/server/src/transports/tcp.rs index 11f07db..71fa20f 100644 --- a/server/src/transports/tcp.rs +++ b/server/src/transports/tcp.rs @@ -1,7 +1,7 @@ //! TCP wire format: //! -> u32: message length, (u32: id, roblib::cmd::Concrete) //! <- u32: message length, (u32: id, roblib::cmd::Concrete::Return) -//! <- u32: message length, (u32: id, roblib::event::ConcreteValue) +//! <- u32: message length, (u32: id, roblib::event::Event::Item) use crate::{ cmd::execute_concrete, event_bus::sub::SubStatus, transports::SubscriptionId, Backends, }; @@ -80,7 +80,7 @@ async fn handle_client( match action { Action::ClientMessage(n) => { if n == 0 { - log::debug!("received 0 sized msg, investigating disconnect"); + log::debug!("tcp: received 0 sized msg, investigating disconnect"); // give the socket some time to fully realize disconnect tokio::time::sleep(Duration::from_millis(50)).await; let r = stream.ready(Interest::READABLE | Interest::WRITABLE).await; @@ -91,13 +91,14 @@ async fn handle_client( } len += n; - log::debug!( - "Thing::ClientMessage - n: {n}, len: {len}, mblen: {maybe_cmd_len:?}, buflen: {}", - buf.len() - ); + // log::debug!( + // "Thing::ClientMessage - n: {n}, len: {len}, mblen: {maybe_cmd_len:?}, buflen: {}", + // buf.len() + // ); + // not enough bytes to get command length if len < HEADER { - log::debug!("read more header"); + // log::debug!("read more header"); continue; } @@ -107,14 +108,14 @@ async fn handle_client( let cmd = u32::from_be_bytes((&buf[..HEADER]).try_into().unwrap()) as usize; // buf.resize(HEADER + cmd, 0); maybe_cmd_len = Some(cmd); - log::debug!("header received, cmdlen: {cmd}"); + // log::debug!("header received, cmdlen: {cmd}"); cmd } }; // not enough bytes to parse command, get some more if len < HEADER + cmd_len { - log::debug!("read more command"); + // log::debug!("read more command"); continue; } @@ -124,14 +125,14 @@ async fn handle_client( match cmd { cmd::Concrete::Subscribe(c) => { let sub = SubscriptionId::Tcp(addr, id); - dbg!(&sub); + dbg!((&c, &sub)); if let Err(e) = robot.sub.send((c.0, sub, SubStatus::Subscribe)) { log::error!("event bus sub error: {e}"); }; } cmd::Concrete::Unsubscribe(c) => { let unsub = SubscriptionId::Tcp(addr, id); - dbg!(&unsub); + dbg!((&c, &unsub)); if let Err(e) = robot.sub.send((c.0, unsub, SubStatus::Unsubscribe)) { log::error!("event bus sub error: {e}"); };