diff --git a/src/daemon.rs b/src/daemon.rs index cd4f81cd6..5db81dd96 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,7 +11,7 @@ use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use error_chain::ChainedError; use hex::FromHex; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -81,13 +81,13 @@ fn parse_error_code(err: &Value) -> Option { fn parse_jsonrpc_reply(mut reply: Value, method: &str, expected_id: u64) -> Result { if let Some(reply_obj) = reply.as_object_mut() { - if let Some(err) = reply_obj.get("error") { + if let Some(err) = reply_obj.get_mut("error") { if !err.is_null() { if let Some(code) = parse_error_code(&err) { match code { // RPC_IN_WARMUP -> retry by later reconnection -28 => bail!(ErrorKind::Connection(err.to_string())), - _ => bail!("{} RPC error: {}", method, err), + code => bail!(ErrorKind::RpcError(code, err.take(), method.to_string())), } } } @@ -442,29 +442,37 @@ impl Daemon { } } - // Send requests in parallel over multiple connections as individual JSON-RPC requests (with no JSON-RPC batching) - fn requests(&self, method: &str, params_list: &[Value]) -> Result> { + // Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching), + // buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned. + fn requests(&self, method: &str, params_list: Vec) -> Result> { + self.requests_iter(method, params_list).collect() + } + + // Send requests in parallel over multiple RPC connections, iterating over the results without buffering them. + // Errors are included in the iterator and do not terminate other pending requests. + fn requests_iter<'a>( + &'a self, + method: &'a str, + params_list: Vec, + ) -> impl ParallelIterator> + 'a { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(self.daemon_parallelism) .thread_name(|i| format!("rpc-requests-{}", i)) .build() .unwrap(); - thread_pool.install(|| { - params_list - .par_iter() - .map(|params| { - // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as - // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. - thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); - - DAEMON_INSTANCE.with(|daemon| { - daemon - .get_or_init(|| self.retry_reconnect()) - .retry_request(method, params) - }) + thread_pool.install(move || { + params_list.into_par_iter().map(move |params| { + // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as + // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. + thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); + + DAEMON_INSTANCE.with(|daemon| { + daemon + .get_or_init(|| self.retry_reconnect()) + .retry_request(&method, ¶ms) }) - .collect() + }) }) } @@ -491,12 +499,12 @@ impl Daemon { pub fn getblockheaders(&self, heights: &[usize]) -> Result> { let heights: Vec = heights.iter().map(|height| json!([height])).collect(); let params_list: Vec = self - .requests("getblockhash", &heights)? + .requests("getblockhash", heights)? .into_iter() .map(|hash| json!([hash, /*verbose=*/ false])) .collect(); let mut result = vec![]; - for h in self.requests("getblockheader", ¶ms_list)? { + for h in self.requests("getblockheader", params_list)? { result.push(header_from_value(h)?); } Ok(result) @@ -518,7 +526,7 @@ impl Daemon { .iter() .map(|hash| json!([hash, /*verbose=*/ false])) .collect(); - let values = self.requests("getblock", ¶ms_list)?; + let values = self.requests("getblock", params_list)?; let mut blocks = vec![]; for value in values { blocks.push(block_from_value(value)?); @@ -526,14 +534,29 @@ impl Daemon { Ok(blocks) } - pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result> { - let params_list: Vec = txhashes + /// Fetch the given transactions in parallel over multiple threads and RPC connections, + /// ignoring any missing ones and returning whatever is available. + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; + + let params_list: Vec = txids .iter() .map(|txhash| json!([txhash, /*verbose=*/ false])) .collect(); - let values = self.requests("getrawtransaction", ¶ms_list)?; - values.into_iter().map(tx_from_value).collect() + self.requests_iter("getrawtransaction", params_list) + .filter_map(|res| match res { + Ok(val) => Some(tx_from_value(val)), + // Ignore 'tx not found' errors + Err(Error(ErrorKind::RpcError(code, _, _), _)) + if code == RPC_INVALID_ADDRESS_OR_KEY => + { + None + } + // Terminate iteration if any other errors are encountered + Err(e) => Some(Err(e)), + }) + .collect() } pub fn gettransaction_raw( @@ -574,7 +597,7 @@ impl Daemon { let params_list: Vec = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect(); Ok(self - .requests("estimatesmartfee", ¶ms_list)? + .requests("estimatesmartfee", params_list)? .iter() .zip(conf_targets) .filter_map(|(reply, target)| { diff --git a/src/errors.rs b/src/errors.rs index cec50ccef..c708d7dda 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -9,6 +9,11 @@ error_chain! { display("Connection error: {}", msg) } + RpcError(code: i64, error: serde_json::Value, method: String) { + description("RPC error") + display("{} RPC error {}: {}", method, code, error) + } + Interrupt(sig: i32) { description("Interruption by external signal") display("Iterrupted by signal {}", sig) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 770c328da..b260c09af 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -276,7 +276,7 @@ impl Mempool { &self.backlog_stats.0 } - pub fn old_txids(&self) -> HashSet { + pub fn txids_set(&self) -> HashSet { return HashSet::from_iter(self.txstore.keys().cloned()); } @@ -291,7 +291,11 @@ impl Mempool { pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { if self.txstore.get(txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { - self.add(vec![tx]) + self.add({ + let mut txs_map = HashMap::new(); + txs_map.insert(tx.txid(), tx); + txs_map + }) } else { bail!("add_by_txid cannot find {}", txid); } @@ -300,17 +304,13 @@ impl Mempool { } } - fn add(&mut self, txs: Vec) -> Result<()> { + fn add(&mut self, txs_map: HashMap) -> Result<()> { self.delta .with_label_values(&["add"]) - .observe(txs.len() as f64); + .observe(txs_map.len() as f64); let _timer = self.latency.with_label_values(&["add"]).start_timer(); - let spent_prevouts = get_prev_outpoints(&txs); - let txs_map = txs - .into_iter() - .map(|tx| (tx.txid(), tx)) - .collect::>(); + let spent_prevouts = get_prev_outpoints(txs_map.values()); // Lookup spent prevouts that were funded within the same `add` batch let mut txos = HashMap::new(); @@ -487,31 +487,53 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } + /// Sync our local view of the mempool with the bitcoind RPC. pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); - // 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it - let old_txids = mempool.read().unwrap().old_txids(); - let all_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect(); + // Continuously attempt to fetch mempool transactions until we're able to get them in full + let mut fetched_txs = HashMap::::new(); + let mut indexed_txids = mempool.read().unwrap().txids_set(); + loop { + // Get bitcoind's current list of mempool txids + let all_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; + + // Remove evicted mempool transactions + mempool + .write() + .unwrap() + .remove(indexed_txids.difference(&all_txids).collect()); - // 2. Remove missing transactions. Even if we are unable to download new transactions from - // the daemon, we still want to remove the transactions that are no longer in the mempool. - mempool.write().unwrap().remove(txids_to_remove); + indexed_txids.retain(|txid| all_txids.contains(txid)); + fetched_txs.retain(|txid, _| all_txids.contains(txid)); - // 3. Download the new transactions from the daemon's mempool - let new_txids: Vec<&Txid> = all_txids.difference(&old_txids).collect(); - let txs_to_add = daemon - .gettransactions(&new_txids) - .chain_err(|| format!("failed to get {} transactions", new_txids.len()))?; + // Fetch missing transactions from bitcoind + let new_txids = all_txids + .iter() + .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) + .collect::>(); + let new_txs = daemon.gettransactions_available(&new_txids)?; + let fetched_count = new_txs.len(); + fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx))); + + // Retry if any transactions were evicted form the mempool before we managed to get them + if fetched_count != new_txids.len() { + warn!( + "failed to fetch {} mempool txs, retrying...", + new_txids.len() - fetched_count + ); + } else { + break; + } + } - // 4. Update local mempool to match daemon's state + // Add fetched transactions to our view of the mempool { let mut mempool = mempool.write().unwrap(); - // Add new transactions - mempool.add(txs_to_add)?; + + mempool.add(fetched_txs)?; mempool .count diff --git a/src/util/transaction.rs b/src/util/transaction.rs index 9becde94b..4daa8547b 100644 --- a/src/util/transaction.rs +++ b/src/util/transaction.rs @@ -96,15 +96,14 @@ pub fn extract_tx_prevouts<'a>( .collect() } -pub fn get_prev_outpoints(txs: &[Transaction]) -> BTreeSet { - txs.iter() - .flat_map(|tx| { - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - }) - .collect() +pub fn get_prev_outpoints<'a>(txs: impl Iterator) -> BTreeSet { + txs.flat_map(|tx| { + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + }) + .collect() } pub fn serialize_outpoint(outpoint: &OutPoint, serializer: S) -> Result