diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 811cf0138..c3fc6d072 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::mpsc::{Sender, SyncSender, TrySendError}; +use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Instant; @@ -25,9 +25,7 @@ use crate::errors::*; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use crate::new_index::{Query, Utxo}; use crate::util::electrum_merkle::{get_header_merkle_proof, get_id_from_pos, get_tx_merkle_proof}; -use crate::util::{ - create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry, SyncChannel, -}; +use crate::util::{create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry}; const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION"); const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4); @@ -105,7 +103,7 @@ struct Connection { status_hashes: HashMap, // ScriptHash -> StatusHash stream: TcpStream, addr: SocketAddr, - chan: SyncChannel, + sender: SyncSender, stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] @@ -118,6 +116,7 @@ impl Connection { query: Arc, stream: TcpStream, addr: SocketAddr, + sender: SyncSender, stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, @@ -129,7 +128,7 @@ impl Connection { status_hashes: HashMap::new(), stream, addr, - chan: SyncChannel::new(10), + sender, stats, txs_limit, #[cfg(feature = "electrum-discovery")] @@ -352,7 +351,7 @@ impl Connection { let tx = params.get(0).chain_err(|| "missing tx")?; let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string(); let txid = self.query.broadcast_raw(&tx)?; - if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) { + if let Err(e) = self.sender.try_send(Message::PeriodicUpdate) { warn!("failed to issue PeriodicUpdate after broadcast: {}", e); } Ok(json!(txid)) @@ -391,9 +390,10 @@ impl Connection { let (merkle, pos) = get_tx_merkle_proof(self.query.chain(), &txid, &blockid.hash) .chain_err(|| "cannot create merkle proof")?; Ok(json!({ - "block_height": blockid.height, - "merkle": merkle, - "pos": pos})) + "block_height": blockid.height, + "merkle": merkle, + "pos": pos + })) } fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result { @@ -409,7 +409,8 @@ impl Connection { Ok(json!({ "tx_hash": txid, - "merkle" : merkle})) + "merkle" : merkle + })) } fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result { @@ -523,10 +524,10 @@ impl Connection { Ok(()) } - fn handle_replies(&mut self) -> Result<()> { + fn handle_replies(&mut self, receiver: Receiver) -> Result<()> { let empty_params = json!([]); loop { - let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?; + let msg = receiver.recv().chain_err(|| "channel closed")?; let start_time = Instant::now(); trace!("RPC {:?}", msg); match msg { @@ -587,7 +588,7 @@ impl Connection { } } - fn handle_requests(mut reader: BufReader, tx: SyncSender) -> Result<()> { + fn parse_requests(mut reader: BufReader, tx: SyncSender) -> Result<()> { loop { let mut line = Vec::::new(); reader @@ -615,14 +616,14 @@ impl Connection { } } - pub fn run(mut self) { + pub fn run(mut self, receiver: Receiver) { self.stats.clients.inc(); conditionally_log_rpc_event!(self, json!({ "event": "connection established" })); let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - let tx = self.chan.sender(); - let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); - if let Err(e) = self.handle_replies() { + let sender = self.sender.clone(); + let child = spawn_thread("reader", || Connection::parse_requests(reader, sender)); + if let Err(e) = self.handle_replies(receiver) { error!( "[{}] connection handling failed: {}", self.addr, @@ -698,14 +699,15 @@ impl RPC { let mut senders = senders.lock().unwrap(); match msg { Notification::Periodic => { - for sender in senders.split_off(0) { + senders.retain(|sender| { if let Err(TrySendError::Disconnected(_)) = sender.try_send(Message::PeriodicUpdate) { - continue; + false // drop disconnected clients + } else { + true } - senders.push(sender); - } + }) } Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done } @@ -792,14 +794,16 @@ impl RPC { let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded(); while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { - // explicitely scope the shadowed variables for the new thread + // explicitly scope the shadowed variables for the new thread let query = Arc::clone(&query); - let senders = Arc::clone(&senders); let stats = Arc::clone(&stats); let garbage_sender = garbage_sender.clone(); + let rpc_logging = config.electrum_rpc_logging.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); - let rpc_logging = config.electrum_rpc_logging.clone(); + + let (sender, receiver) = mpsc::sync_channel(10); + senders.lock().unwrap().push(sender.clone()); let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); @@ -807,14 +811,14 @@ impl RPC { query, stream, addr, + sender, stats, txs_limit, #[cfg(feature = "electrum-discovery")] discovery, rpc_logging, ); - senders.lock().unwrap().push(conn.chan.sender()); - conn.run(); + conn.run(receiver); info!("[{}] disconnected peer", addr); let _ = garbage_sender.send(std::thread::current().id()); });