From 2e684d5a7bbcf5e6bc32791ca7cded6d1488d63c Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Wed, 29 May 2024 21:35:25 +0300 Subject: [PATCH] electrum: Handle connection termination more consistently, fix thread leak See https://github.com/Blockstream/electrs/pull/74#issuecomment-2131309562 --- src/electrum/server.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/electrum/server.rs b/src/electrum/server.rs index c3fc6d072..916487a70 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -588,19 +588,18 @@ impl Connection { } } - fn parse_requests(mut reader: BufReader, tx: SyncSender) -> Result<()> { + fn parse_requests(mut reader: BufReader, tx: &SyncSender) -> Result<()> { loop { + warn!("parse request loop"); let mut line = Vec::::new(); reader .read_until(b'\n', &mut line) .chain_err(|| "failed to read a request")?; if line.is_empty() { - tx.send(Message::Done).chain_err(|| "channel closed")?; return Ok(()); } else { if line.starts_with(&[22, 3, 1]) { // (very) naive SSL handshake detection - let _ = tx.send(Message::Done); bail!("invalid request - maybe SSL-encrypted data?: {:?}", line) } match String::from_utf8(line) { @@ -608,7 +607,6 @@ impl Connection { .send(Message::Request(req)) .chain_err(|| "channel closed")?, Err(err) => { - let _ = tx.send(Message::Done); bail!("invalid UTF8: {}", err) } } @@ -616,13 +614,21 @@ impl Connection { } } + fn reader_thread(reader: BufReader, tx: SyncSender) -> Result<()> { + let result = Connection::parse_requests(reader, &tx); + if let Err(e) = tx.send(Message::Done) { + warn!("failed closing channel: {}", e); + } + result + } + pub fn run(mut self, receiver: Receiver) { self.stats.clients.inc(); conditionally_log_rpc_event!(self, json!({ "event": "connection established" })); let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); let sender = self.sender.clone(); - let child = spawn_thread("reader", || Connection::parse_requests(reader, sender)); + let child = spawn_thread("reader", || Connection::reader_thread(reader, sender)); if let Err(e) = self.handle_replies(receiver) { error!( "[{}] connection handling failed: {}",