diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index af2b5039..35385ba8 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -1025,285 +1025,4 @@ impl GroveDb { } Ok(issues) } - - // Returns the discovered subtrees found recursively along with their associated - // metadata Params: - // tx: Transaction. Function returns the data by opening merks at given tx. - // TODO: Add a SubTreePath as param and start searching from that path instead - // of root (as it is now) - pub fn get_subtrees_metadata<'db>( - &'db self, - tx: &'db Transaction, - ) -> Result { - let mut subtrees_metadata = crate::SubtreesMetadata::new(); - - let subtrees_root = self.find_subtrees(&SubtreePath::empty(), Some(tx)).value?; - for subtree in subtrees_root.into_iter() { - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); - - let current_path = SubtreePath::from(path); - - let parent_path_opt = current_path.derive_parent(); - if parent_path_opt.is_some() { - let parent_path = parent_path_opt.unwrap().0; - let parent_merk = self - .open_transactional_merk_at_path(parent_path, tx, None) - .value?; - let parent_key = subtree.last().unwrap(); - let (elem_value, elem_value_hash) = parent_merk - .get_value_and_value_hash( - parent_key, - true, - None::<&fn(&[u8]) -> Option>, - ) - .value - .expect("should get value hash") - .expect("value hash should be some"); - - let actual_value_hash = value_hash(&elem_value).unwrap(); - subtrees_metadata.data.insert( - prefix, - (current_path.to_vec(), actual_value_hash, elem_value_hash), - ); - } else { - subtrees_metadata.data.insert( - prefix, - ( - current_path.to_vec(), - CryptoHash::default(), - CryptoHash::default(), - ), - ); - } - } - Ok(subtrees_metadata) - } - - // Fetch a chunk by global chunk id (should be called by ABCI when - // LoadSnapshotChunk method is called) Params: - // global_chunk_id: Global chunk id in the following format: - // [SUBTREE_PREFIX:CHUNK_ID] SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros - // = Root subtree) CHUNK_ID: 0.. bytes (optional) Traversal instructions to - // the root of the given chunk. Traversal instructions are "1" for left, and - // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization - // as a subtree can be big hence traversal instructions for the deepest chunks - // tx: Transaction. Function returns the data by opening merks at given tx. - // TODO: Make this tx optional: None -> Use latest data - // Returns the Chunk proof operators for the requested chunk - pub fn fetch_chunk<'db>( - &'db self, - global_chunk_id: &[u8], - tx: &'db Transaction, - ) -> Result, Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: SubtreePrefix = array; - - let subtrees_metadata = self.get_subtrees_metadata(tx)?; - - match subtrees_metadata.data.get(&chunk_prefix_key) { - Some(path_data) => { - let subtree = &path_data.0; - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - - let merk = self - .open_transactional_merk_at_path(path.into(), tx, None) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer - .chunk(String::from_utf8(chunk_id.to_vec()).unwrap().as_str()); - match chunk_res { - Ok((chunk, _)) => Ok(chunk), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } - None => Err(Error::CorruptedData("Prefix not found".to_string())), - } - } - - // Starts a state sync process (should be called by ABCI when OfferSnapshot - // method is called) Params: - // state_sync_info: Consumed StateSyncInfo - // app_hash: Snapshot's AppHash - // tx: Transaction for the state sync - // Returns the first set of global chunk ids that can be fetched from sources (+ - // the StateSyncInfo transferring ownership back to the caller) - pub fn start_snapshot_syncing<'db>( - &'db self, - mut state_sync_info: StateSyncInfo<'db>, - app_hash: CryptoHash, - tx: &'db Transaction, - ) -> Result<(Vec>, StateSyncInfo), Error> { - let mut res = vec![]; - - match ( - &mut state_sync_info.restorer, - &state_sync_info.current_prefix, - ) { - (None, None) => { - if state_sync_info.pending_chunks.is_empty() - && state_sync_info.processed_prefixes.is_empty() - { - let root_prefix = [0u8; 32]; - let merk = self - .open_merk_for_replication(SubtreePath::empty(), tx) - .unwrap(); - let restorer = Restorer::new(merk, app_hash, None); - state_sync_info.restorer = Some(restorer); - state_sync_info.current_prefix = Some(root_prefix); - state_sync_info.pending_chunks.insert(root_prefix.to_vec()); - - res.push(root_prefix.to_vec()); - } else { - return Err(Error::InternalError("Invalid internal state sync info")); - } - } - _ => { - return Err(Error::InternalError( - "GroveDB has already started a snapshot syncing", - )); - } - } - - Ok((res, state_sync_info)) - } - - // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is - // called) Params: - // state_sync_info: Consumed StateSyncInfo - // chunk: (Global chunk id, Chunk proof operators) - // tx: Transaction for the state sync - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the StateSyncInfo transferring ownership back to the caller) - pub fn apply_chunk<'db>( - &'db self, - mut state_sync_info: StateSyncInfo<'db>, - chunk: (&[u8], Vec), - tx: &'db Transaction, - ) -> Result<(Vec>, StateSyncInfo), Error> { - let mut res = vec![]; - - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?; - - match ( - &mut state_sync_info.restorer, - &state_sync_info.current_prefix, - ) { - (Some(restorer), Some(ref current_prefix)) => { - if *current_prefix != chunk_prefix { - return Err(Error::InternalError("Invalid incoming prefix")); - } - if !state_sync_info.pending_chunks.contains(global_chunk_id) { - return Err(Error::InternalError( - "Incoming global_chunk_id not expected", - )); - } - state_sync_info.pending_chunks.remove(global_chunk_id); - if !chunk_data.is_empty() { - match restorer.process_chunk(chunk_id.to_string(), chunk_data) { - Ok(next_chunk_ids) => { - state_sync_info.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(next_chunk_id.as_bytes().to_vec()); - state_sync_info - .pending_chunks - .insert(next_global_chunk_id.clone()); - res.push(next_global_chunk_id); - } - } - _ => { - return Err(Error::InternalError("Unable to process incoming chunk")); - } - }; - } - } - _ => { - return Err(Error::InternalError("GroveDB is not in syncing mode")); - } - } - - if res.is_empty() { - if !state_sync_info.pending_chunks.is_empty() { - return Ok((res, state_sync_info)); - } - match ( - state_sync_info.restorer.take(), - state_sync_info.current_prefix.take(), - ) { - (Some(restorer), Some(current_prefix)) => { - if (state_sync_info.num_processed_chunks > 0) && (restorer.finalize().is_err()) - { - return Err(Error::InternalError("Unable to finalize merk")); - } - state_sync_info.processed_prefixes.insert(current_prefix); - - let subtrees_metadata = self.get_subtrees_metadata(tx)?; - if let Some(value) = subtrees_metadata.data.get(¤t_prefix) { - println!( - " path:{:?} done", - replication::util_path_to_string(&value.0) - ); - } - - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) { - let (current_path, s_actual_value_hash, s_elem_value_hash) = - &prefix_metadata; - - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - - let merk = self.open_merk_for_replication(path.into(), tx).unwrap(); - let restorer = - Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); - state_sync_info.restorer = Some(restorer); - state_sync_info.current_prefix = Some(*prefix); - state_sync_info.num_processed_chunks = 0; - - let root_chunk_prefix = prefix.to_vec(); - state_sync_info - .pending_chunks - .insert(root_chunk_prefix.clone()); - res.push(root_chunk_prefix); - break; - } - } - } - _ => { - return Err(Error::InternalError("Unable to finalize tree")); - } - } - } - - Ok((res, state_sync_info)) - } } diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index d43c7787..42943c06 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -3,11 +3,18 @@ use std::{ fmt, }; -use grovedb_merk::{merk::restore::Restorer, tree::hash::CryptoHash}; +use grovedb_merk::{ + merk::restore::Restorer, + proofs::Op, + tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, + ChunkProducer, +}; +use grovedb_path::SubtreePath; +use grovedb_storage::rocksdb_storage::RocksDbStorage; #[rustfmt::skip] use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; -use crate::Error; +use crate::{replication, Error, GroveDb, Transaction}; pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; @@ -97,3 +104,287 @@ pub fn util_split_global_chunk_id( )), } } + +#[cfg(feature = "full")] +impl GroveDb { + // Returns the discovered subtrees found recursively along with their associated + // metadata Params: + // tx: Transaction. Function returns the data by opening merks at given tx. + // TODO: Add a SubTreePath as param and start searching from that path instead + // of root (as it is now) + pub fn get_subtrees_metadata<'db>( + &'db self, + tx: &'db Transaction, + ) -> Result { + let mut subtrees_metadata = crate::SubtreesMetadata::new(); + + let subtrees_root = self.find_subtrees(&SubtreePath::empty(), Some(tx)).value?; + for subtree in subtrees_root.into_iter() { + let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); + + let current_path = SubtreePath::from(path); + + let parent_path_opt = current_path.derive_parent(); + if parent_path_opt.is_some() { + let parent_path = parent_path_opt.unwrap().0; + let parent_merk = self + .open_transactional_merk_at_path(parent_path, tx, None) + .value?; + let parent_key = subtree.last().unwrap(); + let (elem_value, elem_value_hash) = parent_merk + .get_value_and_value_hash( + parent_key, + true, + None::<&fn(&[u8]) -> Option>, + ) + .value + .expect("should get value hash") + .expect("value hash should be some"); + + let actual_value_hash = value_hash(&elem_value).unwrap(); + subtrees_metadata.data.insert( + prefix, + (current_path.to_vec(), actual_value_hash, elem_value_hash), + ); + } else { + subtrees_metadata.data.insert( + prefix, + ( + current_path.to_vec(), + CryptoHash::default(), + CryptoHash::default(), + ), + ); + } + } + Ok(subtrees_metadata) + } + + // Fetch a chunk by global chunk id (should be called by ABCI when + // LoadSnapshotChunk method is called) Params: + // global_chunk_id: Global chunk id in the following format: + // [SUBTREE_PREFIX:CHUNK_ID] SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros + // = Root subtree) CHUNK_ID: 0.. bytes (optional) Traversal instructions to + // the root of the given chunk. Traversal instructions are "1" for left, and + // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization + // as a subtree can be big hence traversal instructions for the deepest chunks + // tx: Transaction. Function returns the data by opening merks at given tx. + // TODO: Make this tx optional: None -> Use latest data + // Returns the Chunk proof operators for the requested chunk + pub fn fetch_chunk<'db>( + &'db self, + global_chunk_id: &[u8], + tx: &'db Transaction, + ) -> Result, Error> { + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); + } + + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); + + let mut array = [0u8; 32]; + array.copy_from_slice(chunk_prefix); + let chunk_prefix_key: crate::SubtreePrefix = array; + + let subtrees_metadata = self.get_subtrees_metadata(tx)?; + + match subtrees_metadata.data.get(&chunk_prefix_key) { + Some(path_data) => { + let subtree = &path_data.0; + let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + + let merk = self + .open_transactional_merk_at_path(path.into(), tx, None) + .value?; + + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let chunk_producer_res = ChunkProducer::new(&merk); + match chunk_producer_res { + Ok(mut chunk_producer) => { + let chunk_res = chunk_producer + .chunk(String::from_utf8(chunk_id.to_vec()).unwrap().as_str()); + match chunk_res { + Ok((chunk, _)) => Ok(chunk), + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + } + } + Err(_) => Err(Error::CorruptedData( + "Unable to create Chunk producer".to_string(), + )), + } + } + None => Err(Error::CorruptedData("Prefix not found".to_string())), + } + } + + // Starts a state sync process (should be called by ABCI when OfferSnapshot + // method is called) Params: + // state_sync_info: Consumed StateSyncInfo + // app_hash: Snapshot's AppHash + // tx: Transaction for the state sync + // Returns the first set of global chunk ids that can be fetched from sources (+ + // the StateSyncInfo transferring ownership back to the caller) + pub fn start_snapshot_syncing<'db>( + &'db self, + mut state_sync_info: StateSyncInfo<'db>, + app_hash: CryptoHash, + tx: &'db Transaction, + ) -> Result<(Vec>, StateSyncInfo), Error> { + let mut res = vec![]; + + match ( + &mut state_sync_info.restorer, + &state_sync_info.current_prefix, + ) { + (None, None) => { + if state_sync_info.pending_chunks.is_empty() + && state_sync_info.processed_prefixes.is_empty() + { + let root_prefix = [0u8; 32]; + let merk = self + .open_merk_for_replication(SubtreePath::empty(), tx) + .unwrap(); + let restorer = Restorer::new(merk, app_hash, None); + state_sync_info.restorer = Some(restorer); + state_sync_info.current_prefix = Some(root_prefix); + state_sync_info.pending_chunks.insert(root_prefix.to_vec()); + + res.push(root_prefix.to_vec()); + } else { + return Err(Error::InternalError("Invalid internal state sync info")); + } + } + _ => { + return Err(Error::InternalError( + "GroveDB has already started a snapshot syncing", + )); + } + } + + Ok((res, state_sync_info)) + } + + // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is + // called) Params: + // state_sync_info: Consumed StateSyncInfo + // chunk: (Global chunk id, Chunk proof operators) + // tx: Transaction for the state sync + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the StateSyncInfo transferring ownership back to the caller) + pub fn apply_chunk<'db>( + &'db self, + mut state_sync_info: StateSyncInfo<'db>, + chunk: (&[u8], Vec), + tx: &'db Transaction, + ) -> Result<(Vec>, StateSyncInfo), Error> { + let mut res = vec![]; + + let (global_chunk_id, chunk_data) = chunk; + let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?; + + match ( + &mut state_sync_info.restorer, + &state_sync_info.current_prefix, + ) { + (Some(restorer), Some(ref current_prefix)) => { + if *current_prefix != chunk_prefix { + return Err(Error::InternalError("Invalid incoming prefix")); + } + if !state_sync_info.pending_chunks.contains(global_chunk_id) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected", + )); + } + state_sync_info.pending_chunks.remove(global_chunk_id); + if !chunk_data.is_empty() { + match restorer.process_chunk(chunk_id.to_string(), chunk_data) { + Ok(next_chunk_ids) => { + state_sync_info.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + let mut next_global_chunk_id = chunk_prefix.to_vec(); + next_global_chunk_id.extend(next_chunk_id.as_bytes().to_vec()); + state_sync_info + .pending_chunks + .insert(next_global_chunk_id.clone()); + res.push(next_global_chunk_id); + } + } + _ => { + return Err(Error::InternalError("Unable to process incoming chunk")); + } + }; + } + } + _ => { + return Err(Error::InternalError("GroveDB is not in syncing mode")); + } + } + + if res.is_empty() { + if !state_sync_info.pending_chunks.is_empty() { + return Ok((res, state_sync_info)); + } + match ( + state_sync_info.restorer.take(), + state_sync_info.current_prefix.take(), + ) { + (Some(restorer), Some(current_prefix)) => { + if (state_sync_info.num_processed_chunks > 0) && (restorer.finalize().is_err()) + { + return Err(Error::InternalError("Unable to finalize merk")); + } + state_sync_info.processed_prefixes.insert(current_prefix); + + let subtrees_metadata = self.get_subtrees_metadata(tx)?; + if let Some(value) = subtrees_metadata.data.get(¤t_prefix) { + println!( + " path:{:?} done", + replication::util_path_to_string(&value.0) + ); + } + + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !state_sync_info.processed_prefixes.contains(prefix) { + let (current_path, s_actual_value_hash, s_elem_value_hash) = + &prefix_metadata; + + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + + let merk = self.open_merk_for_replication(path.into(), tx).unwrap(); + let restorer = + Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); + state_sync_info.restorer = Some(restorer); + state_sync_info.current_prefix = Some(*prefix); + state_sync_info.num_processed_chunks = 0; + + let root_chunk_prefix = prefix.to_vec(); + state_sync_info + .pending_chunks + .insert(root_chunk_prefix.clone()); + res.push(root_chunk_prefix); + break; + } + } + } + _ => { + return Err(Error::InternalError("Unable to finalize tree")); + } + } + } + + Ok((res, state_sync_info)) + } +}