Skip to content

Commit

Permalink
fix: combinator tests (freenet#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez authored Oct 27, 2024
1 parent 2092a52 commit a3e55d6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 43 deletions.
5 changes: 0 additions & 5 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ static CLIENT_ID: AtomicUsize = AtomicUsize::new(1);
impl ClientId {
pub const FIRST: Self = ClientId(0);

#[cfg(test)]
pub(crate) const fn new(id: usize) -> ClientId {
Self(id)
}

pub fn next() -> Self {
ClientId(CLIENT_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
}
Expand Down
80 changes: 42 additions & 38 deletions crates/core/src/client_events/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ async fn client_fn(
host_msg = rx.recv() => {
if let Some((client_id, response)) = host_msg {
if client.send(client_id, response).await.is_err() {
eprintln!("disconnected host");
break;
}
} else {
Expand Down Expand Up @@ -204,20 +203,18 @@ mod test {
.await
.ok_or_else::<ClientError, _>(|| ErrorKind::ChannelClosed.into())?;
assert_eq!(id, self.id);
eprintln!("#{}, received msg {id}", self.id);
Ok(OpenRequest::new(
ClientId::new(id),
ClientId::next(),
Box::new(ClientRequest::Disconnect { cause: None }),
))
})
}

fn send(
&mut self,
id: ClientId,
_id: ClientId,
_response: Result<HostResponse, ClientError>,
) -> BoxFuture<'_, Result<(), ClientError>> {
assert_eq!(id.0, self.id);
async {
self.tx
.send(self.id)
Expand Down Expand Up @@ -245,47 +242,51 @@ mod test {
}

#[tokio::test]
async fn combinator_recv() {
async fn test_recv() {
let (proxies, mut senders, _) = setup_proxies();
let mut combinator = ClientEventsCombinator::new(proxies);

let _senders: Vec<Sender<usize>> = tokio::task::spawn(async move {
let sending = async {
for _ in 1..4 {
for (id, tx) in senders.iter_mut().enumerate() {
tx.send(id).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
eprintln!("sent msg {id}");
tx.send(id).await?;
}
}
senders
})
.await
.unwrap();
Ok::<_, Box<dyn std::error::Error>>(senders)
};

for _ in 0..3 {
for i in 1..4 {
let OpenRequest { client_id: id, .. } = combinator.recv().await.unwrap();
eprintln!("received {i}: {id:?}");
assert_eq!(ClientId::new(i), id);
let combinator = async {
let client_ids = combinator
.internal_clients
.keys()
.cloned()
.collect::<Vec<_>>();
for _ in 0..3 {
for id in client_ids.iter() {
let OpenRequest {
client_id: req_id, ..
} = combinator.recv().await?;
assert_eq!(*id, req_id);
}
}
}
Ok::<_, Box<dyn std::error::Error>>(())
};

try_join!(sending, combinator).unwrap();
}

#[ignore]
#[tokio::test]
async fn test_send() {
let (proxies, _, mut receivers) = setup_proxies();
let (proxies, mut senders, mut receivers) = setup_proxies();
let mut combinator = ClientEventsCombinator::new(proxies);

for idx in 0..3 {
let client_id = ClientId::new(idx);
// Insert each client ID mapping into the combinator's internal clients.
combinator
.internal_clients
.insert(client_id, (idx, client_id));
// Create the internal client mapping implicitly.
for (idx, sender) in senders.iter_mut().enumerate() {
sender.send(idx).await.unwrap();
combinator.recv().await.unwrap();
}

let receivers = async move {
let receiving = async {
// Test sending a response through the combinator for each proxy.
for (idx, receiver) in receivers.iter_mut().enumerate() {
// Assert that the receiver received the expected message.
Expand All @@ -294,25 +295,28 @@ mod test {
.await
.ok_or(format!("missing {idx} sender"))?;
assert_eq!(received_id, idx);
println!(
"Receiver {} confirmed send for client ID: {}",
idx, received_id
);
}
Ok::<_, Box<dyn std::error::Error>>(())
};

let senders = async {
for idx in 0..3 {
let sending = async {
for (i, cli_id) in combinator
.internal_clients
.keys()
.cloned()
.collect::<Vec<_>>()
.into_iter()
.enumerate()
{
// Send a sample response through the combinator.
combinator
.send(ClientId::new(idx), Ok(HostResponse::Ok))
.send(cli_id, Ok(HostResponse::Ok))
.await
.map_err(|err| format!("Send failed for client {idx}: {err}",))?;
.map_err(|err| format!("Send failed for client {i}: {err}",))?;
}
Ok::<_, Box<dyn std::error::Error>>(())
};

try_join!(senders, receivers).unwrap();
try_join!(sending, receiving).unwrap();
}
}

0 comments on commit a3e55d6

Please sign in to comment.