Skip to content

Commit

Permalink
WIP: add basic tcp server/sync client implementation
Browse files Browse the repository at this point in the history
server hangs on camloc getposition

events are untested so they probably don't work

disconnect logic is a little finnicky but it might just work
  • Loading branch information
beni69 committed Jul 30, 2023
1 parent 82415de commit c6100a4
Show file tree
Hide file tree
Showing 16 changed files with 420 additions and 135 deletions.
12 changes: 12 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,15 @@ required-features = ["gpio"]
[[example]]
name = "controller"
required-features = ["roland", "gpio", "camloc"]

[[example]]
name = "sensor"
required-features = ["roland"]

[[example]]
name = "circle"
required-features = ["roland"]

[[example]]
name = "tcp"
required-features = ["roland"]
5 changes: 4 additions & 1 deletion client/examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use roblib::roland::{LedColor, Roland};
use roblib_client::{transports::udp::Udp, Result, Robot};
use roblib_client::{
transports::{tcp::Tcp, udp::Udp},
Result, Robot,
};
use std::{thread::sleep, time::Duration};

fn main() -> Result<()> {
Expand Down
16 changes: 16 additions & 0 deletions client/examples/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use roblib::roland::Roland;
use roblib_client::{transports::tcp::Tcp, Result, Robot};
use std::{thread::sleep, time::Duration};

fn main() -> Result<()> {
let ip = std::env::args()
.nth(1)
.unwrap_or_else(|| "localhost:1110".into());

let robot = Robot::new(Tcp::connect(ip)?);

let up = robot.drive(0., 0.)?;
dbg!(up);

Ok(())
}
4 changes: 3 additions & 1 deletion client/src/transports/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use anyhow::Result;
use roblib::{cmd::Command, event::Event};

pub mod http;
// pub mod http;
pub mod tcp;
pub mod udp;
// pub mod ws;

pub(self) const ID_START: u32 = 1;

pub trait Transport {
fn cmd<C>(&self, cmd: C) -> Result<C::Return>
where
Expand Down
63 changes: 46 additions & 17 deletions client/src/transports/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use super::{Subscribable, Transport};
use anyhow::Result;
use roblib::{cmd, event};
use roblib::{
cmd::{self, has_return, Command},
event,
};
use serde::Deserialize;
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, io::Write, sync::Arc};

type D<'a> = bincode::Deserializer<
bincode::de::read::IoReader<&'a std::net::TcpStream>,
Expand Down Expand Up @@ -38,48 +41,74 @@ impl Tcp {

Ok(Self {
inner,
id: 0.into(),
id: super::ID_START.into(),
socket,
})
}

fn listen(inner: Arc<TcpInner>, mut socket: std::net::TcpStream) -> Result<()> {
let bin = bincode::options();
loop {
let running = inner.running.read().unwrap();
if !*running {
return Ok(());
}
drop(running);

let id: u32 = bincode::deserialize_from(&mut socket)?;
let id: u32 = bincode::Options::deserialize_from(bin, &mut socket)?;
let mut handlers = inner.handlers.lock().unwrap();

let Some(entry) = handlers.get_mut(&id) else {
let Some(handler) = handlers.get_mut(&id) else {
return Err(anyhow::Error::msg("received response for unknown id"));
};

entry(bincode::Deserializer::with_reader(
&socket,
bincode::DefaultOptions::new(),
))?;
handler(bincode::Deserializer::with_reader(&socket, bin))?;
}
}

fn cmd_id<C>(&self, cmd: C, id: u32) -> Result<C::Return>
where
C: Command,
C::Return: Send + 'static,
{
let concrete: cmd::Concrete = cmd.into();
let buf = bincode::Options::serialize(bincode::options(), &(id, concrete))?;
(&self.socket).write_all(&(buf.len() as u32).to_be_bytes())?;
(&self.socket).write_all(&buf)?;
dbg!(&buf);

Ok(if has_return::<C>() {
let (tx, rx) = std::sync::mpsc::sync_channel(1);

let a: Handler = Box::new(move |mut des: D| {
let r = C::Return::deserialize(&mut des)?;
tx.send(r).unwrap();
Ok::<(), anyhow::Error>(())
});

self.inner.handlers.lock().unwrap().insert(id, a);

rx.recv()?
} else {
unsafe { std::mem::zeroed() }
})
}
}

impl Transport for Tcp {
fn cmd<C>(&self, cmd: C) -> anyhow::Result<C::Return>
where
C: roblib::cmd::Command,
C: Command,
C::Return: Send + 'static,
{
let concrete: cmd::Concrete = cmd.into();

let mut id_handle = self.id.lock().unwrap();
bincode::serialize_into(&self.socket, &(*id_handle, concrete))?;

*id_handle += 1;
let id = *id_handle;
*id_handle = id + 1;
drop(id_handle);

todo!()
let res = self.cmd_id(cmd, id);
self.inner.handlers.lock().unwrap().remove(&id);
res
}
}

Expand Down Expand Up @@ -107,7 +136,7 @@ impl Subscribable for Tcp {
return Err(anyhow::Error::msg("already subscribed to this event"));
}

bincode::serialize_into(&self.socket, &(id, cmd))?;
bincode::Options::serialize_into(bincode::options(), &self.socket, &(id, cmd))?;

*id_handle += 1;

Expand Down
14 changes: 8 additions & 6 deletions client/src/transports/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Udp {
std::thread::spawn(move || Self::recieve(i2, sock2));

Ok(Self {
id: 0.into(),
id: super::ID_START.into(),
inner,
sock,
})
Expand All @@ -61,7 +61,7 @@ impl Udp {
let buf = &buf[..len];

let mut curs = Cursor::new(buf);
let id: u32 = bincode::deserialize_from(&mut curs)?;
let id: u32 = bincode::Options::deserialize_from(bincode::options(), &mut curs)?;
if let Some(h) = inner.handlers.lock().unwrap().get_mut(&id) {
let pos = curs.position() as usize;
let rest = &curs.into_inner()[pos..];
Expand All @@ -77,7 +77,10 @@ impl Udp {
C::Return: Send + 'static,
{
let concrete: cmd::Concrete = cmd.into();
self.sock.send(&bincode::serialize(&(id, concrete))?)?;
self.sock.send(&bincode::Options::serialize(
bincode::options(),
&(id, concrete),
)?)?;

Ok(if has_return::<C>() {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
Expand Down Expand Up @@ -109,9 +112,7 @@ impl Transport for Udp {
drop(id_handle);

let res = self.cmd_id(cmd, id);

self.inner.handlers.lock().unwrap().remove(&id);

res
}
}
Expand Down Expand Up @@ -141,7 +142,8 @@ impl Subscribable for Udp {
let ev = ev.into();
let cmd: cmd::Concrete = cmd::Unsubscribe(ev).into();

self.sock.send(&bincode::serialize(&cmd)?)?;
self.sock
.send(&bincode::Options::serialize(bincode::options(), &cmd)?)?;

let id = self.inner.events.lock().unwrap().remove(&ev).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion roblib/src/camloc/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cmd::Command;
use roblib_macro::Command;

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct GetPosition;
impl Command for GetPosition {
const PREFIX: char = 'P';
Expand Down
49 changes: 39 additions & 10 deletions roblib/src/cmd/concrete.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use std::fmt::Display;

use crate::cmd::{self, Command};
use serde::{
de::{self, SeqAccess, Visitor},
ser::SerializeStruct,
Deserialize, Serialize,
};

use crate::cmd::{self, Command};

pub enum Concrete {
#[cfg(feature = "roland")]
MoveRobot(cmd::MoveRobot),
Expand Down Expand Up @@ -46,6 +43,44 @@ pub enum Concrete {
Nop(cmd::Nop),
GetUptime(cmd::GetUptime),
}
impl std::fmt::Debug for Concrete {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(feature = "roland")]
Self::MoveRobot(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::MoveRobotByAngle(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::StopRobot(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::Led(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::RolandServo(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::Buzzer(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::TrackSensor(v) => v.fmt(f),
#[cfg(feature = "roland")]
Self::UltraSensor(v) => v.fmt(f),
#[cfg(feature = "gpio")]
Self::PinMode(v) => v.fmt(f),
#[cfg(feature = "gpio")]
Self::ReadPin(v) => v.fmt(f),
#[cfg(feature = "gpio")]
Self::WritePin(v) => v.fmt(f),
#[cfg(feature = "gpio")]
Self::Pwm(v) => v.fmt(f),
#[cfg(feature = "gpio")]
Self::Servo(v) => v.fmt(f),
#[cfg(feature = "camloc")]
Self::GetPosition(v) => v.fmt(f),
Self::Subscribe(v) => v.fmt(f),
Self::Unsubscribe(v) => v.fmt(f),
Self::Nop(v) => v.fmt(f),
Self::GetUptime(v) => v.fmt(f),
}
}
}

// TODO: automatize Concrete impls
impl Concrete {
Expand Down Expand Up @@ -392,9 +427,3 @@ impl<'de> Deserialize<'de> for Concrete {
deserializer.deserialize_struct("Concrete", &["prefix", "cmd"], ConcreteVisitor)
}
}

impl Display for Concrete {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}
8 changes: 4 additions & 4 deletions roblib/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ pub const fn has_return<C: Command>() -> bool {
std::mem::size_of::<C::Return>() != 0
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct Subscribe(pub event::ConcreteType);
impl Command for Subscribe {
const PREFIX: char = '+';
type Return = ();
}
#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct Unsubscribe(pub event::ConcreteType);
impl Command for Unsubscribe {
const PREFIX: char = '-';
type Return = ();
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct Nop;
impl Command for Nop {
const PREFIX: char = '0';
type Return = ();
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct GetUptime;
impl Command for GetUptime {
const PREFIX: char = 'U';
Expand Down
10 changes: 5 additions & 5 deletions roblib/src/gpio/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
use crate::cmd::Command;
use roblib_macro::Command;

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct PinMode(pub u8, pub super::Mode);
impl Command for PinMode {
const PREFIX: char = 'p';
type Return = ();
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct ReadPin(pub u8);
impl Command for ReadPin {
const PREFIX: char = 'r';
type Return = bool;
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct WritePin(pub u8, pub bool);
impl Command for WritePin {
const PREFIX: char = 'w';
type Return = ();
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct Pwm(pub u8, pub f64, pub f64);
impl Command for Pwm {
const PREFIX: char = 'W';
type Return = ();
}

#[derive(Command, serde::Serialize, serde::Deserialize)]
#[derive(Command, serde::Serialize, serde::Deserialize, Debug)]
pub struct Servo(pub u8, pub f64);
impl Command for Servo {
const PREFIX: char = 'V';
Expand Down
Loading

0 comments on commit c6100a4

Please sign in to comment.