Skip to content

Commit

Permalink
Continuously attempt to fetch mempool transactions
Browse files Browse the repository at this point in the history
Previously, if any of the mempool transactions were not available
because they were evicted between getting the mempool txids and txs
themselves, the mempool syncing operation would be aborted and tried
again from scratch.

With this change, we instead keep whatever transactions we were able to
fetch, then get the updated list of mempool txids and re-try fetching
missing ones continuously until we're able to get a full snapshot.
  • Loading branch information
shesek committed Jun 1, 2024
1 parent c60207a commit 6a14f40
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 62 deletions.
77 changes: 50 additions & 27 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;
use base64::prelude::{Engine, BASE64_STANDARD};
use error_chain::ChainedError;
use hex::FromHex;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -81,13 +81,13 @@ fn parse_error_code(err: &Value) -> Option<i64> {

fn parse_jsonrpc_reply(mut reply: Value, method: &str, expected_id: u64) -> Result<Value> {
if let Some(reply_obj) = reply.as_object_mut() {
if let Some(err) = reply_obj.get("error") {
if let Some(err) = reply_obj.get_mut("error") {
if !err.is_null() {
if let Some(code) = parse_error_code(&err) {
match code {
// RPC_IN_WARMUP -> retry by later reconnection
-28 => bail!(ErrorKind::Connection(err.to_string())),
_ => bail!("{} RPC error: {}", method, err),
code => bail!(ErrorKind::RpcError(code, err.take(), method.to_string())),
}
}
}
Expand Down Expand Up @@ -442,29 +442,37 @@ impl Daemon {
}
}

// Send requests in parallel over multiple connections as individual JSON-RPC requests (with no JSON-RPC batching)
fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
// Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching),
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
fn requests_iter<'a>(
&'a self,
method: &'a str,
params_list: Vec<Value>,
) -> impl ParallelIterator<Item = Result<Value>> + 'a {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap();

thread_pool.install(|| {
params_list
.par_iter()
.map(|params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as
// necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(method, params)
})
thread_pool.install(move || {
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as
// necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
.collect()
})
})
}

Expand All @@ -491,12 +499,12 @@ impl Daemon {
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
.requests("getblockhash", &heights)?
.requests("getblockhash", heights)?
.into_iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
let mut result = vec![];
for h in self.requests("getblockheader", &params_list)? {
for h in self.requests("getblockheader", params_list)? {
result.push(header_from_value(h)?);
}
Ok(result)
Expand All @@ -518,22 +526,37 @@ impl Daemon {
.iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
let values = self.requests("getblock", &params_list)?;
let values = self.requests("getblock", params_list)?;
let mut blocks = vec![];
for value in values {
blocks.push(block_from_value(value)?);
}
Ok(blocks)
}

pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result<Vec<Transaction>> {
let params_list: Vec<Value> = txhashes
/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<Transaction>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

let params_list: Vec<Value> = txids
.iter()
.map(|txhash| json!([txhash, /*verbose=*/ false]))
.collect();

let values = self.requests("getrawtransaction", &params_list)?;
values.into_iter().map(tx_from_value).collect()
self.requests_iter("getrawtransaction", params_list)
.filter_map(|res| match res {
Ok(val) => Some(tx_from_value(val)),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
}

pub fn gettransaction_raw(
Expand Down Expand Up @@ -574,7 +597,7 @@ impl Daemon {
let params_list: Vec<Value> = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect();

Ok(self
.requests("estimatesmartfee", &params_list)?
.requests("estimatesmartfee", params_list)?
.iter()
.zip(conf_targets)
.filter_map(|(reply, target)| {
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ error_chain! {
display("Connection error: {}", msg)
}

RpcError(code: i64, error: serde_json::Value, method: String) {
description("RPC error")
display("{} RPC error {}: {}", method, code, error)
}

Interrupt(sig: i32) {
description("Interruption by external signal")
display("Iterrupted by signal {}", sig)
Expand Down
74 changes: 48 additions & 26 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl Mempool {
&self.backlog_stats.0
}

pub fn old_txids(&self) -> HashSet<Txid> {
pub fn txids_set(&self) -> HashSet<Txid> {
return HashSet::from_iter(self.txstore.keys().cloned());
}

Expand All @@ -291,7 +291,11 @@ impl Mempool {
pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
if self.txstore.get(txid).is_none() {
if let Ok(tx) = daemon.getmempooltx(&txid) {
self.add(vec![tx])
self.add({
let mut txs_map = HashMap::new();
txs_map.insert(tx.txid(), tx);
txs_map
})
} else {
bail!("add_by_txid cannot find {}", txid);
}
Expand All @@ -300,17 +304,13 @@ impl Mempool {
}
}

fn add(&mut self, txs: Vec<Transaction>) -> Result<()> {
fn add(&mut self, txs_map: HashMap<Txid, Transaction>) -> Result<()> {
self.delta
.with_label_values(&["add"])
.observe(txs.len() as f64);
.observe(txs_map.len() as f64);
let _timer = self.latency.with_label_values(&["add"]).start_timer();

let spent_prevouts = get_prev_outpoints(&txs);
let txs_map = txs
.into_iter()
.map(|tx| (tx.txid(), tx))
.collect::<HashMap<_, _>>();
let spent_prevouts = get_prev_outpoints(txs_map.values());

// Lookup spent prevouts that were funded within the same `add` batch
let mut txos = HashMap::new();
Expand Down Expand Up @@ -487,31 +487,53 @@ impl Mempool {
.map_or_else(|| vec![], |entries| self._history(entries, limit))
}

/// Sync our local view of the mempool with the bitcoind RPC.
pub fn update(mempool: &Arc<RwLock<Mempool>>, daemon: &Daemon) -> Result<()> {
let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer();

// 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it
let old_txids = mempool.read().unwrap().old_txids();
let all_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect();
// Continuously attempt to fetch mempool transactions until we're able to get them in full
let mut fetched_txs = HashMap::<Txid, Transaction>::new();
let mut indexed_txids = mempool.read().unwrap().txids_set();
loop {
// Get bitcoind's current list of mempool txids
let all_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;

// Remove evicted mempool transactions
mempool
.write()
.unwrap()
.remove(indexed_txids.difference(&all_txids).collect());

// 2. Remove missing transactions. Even if we are unable to download new transactions from
// the daemon, we still want to remove the transactions that are no longer in the mempool.
mempool.write().unwrap().remove(txids_to_remove);
indexed_txids.retain(|txid| all_txids.contains(txid));
fetched_txs.retain(|txid, _| all_txids.contains(txid));

// 3. Download the new transactions from the daemon's mempool
let new_txids: Vec<&Txid> = all_txids.difference(&old_txids).collect();
let txs_to_add = daemon
.gettransactions(&new_txids)
.chain_err(|| format!("failed to get {} transactions", new_txids.len()))?;
// Fetch missing transactions from bitcoind
let new_txids = all_txids
.iter()
.filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid))
.collect::<Vec<_>>();
let new_txs = daemon.gettransactions_available(&new_txids)?;
let fetched_count = new_txs.len();
fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx)));

// Retry if any transactions were evicted form the mempool before we managed to get them
if fetched_count != new_txids.len() {
warn!(
"failed to fetch {} mempool txs, retrying...",
new_txids.len() - fetched_count
);
} else {
break;
}
}

// 4. Update local mempool to match daemon's state
// Add fetched transactions to our view of the mempool
{
let mut mempool = mempool.write().unwrap();
// Add new transactions
mempool.add(txs_to_add)?;

mempool.add(fetched_txs)?;

mempool
.count
Expand Down
17 changes: 8 additions & 9 deletions src/util/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,14 @@ pub fn extract_tx_prevouts<'a>(
.collect()
}

pub fn get_prev_outpoints(txs: &[Transaction]) -> BTreeSet<OutPoint> {
txs.iter()
.flat_map(|tx| {
tx.input
.iter()
.filter(|txin| has_prevout(txin))
.map(|txin| txin.previous_output)
})
.collect()
pub fn get_prev_outpoints<'a>(txs: impl Iterator<Item = &'a Transaction>) -> BTreeSet<OutPoint> {
txs.flat_map(|tx| {
tx.input
.iter()
.filter(|txin| has_prevout(txin))
.map(|txin| txin.previous_output)
})
.collect()
}

pub fn serialize_outpoint<S>(outpoint: &OutPoint, serializer: S) -> Result<S::Ok, S::Error>
Expand Down

0 comments on commit 6a14f40

Please sign in to comment.