Skip to content

Commit

Permalink
Implement suggest_scan_ranges
Browse files Browse the repository at this point in the history
This implements a priority queue backed by the wallet database for scan
range ordering. The scan queue is updated on each call to `put_blocks`
or to `update_chain_tip`.
  • Loading branch information
nuttycom committed Jul 7, 2023
1 parent 4d5dc28 commit 8c18f88
Show file tree
Hide file tree
Showing 8 changed files with 926 additions and 30 deletions.
36 changes: 21 additions & 15 deletions zcash_client_backend/src/data_api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Interfaces for wallet data persistence & low-level wallet utilities.

use std::cmp;
use std::collections::HashMap;
use std::fmt::Debug;
use std::num::NonZeroU32;
use std::{cmp, ops::Range};

use incrementalmerkletree::Retention;
use secrecy::SecretVec;
Expand All @@ -29,9 +29,11 @@ use crate::{
};

use self::chain::CommitmentTreeRoot;
use self::scanning::ScanRange;

pub mod chain;
pub mod error;
pub mod scanning;
pub mod wallet;

pub const SAPLING_SHARD_HEIGHT: u8 = sapling::NOTE_COMMITMENT_TREE_DEPTH / 2;
Expand Down Expand Up @@ -88,11 +90,7 @@ pub trait WalletRead {
/// to the wallet are detected.
///
/// [`CompactBlock`]: crate::proto::compact_formats::CompactBlock
fn suggest_scan_ranges(
&self,
batch_size: usize,
limit: usize,
) -> Result<Vec<Range<BlockHeight>>, Self::Error>;
fn suggest_scan_ranges(&self) -> Result<Vec<ScanRange>, Self::Error>;

/// Returns the default target height (for the block in which a new
/// transaction would be mined) and anchor height (to use for a new
Expand Down Expand Up @@ -501,6 +499,14 @@ pub trait WalletWrite: WalletRead {
block: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error>;

/// Updates the wallet's view of the blockchain.
///
/// This method is used to provide the wallet with information about the state of the
/// blockchain. It should be called on wallet startup prior to calling
/// [`WalletRead::suggest_scan_ranges`] in order to provide the wallet with the information it
/// needs to correctly prioritize scanning operations.
fn update_chain_tip(&mut self, block_metadata: BlockMetadata) -> Result<(), Self::Error>;

/// Caches a decrypted transaction in the persistent wallet store.
fn store_decrypted_tx(
&mut self,
Expand Down Expand Up @@ -569,7 +575,7 @@ pub mod testing {
use incrementalmerkletree::Address;
use secrecy::{ExposeSecret, SecretVec};
use shardtree::{memory::MemoryShardStore, ShardTree, ShardTreeError};
use std::{collections::HashMap, convert::Infallible, ops::Range};
use std::{collections::HashMap, convert::Infallible};

use zcash_primitives::{
block::BlockHash,
Expand All @@ -591,9 +597,9 @@ pub mod testing {
};

use super::{
chain::CommitmentTreeRoot, BlockMetadata, DecryptedTransaction, NullifierQuery,
ScannedBlock, SentTransaction, WalletCommitmentTrees, WalletRead, WalletWrite,
SAPLING_SHARD_HEIGHT,
chain::CommitmentTreeRoot, scanning::ScanRange, BlockMetadata, DecryptedTransaction,
NullifierQuery, ScannedBlock, SentTransaction, WalletCommitmentTrees, WalletRead,
WalletWrite, SAPLING_SHARD_HEIGHT,
};

pub struct MockWalletDb {
Expand Down Expand Up @@ -634,11 +640,7 @@ pub mod testing {
Ok(None)
}

fn suggest_scan_ranges(
&self,
_batch_size: usize,
_limit: usize,
) -> Result<Vec<Range<BlockHeight>>, Self::Error> {
fn suggest_scan_ranges(&self) -> Result<Vec<ScanRange>, Self::Error> {
Ok(vec![])
}

Expand Down Expand Up @@ -780,6 +782,10 @@ pub mod testing {
Ok(vec![])
}

fn update_chain_tip(&mut self, _block_metadata: BlockMetadata) -> Result<(), Self::Error> {
Ok(())
}

fn store_decrypted_tx(
&mut self,
_received_tx: DecryptedTransaction,
Expand Down
77 changes: 77 additions & 0 deletions zcash_client_backend/src/data_api/scanning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::ops::Range;

use zcash_primitives::consensus::BlockHeight;

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum ScanPriority {
/// Block ranges that have already been scanned have lowest priority.
Scanned,
/// Block ranges to be scanned to advance the fully-scanned height.
Historic,
/// Block ranges adjacent to wallet open heights.
OpenAdjacent,
/// Blocks that must be scanned to complete note commitment tree shards adjacent to found notes.
FoundNote,
/// Blocks that must be scanned to complete the latest note commitment tree shard.
ChainTip,
/// A previously-scanned range that must be verified has highest priority.
Verify,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScanRange {
block_range: Range<BlockHeight>,
priority: ScanPriority,
}

impl ScanRange {
pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
ScanRange {
block_range,
priority,
}
}

pub fn block_range(&self) -> &Range<BlockHeight> {
&self.block_range
}
pub fn priority(&self) -> ScanPriority {
self.priority
}

pub fn truncate_left(&self, block_height: BlockHeight) -> Option<Self> {
if block_height >= self.block_range.end {
None
} else {
Some(ScanRange {
block_range: block_height..self.block_range.end,
priority: self.priority,
})
}
}

pub fn truncate_right(&self, block_height: BlockHeight) -> Option<Self> {
if block_height <= self.block_range.start {
None
} else {
Some(ScanRange {
block_range: self.block_range.start..block_height,
priority: self.priority,
})
}
}

pub fn split_at(&self, p: BlockHeight) -> (Self, Self) {
assert!(p > self.block_range.start && p < self.block_range.end);
(
ScanRange {
block_range: self.block_range.start..p,
priority: self.priority,
},
ScanRange {
block_range: p..self.block_range.end,
priority: self.priority,
},
)
}
}
4 changes: 3 additions & 1 deletion zcash_client_sqlite/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ where
let mut rows = stmt_blocks
.query(params![
from_height.map_or(0u32, u32::from),
limit.unwrap_or(u32::max_value()),
limit
.and_then(|l| u32::try_from(l).ok())
.unwrap_or(u32::MAX)
])
.map_err(to_chain_error)?;

Expand Down
55 changes: 42 additions & 13 deletions zcash_client_sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use either::Either;
use rusqlite::{self, Connection};
use secrecy::{ExposeSecret, SecretVec};
use std::{borrow::Borrow, collections::HashMap, convert::AsRef, fmt, io, ops::Range, path::Path};
use wallet::commitment_tree::put_shard_roots;

use incrementalmerkletree::Position;
use shardtree::{ShardTree, ShardTreeError};
Expand All @@ -58,6 +57,7 @@ use zcash_client_backend::{
data_api::{
self,
chain::{BlockSource, CommitmentTreeRoot},
scanning::ScanRange,
BlockMetadata, DecryptedTransaction, NullifierQuery, PoolType, Recipient, ScannedBlock,
SentTransaction, ShieldedProtocol, WalletCommitmentTrees, WalletRead, WalletWrite,
SAPLING_SHARD_HEIGHT,
Expand All @@ -80,13 +80,18 @@ use {
pub mod chain;
pub mod error;
pub mod serialization;

pub mod wallet;
use wallet::commitment_tree::put_shard_roots;

/// The maximum number of blocks the wallet is allowed to rewind. This is
/// consistent with the bound in zcashd, and allows block data deeper than
/// this delta from the chain tip to be pruned.
pub(crate) const PRUNING_DEPTH: u32 = 100;

/// The number of blocks to re-verify when the chain tip is updated.
pub(crate) const VALIDATION_DEPTH: u32 = 10;

pub(crate) const SAPLING_TABLES_PREFIX: &str = "sapling";

/// A newtype wrapper for sqlite primary key values for the notes
Expand Down Expand Up @@ -167,12 +172,8 @@ impl<C: Borrow<rusqlite::Connection>, P: consensus::Parameters> WalletRead for W
wallet::block_fully_scanned(self.conn.borrow())
}

fn suggest_scan_ranges(
&self,
_batch_size: usize,
_limit: usize,
) -> Result<Vec<Range<BlockHeight>>, Self::Error> {
todo!()
fn suggest_scan_ranges(&self) -> Result<Vec<ScanRange>, Self::Error> {
wallet::scanning::suggest_scan_ranges(self.conn.borrow()).map_err(SqliteClientError::from)
}

fn get_min_unspent_height(&self) -> Result<Option<BlockHeight>, Self::Error> {
Expand Down Expand Up @@ -401,16 +402,19 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
blocks: Vec<ScannedBlock<sapling::Nullifier>>,
) -> Result<Vec<Self::NoteRef>, Self::Error> {
self.transactionally(|wdb| {
let start_position = blocks.first().map(|block| {
Position::from(
u64::from(block.metadata().sapling_tree_size())
- u64::try_from(block.sapling_commitments().len()).unwrap(),
let start_positions = blocks.first().map(|block| {
(
block.height(),
Position::from(
u64::from(block.metadata().sapling_tree_size())
- u64::try_from(block.sapling_commitments().len()).unwrap(),
),
)
});
let mut wallet_note_ids = vec![];
let mut sapling_commitments = vec![];
let mut end_height = None;

let mut note_positions = vec![];
for block in blocks.into_iter() {
if end_height.iter().any(|prev| block.height() != *prev + 1) {
return Err(SqliteClientError::NonSequentialBlocks);
Expand Down Expand Up @@ -442,13 +446,21 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>
}
}

note_positions.extend(block.transactions().iter().flat_map(|wtx| {
wtx.sapling_outputs
.iter()
.map(|out| out.note_commitment_tree_position())
}));

end_height = Some(block.height());
sapling_commitments.extend(block.into_sapling_commitments().into_iter());
}

// We will have a start position and an end height in all cases where `blocks` is
// non-empty.
if let Some((start_position, end_height)) = start_position.zip(end_height) {
if let Some(((start_height, start_position), end_height)) =
start_positions.zip(end_height)
{
// Update the Sapling note commitment tree with all newly read note commitments
let mut sapling_commitments = sapling_commitments.into_iter();
wdb.with_sapling_tree_mut::<_, _, SqliteClientError>(move |sapling_tree| {
Expand All @@ -458,12 +470,29 @@ impl<P: consensus::Parameters> WalletWrite for WalletDb<rusqlite::Connection, P>

// Update now-expired transactions that didn't get mined.
wallet::update_expired_notes(wdb.conn.0, end_height)?;

wallet::scanning::scan_complete(
wdb.conn.0,
&wdb.params,
Range {
start: start_height,
end: end_height + 1,
},
&note_positions,
)?;
}

Ok(wallet_note_ids)
})
}

fn update_chain_tip(&mut self, block_metadata: BlockMetadata) -> Result<(), Self::Error> {
let tx = self.conn.transaction()?;
wallet::scanning::update_chain_tip(&tx, &self.params, block_metadata)?;
tx.commit()?;
Ok(())
}

fn store_decrypted_tx(
&mut self,
d_tx: DecryptedTransaction,
Expand Down
2 changes: 2 additions & 0 deletions zcash_client_sqlite/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use rusqlite::{self, named_params, OptionalExtension, ToSql};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io::{self, Cursor};

use zcash_client_backend::data_api::ShieldedProtocol;
use zcash_primitives::transaction::TransactionData;

Expand Down Expand Up @@ -109,6 +110,7 @@ use {
pub(crate) mod commitment_tree;
pub mod init;
pub(crate) mod sapling;
pub(crate) mod scanning;

pub(crate) const BLOCK_SAPLING_FRONTIER_ABSENT: &[u8] = &[0x0];

Expand Down
10 changes: 10 additions & 0 deletions zcash_client_sqlite/src/wallet/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,16 @@ mod tests {
FOREIGN KEY (block) REFERENCES blocks(height),
CONSTRAINT witness_height UNIQUE (note, block)
)",
"CREATE TABLE scan_queue (
block_range_start INTEGER NOT NULL,
block_range_end INTEGER NOT NULL,
priority INTEGER NOT NULL,
CONSTRAINT range_start_uniq UNIQUE (block_range_start),
CONSTRAINT range_end_uniq UNIQUE (block_range_end),
CONSTRAINT range_bounds_order CHECK (
block_range_start < block_range_end
)
)",
"CREATE TABLE schemer_migrations (
id blob PRIMARY KEY
)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use schemer_rusqlite::RusqliteMigration;
use shardtree::ShardTree;
use uuid::Uuid;

use zcash_client_backend::data_api::SAPLING_SHARD_HEIGHT;
use zcash_client_backend::data_api::{
scanning::{ScanPriority, ScanRange},
SAPLING_SHARD_HEIGHT,
};
use zcash_primitives::{
consensus::BlockHeight,
merkle_tree::{read_commitment_tree, read_incremental_witness},
Expand All @@ -20,8 +23,10 @@ use zcash_primitives::{

use crate::{
wallet::{
block_height_extrema,
commitment_tree::SqliteShardStore,
init::{migrations::received_notes_nullable_nf, WalletMigrationError},
scanning::insert_queue_entries,
},
PRUNING_DEPTH, SAPLING_TABLES_PREFIX,
};
Expand Down Expand Up @@ -184,6 +189,27 @@ impl RusqliteMigration for Migration {
}
}

// Establish the scan queue & wallet history table
transaction.execute_batch(
"CREATE TABLE scan_queue (
block_range_start INTEGER NOT NULL,
block_range_end INTEGER NOT NULL,
priority INTEGER NOT NULL,
CONSTRAINT range_start_uniq UNIQUE (block_range_start),
CONSTRAINT range_end_uniq UNIQUE (block_range_end),
CONSTRAINT range_bounds_order CHECK (
block_range_start < block_range_end
)
);",
)?;

if let Some((start, end)) = block_height_extrema(transaction)? {
insert_queue_entries(
transaction,
Some(ScanRange::from_parts(start..end, ScanPriority::Historic)).iter(),
)?;
}

Ok(())
}

Expand Down
Loading

0 comments on commit 8c18f88

Please sign in to comment.