Skip to content

Commit

Permalink
moving some utils to skunk-utils
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraef committed Jul 12, 2024
1 parent 0991884 commit 9ff6df3
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 91 deletions.
1 change: 1 addition & 0 deletions skunk-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ semver-macro = "0.1.0"
serde = { version = "1.0.201", features = ["derive"] }
thiserror = "1.0.61"
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.11"
toml = "0.8.12"
toml_edit = { version = "0.22.15", features = ["serde"] }
tower-http = { version = "0.5.2", features = ["fs"] }
Expand Down
14 changes: 8 additions & 6 deletions skunk-cli/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ use skunk::{
Passthrough,
Proxy,
},
util::{
error::ResultExt,
CancellationToken,
},
};
use skunk_util::error::ResultExt;
use tokio::{
net::TcpStream,
task::JoinSet,
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

use crate::{
Expand Down Expand Up @@ -151,20 +149,24 @@ pub async fn run(environment: Environment, args: ProxyArgs) -> Result<(), Error>
let shutdown = shutdown.clone();
let interface = interface.clone();
async move {
if args.pcap.enabled {
let _hostapd = if args.pcap.ap {
let country_code = std::env::var("HOSTAPD_CC")
.expect("Environment variable `HOSTAPD_CC` not set. You need to set this variable to your country code.");

tracing::info!("Starting hostapd");
let mut hostapd = pcap::ap::Builder::new(&interface, &country_code)
.with_channel(11)
.with_graceful_shutdown(shutdown.clone())
.start()?;

tracing::info!("Waiting for hostapd to configure the interface...");
hostapd.ready().await?;
tracing::info!("hostapd ready");

Some(hostapd)
}
else {
None
};

let _network = VirtualNetwork::new(&interface)?;
shutdown.cancelled().await;
Expand Down
2 changes: 1 addition & 1 deletion skunk-cli/src/util/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use skunk::util::CancellationToken;
use tokio_util::sync::CancellationToken;

/// Resolves when the application receives SIGTERM on unix systems, or never on
/// other systems.
Expand Down
6 changes: 5 additions & 1 deletion skunk-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ edition = "2021"

[features]
default = []
full = ["trigger"]
trigger = ["dep:tokio"]
ordered-multimap = ["dep:ahash", "dep:hashbrown"]
error = []

[dependencies]
ahash = { version = "0.8.11", optional = true }
hashbrown = { version = "0.14.5", optional = true }
tokio = { version = "1.37.0", default-features = false, features = ["sync"], optional = true }
tracing = "0.1.40"
File renamed without changes.
6 changes: 6 additions & 0 deletions skunk-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
#[cfg(feature = "trigger")]
pub mod trigger;

#[cfg(feature = "error")]
pub mod error;

#[cfg(feature = "ordered-multimap")]
pub mod ordered_multimap;
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,12 @@ impl<'a, K, V, H> OccupiedEntryMut<'a, K, V, H> {

pub fn append(&mut self, key: K, value: V) -> &mut V {
let tail = self.list().tail;
let bucket = self.map.buckets.get_index(&self.bucket);

let index = self.map.pairs.push(Pair {
key,
value,
next: None,
prev: Some(tail),
bucket,
});

self.map.pairs.get_mut(tail).next = Some(index);
Expand Down Expand Up @@ -582,14 +580,12 @@ impl<'a, K, V, H> VacantEntryMut<'a, K, V, H> {
count: 1,
},
);
let bucket_index = self.map.buckets.get_index(&bucket);

let index2 = self.map.pairs.push(Pair {
key,
value,
next: None,
prev: None,
bucket: bucket_index,
});
assert_eq!(index, index2);

Expand Down Expand Up @@ -878,16 +874,12 @@ impl<K, V> ExactSizeIterator for IntoIter<K, V> {}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct PairIndex(usize);

#[derive(Clone, Copy)]
struct BucketIndex(usize);

#[derive(Clone)]
struct Pair<K, V> {
key: K,
value: V,
next: Option<PairIndex>,
prev: Option<PairIndex>,
bucket: BucketIndex,
}

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -978,10 +970,6 @@ impl<H: BuildHasher> Buckets<H> {
}

impl<H> Buckets<H> {
pub fn get_index(&self, bucket: &Bucket<List>) -> BucketIndex {
BucketIndex(unsafe { self.inner.bucket_index(bucket) })
}

pub fn remove(&mut self, bucket: Bucket<List>) -> (List, InsertSlot) {
unsafe { self.inner.remove(bucket) }
}
Expand Down
7 changes: 5 additions & 2 deletions skunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ path = "../byst"
[dependencies.skunk-macros]
path = "../skunk-macros"

[dependencies.skunk-util]
path = "../skunk-util"
features = ["error", "ordered-multimap"]

[dependencies]
ahash = "0.8.11"
bitflags = "2.5.0"
bytes = "1.6.0"
crc = "3.2.1"
Expand Down Expand Up @@ -83,6 +86,6 @@ tempfile = "3.10.1"
thiserror = "1.0.60"
tokio = { version = "1.37.0", features = ["macros", "net", "io-util", "process"] }
tokio-rustls = { version = "0.26.0", optional = true }
tokio-util = "0.7.11"
#tokio-util = "0.7.11"
tracing = "0.1.40"
url = { version = "2.5.0", features = ["serde"] }
6 changes: 2 additions & 4 deletions skunk/src/protocol/inet/dhcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ use byst::{
},
util::for_tuple,
};
use skunk_util::ordered_multimap::OrderedMultiMap;

use crate::util::{
network_enum,
ordered_multimap::OrderedMultiMap,
};
use crate::util::network_enum;

/// A [DHCP message][1]
///
Expand Down
84 changes: 36 additions & 48 deletions skunk/src/proxy/pcap/ap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use tempfile::NamedTempFile;
use tokio::{
io::{
AsyncBufReadExt,
AsyncRead,
BufReader,
},
process::Command,
sync::watch,
sync::{
oneshot,
watch,
},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

use super::interface::Interface;
Expand Down Expand Up @@ -65,7 +66,6 @@ pub struct Builder<'a> {
hw_mode: HwMode,
channel: Option<u8>,
password: Option<&'a str>,
shutdown: CancellationToken,
ready: Option<watch::Sender<bool>>,
}

Expand All @@ -80,7 +80,6 @@ impl<'a> Builder<'a> {
hw_mode: Default::default(),
channel: None,
password: None,
shutdown: Default::default(),
ready: None,
}
}
Expand Down Expand Up @@ -115,11 +114,6 @@ impl<'a> Builder<'a> {
self
}

pub fn with_graceful_shutdown(mut self, shutdown: CancellationToken) -> Self {
self.shutdown = shutdown;
self
}

pub fn write_config(&self, mut writer: impl Write) -> Result<(), Error> {
writeln!(writer, "interface={}", self.interface.name())?;
writeln!(writer, "driver={}", <&'static str>::from(self.driver))?;
Expand Down Expand Up @@ -156,41 +150,57 @@ impl<'a> Builder<'a> {
tracing::debug!(parent: &span, "spawning hostapd");

let (ready_tx, ready_rx) = watch::channel(false);
let shutdown = CancellationToken::new();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();

let join_handle = tokio::spawn({
let shutdown = shutdown.clone();
let join_handle = tokio::spawn(
async move {
// move temp config file here, so it is only deleted once the process
// terminates.
let _cfg_file = cfg_file;

tokio::select! {
result = handle_stdout(&mut process.stdout, ready_tx) => {
result?;
},
_ = self.shutdown.cancelled() => {},
_ = shutdown.cancelled() => {},
};
let reader = BufReader::new(process.stdout.as_mut().expect("no stdout"));
let mut lines = reader.lines();

loop {
tokio::select! {
result = lines.next_line() => {
if let Some(line) = result? {
if line.contains("AP-ENABLED") {
let _ = ready_tx.send(true);
}
tracing::debug!("{}", line);
}
else {
// EOF on stdout
break;
}
},
_ = &mut shutdown_rx => {
// either the user sent a shutdown Signal through [`HostApd::stop`], or the sender was dropped.
// either case, we're done.
break;
},
};
}

tracing::debug!("killing hostapd");
process.kill().await?;
Ok::<(), Error>(())
}
.instrument(span)
});
.instrument(span),
);

Ok(HostApd {
join_handle,
shutdown,
shutdown_tx,
ready_rx,
})
}
}

pub struct HostApd {
join_handle: JoinHandle<Result<(), Error>>,
shutdown: CancellationToken,
shutdown_tx: oneshot::Sender<()>,
ready_rx: watch::Receiver<bool>,
}

Expand All @@ -206,31 +216,9 @@ impl HostApd {
})
}

pub async fn wait(self) -> Result<(), Error> {
pub async fn stop(self) -> Result<(), Error> {
let _ = self.shutdown_tx.send(());
self.join_handle.await.ok().transpose()?;
Ok(())
}

pub async fn stop(self) -> Result<(), Error> {
self.shutdown.cancel();
self.wait().await
}
}

async fn handle_stdout<S: AsyncRead + Unpin>(
stream_opt: &mut Option<S>,
ready_tx: watch::Sender<bool>,
) -> Result<(), Error> {
if let Some(stream) = stream_opt {
let stream = BufReader::new(stream);
let mut lines = stream.lines();
while let Some(line) = lines.next_line().await? {
let line = line.trim_end();
if line.ends_with("AP-ENABLED") {
let _ = ready_tx.send(true);
}
tracing::debug!("{}", line);
}
}
Ok(())
}
10 changes: 4 additions & 6 deletions skunk/src/proxy/pcap/arp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
};

use futures::TryFutureExt;
use skunk_util::error::ResultExt;
use smallvec::SmallVec;
use tokio::{
sync::{
Expand All @@ -29,12 +30,9 @@ use super::{
SendError,
};
pub use crate::protocol::inet::arp::Packet;
use crate::{
protocol::inet::{
arp::Operation,
MacAddress,
},
util::error::ResultExt,
use crate::protocol::inet::{
arp::Operation,
MacAddress,
};

#[derive(Debug)]
Expand Down
12 changes: 5 additions & 7 deletions skunk/src/proxy/pcap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use skunk_macros::{
ipv4_address,
ipv4_network,
};
use skunk_util::error::ResultExt;
use tokio::sync::mpsc;
use tracing::Instrument;

Expand All @@ -39,13 +40,10 @@ use self::{
ReceiveError,
},
};
use crate::{
protocol::inet::{
ethernet,
ipv4,
MacAddress,
},
util::error::ResultExt,
use crate::protocol::inet::{
ethernet,
ipv4,
MacAddress,
};

#[derive(Debug)]
Expand Down
4 changes: 0 additions & 4 deletions skunk/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
pub(crate) mod boolean;
pub mod crc;
pub mod error;
pub mod io;
pub mod ordered_multimap;

use std::{
fmt::{
Expand All @@ -21,9 +19,7 @@ use std::{
},
};

pub use byst::util::for_tuple;
use parking_lot::Mutex;
pub use tokio_util::sync::CancellationToken;

/// [`Oncelock`](std::sync::OnceLock::get_or_try_init) is not stabilized yet, so
/// we implement it ourselves. Also we inclose the `Arc`, because why not.
Expand Down

0 comments on commit 9ff6df3

Please sign in to comment.