Skip to content

Commit

Permalink
Make sure the chain tip doesn't move while fetching the mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
shesek committed Jun 1, 2024
1 parent 6a14f40 commit a6a796c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
22 changes: 8 additions & 14 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,11 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
loop {
match Mempool::update(&mempool, &daemon) {
Ok(_) => break,
Err(e) => {
warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain());
signal.wait(Duration::from_secs(5), false)?;
},
}

while !Mempool::update(&mempool, &daemon, &tip)? {
// Mempool syncing was aborted because the chain tip moved;
// Index the new block(s) and try again.
tip = indexer.update(&daemon)?;
}

#[cfg(feature = "liquid")]
Expand Down Expand Up @@ -118,7 +115,6 @@ fn run_server(config: Arc<Config>) -> Result<()> {
));

loop {

main_loop_count.inc();

if let Err(err) = signal.wait(Duration::from_secs(5), true) {
Expand All @@ -131,14 +127,12 @@ fn run_server(config: Arc<Config>) -> Result<()> {
// Index new blocks
let current_tip = daemon.getbestblockhash()?;
if current_tip != tip {
indexer.update(&daemon)?;
tip = current_tip;
tip = indexer.update(&daemon)?;
};

// Update mempool
if let Err(e) = Mempool::update(&mempool, &daemon) {
// Log the error if the result is an Err
warn!("Error updating mempool, skipping mempool update: {}", e.display_chain());
if !Mempool::update(&mempool, &daemon, &tip)? {
warn!("skipped failed mempool update, trying again in 5 seconds");
}

// Update subscribed clients
Expand Down
20 changes: 16 additions & 4 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::iter::FromIterator;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid};
use crate::chain::{deserialize, BlockHash, Network, OutPoint, Transaction, TxOut, Txid};
use crate::config::Config;
use crate::daemon::Daemon;
use crate::errors::*;
Expand Down Expand Up @@ -487,8 +487,13 @@ impl Mempool {
.map_or_else(|| vec![], |entries| self._history(entries, limit))
}

/// Sync our local view of the mempool with the bitcoind RPC.
pub fn update(mempool: &Arc<RwLock<Mempool>>, daemon: &Daemon) -> Result<()> {
/// Sync our local view of the mempool with the bitcoind Daemon RPC. If the chain tip moves before
/// the mempool is fetched in full, syncing is aborted and an Ok(false) is returned.
pub fn update(
mempool: &Arc<RwLock<Mempool>>,
daemon: &Daemon,
tip: &BlockHash,
) -> Result<bool> {
let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer();

// Continuously attempt to fetch mempool transactions until we're able to get them in full
Expand All @@ -515,6 +520,13 @@ impl Mempool {
.filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid))
.collect::<Vec<_>>();
let new_txs = daemon.gettransactions_available(&new_txids)?;

// Abort if the chain tip moved while fetching transactions
if daemon.getbestblockhash()? != *tip {
warn!("chain tip moved while updating mempool");
return Ok(false);
}

let fetched_count = new_txs.len();
fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx)));

Expand Down Expand Up @@ -546,7 +558,7 @@ impl Mempool {
}
}

Ok(())
Ok(true)
}
}

Expand Down
8 changes: 4 additions & 4 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl TestRunner {
};

let mut indexer = Indexer::open(Arc::clone(&store), fetch_from, &config, &metrics);
indexer.update(&daemon)?;
let tip = indexer.update(&daemon)?;
indexer.fetch_from(FetchFrom::Bitcoind);

let chain = Arc::new(ChainQuery::new(
Expand All @@ -164,7 +164,7 @@ impl TestRunner {
&metrics,
Arc::clone(&config),
)));
Mempool::update(&mempool, &daemon)?;
assert!(Mempool::update(&mempool, &daemon, &tip)?);

let query = Arc::new(Query::new(
Arc::clone(&chain),
Expand Down Expand Up @@ -195,8 +195,8 @@ impl TestRunner {
}

pub fn sync(&mut self) -> Result<()> {
self.indexer.update(&self.daemon)?;
Mempool::update(&self.mempool, &self.daemon)?;
let tip = self.indexer.update(&self.daemon)?;
assert!(Mempool::update(&self.mempool, &self.daemon, &tip)?);
// force an update for the mempool stats, which are normally cached
self.mempool.write().unwrap().update_backlog_stats();
Ok(())
Expand Down

0 comments on commit a6a796c

Please sign in to comment.