Skip to content

Commit

Permalink
chore: Run rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
qdot committed Oct 20, 2023
1 parent e3642a3 commit e91d5ec
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ pub struct BtleplugAdapterTask {
event_sender: Sender<HardwareCommunicationManagerEvent>,
command_receiver: Receiver<BtleplugAdapterCommand>,
adapter_connected: Arc<AtomicBool>,
requires_keepalive: bool
requires_keepalive: bool,
}

impl BtleplugAdapterTask {
pub fn new(
event_sender: Sender<HardwareCommunicationManagerEvent>,
command_receiver: Receiver<BtleplugAdapterCommand>,
adapter_connected: Arc<AtomicBool>,
requires_keepalive: bool
requires_keepalive: bool,
) -> Self {
Self {
event_sender,
command_receiver,
adapter_connected,
requires_keepalive
requires_keepalive,
}
}

Expand Down Expand Up @@ -118,7 +118,7 @@ impl BtleplugAdapterTask {
&properties.services,
peripheral.clone(),
adapter.clone(),
self.requires_keepalive
self.requires_keepalive,
));
if self
.event_sender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::sync::mpsc::{channel, Sender};

#[derive(Default, Clone)]
pub struct BtlePlugCommunicationManagerBuilder {
require_keepalive: bool
require_keepalive: bool,
}

impl BtlePlugCommunicationManagerBuilder {
Expand All @@ -39,7 +39,10 @@ impl HardwareCommunicationManagerBuilder for BtlePlugCommunicationManagerBuilder
&mut self,
sender: Sender<HardwareCommunicationManagerEvent>,
) -> Box<dyn HardwareCommunicationManager> {
Box::new(BtlePlugCommunicationManager::new(sender, self.require_keepalive))
Box::new(BtlePlugCommunicationManager::new(
sender,
self.require_keepalive,
))
}
}

Expand All @@ -50,12 +53,20 @@ pub struct BtlePlugCommunicationManager {
}

impl BtlePlugCommunicationManager {
pub fn new(event_sender: Sender<HardwareCommunicationManagerEvent>, require_keepalive: bool) -> Self {
pub fn new(
event_sender: Sender<HardwareCommunicationManagerEvent>,
require_keepalive: bool,
) -> Self {
let (sender, receiver) = channel(256);
let adapter_connected = Arc::new(AtomicBool::new(false));
let adapter_connected_clone = adapter_connected.clone();
async_manager::spawn(async move {
let mut task = BtleplugAdapterTask::new(event_sender, receiver, adapter_connected_clone, require_keepalive);
let mut task = BtleplugAdapterTask::new(
event_sender,
receiver,
adapter_connected_clone,
require_keepalive,
);
task.run().await;
});
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub(super) struct BtleplugHardwareConnector<T: Peripheral + 'static> {
services: Vec<Uuid>,
device: T,
adapter: Adapter,
requires_keepalive: bool
requires_keepalive: bool,
}

impl<T: Peripheral> BtleplugHardwareConnector<T> {
Expand All @@ -65,15 +65,15 @@ impl<T: Peripheral> BtleplugHardwareConnector<T> {
services: &[Uuid],
device: T,
adapter: Adapter,
requires_keepalive: bool
requires_keepalive: bool,
) -> Self {
Self {
name: name.to_owned(),
manufacturer_data: manufacturer_data.clone(),
services: services.to_vec(),
device,
adapter,
requires_keepalive
requires_keepalive,
}
}
}
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<T: Peripheral> HardwareConnector for BtleplugHardwareConnector<T> {
&self.name,
self.device.clone(),
self.adapter.clone(),
self.requires_keepalive
self.requires_keepalive,
)))
}
}
Expand All @@ -131,7 +131,7 @@ pub struct BtleplugHardwareSpecializer<T: Peripheral + 'static> {
name: String,
device: T,
adapter: Adapter,
requires_keepalive: bool
requires_keepalive: bool,
}

impl<T: Peripheral> BtleplugHardwareSpecializer<T> {
Expand All @@ -140,7 +140,7 @@ impl<T: Peripheral> BtleplugHardwareSpecializer<T> {
name: name.to_owned(),
device,
adapter,
requires_keepalive
requires_keepalive,
}
}
}
Expand Down
17 changes: 7 additions & 10 deletions buttplug/src/server/device/hardware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
pub mod communication;

use std::{
fmt::Debug,
time::Duration, sync::Arc
};
use std::{fmt::Debug, sync::Arc, time::Duration};

