Skip to content

Commit

Permalink
feat(network): resolve TCP addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Nov 10, 2023
1 parent 8083cf3 commit bbbfcfa
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 24 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Added
- network: DNS resolve for TCP transport.

### Fixed
- network: unrouted requests don't lead to infinite waiting anymore ([#116]).
- network: release routed if flow closed.
Expand Down
16 changes: 5 additions & 11 deletions elfo-network/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(unix)]
use std::path::PathBuf;
use std::{net::SocketAddr, str::FromStr, time::Duration};
use std::{str::FromStr, time::Duration};

use derive_more::Display;
use eyre::{bail, Result, WrapErr};
use eyre::{bail, Result};
use serde::{
de::{self, Deserializer},
Deserialize, Serialize,
Expand Down Expand Up @@ -51,7 +51,7 @@ fn default_attempt_interval() -> Duration {
#[derive(Debug, Clone, Hash, PartialEq, Eq, Display, Serialize)]
pub(crate) enum Transport {
#[display(fmt = "tcp://{}", _0)]
Tcp(SocketAddr),
Tcp(String),
#[cfg(unix)]
#[display(fmt = "uds://{}", "_0.display()")]
Uds(PathBuf),
Expand All @@ -70,10 +70,7 @@ impl FromStr for Transport {

match protocol {
"" => bail!("protocol must be specified ({PROTOCOLS})"),
"tcp" => addr
.parse()
.map(Transport::Tcp)
.wrap_err("invalid TCP address"),
"tcp" => Ok(Transport::Tcp(addr.into())),
#[cfg(unix)]
"uds" => {
eyre::ensure!(
Expand Down Expand Up @@ -102,8 +99,6 @@ impl<'de> Deserialize<'de> for Transport {

#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};

use super::*;

#[test]
Expand All @@ -130,10 +125,9 @@ mod tests {
.starts_with("unknown protocol"));

// TCP
let expected = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4242);
assert_eq!(
Transport::from_str("tcp://127.0.0.1:4242").unwrap(),
Transport::Tcp(expected)
Transport::Tcp("127.0.0.1:4242".into())
);
assert_eq!(
Transport::from_str("tcp://foobar").unwrap_err().to_string(),
Expand Down
1 change: 1 addition & 0 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl Discovery {
.into_iter()
.for_each(|(local_group_no, remote_group_no)| {
// TODO: save stream to cancel later.
// TODO: connect without DNS resolving here.
self.open_connection(
&transport,
ConnectionRole::Data(internode::SwitchToData {
Expand Down
4 changes: 2 additions & 2 deletions elfo-network/src/socket/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl From<uds::Socket> for Socket {

pub(super) async fn connect(addr: &Transport) -> Result<Socket> {
match addr {
Transport::Tcp(addr) => tcp::connect(*addr).await.map(Into::into),
Transport::Tcp(addr) => tcp::connect(addr).await.map(Into::into),
#[cfg(unix)]
Transport::Uds(addr) => uds::connect(addr).await.map(Into::into),
}
Expand All @@ -123,7 +123,7 @@ pub(super) async fn connect(addr: &Transport) -> Result<Socket> {
pub(super) async fn listen(addr: &Transport) -> Result<impl Stream<Item = Socket> + 'static> {
match addr {
Transport::Tcp(addr) => {
let result = tcp::listen(*addr).await.map(|s| s.map(Into::into));
let result = tcp::listen(addr).await.map(|s| s.map(Into::into));
#[cfg(unix)]
let result = result.map(Either::Left);
result
Expand Down
6 changes: 3 additions & 3 deletions elfo-network/src/socket/raw/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ fn prepare_stream(stream: TcpStream) -> Result<Socket> {
Ok(Socket { read, write, info })
}

pub(super) async fn connect(addr: SocketAddr) -> Result<Socket> {
pub(super) async fn connect(addr: &str) -> Result<Socket> {
prepare_stream(TcpStream::connect(addr).await?)
}

pub(super) async fn listen(addr: SocketAddr) -> Result<impl Stream<Item = Socket> + 'static> {
pub(super) async fn listen(addr: &str) -> Result<impl Stream<Item = Socket> + 'static> {
let listener = TcpListener::bind(addr).await?;

let accept = move |listener: TcpListener| async move {
Expand All @@ -61,7 +61,7 @@ pub(super) async fn listen(addr: SocketAddr) -> Result<impl Stream<Item = Socket
warn!(
message = "cannot accept TCP connection",
error = %err,
addr = %addr,
// TODO: addr
);

// Continue listening.
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/network/alice.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ path = "example.alice.{class}.dump"

[system.network]
# TCP
#listen = ["tcp://127.0.0.1:9200"]
#discovery.predefined = ["tcp://127.0.0.1:9201"]
listen = ["tcp://127.0.0.1:9200"]
discovery.predefined = ["tcp://localhost:9201"]

# UDS (unix only)
listen = ["uds://example-alice.socket"]
discovery.predefined = ["uds://example-bob.socket"]
#listen = ["uds://example-alice.socket"]
#discovery.predefined = ["uds://example-bob.socket"]
8 changes: 4 additions & 4 deletions examples/examples/network/bob.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ path = "example.bob.{class}.dump"

[system.network]
# TCP
#listen = ["tcp://127.0.0.1:9201"]
#discovery.predefined = ["tcp://127.0.0.1:9200"]
listen = ["tcp://127.0.0.1:9201"]
discovery.predefined = ["tcp://127.0.0.1:9200"]

# UDS (unix only)
listen = ["uds://example-bob.socket"]
discovery.predefined = ["uds://example-alice.socket"]
#listen = ["uds://example-bob.socket"]
#discovery.predefined = ["uds://example-alice.socket"]

0 comments on commit bbbfcfa

Please sign in to comment.