From 24d34febc86826a7a39dc73af0229b84700e80b6 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Mon, 18 Mar 2024 08:11:30 +0200 Subject: [PATCH] Threading-related fixes from upstream romanz/electrs Changes were taken from the latest romanz/electrs rpc.rs implementation prior to the major refactoring in v0.9.0 that significantly diverged the codebases (https://github.com/romanz/electrs/blob/af6ff09a275ec12b6fd0d6a101637f4710902a3c/src/rpc.rs). The relevant changes include (not a complete list): https://github.com/romanz/electrs/pull/284 https://github.com/romanz/electrs/pull/233 https://github.com/romanz/electrs/commit/a3bfdda32a1c8cab248d3525786a433d1e415d1e https://github.com/romanz/electrs/pull/195 https://github.com/romanz/electrs/pull/523 (only post-v0.9 change, a very minor one) This fixes a memory leak that could be reproduced using the following script, which opens and closes 500k connections with a concurrency of 20: $ seq 1 500000 | xargs -I {} -n 1 -P 20 sh -c 'echo '\''{"id":{},"method":"server.version","params":[]}'\''| nc 127.0.0.1 50001 -v -N' Before the fixes, memory usage would continue to grow the more connections are made, to around 35MB for 500k connections. After the fixes, memory usage is steady at around 25MB and doesn't grow with more connections. --- src/electrum/server.rs | 58 ++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 811cf0138..c3fc6d072 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::mpsc::{Sender, SyncSender, TrySendError}; +use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Instant; @@ -25,9 +25,7 @@ use crate::errors::*; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use crate::new_index::{Query, Utxo}; use crate::util::electrum_merkle::{get_header_merkle_proof, get_id_from_pos, get_tx_merkle_proof}; -use crate::util::{ - create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry, SyncChannel, -}; +use crate::util::{create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry}; const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION"); const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4); @@ -105,7 +103,7 @@ struct Connection { status_hashes: HashMap, // ScriptHash -> StatusHash stream: TcpStream, addr: SocketAddr, - chan: SyncChannel, + sender: SyncSender, stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] @@ -118,6 +116,7 @@ impl Connection { query: Arc, stream: TcpStream, addr: SocketAddr, + sender: SyncSender, stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, @@ -129,7 +128,7 @@ impl Connection { status_hashes: HashMap::new(), stream, addr, - chan: SyncChannel::new(10), + sender, stats, txs_limit, #[cfg(feature = "electrum-discovery")] @@ -352,7 +351,7 @@ impl Connection { let tx = params.get(0).chain_err(|| "missing tx")?; let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string(); let txid = self.query.broadcast_raw(&tx)?; - if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) { + if let Err(e) = self.sender.try_send(Message::PeriodicUpdate) { warn!("failed to issue PeriodicUpdate after broadcast: {}", e); } Ok(json!(txid)) @@ -391,9 +390,10 @@ impl Connection { let (merkle, pos) = get_tx_merkle_proof(self.query.chain(), &txid, &blockid.hash) .chain_err(|| "cannot create merkle proof")?; Ok(json!({ - "block_height": blockid.height, - "merkle": merkle, - "pos": pos})) + "block_height": blockid.height, + "merkle": merkle, + "pos": pos + })) } fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result { @@ -409,7 +409,8 @@ impl Connection { Ok(json!({ "tx_hash": txid, - "merkle" : merkle})) + "merkle" : merkle + })) } fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result { @@ -523,10 +524,10 @@ impl Connection { Ok(()) } - fn handle_replies(&mut self) -> Result<()> { + fn handle_replies(&mut self, receiver: Receiver) -> Result<()> { let empty_params = json!([]); loop { - let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?; + let msg = receiver.recv().chain_err(|| "channel closed")?; let start_time = Instant::now(); trace!("RPC {:?}", msg); match msg { @@ -587,7 +588,7 @@ impl Connection { } } - fn handle_requests(mut reader: BufReader, tx: SyncSender) -> Result<()> { + fn parse_requests(mut reader: BufReader, tx: SyncSender) -> Result<()> { loop { let mut line = Vec::::new(); reader @@ -615,14 +616,14 @@ impl Connection { } } - pub fn run(mut self) { + pub fn run(mut self, receiver: Receiver) { self.stats.clients.inc(); conditionally_log_rpc_event!(self, json!({ "event": "connection established" })); let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - let tx = self.chan.sender(); - let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); - if let Err(e) = self.handle_replies() { + let sender = self.sender.clone(); + let child = spawn_thread("reader", || Connection::parse_requests(reader, sender)); + if let Err(e) = self.handle_replies(receiver) { error!( "[{}] connection handling failed: {}", self.addr, @@ -698,14 +699,15 @@ impl RPC { let mut senders = senders.lock().unwrap(); match msg { Notification::Periodic => { - for sender in senders.split_off(0) { + senders.retain(|sender| { if let Err(TrySendError::Disconnected(_)) = sender.try_send(Message::PeriodicUpdate) { - continue; + false // drop disconnected clients + } else { + true } - senders.push(sender); - } + }) } Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done } @@ -792,14 +794,16 @@ impl RPC { let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded(); while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { - // explicitely scope the shadowed variables for the new thread + // explicitly scope the shadowed variables for the new thread let query = Arc::clone(&query); - let senders = Arc::clone(&senders); let stats = Arc::clone(&stats); let garbage_sender = garbage_sender.clone(); + let rpc_logging = config.electrum_rpc_logging.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); - let rpc_logging = config.electrum_rpc_logging.clone(); + + let (sender, receiver) = mpsc::sync_channel(10); + senders.lock().unwrap().push(sender.clone()); let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); @@ -807,14 +811,14 @@ impl RPC { query, stream, addr, + sender, stats, txs_limit, #[cfg(feature = "electrum-discovery")] discovery, rpc_logging, ); - senders.lock().unwrap().push(conn.chan.sender()); - conn.run(); + conn.run(receiver); info!("[{}] disconnected peer", addr); let _ = garbage_sender.send(std::thread::current().id()); });