diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 4c1bd0ad7..107ea714d 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,8 +1,10 @@ use async_trait::async_trait; use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithHistory, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::Sleeper; @@ -77,10 +79,19 @@ where let mut last_active_indices = BTreeMap::::new(); for keychain in keychains { let keychain_spks = request.iter_spks(keychain.clone()); + let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithHistory { + spk, + txids: HashSet::::new(), + }, + ) + }); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, &mut inserted_txs, - keychain_spks, + spks_with_history, stop_gap, parallel_requests, ) @@ -125,7 +136,7 @@ where fetch_txs_with_spks( self, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_history(), parallel_requests, ) .await?, @@ -279,12 +290,12 @@ async fn chain_update( async fn fetch_txs_with_keychain_spks( client: &esplora_client::AsyncClient, inserted_txs: &mut HashSet, - mut keychain_spks: I, + mut spks_with_history: I, stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> where - I: Iterator> + Send, + I: Iterator> + Send, S: Sleeper + Clone + Send + Sync, { type TxsOfSpkIndex = (u32, Vec); @@ -292,18 +303,22 @@ where let mut update = TxUpdate::::default(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut spk_txids: HashSet = HashSet::new(); loop { - let handles = keychain_spks + let handles = spks_with_history .by_ref() .take(parallel_requests) - .map(|(spk_index, spk)| { + .map(|(spk_index, spk_with_history)| { + spk_txids.extend(&spk_with_history.txids); let client = client.clone(); async move { let mut last_seen = None; let mut spk_txs = Vec::new(); loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; + let txs = client + .scripthash_txs(&spk_with_history.spk, last_seen) + .await?; let tx_count = txs.len(); last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); @@ -344,6 +359,8 @@ where } } + update.missing = spk_txids.difference(inserted_txs).cloned().collect(); + Ok((update, last_active_index)) } @@ -358,18 +375,21 @@ where async fn fetch_txs_with_spks( client: &esplora_client::AsyncClient, inserted_txs: &mut HashSet, - spks: I, + spks_with_history: I, parallel_requests: usize, ) -> Result, Error> where - I: IntoIterator + Send, + I: IntoIterator + Send, I::IntoIter: Send, S: Sleeper + Clone + Send + Sync, { fetch_txs_with_keychain_spks( client, inserted_txs, - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + spks_with_history + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), usize::MAX, parallel_requests, ) diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 655055b33..f64212bb3 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,9 @@ use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithHistory, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::{OutputStatus, Tx}; @@ -67,10 +69,19 @@ impl EsploraExt for esplora_client::BlockingClient { let mut last_active_indices = BTreeMap::::new(); for keychain in request.keychains() { let keychain_spks = request.iter_spks(keychain.clone()); + let spks_with_history = keychain_spks.into_iter().map(|(i, spk)| { + ( + i, + SpkWithHistory { + spk, + txids: HashSet::::new(), + }, + ) + }); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, &mut inserted_txs, - keychain_spks, + spks_with_history, stop_gap, parallel_requests, )?; @@ -116,7 +127,7 @@ impl EsploraExt for esplora_client::BlockingClient { tx_update.extend(fetch_txs_with_spks( self, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_history(), parallel_requests, )?); tx_update.extend(fetch_txs_with_txids( @@ -248,10 +259,10 @@ fn chain_update( Ok(tip) } -fn fetch_txs_with_keychain_spks>>( +fn fetch_txs_with_keychain_spks>>( client: &esplora_client::BlockingClient, inserted_txs: &mut HashSet, - mut keychain_spks: I, + mut spks_with_history: I, stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> { @@ -260,19 +271,21 @@ fn fetch_txs_with_keychain_spks>>( let mut update = TxUpdate::::default(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut spk_txids: HashSet = HashSet::new(); loop { - let handles = keychain_spks + let handles = spks_with_history .by_ref() .take(parallel_requests) - .map(|(spk_index, spk)| { + .map(|(spk_index, spk_with_history)| { + spk_txids.extend(&spk_with_history.txids); std::thread::spawn({ let client = client.clone(); move || -> Result { let mut last_seen = None; let mut spk_txs = Vec::new(); loop { - let txs = client.scripthash_txs(&spk, last_seen)?; + let txs = client.scripthash_txs(&spk_with_history.spk, last_seen)?; let tx_count = txs.len(); last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); @@ -315,6 +328,8 @@ fn fetch_txs_with_keychain_spks>>( } } + update.missing = spk_txids.difference(inserted_txs).cloned().collect(); + Ok((update, last_active_index)) } @@ -326,16 +341,19 @@ fn fetch_txs_with_keychain_spks>>( /// requests to make in parallel. /// /// Refer to [crate-level docs](crate) for more. -fn fetch_txs_with_spks>( +fn fetch_txs_with_spks>( client: &esplora_client::BlockingClient, inserted_txs: &mut HashSet, - spks: I, + spks_with_history: I, parallel_requests: usize, ) -> Result, Error> { fetch_txs_with_keychain_spks( client, inserted_txs, - spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), + spks_with_history + .into_iter() + .enumerate() + .map(|(i, spk_with_history)| (i as u32, spk_with_history)), usize::MAX, parallel_requests, )