From 0fbeff509ce7598870506054794026114f76deef Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Wed, 10 Apr 2024 23:17:50 -0400 Subject: [PATCH] fix: readonly/writeonly protocol resource handling --- CHANGELOG.md | 6 + Cargo.toml | 2 +- bevy_rtc/src/client/client.rs | 102 +++++++++++++ bevy_rtc/src/client/mod.rs | 5 +- bevy_rtc/src/client/system_params.rs | 58 ------- bevy_rtc/src/server/client.rs | 219 +++++++++++++++++++++++++++ bevy_rtc/src/server/mod.rs | 4 +- bevy_rtc/src/server/system_params.rs | 125 --------------- 8 files changed, 333 insertions(+), 188 deletions(-) create mode 100644 bevy_rtc/src/client/client.rs delete mode 100644 bevy_rtc/src/client/system_params.rs create mode 100644 bevy_rtc/src/server/client.rs delete mode 100644 bevy_rtc/src/server/system_params.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b410e0..750daaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ Subheadings to categorize changes are `added, changed, deprecated, removed, fixe ## Unreleased +## 0.3.1 + +### fixed + +- A panic that would occur for ro/wo protocols. + ## 0.3.0 ### added diff --git a/Cargo.toml b/Cargo.toml index d03b598..4a7449c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.0" +version = "0.3.1" license = "MIT OR Apache-2.0" description = "A client-server library designed over WebRTC for Bevy" repository = "https://github.com/loopystudios/bevy_rtc" diff --git a/bevy_rtc/src/client/client.rs b/bevy_rtc/src/client/client.rs new file mode 100644 index 0000000..b47c35c --- /dev/null +++ b/bevy_rtc/src/client/client.rs @@ -0,0 +1,102 @@ +use super::router::{IncomingMessages, OutgoingMessages}; +use crate::protocol::Protocol; +use bevy::{ecs::system::SystemParam, prelude::*}; + +#[derive(SystemParam, Debug)] +pub struct RtcClient<'w, M: Protocol> { + // Option is none if it's send-only or read-only. + pub(crate) incoming: Option>>, + pub(crate) outgoing: Option>>, +} + +impl<'w, M: Protocol> RtcClient<'w, M> { + /// Returns the capacity of incoming messages. + pub fn capacity(&self) -> usize { + self.incoming.as_ref().map(|v| v.bound).unwrap_or(0) + } + + /// Returns the number of messages waiting in the buffer without draining them. + pub fn len(&self) -> usize { + self.incoming + .as_ref() + .map(|v| v.messages.len()) + .unwrap_or(0) + } + + /// Returns the number of messages waiting in the buffer without draining them. + pub fn is_empty(&self) -> bool { + self.incoming + .as_ref() + .map(|v| v.messages.is_empty()) + .unwrap_or(true) + } + + /// Clear all messages waiting in the buffer. + pub fn clear(&mut self) { + if let Some(ref mut incoming) = self.incoming { + incoming.messages.clear() + } + } + + /// Consumes all messages in the buffer and iterate on them. + pub fn read(&mut self) -> Vec { + if let Some(ref mut incoming) = self.incoming { + incoming.messages.drain(..).collect() + } else { + panic!( + "Attempting to read from `{}` is not allowed, it is registered write only.", + M::reflect_name() + ); + } + } + + /// Send a payload to the host with reliability. The payload is created with + /// lazy behavior, only when the send rate allows. + pub fn reliable_to_host_with(&mut self, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_host.push(message_fn()); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to the host with no expectation of delivery. The payload + /// is created with lazy behavior, only when the send rate allows. + pub fn unreliable_to_host_with(&mut self, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.unreliable_to_host.push(message_fn()); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to the host with reliability. + pub fn reliable_to_host(&mut self, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_host.push(message); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to the host with no expectation of delivery. + pub fn unreliable_to_host(&mut self, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.unreliable_to_host.push(message); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } +} diff --git a/bevy_rtc/src/client/mod.rs b/bevy_rtc/src/client/mod.rs index cda255b..33f7ff6 100644 --- a/bevy_rtc/src/client/mod.rs +++ b/bevy_rtc/src/client/mod.rs @@ -1,12 +1,13 @@ +#[allow(clippy::module_inception)] +mod client; mod events; mod plugin; mod router; mod state; -mod system_params; mod systems; +pub use client::RtcClient; pub use events::{RtcClientEvent, RtcClientRequestEvent}; pub use plugin::RtcClientPlugin; pub use router::AddClientProtocolExt; pub use state::{RtcClientState, RtcClientStatus}; -pub use system_params::RtcClient; diff --git a/bevy_rtc/src/client/system_params.rs b/bevy_rtc/src/client/system_params.rs deleted file mode 100644 index 1d468b4..0000000 --- a/bevy_rtc/src/client/system_params.rs +++ /dev/null @@ -1,58 +0,0 @@ -use super::router::{IncomingMessages, OutgoingMessages}; -use crate::protocol::Protocol; -use bevy::{ecs::system::SystemParam, prelude::*}; - -#[derive(SystemParam, Debug)] -pub struct RtcClient<'w, M: Protocol> { - pub(crate) incoming: ResMut<'w, IncomingMessages>, - pub(crate) outgoing: ResMut<'w, OutgoingMessages>, -} - -impl<'w, M: Protocol> RtcClient<'w, M> { - /// Returns the capacity of incoming messages. - pub fn capacity(&self) -> usize { - self.incoming.bound - } - - /// Returns the number of messages waiting in the buffer without draining them. - pub fn len(&self) -> usize { - self.incoming.messages.len() - } - - /// Returns the number of messages waiting in the buffer without draining them. - pub fn is_empty(&self) -> bool { - self.incoming.messages.is_empty() - } - - /// Clear all messages waiting in the buffer. - pub fn clear(&mut self) { - self.incoming.messages.clear() - } - - /// Consumes all messages in the buffer and iterate on them. - pub fn read(&mut self) -> Vec { - self.incoming.messages.drain(..).collect() - } - - /// Send a payload to the host with reliability. The payload is created with - /// lazy behavior, only when the send rate allows. - pub fn reliable_to_host_with(&mut self, message_fn: impl Fn() -> M) { - self.outgoing.reliable_to_host.push(message_fn()); - } - - /// Send a payload to the host with no expectation of delivery. The payload - /// is created with lazy behavior, only when the send rate allows. - pub fn unreliable_to_host_with(&mut self, message_fn: impl Fn() -> M) { - self.outgoing.unreliable_to_host.push(message_fn()); - } - - /// Send a payload to the host with reliability. - pub fn reliable_to_host(&mut self, message: M) { - self.outgoing.reliable_to_host.push(message); - } - - /// Send a payload to the host with no expectation of delivery. - pub fn unreliable_to_host(&mut self, message: M) { - self.outgoing.unreliable_to_host.push(message); - } -} diff --git a/bevy_rtc/src/server/client.rs b/bevy_rtc/src/server/client.rs new file mode 100644 index 0000000..841a586 --- /dev/null +++ b/bevy_rtc/src/server/client.rs @@ -0,0 +1,219 @@ +use super::router::{IncomingMessages, OutgoingMessages}; +use crate::protocol::Protocol; +use bevy::{ecs::system::SystemParam, prelude::*}; +use bevy_matchbox::prelude::PeerId; + +/// A [`SystemParam`] for reading payloads of a particular type. +#[derive(SystemParam, Debug)] +pub struct RtcServer<'w, M: Protocol> { + // Option is none if it's send-only or read-only. + pub(crate) incoming: Option>>, + pub(crate) outgoing: Option>>, +} + +impl<'w, M: Protocol> RtcServer<'w, M> { + /// Returns the capacity of incoming messages. + pub fn capacity(&self) -> usize { + self.incoming.as_ref().map(|v| v.bound).unwrap_or(0) + } + + /// Returns the number of messages waiting in the buffer without draining them. + pub fn len(&self) -> usize { + self.incoming + .as_ref() + .map(|v| v.messages.len()) + .unwrap_or(0) + } + + /// Returns the number of messages waiting in the buffer without draining them. + pub fn is_empty(&self) -> bool { + self.incoming + .as_ref() + .map(|v| v.messages.is_empty()) + .unwrap_or(true) + } + + /// Clear all messages waiting in the buffer. + pub fn clear(&mut self) { + if let Some(ref mut incoming) = self.incoming { + incoming.messages.clear() + } + } + + /// Consumes all messages in the buffer and iterate on them. + pub fn read(&mut self) -> Vec<(PeerId, M)> { + if let Some(ref mut incoming) = self.incoming { + incoming + .messages + .drain() + .fold(vec![], |mut v, (peer, payloads)| { + v.extend(payloads.into_iter().map(|p| (peer, p))); + v + }) + } else { + panic!( + "Attempting to read from `{}` is not allowed, it is registered write only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers with reliability. + pub fn reliable_to_all(&mut self, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_all.push(message); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers with no expectation of delivery. + pub fn unreliable_to_all(&mut self, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_all.push(message); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to a peer with reliability. + pub fn reliable_to_peer(&mut self, peer_id: PeerId, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_peer.push((peer_id, message)); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to a peer with no expectation of delivery. + pub fn unreliable_to_peer(&mut self, peer_id: PeerId, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.unreliable_to_peer.push((peer_id, message)); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers except one with reliability. + pub fn reliable_to_all_except(&mut self, peer_id: PeerId, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_all_except.push((peer_id, message)); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers except one with no expectation of + /// delivery. + pub fn unreliable_to_all_except(&mut self, peer_id: PeerId, message: M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.unreliable_to_all_except.push((peer_id, message)); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers with reliability. The payload is + /// created with lazy behavior, only when the send rate allows. + pub fn reliable_to_all_with(&mut self, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_all.push(message_fn()); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers with no expectation of delivery. + /// The payload is created with lazy behavior, only when the send rate + /// allows. + pub fn unreliable_to_all_with(&mut self, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.unreliable_to_all.push(message_fn()); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to a peer with reliability. The payload is + /// created with lazy behavior, only when the send rate allows. + pub fn reliable_to_peer_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.reliable_to_peer.push((peer_id, message_fn())); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to a peer with no expectation of delivery. + /// The payload is created with lazy behavior, only when the send rate + /// allows. + pub fn unreliable_to_peer_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing.unreliable_to_peer.push((peer_id, message_fn())); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers except one with reliability. The + /// payload is created with lazy behavior, only when the send rate + /// allows. + pub fn reliable_to_all_except_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing + .reliable_to_all_except + .push((peer_id, message_fn())); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } + + /// Send a payload to all connected peers except one with no expectation of + /// delivery. The payload is created with lazy behavior, only when the + /// send rate allows. + pub fn unreliable_to_all_except_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { + if let Some(ref mut outgoing) = self.outgoing { + outgoing + .unreliable_to_all_except + .push((peer_id, message_fn())); + } else { + panic!( + "Attempting to write `{}` is not allowed, it is registered read only.", + M::reflect_name() + ); + } + } +} diff --git a/bevy_rtc/src/server/mod.rs b/bevy_rtc/src/server/mod.rs index 19e32d6..c56da40 100644 --- a/bevy_rtc/src/server/mod.rs +++ b/bevy_rtc/src/server/mod.rs @@ -1,12 +1,12 @@ +mod client; mod events; mod plugin; mod router; mod state; -mod system_params; mod systems; +pub use client::RtcServer; pub use events::RtcServerEvent; pub use plugin::RtcServerPlugin; pub use router::AddServerProtocolExt; pub use state::{RtcServerState, RtcServerStatus}; -pub use system_params::RtcServer; diff --git a/bevy_rtc/src/server/system_params.rs b/bevy_rtc/src/server/system_params.rs deleted file mode 100644 index bbe95be..0000000 --- a/bevy_rtc/src/server/system_params.rs +++ /dev/null @@ -1,125 +0,0 @@ -use super::router::{IncomingMessages, OutgoingMessages}; -use crate::protocol::Protocol; -use bevy::{ecs::system::SystemParam, prelude::*}; -use bevy_matchbox::prelude::PeerId; - -/// A [`SystemParam`] for reading payloads of a particular type. -#[derive(SystemParam, Debug)] -pub struct RtcServer<'w, M: Protocol> { - pub(crate) incoming: ResMut<'w, IncomingMessages>, - pub(crate) outgoing: ResMut<'w, OutgoingMessages>, -} - -impl<'w, M: Protocol> RtcServer<'w, M> { - /// Returns the capacity of incoming messages. - pub fn capacity(&self) -> usize { - self.incoming.bound - } - - /// Returns the number of messages waiting in the buffer without draining them. - pub fn len(&self) -> usize { - self.incoming.messages.len() - } - - /// Returns the number of messages waiting in the buffer without draining them. - pub fn is_empty(&self) -> bool { - self.incoming.messages.is_empty() - } - - /// Consumes all messages in the buffer and iterate on them. - pub fn read(&mut self) -> Vec<(PeerId, M)> { - self.incoming - .messages - .drain() - .fold(vec![], |mut v, (peer, payloads)| { - v.extend(payloads.into_iter().map(|p| (peer, p))); - v - }) - } - - /// Clear all messages waiting in the buffer. - pub fn clear(&mut self) { - self.incoming.messages.clear() - } - - /// Send a payload to all connected peers with reliability. - pub fn reliable_to_all(&mut self, message: M) { - self.outgoing.reliable_to_all.push(message); - } - - /// Send a payload to all connected peers with no expectation of delivery. - pub fn unreliable_to_all(&mut self, message: M) { - self.outgoing.unreliable_to_all.push(message); - } - - /// Send a payload to a peer with reliability. - pub fn reliable_to_peer(&mut self, peer_id: PeerId, message: M) { - self.outgoing.reliable_to_peer.push((peer_id, message)); - } - - /// Send a payload to a peer with no expectation of delivery. - pub fn unreliable_to_peer(&mut self, peer_id: PeerId, message: M) { - self.outgoing.unreliable_to_peer.push((peer_id, message)); - } - - /// Send a payload to all connected peers except one with reliability. - pub fn reliable_to_all_except(&mut self, peer_id: PeerId, message: M) { - self.outgoing - .reliable_to_all_except - .push((peer_id, message)); - } - - /// Send a payload to all connected peers except one with no expectation of - /// delivery. - pub fn unreliable_to_all_except(&mut self, peer_id: PeerId, message: M) { - self.outgoing - .unreliable_to_all_except - .push((peer_id, message)); - } - - /// Send a payload to all connected peers with reliability. The payload is - /// created with lazy behavior, only when the send rate allows. - pub fn reliable_to_all_with(&mut self, message_fn: impl Fn() -> M) { - self.outgoing.reliable_to_all.push(message_fn()); - } - - /// Send a payload to all connected peers with no expectation of delivery. - /// The payload is created with lazy behavior, only when the send rate - /// allows. - pub fn unreliable_to_all_with(&mut self, message_fn: impl Fn() -> M) { - self.outgoing.unreliable_to_all.push(message_fn()); - } - - /// Send a payload to a peer with reliability. The payload is - /// created with lazy behavior, only when the send rate allows. - pub fn reliable_to_peer_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { - self.outgoing.reliable_to_peer.push((peer_id, message_fn())); - } - - /// Send a payload to a peer with no expectation of delivery. - /// The payload is created with lazy behavior, only when the send rate - /// allows. - pub fn unreliable_to_peer_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { - self.outgoing - .unreliable_to_peer - .push((peer_id, message_fn())); - } - - /// Send a payload to all connected peers except one with reliability. The - /// payload is created with lazy behavior, only when the send rate - /// allows. - pub fn reliable_to_all_except_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { - self.outgoing - .reliable_to_all_except - .push((peer_id, message_fn())); - } - - /// Send a payload to all connected peers except one with no expectation of - /// delivery. The payload is created with lazy behavior, only when the - /// send rate allows. - pub fn unreliable_to_all_except_with(&mut self, peer_id: PeerId, message_fn: impl Fn() -> M) { - self.outgoing - .unreliable_to_all_except - .push((peer_id, message_fn())); - } -}