use crate::{
core::{
Expand All @@ -16,9 +13,9 @@ use async_trait::async_trait;
use futures::future::BoxFuture;
use futures_util::FutureExt;
use getset::{CopyGetters, Getters};
use instant::Instant;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock};
use instant::Instant;

/// Parameters for reading data from a [Hardware](crate::device::Hardware) endpoint
///
Expand Down Expand Up @@ -247,9 +244,9 @@ pub struct Hardware {
/// Internal implementation details
internal_impl: Box<dyn HardwareInternal>,
/// Requires a keepalive signal to be sent by the Server Device class
#[getset(get_copy="pub")]
#[getset(get_copy = "pub")]
requires_keepalive: bool,
last_write_time: Arc<RwLock<Instant>>
last_write_time: Arc<RwLock<Instant>>,
}

impl Hardware {
Expand All @@ -265,7 +262,7 @@ impl Hardware {
endpoints: endpoints.into(),
internal_impl,
requires_keepalive: false,
last_write_time: Arc::new(RwLock::new(Instant::now()))
last_write_time: Arc::new(RwLock::new(Instant::now())),
}
}

Expand Down Expand Up @@ -329,14 +326,14 @@ impl Hardware {
&self,
msg: &HardwareWriteCmd,
) -> BoxFuture<'static, Result<(), ButtplugDeviceError>> {

let write_fut = self.internal_impl.write_value(msg);
if self.requires_keepalive {
let last_write_time = self.last_write_time.clone();
async move {
*last_write_time.write().await = Instant::now();
write_fut.await
}.boxed()
}
.boxed()
} else {
write_fut
}
Expand Down
2 changes: 1 addition & 1 deletion buttplug/src/server/device/protocol/adrienlastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct AdrienLastic {}

