Skip to content

Commit

Permalink
Merge pull request #51 from n0-computer/clean-disconnect
Browse files Browse the repository at this point in the history
fix: explicitly close channel when client is dropped
  • Loading branch information
rklaehn authored Jul 13, 2023
2 parents 6cbf62b + 532b554 commit 8f1e7da
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quic-rpc"
version = "0.6.0"
version = "0.6.1"
edition = "2021"
authors = ["Rüdiger Klaehn <[email protected]>"]
keywords = ["api", "protocol", "network", "rpc"]
Expand Down
21 changes: 11 additions & 10 deletions src/transport/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl<In: RpcMessage, Out: RpcMessage> QuinnConnection<In, Out> {
addr: SocketAddr,
name: String,
requests: flume::Receiver<oneshot::Sender<Result<SocketInner, quinn::ConnectionError>>>,
) -> result::Result<(), flume::RecvError> {
) {
'outer: loop {
tracing::debug!("Connecting to {} as {}", addr, name);
let connecting = match endpoint.connect(addr, &name) {
Expand All @@ -321,7 +321,14 @@ impl<In: RpcMessage, Out: RpcMessage> QuinnConnection<In, Out> {
};
loop {
tracing::debug!("Awaiting request for new bidi substream...");
let request = requests.recv_async().await?;
let request = match requests.recv_async().await {
Ok(request) => request,
Err(_) => {
tracing::debug!("client dropped");
connection.close(0u32.into(), b"requester dropped");
break;
}
};
tracing::debug!("Got request for new bidi substream");
match connection.open_bi().await {
Ok(pair) => {
Expand All @@ -347,14 +354,8 @@ impl<In: RpcMessage, Out: RpcMessage> QuinnConnection<In, Out> {
name: String,
requests: flume::Receiver<oneshot::Sender<Result<SocketInner, quinn::ConnectionError>>>,
) {
if Self::reconnect_handler_inner(endpoint, addr, name, requests)
.await
.is_err()
{
tracing::info!("Reconnect handler finished");
} else {
unreachable!()
}
Self::reconnect_handler_inner(endpoint, addr, name, requests).await;
tracing::info!("Reconnect handler finished");
}

/// Create a new channel
Expand Down

0 comments on commit 8f1e7da

Please sign in to comment.