diff --git a/CHANGELOG.md b/CHANGELOG.md index ad6d22a0..9a1f89dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - core/config: impl `FromStr` for `Secret` ([#135]). - core/init: emit the `elfo_start_time_seconds` metric. - core/actor: implement `Display` for `ActorMeta` ([#74]). -- network: add `idle_timeout` to detect and disconnect stuck connections. +- network: add `idle_timeout` to detect and disconnect stuck connections ([#137]). +- network: add the `turmoil` transport for testing distributed actors ([#137]). ### Changed - **BREAKING** core/node: remove `node` module, `NodeNo` is moved to `addr`. @@ -28,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#74]: https://github.com/elfo-rs/elfo/issues/74 [#135]: https://github.com/elfo-rs/elfo/pull/135 [#136]: https://github.com/elfo-rs/elfo/pull/136 +[#137]: https://github.com/elfo-rs/elfo/pull/137 ## [0.2.0-alpha.16] - 2024-07-24 ### Added diff --git a/elfo-network/Cargo.toml b/elfo-network/Cargo.toml index d915cd71..0e4e917c 100644 --- a/elfo-network/Cargo.toml +++ b/elfo-network/Cargo.toml @@ -14,6 +14,9 @@ rust-version.workspace = true [lints] workspace = true +[features] +turmoil06 = ["dep:turmoil06"] + [dependencies] elfo-core = { version = "0.2.0-alpha.16", path = "../elfo-core", features = ["unstable", "network"] } elfo-utils = { version = "0.2.6", path = "../elfo-utils" } @@ -27,7 +30,6 @@ eyre = "0.6.8" fxhash = "0.2.1" futures = "0.3.21" tokio = { workspace = true, features = ["net", "io-util"] } -tokio-util = "0.7" tracing = "0.1.25" parking_lot = "0.12" humantime-serde = "1" @@ -35,6 +37,7 @@ kanal = "0.1.0-pre8" bitflags = "2.3.2" lz4_flex = { version = "0.11.1", default-features = false, features = ["std"] } byteorder = "1.4.3" +turmoil06 = { package = "turmoil", version = "0.6", optional = true } [dev-dependencies] tracing-test = "0.2.4" # TODO: actually unused? diff --git a/elfo-network/src/config.rs b/elfo-network/src/config.rs index 8c78ae5f..2c94f67e 100644 --- a/elfo-network/src/config.rs +++ b/elfo-network/src/config.rs @@ -110,6 +110,12 @@ pub enum Transport { #[cfg(unix)] #[display("uds://{}", "_0.display()")] Uds(PathBuf), + /// Turmoil v0.6 transport ("turmoil06://host"). + /// + /// Useful for testing purposes only. + #[cfg(feature = "turmoil06")] + #[display("turmoil06://{_0}")] + Turmoil06(String), } impl FromStr for Transport { @@ -134,6 +140,8 @@ impl FromStr for Transport { ); Ok(Transport::Uds(PathBuf::from(addr))) } + #[cfg(feature = "turmoil06")] + "turmoil06" => Ok(Transport::Turmoil06(addr.into())), proto => bail!("unknown protocol: {proto}"), } } @@ -184,6 +192,10 @@ mod tests { Transport::from_str("tcp://127.0.0.1:4242").unwrap(), Transport::Tcp("127.0.0.1:4242".into()) ); + assert_eq!( + Transport::from_str("tcp://alice:4242").unwrap(), + Transport::Tcp("alice:4242".into()) + ); // UDS #[cfg(unix)] @@ -201,5 +213,12 @@ mod tests { "path to UDS socket cannot be directory" ); } + + // Turmoil06 + #[cfg(feature = "turmoil06")] + assert_eq!( + Transport::from_str("turmoil06://alice").unwrap(), + Transport::Turmoil06("alice".into()) + ); } } diff --git a/elfo-network/src/socket/idleness.rs b/elfo-network/src/socket/idleness.rs new file mode 100644 index 00000000..dfe8db3d --- /dev/null +++ b/elfo-network/src/socket/idleness.rs @@ -0,0 +1,62 @@ +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Duration, +}; + +use tokio::time::Instant; + +// === IdleTracker === + +/// Measures the time since the last activity on a socket. +/// It's used to implement health checks based on idle timeout. +pub(crate) struct IdleTracker { + track: IdleTrack, + prev_value: u32, + prev_time: Instant, +} + +impl IdleTracker { + pub(super) fn new() -> (Self, IdleTrack) { + let track = IdleTrack(<_>::default()); + let this = Self { + track: track.clone(), + prev_value: track.get(), + prev_time: Instant::now(), + }; + + (this, track) + } + + /// Returns the elapsed time since the last `check()` call + /// that observed any [`IdleTrack::update()`] calls. + pub(crate) fn check(&mut self) -> Duration { + let now = Instant::now(); + let new_value = self.track.get(); + + if self.prev_value != new_value { + self.prev_value = new_value; + self.prev_time = now; + } + + now.duration_since(self.prev_time) + } +} + +// === IdleTrack === + +#[derive(Clone)] +pub(super) struct IdleTrack(Arc); + +impl IdleTrack { + /// Marks this socket as non-idle. + pub(super) fn update(&self) { + self.0.fetch_add(1, Ordering::Relaxed); + } + + fn get(&self) -> u32 { + self.0.load(Ordering::Relaxed) + } +} diff --git a/elfo-network/src/socket/raw/mod.rs b/elfo-network/src/socket/raw/mod.rs index 36d4d959..f7a82cfa 100644 --- a/elfo-network/src/socket/raw/mod.rs +++ b/elfo-network/src/socket/raw/mod.rs @@ -6,14 +6,14 @@ use std::{ use derive_more::Display; use eyre::Result; -use futures::{Stream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -#[cfg(unix)] -use tokio_util::either::Either; use crate::config::Transport; mod tcp; +#[cfg(feature = "turmoil06")] +mod turmoil; mod uds; macro_rules! delegate_call { @@ -22,6 +22,8 @@ macro_rules! delegate_call { Self::Tcp(v) => Pin::new(v).$method($($args),+), #[cfg(unix)] Self::Uds(v) => Pin::new(v).$method($($args),+), + #[cfg(feature = "turmoil06")] + Self::Turmoil06(v) => Pin::new(v).$method($($args),+), } } } @@ -31,12 +33,16 @@ pub(crate) enum SocketInfo { Tcp(tcp::SocketInfo), #[cfg(unix)] Uds(uds::SocketInfo), + #[cfg(feature = "turmoil06")] + Turmoil06(turmoil::SocketInfo), } pub(super) enum OwnedReadHalf { Tcp(tcp::OwnedReadHalf), #[cfg(unix)] Uds(uds::OwnedReadHalf), + #[cfg(feature = "turmoil06")] + Turmoil06(turmoil::OwnedReadHalf), } impl AsyncRead for OwnedReadHalf { @@ -53,6 +59,8 @@ pub(super) enum OwnedWriteHalf { Tcp(tcp::OwnedWriteHalf), #[cfg(unix)] Uds(uds::OwnedWriteHalf), + #[cfg(feature = "turmoil06")] + Turmoil06(turmoil::OwnedWriteHalf), } impl AsyncWrite for OwnedWriteHalf { @@ -81,6 +89,8 @@ impl AsyncWrite for OwnedWriteHalf { Self::Tcp(v) => v.is_write_vectored(), #[cfg(unix)] Self::Uds(v) => v.is_write_vectored(), + #[cfg(feature = "turmoil06")] + Self::Turmoil06(v) => v.is_write_vectored(), } } } @@ -112,23 +122,33 @@ impl From for Socket { } } +#[cfg(feature = "turmoil06")] +impl From for Socket { + fn from(socket: turmoil::Socket) -> Self { + Self { + read: OwnedReadHalf::Turmoil06(socket.read), + write: OwnedWriteHalf::Turmoil06(socket.write), + info: SocketInfo::Turmoil06(socket.info), + } + } +} + pub(super) async fn connect(addr: &Transport) -> Result { match addr { Transport::Tcp(addr) => tcp::connect(addr).await.map(Into::into), #[cfg(unix)] Transport::Uds(addr) => uds::connect(addr).await.map(Into::into), + #[cfg(feature = "turmoil06")] + Transport::Turmoil06(addr) => turmoil::connect(addr).await.map(Into::into), } } -pub(super) async fn listen(addr: &Transport) -> Result + 'static> { - match addr { - Transport::Tcp(addr) => { - let result = tcp::listen(addr).await.map(|s| s.map(Into::into)); - #[cfg(unix)] - let result = result.map(Either::Left); - result - } +pub(super) async fn listen(addr: &Transport) -> Result> { + Ok(match addr { + Transport::Tcp(addr) => Box::pin(tcp::listen(addr).await?.map(Into::into)), #[cfg(unix)] - Transport::Uds(addr) => uds::listen(addr).map(|s| Either::Right(s.map(Into::into))), - } + Transport::Uds(addr) => Box::pin(uds::listen(addr)?.map(Into::into)), + #[cfg(feature = "turmoil06")] + Transport::Turmoil06(addr) => Box::pin(turmoil::listen(addr).await?.map(Into::into)), + }) } diff --git a/elfo-network/src/socket/raw/turmoil.rs b/elfo-network/src/socket/raw/turmoil.rs new file mode 100644 index 00000000..f13d29b7 --- /dev/null +++ b/elfo-network/src/socket/raw/turmoil.rs @@ -0,0 +1,73 @@ +use std::net::SocketAddr; + +use derive_more::Display; +use eyre::{Result, WrapErr}; +use futures::Stream; +use tracing::warn; +use turmoil06::net::{TcpListener, TcpStream}; + +pub(super) use turmoil06::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; + +const PORT: u16 = 0xE1F0; + +#[derive(Clone, Display)] +#[display("turmoil06(local={local}, peer={peer})")] // TODO: use `valuable` after tracing#1570 +pub(crate) struct SocketInfo { + local: String, + peer: String, +} + +pub(super) struct Socket { + pub(super) read: OwnedReadHalf, + pub(super) write: OwnedWriteHalf, + pub(super) info: SocketInfo, +} + +fn prepare_stream(stream: TcpStream) -> Result { + let info = SocketInfo { + local: stringify_addr(stream.local_addr().wrap_err("cannot get local addr")?), + peer: stringify_addr(stream.peer_addr().wrap_err("cannot get peer addr")?), + }; + + let (read, write) = stream.into_split(); + Ok(Socket { read, write, info }) +} + +fn stringify_addr(addr: SocketAddr) -> String { + let (ip, port) = (addr.ip(), addr.port()); + let host = turmoil06::reverse_lookup(ip).unwrap_or_else(|| ip.to_string()); + format!("{host}:{port}") +} + +pub(super) async fn connect(host: &str) -> Result { + prepare_stream(TcpStream::connect((host, PORT)).await?) +} + +pub(super) async fn listen(host: &str) -> Result + 'static> { + let listener = TcpListener::bind((host, PORT)).await?; + + let accept = move |listener: TcpListener| async move { + loop { + let result = listener + .accept() + .await + .map_err(Into::into) + .and_then(|(socket, _)| prepare_stream(socket)); + + match result { + Ok(socket) => return Some((socket, listener)), + Err(err) => { + warn!( + message = "cannot accept TCP connection", + error = %err, + // TODO: addr + ); + + // Continue listening. + } + } + } + }; + + Ok(futures::stream::unfold(listener, accept)) +} diff --git a/elfo/Cargo.toml b/elfo/Cargo.toml index 5f8f00da..1789eab8 100644 --- a/elfo/Cargo.toml +++ b/elfo/Cargo.toml @@ -21,6 +21,7 @@ network = ["elfo-network"] unstable = ["elfo-core/unstable", "elfo-telemeter/unstable", "elfo-test/unstable" ] unstable-stuck-detection = ["elfo-core/unstable-stuck-detection"] tracing-log = ["elfo-logger/tracing-log"] +turmoil06 = ["elfo-network/turmoil06"] [dependencies] elfo-core = { version = "0.2.0-alpha.16", path = "../elfo-core" } @@ -49,6 +50,7 @@ static_assertions = "1.1.0" parking_lot = "0.12" libc = "0.2.97" futures-intrusive = "0.5" +turmoil = "0.6" [package.metadata.docs.rs] all-features = true diff --git a/elfo/tests/common.rs b/elfo/tests/common.rs new file mode 100644 index 00000000..1476acd1 --- /dev/null +++ b/elfo/tests/common.rs @@ -0,0 +1,10 @@ +#![allow(dead_code)] // TODO: combine tests into "it/*" + +// For tests without `elfo::test::proxy`. +pub(crate) fn setup_logger() { + let _ = tracing_subscriber::fmt() + .with_target(false) + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .try_init(); +} diff --git a/elfo/tests/remote_messaging.rs b/elfo/tests/remote_messaging.rs new file mode 100644 index 00000000..1acb6e65 --- /dev/null +++ b/elfo/tests/remote_messaging.rs @@ -0,0 +1,133 @@ +#![cfg(feature = "network")] +#![cfg(feature = "turmoil06")] + +use std::{sync::Arc, time::Duration}; + +use tokio::sync::Notify; +use toml::toml; +use tracing::info; + +use elfo::{prelude::*, time::Interval, topology, Topology}; + +mod common; + +#[test] +fn simple() { + common::setup_logger(); + + #[message] + struct SimpleMessage(u64); + + #[message] + struct ProducerTick; + + fn producer() -> Blueprint { + ActorGroup::new().exec(move |mut ctx| async move { + ctx.attach(Interval::new(ProducerTick)) + .start(Duration::from_secs(1)); + + let mut counter = 0; + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + ProducerTick => { + let res = ctx.send(SimpleMessage(counter)).await; + info!("sent message #{counter} => {res:?}"); + counter += 1; + } + }) + } + }) + } + + fn consumer(notify: Arc) -> Blueprint { + ActorGroup::new().exec(move |mut ctx| { + let notify = notify.clone(); + async move { + let mut nos = vec![]; + + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + SimpleMessage(no) => { + info!("received message #{no}"); + nos.push(no); + + if no == 2 { + turmoil::partition("server", "client"); + tokio::time::sleep(Duration::from_secs(1)).await; + turmoil::repair("server", "client"); + } else if no == 8 { + break; + } + } + }) + } + + assert!(nos == [0, 1, 2, 6, 7, 8] || nos == [0, 1, 2, 5, 6, 7, 8]); + + // Terminate the test. + // TODO: expose `system.init` and use `send(TerminateSystem)` instead. + notify.notify_one(); + } + }) + } + + let mut sim = turmoil::Builder::new() + // TODO: We don't actually use I/O, but otherwise the test panics with: + // "there is no signal driver running" + // Need to detect availability of the signal driver in the `Signal` source. + .enable_tokio_io() + .tick_duration(Duration::from_millis(100)) + .build(); + + sim.host("server", || async { + let topology = Topology::empty(); + let configurers = topology.local("system.configurers").entrypoint(); + let network = topology.local("system.network"); + let producers = topology.local("producers"); + let consumers = topology.remote("consumers"); + + producers.route_to(&consumers, |_, _| topology::Outcome::Broadcast); + + network.mount(elfo::batteries::network::new(&topology)); + configurers.mount(elfo::batteries::configurer::fixture( + &topology, + toml! { + [system.network] + listen = ["turmoil06://0.0.0.0"] + ping_interval = "1s" + idle_timeout = "1s" + }, + )); + producers.mount(producer()); + + Ok(elfo::init::try_start(topology).await?) + }); + + sim.client("client", async { + let topology = Topology::empty(); + let configurers = topology.local("system.configurers").entrypoint(); + let network = topology.local("system.network"); + let consumers = topology.local("consumers"); + + network.mount(elfo::batteries::network::new(&topology)); + configurers.mount(elfo::batteries::configurer::fixture( + &topology, + toml! { + [system.network] + discovery.predefined = ["turmoil06://server"] + ping_interval = "1s" + idle_timeout = "1s" + }, + )); + + let notify = Arc::new(Notify::new()); + consumers.mount(consumer(notify.clone())); + + Ok(elfo::_priv::do_start(topology, false, |_, _| async move { + notify.notified().await; + }) + .await?) + }); + + sim.run().unwrap(); +} diff --git a/elfo/tests/request_routing.rs b/elfo/tests/request_routing.rs index 94920fff..0bfa5ace 100644 --- a/elfo/tests/request_routing.rs +++ b/elfo/tests/request_routing.rs @@ -12,23 +12,17 @@ use elfo::{ }; use elfo_core::config::AnyConfig; +mod common; + #[message(ret = u64)] struct TestRequest; #[message] struct NeverSent; -fn setup_logger() { - let _ = tracing_subscriber::fmt() - .with_target(false) - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_test_writer() - .try_init(); -} - #[tokio::test] async fn stealing() { - setup_logger(); + common::setup_logger(); let requester_blueprint = ActorGroup::new().exec(move |mut ctx| async move { info!("sent request"); @@ -79,7 +73,10 @@ async fn stealing() { }) }); - configurers.mount(elfo_configurer::fixture(&topology, AnyConfig::default())); + configurers.mount(elfo::batteries::configurer::fixture( + &topology, + AnyConfig::default(), + )); requester.mount(requester_blueprint); responder.mount(responder_blueprint); thief.mount(thief_blueprint); @@ -94,7 +91,7 @@ async fn stealing() { #[tokio::test] async fn multiple_failures() { - setup_logger(); + common::setup_logger(); fn success_blueprint(id: u64) -> Blueprint { ActorGroup::new().exec(move |mut ctx| async move {