impl ProtocolHandler for AdrienLastic {
fn keepalive_strategy(&self) -> super::ProtocolKeepaliveStrategy {
super::ProtocolKeepaliveStrategy::RepeatLastPacketStrategy
super::ProtocolKeepaliveStrategy::RepeatLastPacketStrategy
}

fn handle_scalar_vibrate_cmd(
Expand Down
7 changes: 5 additions & 2 deletions buttplug/src/server/device/protocol/lovense.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,13 @@ pub struct Lovense {
}

impl ProtocolHandler for Lovense {

fn keepalive_strategy(&self) -> super::ProtocolKeepaliveStrategy {
// For Lovense, we'll just repeat the device type packet and drop the result.
super::ProtocolKeepaliveStrategy::RepeatPacketStrategy(HardwareWriteCmd::new(Endpoint::Tx, b"DeviceType;".to_vec(), false))
super::ProtocolKeepaliveStrategy::RepeatPacketStrategy(HardwareWriteCmd::new(
Endpoint::Tx,
b"DeviceType;".to_vec(),
false,
))
}

fn handle_scalar_cmd(
Expand Down
6 changes: 3 additions & 3 deletions buttplug/src/server/device/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ use std::{collections::HashMap, sync::Arc};
/// things alive. Currently this only applies to iOS backgrounding with bluetooth devices, but since
/// we never know which of our hundreds of supported devices someone might connect, we need context
/// as to which keepalive strategy to use.
///
///
/// When choosing a keepalive strategy for a protocol:
///
///
/// - All protocols use NoStrategy by default. For many devices, sending trash will break them in
/// very weird ways and we can't risk that, so we need to know the protocol context.
/// - If the protocol already needs its own keepalive (Satisfyer, Mysteryvibe, etc...), use
Expand All @@ -159,7 +159,7 @@ pub enum ProtocolKeepaliveStrategy {
/// will be useful for most devices that purely use scalar commands.
RepeatLastPacketStrategy,
/// Call a specific method on the protocol implementation to generate keepalive packets.
CustomStrategy
CustomStrategy,
}

pub trait ProtocolIdentifierFactory: Send + Sync {
Expand Down
11 changes: 7 additions & 4 deletions buttplug/src/server/device/protocol/thehandy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,22 @@ pub struct TheHandy {
}

impl ProtocolHandler for TheHandy {

fn keepalive_strategy(&self) -> super::ProtocolKeepaliveStrategy {
let ping_payload = handyplug::Payload {
messages: vec![handyplug::Message {
message: Some(handyplug::message::Message::Ping(Ping { id: 999 })),
}]
}],
};
let mut ping_buf = vec![];
ping_payload
.encode(&mut ping_buf)
.expect("Infallible encode.");

super::ProtocolKeepaliveStrategy::RepeatPacketStrategy(HardwareWriteCmd::new(Endpoint::Tx, ping_buf, true))

super::ProtocolKeepaliveStrategy::RepeatPacketStrategy(HardwareWriteCmd::new(
Endpoint::Tx,
ping_buf,
true,
))
}

fn handle_fleshlight_launch_fw12_cmd(
Expand Down
48 changes: 37 additions & 11 deletions buttplug/src/server/device/server_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
},
ButtplugServerResultFuture,
},
util::{self, stream::convert_broadcast_receiver_to_stream, async_manager},
util::{self, async_manager, stream::convert_broadcast_receiver_to_stream},
};
use core::hash::{Hash, Hasher};
use dashmap::DashSet;
Expand All @@ -56,7 +56,12 @@ use tokio_stream::StreamExt;

use super::{
configuration::{ProtocolDeviceAttributes, ServerDeviceMessageAttributes},
protocol::{generic_command_manager::GenericCommandManager, ProtocolSpecializer, ProtocolKeepaliveStrategy}, hardware::HardwareWriteCmd,
hardware::HardwareWriteCmd,
protocol::{
generic_command_manager::GenericCommandManager,
ProtocolKeepaliveStrategy,
ProtocolSpecializer,
},
};

#[derive(Debug)]
Expand Down Expand Up @@ -164,11 +169,19 @@ pub(super) async fn build_server_device(

// We now have fully initialized hardware, return a server device.
let device = ServerDevice::new(identifier, handler, hardware, &attrs);

// If we need a keepalive with a packet replay, set this up via stopping the device on connect.
if requires_keepalive && matches!(strategy, ProtocolKeepaliveStrategy::RepeatLastPacketStrategy) {
if requires_keepalive
&& matches!(
strategy,
ProtocolKeepaliveStrategy::RepeatLastPacketStrategy
)
{
if let Err(e) = device.handle_stop_device_cmd().await {
return Err(ButtplugDeviceError::DeviceConnectionError(format!("Error setting up keepalive: {}", e)));
return Err(ButtplugDeviceError::DeviceConnectionError(format!(
"Error setting up keepalive: {}",
e
)));
}
}

Expand All @@ -183,7 +196,7 @@ pub struct ServerDevice {
/// Unique identifier for the device
identifier: ServerDeviceIdentifier,
raw_subscribed_endpoints: Arc<DashSet<Endpoint>>,
keepalive_packet: Arc<RwLock<Option<HardwareWriteCmd>>>
keepalive_packet: Arc<RwLock<Option<HardwareWriteCmd>>>,
}
impl Debug for ServerDevice {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down Expand Up @@ -220,7 +233,12 @@ impl ServerDevice {
let keepalive_packet = Arc::new(RwLock::new(None));
let gcm = GenericCommandManager::new(attributes);
// If we've gotten here, we know our hardware is connected. This means we can start the keepalive if it's required.
if hardware.requires_keepalive() && !matches!(handler.keepalive_strategy(), ProtocolKeepaliveStrategy::NoStrategy) {
if hardware.requires_keepalive()
&& !matches!(
handler.keepalive_strategy(),
ProtocolKeepaliveStrategy::NoStrategy
)
{
let hardware = hardware.clone();
let strategy = handler.keepalive_strategy();
let keepalive_packet = keepalive_packet.clone();
Expand All @@ -243,11 +261,14 @@ impl ServerDevice {
break;
}
}
},
}
_ => {
info!("Protocol keepalive strategy {:?} not implemented, replacing with NoStrategy", strategy);
info!(
"Protocol keepalive strategy {:?} not implemented, replacing with NoStrategy",
strategy
);
}
}
}
}
// Arbitrary wait time for now.
util::sleep(wait_duration).await;
Expand Down Expand Up @@ -548,7 +569,12 @@ impl ServerDevice {
// disconnected.
for command in commands {
hardware.parse_message(&command).await?;
if hardware.requires_keepalive() && matches!(keepalive_type, ProtocolKeepaliveStrategy::RepeatLastPacketStrategy) {
if hardware.requires_keepalive()
&& matches!(
keepalive_type,
ProtocolKeepaliveStrategy::RepeatLastPacketStrategy
)
{
if let HardwareCommand::Write(command) = command {
*keepalive_packet.write().await = Some(command);
}
Expand Down

0 comments on commit e91d5ec

Please sign in to comment.