From 66e435046edaed479d5724cf9a77f5e006012981 Mon Sep 17 00:00:00 2001 From: Diva M Date: Thu, 15 Feb 2024 18:14:10 -0500 Subject: [PATCH] wip --- src/transport/quinn.rs | 75 +++++++++++++++++++++++++++--------------- tests/math.rs | 2 ++ tests/quinn.rs | 8 ++++- 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/src/transport/quinn.rs b/src/transport/quinn.rs index 77ed943..3dbd37c 100644 --- a/src/transport/quinn.rs +++ b/src/transport/quinn.rs @@ -61,7 +61,7 @@ pub struct QuinnServerEndpoint { impl QuinnServerEndpoint { /// handles RPC requests from a connection /// - /// to cleanly shutdown the handler, drop the receiver side of the sender. + /// to cleanly shutdown tee handler, drop the receiver side of the sender. async fn connection_handler(connection: quinn::Connection, sender: flume::Sender) { loop { tracing::debug!("Awaiting incoming bidi substream on existing connection..."); @@ -106,6 +106,7 @@ impl QuinnServerEndpoint { tracing::debug!("Spawning connection handler..."); tokio::spawn(Self::connection_handler(conection, sender.clone())); } + tracing::debug!("endpoint handler finished"); } /// Create a new server channel, given a quinn endpoint. @@ -311,8 +312,6 @@ impl QuinnConnection { let mut receiver = Receiver::new(&requests); - // a pending request to open a bi-directional stream that was received with a lost - // connection let mut pending_request: Option< oneshot::Sender>, > = None; @@ -322,8 +321,12 @@ impl QuinnConnection { tokio::select! { // wait for a new connection to be opened conn_result = reconnect.as_mut() => { + tracing::trace!("tick: connection result"); match conn_result { - Ok(new_connection) => connection = Some(new_connection), + Ok(new_connection) => { + connection = Some(new_connection); + tracing::debug!("got new connection"); + }, Err(e) => { let connection_err = match e { ReconnectErr::Connect(e) => { @@ -331,10 +334,13 @@ impl QuinnConnection { // ConnectionError, not a ConnectError. I'm mapping this now to // some ConnectionError since before it was not even reported. // Maybe adjust the type? - tracing::warn!("error calling connect: {}", e); + tracing::warn!(%e, "error calling connect"); quinn::ConnectionError::Reset }, - ReconnectErr::Connection(e) => e, + ReconnectErr::Connection(e) => { + tracing::warn!(%e, "failed to connect"); + e + }, }; if let Some(request) = pending_request.take() { if request.send(Err(connection_err)).is_err() { @@ -346,8 +352,10 @@ impl QuinnConnection { } // wait for a new request as long as there is no pending one req = receiver.next(), if pending_request.is_none() => { + tracing::trace!("tick: bidi request"); match req { Some(request) => { + tracing::debug!("got new bidi request"); pending_request = Some(request) }, None => { @@ -361,6 +369,11 @@ impl QuinnConnection { } } + tracing::trace!( + "connection is some {}; request is some {}", + connection.is_some(), + pending_request.is_some() + ); if let Some(connection) = connection.as_mut() { if let Some(request) = pending_request.take() { match connection.open_bi().await { @@ -473,32 +486,40 @@ impl Future for ReconnectHandler { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.state.poison() { - ConnectionState::NotConnected => match self.endpoint.connect(self.addr, &self.name) { - Ok(connecting) => { - self.state = ConnectionState::Connecting(connecting); - self.poll(cx) - } - Err(e) => { - self.state = ConnectionState::NotConnected; - Poll::Ready(Err(ReconnectErr::Connect(e))) - } - }, - ConnectionState::Connecting(mut connecting) => match connecting.poll_unpin(cx) { - Poll::Ready(res) => match res { - Ok(connection) => { - self.state = ConnectionState::Connected; - Poll::Ready(Ok(connection)) + ConnectionState::NotConnected => { + tracing::debug!(addr = %self.addr, name = self.name, "calling connect"); + + match self.endpoint.connect(self.addr, &self.name) { + Ok(connecting) => { + self.state = ConnectionState::Connecting(connecting); + self.poll(cx) } Err(e) => { self.state = ConnectionState::NotConnected; - Poll::Ready(Err(ReconnectErr::Connection(e))) + Poll::Ready(Err(ReconnectErr::Connect(e))) } - }, - Poll::Pending => { - self.state = ConnectionState::Connecting(connecting); - Poll::Pending } - }, + } + ConnectionState::Connecting(mut connecting) => { + tracing::debug!(addr = %self.addr, name = self.name, "awaiting connect"); + + match connecting.poll_unpin(cx) { + Poll::Ready(res) => match res { + Ok(connection) => { + self.state = ConnectionState::Connected; + Poll::Ready(Ok(connection)) + } + Err(e) => { + self.state = ConnectionState::NotConnected; + Poll::Ready(Err(ReconnectErr::Connection(e))) + } + }, + Poll::Pending => { + self.state = ConnectionState::Connecting(connecting); + Poll::Pending + } + } + } ConnectionState::Connected => { // waiting for a request to open a new connection, nothing to do self.state = ConnectionState::Connected; diff --git a/tests/math.rs b/tests/math.rs index 583b7ca..9d9eccc 100644 --- a/tests/math.rs +++ b/tests/math.rs @@ -164,7 +164,9 @@ impl ComputeService { let service = ComputeService; while received < count { received += 1; + tracing::debug!("before accept"); let (req, chan) = s.accept().await?; + tracing::debug!("after accept"); let service = service.clone(); tokio::spawn(async move { use ComputeRequest::*; diff --git a/tests/quinn.rs b/tests/quinn.rs index 6a6068d..5c3ae30 100644 --- a/tests/quinn.rs +++ b/tests/quinn.rs @@ -154,7 +154,7 @@ async fn server_away_and_back() -> anyhow::Result<()> { // send a request. No server available so it should fail let e = client.rpc(Sqr(4)).await.unwrap_err(); - println!("{e}"); + tracing::info!(%e, "got expected request failure"); // create the RPC Server let server = Endpoint::server(server_config.clone(), server_addr)?; @@ -164,6 +164,7 @@ async fn server_away_and_back() -> anyhow::Result<()> { // send the first request and wait for the response to ensure everything works as expected let SqrResponse(response) = client.rpc(Sqr(4)).await.unwrap(); + tracing::info!(%response, "got expected response"); assert_eq!(response, 16); let server = server_handle.await.unwrap().unwrap(); @@ -171,14 +172,19 @@ async fn server_away_and_back() -> anyhow::Result<()> { // wait for drop to free the socket tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + tracing::info!("SERVER DROPPED"); + // make the server run again let server = Endpoint::server(server_config, server_addr)?; let connection = transport::quinn::QuinnServerEndpoint::new(server)?; let server = RpcServer::::new(connection); let server_handle = tokio::task::spawn(ComputeService::server_bounded(server, 5)); + tracing::info!("Server spawned"); // server is running, this should work + tracing::info!("sending Sqr(3)"); let SqrResponse(response) = client.rpc(Sqr(3)).await.unwrap(); + tracing::info!(%response, "got expected response"); assert_eq!(response, 9); server_handle.abort();