From ef48367029ae8e25c3268a731bf0ff7411407d1b Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Sat, 4 May 2024 16:56:34 +0300 Subject: [PATCH 1/5] multi subtrees sync --- grovedb/src/replication.rs | 294 +++++++++++++++++++------------ tutorials/src/bin/replication.rs | 36 +++- 2 files changed, 205 insertions(+), 125 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index f018053e..920af3ff 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -20,23 +20,27 @@ pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -// Struct governing state sync -pub struct StateSyncInfo<'db> { +pub struct SubtreeStateSyncInfo<'db> { // Current Chunk restorer pub restorer: Option>>, - // Set of processed prefixes (Path digests) - pub processed_prefixes: BTreeSet, - // Current processed prefix (Path digest) - pub current_prefix: Option, // Set of global chunk ids requested to be fetched and pending for processing. For the // description of global chunk id check fetch_chunk(). - pub pending_chunks: BTreeSet>, + pub pending_chunks: BTreeSet, // Number of processed chunks in current prefix (Path digest) pub num_processed_chunks: usize, // Version of state sync protocol, pub version: u16, } +// Struct governing state sync +pub struct MultiStateSyncInfo<'db> { + // Map of current processing subtrees + // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo + pub current_prefixes: BTreeMap>, + // Set of processed prefixes (Path digests) + pub processed_prefixes: BTreeSet, +} + // Struct containing information about current subtrees found in GroveDB pub struct SubtreesMetadata { // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent @@ -109,19 +113,25 @@ pub fn util_split_global_chunk_id( #[cfg(feature = "full")] impl GroveDb { - pub fn create_state_sync_info(&self) -> StateSyncInfo { + pub fn create_subtree_state_sync_info(&self) -> SubtreeStateSyncInfo { let pending_chunks = BTreeSet::new(); - let processed_prefixes = BTreeSet::new(); - StateSyncInfo { + SubtreeStateSyncInfo { restorer: None, - processed_prefixes, - current_prefix: None, pending_chunks, num_processed_chunks: 0, version: CURRENT_STATE_SYNC_VERSION, } } + pub fn create_multi_state_sync_info(&self) -> MultiStateSyncInfo { + let processed_prefixes = BTreeSet::new(); + let current_prefixes = BTreeMap::default(); + MultiStateSyncInfo { + current_prefixes, + processed_prefixes, + } + } + // Returns the discovered subtrees found recursively along with their associated // metadata Params: // tx: Transaction. Function returns the data by opening merks at given tx. @@ -304,11 +314,11 @@ impl GroveDb { // the StateSyncInfo transferring ownership back to the caller) pub fn start_snapshot_syncing<'db>( &'db self, - mut state_sync_info: StateSyncInfo<'db>, + mut state_sync_info: MultiStateSyncInfo<'db>, app_hash: CryptoHash, tx: &'db Transaction, version: u16, - ) -> Result<(Vec>, StateSyncInfo), Error> { + ) -> Result<(Vec>, MultiStateSyncInfo), Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( @@ -323,53 +333,43 @@ impl GroveDb { let mut res = vec![]; - match ( - &mut state_sync_info.restorer, - &state_sync_info.current_prefix, - ) { - (None, None) => { - if state_sync_info.pending_chunks.is_empty() - && state_sync_info.processed_prefixes.is_empty() - { - let root_prefix = [0u8; 32]; - if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx) { - let restorer = Restorer::new(merk, app_hash, None); - state_sync_info.restorer = Some(restorer); - state_sync_info.current_prefix = Some(root_prefix); - state_sync_info.pending_chunks.insert(root_prefix.to_vec()); - - res.push(root_prefix.to_vec()); - } else { - return Err(Error::InternalError("Unable to open merk for replication")); - } - } else { - return Err(Error::InternalError("Invalid internal state sync info")); - } - } - _ => { - return Err(Error::InternalError( - "GroveDB has already started a snapshot syncing", - )); - } + if !state_sync_info.current_prefixes.is_empty() || !state_sync_info.processed_prefixes.is_empty() { + return Err(Error::InternalError( + "GroveDB has already started a snapshot syncing", + )); + } + + let mut root_prefix_state_sync_info = self.create_subtree_state_sync_info(); + let root_prefix = [0u8; 32]; + if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx) { + let restorer = Restorer::new(merk, app_hash, None); + root_prefix_state_sync_info.restorer = Some(restorer); + root_prefix_state_sync_info.pending_chunks.insert("".to_string()); + state_sync_info.current_prefixes.insert(root_prefix, root_prefix_state_sync_info); + + res.push(root_prefix.to_vec()); + } else { + return Err(Error::InternalError("Unable to open merk for replication")); } Ok((res, state_sync_info)) } + // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is // called) Params: - // state_sync_info: Consumed StateSyncInfo + // state_sync_info: Consumed MultiStateSyncInfo // chunk: (Global chunk id, Chunk proof operators) // tx: Transaction for the state sync // Returns the next set of global chunk ids that can be fetched from sources (+ - // the StateSyncInfo transferring ownership back to the caller) + // the MultiStateSyncInfo transferring ownership back to the caller) pub fn apply_chunk<'db>( &'db self, - mut state_sync_info: StateSyncInfo<'db>, + mut state_sync_info: MultiStateSyncInfo<'db>, chunk: (&[u8], Vec), tx: &'db Transaction, version: u16, - ) -> Result<(Vec>, StateSyncInfo), Error> { + ) -> Result<(Vec>, MultiStateSyncInfo), Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( @@ -382,36 +382,107 @@ impl GroveDb { )); } - let mut res = vec![]; + let mut next_chunk_ids = vec![]; let (global_chunk_id, chunk_data) = chunk; let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?; - match ( - &mut state_sync_info.restorer, - &state_sync_info.current_prefix, - ) { - (Some(restorer), Some(ref current_prefix)) => { - if *current_prefix != chunk_prefix { - return Err(Error::InternalError("Invalid incoming prefix")); + if state_sync_info.current_prefixes.is_empty() { + return Err(Error::InternalError("GroveDB is not in syncing mode")); + } + if let Some(mut subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { + if let Ok((res, mut new_subtree_state_sync)) = self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) { + if !res.is_empty() { + for local_chunk_id in res.iter() { + let mut next_global_chunk_id = chunk_prefix.to_vec(); + next_global_chunk_id.extend(local_chunk_id.as_bytes().to_vec()); + next_chunk_ids.push(next_global_chunk_id); + } + + // re-insert subtree_state_sync in state_sync_info + state_sync_info.current_prefixes.insert(chunk_prefix, new_subtree_state_sync); + Ok((next_chunk_ids, state_sync_info)) } - if !state_sync_info.pending_chunks.contains(global_chunk_id) { + else { + if !new_subtree_state_sync.pending_chunks.is_empty() { + // re-insert subtree_state_sync in state_sync_info + state_sync_info.current_prefixes.insert(chunk_prefix, new_subtree_state_sync); + return Ok((vec![], state_sync_info)) + } + + // Subtree is finished. We can save it. + match new_subtree_state_sync.restorer.take() { + None => { + return Err(Error::InternalError("Unable to finalize subtree")); + } + Some(restorer) => { + if (new_subtree_state_sync.num_processed_chunks > 0) && (restorer.finalize().is_err()) { + return Err(Error::InternalError("Unable to finalize Merk")); + } + state_sync_info.processed_prefixes.insert(chunk_prefix); + + + // Subtree was successfully save. Time to discover new subtrees that need to be processed + let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; + if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { + println!( + " path:{:?} done num_processed_chunks:{:?}", + replication::util_path_to_string(&value.0), + new_subtree_state_sync.num_processed_chunks + ); + } + + if let Ok((res,new_state_sync_info)) = self.discover_subtrees(state_sync_info, subtrees_metadata, tx) { + next_chunk_ids.extend(res); + Ok((next_chunk_ids, new_state_sync_info)) + } + else { + return Err(Error::InternalError("Unable to discover Subtrees")); + } + } + } + } + } + else { + return Err(Error::InternalError("Unable to process incoming chunk")); + } + } + else { + return Err(Error::InternalError("Invalid incoming prefix")); + } + } + + // Apply a chunk using the given SubtreeStateSyncInfo + // state_sync_info: Consumed SubtreeStateSyncInfo + // chunk_id: Local chunk id + // chunk_data: Chunk proof operators + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the SubtreeStateSyncInfo transferring ownership back to the caller) + fn apply_inner_chunk<'db>( + &'db self, + mut state_sync_info: SubtreeStateSyncInfo<'db>, + chunk_id: &str, + chunk_data: Vec, + ) -> Result<(Vec, SubtreeStateSyncInfo), Error> { + let mut res = vec![]; + + match &mut state_sync_info.restorer { + Some(restorer) => { + if !state_sync_info.pending_chunks.contains(chunk_id) { return Err(Error::InternalError( "Incoming global_chunk_id not expected", )); } - state_sync_info.pending_chunks.remove(global_chunk_id); + state_sync_info.pending_chunks.remove(chunk_id); if !chunk_data.is_empty() { match restorer.process_chunk(&chunk_id, chunk_data) { Ok(next_chunk_ids) => { state_sync_info.num_processed_chunks += 1; for next_chunk_id in next_chunk_ids { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(next_chunk_id.to_vec()); state_sync_info .pending_chunks - .insert(next_global_chunk_id.clone()); - res.push(next_global_chunk_id); + .insert(next_chunk_id.clone()); + res.push(next_chunk_id); } } _ => { @@ -419,70 +490,61 @@ impl GroveDb { } }; } - } + }, _ => { - return Err(Error::InternalError("GroveDB is not in syncing mode")); + return Err(Error::InternalError("Invalid internal state (restorer")); } } - if res.is_empty() { - if !state_sync_info.pending_chunks.is_empty() { - return Ok((res, state_sync_info)); - } - match ( - state_sync_info.restorer.take(), - state_sync_info.current_prefix.take(), - ) { - (Some(restorer), Some(current_prefix)) => { - if (state_sync_info.num_processed_chunks > 0) && (restorer.finalize().is_err()) - { - return Err(Error::InternalError("Unable to finalize merk")); - } - state_sync_info.processed_prefixes.insert(current_prefix); - - let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; - if let Some(value) = subtrees_metadata.data.get(¤t_prefix) { - println!( - " path:{:?} done", - replication::util_path_to_string(&value.0) - ); - } + Ok((res, state_sync_info)) + } - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) { - let (current_path, s_actual_value_hash, s_elem_value_hash) = - &prefix_metadata; + // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in subtrees_metadata + // and returns the root global chunk ids for all of those new subtrees. + // state_sync_info: Consumed MultiStateSyncInfo + // subtrees_metadata: Metadata about discovered subtrees + // chunk_data: Chunk proof operators + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the MultiStateSyncInfo transferring ownership back to the caller) + fn discover_subtrees<'db>( + &'db self, + mut state_sync_info: MultiStateSyncInfo<'db>, + subtrees_metadata: SubtreesMetadata, + tx: &'db Transaction, + ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + let mut res = vec![]; - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !state_sync_info.processed_prefixes.contains(prefix) && !state_sync_info.current_prefixes.contains_key(prefix){ + let (current_path, s_actual_value_hash, s_elem_value_hash) = + &prefix_metadata; - if let Ok(merk) = self.open_merk_for_replication(path.into(), tx) { - let restorer = Restorer::new( - merk, - *s_elem_value_hash, - Some(*s_actual_value_hash), - ); - state_sync_info.restorer = Some(restorer); - state_sync_info.current_prefix = Some(*prefix); - state_sync_info.num_processed_chunks = 0; + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + println!( + " starting:{:?}...", + replication::util_path_to_string(&prefix_metadata.0) + ); + + let mut subtree_state_sync_info = self.create_subtree_state_sync_info(); + if let Ok(merk) = self.open_merk_for_replication(path.into(), tx) { + let restorer = Restorer::new( + merk, + *s_elem_value_hash, + Some(*s_actual_value_hash), + ); + subtree_state_sync_info.restorer = Some(restorer); + subtree_state_sync_info.pending_chunks.insert("".to_string()); - let root_chunk_prefix = prefix.to_vec(); - state_sync_info - .pending_chunks - .insert(root_chunk_prefix.clone()); - res.push(root_chunk_prefix); - } else { - return Err(Error::InternalError( - "Unable to open merk for replication", - )); - } - break; - } - } - } - _ => { - return Err(Error::InternalError("Unable to finalize tree")); + state_sync_info.current_prefixes.insert(*prefix, subtree_state_sync_info); + + let root_chunk_prefix = prefix.to_vec(); + res.push(root_chunk_prefix.to_vec()); + } else { + return Err(Error::InternalError( + "Unable to open Merk for replication", + )); } } } diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index 5911c8e7..9f2ba25e 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -1,16 +1,18 @@ use std::collections::VecDeque; use std::path::Path; -use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction, replication::StateSyncInfo}; +use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction}; use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; -use grovedb_path::{SubtreePath}; +use grovedb::replication::MultiStateSyncInfo; const MAIN_ΚΕΥ: &[u8] = b"key_main"; const MAIN_ΚΕΥ_EMPTY: &[u8] = b"key_main_empty"; const KEY_INT_0: &[u8] = b"key_int_0"; +const KEY_INT_1: &[u8] = b"key_int_1"; +const KEY_INT_2: &[u8] = b"key_int_2"; const KEY_INT_REF_0: &[u8] = b"key_int_ref_0"; const KEY_INT_A: &[u8] = b"key_sum_0"; const ROOT_PATH: &[&[u8]] = &[]; @@ -29,14 +31,30 @@ fn populate_db(grovedb_path: String) -> GroveDb { insert_empty_tree_db(&db, ROOT_PATH, MAIN_ΚΕΥ); insert_empty_tree_db(&db, ROOT_PATH, MAIN_ΚΕΥ_EMPTY); insert_empty_tree_db(&db, &[MAIN_ΚΕΥ], KEY_INT_0); + insert_empty_tree_db(&db, &[MAIN_ΚΕΥ], KEY_INT_1); + insert_empty_tree_db(&db, &[MAIN_ΚΕΥ], KEY_INT_2); let tx = db.start_transaction(); - let batch_size = 100; - for i in 0..=10 { + let batch_size = 50; + for i in 0..=5 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_0], i * batch_size, i * batch_size + batch_size - 1, &tx); } let _ = db.commit_transaction(tx); + let tx = db.start_transaction(); + let batch_size = 50; + for i in 0..=5 { + insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_1], i * batch_size, i * batch_size + batch_size - 1, &tx); + } + let _ = db.commit_transaction(tx); + + let tx = db.start_transaction(); + let batch_size = 50; + for i in 0..=5 { + insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_2], i * batch_size, i * batch_size + batch_size - 1, &tx); + } + let _ = db.commit_transaction(tx); + insert_empty_tree_db(&db, &[MAIN_ΚΕΥ], KEY_INT_REF_0); let tx_2 = db.start_transaction(); @@ -46,8 +64,8 @@ fn populate_db(grovedb_path: String) -> GroveDb { insert_empty_sum_tree_db(&db, &[MAIN_ΚΕΥ], KEY_INT_A); let tx_3 = db.start_transaction(); - insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_A], 1, 100, &tx_3); - insert_sum_element_db(&db, &[MAIN_ΚΕΥ, KEY_INT_A], 101, 150, &tx_3); + insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_A], 1, 500, &tx_3); + insert_sum_element_db(&db, &[MAIN_ΚΕΥ, KEY_INT_A], 501, 550, &tx_3); let _ = db.commit_transaction(tx_3); db } @@ -83,7 +101,7 @@ fn main() { println!("{:?}", subtrees_metadata_source); println!("\n######### db_checkpoint_0 -> db_destination state sync"); - let state_info = db_destination.create_state_sync_info(); + let state_info = db_destination.create_multi_state_sync_info(); let tx = db_destination.start_transaction(); sync_db_demo(&db_checkpoint_0, &db_destination, state_info, &tx).unwrap(); db_destination.commit_transaction(tx).unwrap().expect("expected to commit transaction"); @@ -204,7 +222,7 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec) { let path_query = PathQuery::new_unsized(path_vec, query.clone()); let (elements, _) = db - .query_item_value(&path_query, true, false,true, None) + .query_item_value(&path_query, true, false, true, None) .unwrap() .expect("expected successful get_path_query"); for e in elements.into_iter() { @@ -223,7 +241,7 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec) { fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, - state_sync_info: StateSyncInfo, + state_sync_info: MultiStateSyncInfo, target_tx: &Transaction, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); From 1824bf844511a1d6668f722f463a382fb8e481d0 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Wed, 8 May 2024 13:24:00 +0300 Subject: [PATCH 2/5] adjustements --- grovedb/src/replication.rs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 920af3ff..0de57a78 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -20,16 +20,14 @@ pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -pub struct SubtreeStateSyncInfo<'db> { +struct SubtreeStateSyncInfo<'db> { // Current Chunk restorer pub restorer: Option>>, // Set of global chunk ids requested to be fetched and pending for processing. For the // description of global chunk id check fetch_chunk(). - pub pending_chunks: BTreeSet, + pub pending_chunks: BTreeSet>, // Number of processed chunks in current prefix (Path digest) pub num_processed_chunks: usize, - // Version of state sync protocol, - pub version: u16, } // Struct governing state sync @@ -39,6 +37,8 @@ pub struct MultiStateSyncInfo<'db> { pub current_prefixes: BTreeMap>, // Set of processed prefixes (Path digests) pub processed_prefixes: BTreeSet, + // Version of state sync protocol, + pub version: u16, } // Struct containing information about current subtrees found in GroveDB @@ -70,7 +70,7 @@ impl fmt::Debug for SubtreesMetadata { let metadata_path_str = util_path_to_string(metadata_path); writeln!( f, - " prefix:{:?} -> path:{:?}\n", + " prefix:{:?} -> path:{:?}", hex::encode(prefix), metadata_path_str ); @@ -119,7 +119,6 @@ impl GroveDb { restorer: None, pending_chunks, num_processed_chunks: 0, - version: CURRENT_STATE_SYNC_VERSION, } } @@ -129,6 +128,7 @@ impl GroveDb { MultiStateSyncInfo { current_prefixes, processed_prefixes, + version: CURRENT_STATE_SYNC_VERSION, } } @@ -339,12 +339,17 @@ impl GroveDb { )); } + println!( + " starting:{:?}...", + replication::util_path_to_string(&vec![]) + ); + let mut root_prefix_state_sync_info = self.create_subtree_state_sync_info(); let root_prefix = [0u8; 32]; if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx) { let restorer = Restorer::new(merk, app_hash, None); root_prefix_state_sync_info.restorer = Some(restorer); - root_prefix_state_sync_info.pending_chunks.insert("".to_string()); + root_prefix_state_sync_info.pending_chunks.insert(vec![]); state_sync_info.current_prefixes.insert(root_prefix, root_prefix_state_sync_info); res.push(root_prefix.to_vec()); @@ -395,7 +400,7 @@ impl GroveDb { if !res.is_empty() { for local_chunk_id in res.iter() { let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.as_bytes().to_vec()); + next_global_chunk_id.extend(local_chunk_id.to_vec()); next_chunk_ids.push(next_global_chunk_id); } @@ -426,7 +431,7 @@ impl GroveDb { let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { println!( - " path:{:?} done num_processed_chunks:{:?}", + " path:{:?} done (num_processed_chunks:{:?})", replication::util_path_to_string(&value.0), new_subtree_state_sync.num_processed_chunks ); @@ -461,9 +466,9 @@ impl GroveDb { fn apply_inner_chunk<'db>( &'db self, mut state_sync_info: SubtreeStateSyncInfo<'db>, - chunk_id: &str, + chunk_id: &[u8], chunk_data: Vec, - ) -> Result<(Vec, SubtreeStateSyncInfo), Error> { + ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { let mut res = vec![]; match &mut state_sync_info.restorer { @@ -523,7 +528,7 @@ impl GroveDb { current_path.iter().map(|vec| vec.as_slice()).collect(); let path: &[&[u8]] = &subtree_path; println!( - " starting:{:?}...", + " path:{:?} starting...", replication::util_path_to_string(&prefix_metadata.0) ); @@ -535,7 +540,7 @@ impl GroveDb { Some(*s_actual_value_hash), ); subtree_state_sync_info.restorer = Some(restorer); - subtree_state_sync_info.pending_chunks.insert("".to_string()); + subtree_state_sync_info.pending_chunks.insert(vec![]); state_sync_info.current_prefixes.insert(*prefix, subtree_state_sync_info); From 0bafb9f2ca8eb6d92439f4224b7058da87cfe933 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Wed, 8 May 2024 18:40:38 +0300 Subject: [PATCH 3/5] fmt --- grovedb/src/replication.rs | 84 ++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 0de57a78..db74a62b 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -333,7 +333,9 @@ impl GroveDb { let mut res = vec![]; - if !state_sync_info.current_prefixes.is_empty() || !state_sync_info.processed_prefixes.is_empty() { + if !state_sync_info.current_prefixes.is_empty() + || !state_sync_info.processed_prefixes.is_empty() + { return Err(Error::InternalError( "GroveDB has already started a snapshot syncing", )); @@ -350,7 +352,9 @@ impl GroveDb { let restorer = Restorer::new(merk, app_hash, None); root_prefix_state_sync_info.restorer = Some(restorer); root_prefix_state_sync_info.pending_chunks.insert(vec![]); - state_sync_info.current_prefixes.insert(root_prefix, root_prefix_state_sync_info); + state_sync_info + .current_prefixes + .insert(root_prefix, root_prefix_state_sync_info); res.push(root_prefix.to_vec()); } else { @@ -360,7 +364,6 @@ impl GroveDb { Ok((res, state_sync_info)) } - // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is // called) Params: // state_sync_info: Consumed MultiStateSyncInfo @@ -395,8 +398,11 @@ impl GroveDb { if state_sync_info.current_prefixes.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); } - if let Some(mut subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { - if let Ok((res, mut new_subtree_state_sync)) = self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) { + if let Some(mut subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) + { + if let Ok((res, mut new_subtree_state_sync)) = + self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) + { if !res.is_empty() { for local_chunk_id in res.iter() { let mut next_global_chunk_id = chunk_prefix.to_vec(); @@ -405,14 +411,17 @@ impl GroveDb { } // re-insert subtree_state_sync in state_sync_info - state_sync_info.current_prefixes.insert(chunk_prefix, new_subtree_state_sync); + state_sync_info + .current_prefixes + .insert(chunk_prefix, new_subtree_state_sync); Ok((next_chunk_ids, state_sync_info)) - } - else { + } else { if !new_subtree_state_sync.pending_chunks.is_empty() { // re-insert subtree_state_sync in state_sync_info - state_sync_info.current_prefixes.insert(chunk_prefix, new_subtree_state_sync); - return Ok((vec![], state_sync_info)) + state_sync_info + .current_prefixes + .insert(chunk_prefix, new_subtree_state_sync); + return Ok((vec![], state_sync_info)); } // Subtree is finished. We can save it. @@ -421,13 +430,15 @@ impl GroveDb { return Err(Error::InternalError("Unable to finalize subtree")); } Some(restorer) => { - if (new_subtree_state_sync.num_processed_chunks > 0) && (restorer.finalize().is_err()) { + if (new_subtree_state_sync.num_processed_chunks > 0) + && (restorer.finalize().is_err()) + { return Err(Error::InternalError("Unable to finalize Merk")); } state_sync_info.processed_prefixes.insert(chunk_prefix); - - // Subtree was successfully save. Time to discover new subtrees that need to be processed + // Subtree was successfully save. Time to discover new subtrees that + // need to be processed let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { println!( @@ -437,22 +448,21 @@ impl GroveDb { ); } - if let Ok((res,new_state_sync_info)) = self.discover_subtrees(state_sync_info, subtrees_metadata, tx) { + if let Ok((res, new_state_sync_info)) = + self.discover_subtrees(state_sync_info, subtrees_metadata, tx) + { next_chunk_ids.extend(res); Ok((next_chunk_ids, new_state_sync_info)) - } - else { + } else { return Err(Error::InternalError("Unable to discover Subtrees")); } } } } - } - else { + } else { return Err(Error::InternalError("Unable to process incoming chunk")); } - } - else { + } else { return Err(Error::InternalError("Invalid incoming prefix")); } } @@ -484,9 +494,7 @@ impl GroveDb { Ok(next_chunk_ids) => { state_sync_info.num_processed_chunks += 1; for next_chunk_id in next_chunk_ids { - state_sync_info - .pending_chunks - .insert(next_chunk_id.clone()); + state_sync_info.pending_chunks.insert(next_chunk_id.clone()); res.push(next_chunk_id); } } @@ -495,7 +503,7 @@ impl GroveDb { } }; } - }, + } _ => { return Err(Error::InternalError("Invalid internal state (restorer")); } @@ -504,9 +512,9 @@ impl GroveDb { Ok((res, state_sync_info)) } - // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in subtrees_metadata - // and returns the root global chunk ids for all of those new subtrees. - // state_sync_info: Consumed MultiStateSyncInfo + // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in + // subtrees_metadata and returns the root global chunk ids for all of those + // new subtrees. state_sync_info: Consumed MultiStateSyncInfo // subtrees_metadata: Metadata about discovered subtrees // chunk_data: Chunk proof operators // Returns the next set of global chunk ids that can be fetched from sources (+ @@ -520,9 +528,10 @@ impl GroveDb { let mut res = vec![]; for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) && !state_sync_info.current_prefixes.contains_key(prefix){ - let (current_path, s_actual_value_hash, s_elem_value_hash) = - &prefix_metadata; + if !state_sync_info.processed_prefixes.contains(prefix) + && !state_sync_info.current_prefixes.contains_key(prefix) + { + let (current_path, s_actual_value_hash, s_elem_value_hash) = &prefix_metadata; let subtree_path: Vec<&[u8]> = current_path.iter().map(|vec| vec.as_slice()).collect(); @@ -534,22 +543,19 @@ impl GroveDb { let mut subtree_state_sync_info = self.create_subtree_state_sync_info(); if let Ok(merk) = self.open_merk_for_replication(path.into(), tx) { - let restorer = Restorer::new( - merk, - *s_elem_value_hash, - Some(*s_actual_value_hash), - ); + let restorer = + Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); subtree_state_sync_info.restorer = Some(restorer); subtree_state_sync_info.pending_chunks.insert(vec![]); - state_sync_info.current_prefixes.insert(*prefix, subtree_state_sync_info); + state_sync_info + .current_prefixes + .insert(*prefix, subtree_state_sync_info); let root_chunk_prefix = prefix.to_vec(); res.push(root_chunk_prefix.to_vec()); } else { - return Err(Error::InternalError( - "Unable to open Merk for replication", - )); + return Err(Error::InternalError("Unable to open Merk for replication")); } } } From 0ee72658dcec024003601e11ad0954a73442fc79 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 9 May 2024 11:10:33 +0300 Subject: [PATCH 4/5] clippy warnings --- grovedb/src/replication.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index db74a62b..9aa1ba5b 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -22,23 +22,23 @@ pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; struct SubtreeStateSyncInfo<'db> { // Current Chunk restorer - pub restorer: Option>>, + restorer: Option>>, // Set of global chunk ids requested to be fetched and pending for processing. For the // description of global chunk id check fetch_chunk(). - pub pending_chunks: BTreeSet>, + pending_chunks: BTreeSet>, // Number of processed chunks in current prefix (Path digest) - pub num_processed_chunks: usize, + num_processed_chunks: usize, } // Struct governing state sync pub struct MultiStateSyncInfo<'db> { // Map of current processing subtrees // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo - pub current_prefixes: BTreeMap>, + current_prefixes: BTreeMap>, // Set of processed prefixes (Path digests) - pub processed_prefixes: BTreeSet, + processed_prefixes: BTreeSet, // Version of state sync protocol, - pub version: u16, + version: u16, } // Struct containing information about current subtrees found in GroveDB @@ -113,7 +113,7 @@ pub fn util_split_global_chunk_id( #[cfg(feature = "full")] impl GroveDb { - pub fn create_subtree_state_sync_info(&self) -> SubtreeStateSyncInfo { + fn create_subtree_state_sync_info(&self) -> SubtreeStateSyncInfo { let pending_chunks = BTreeSet::new(); SubtreeStateSyncInfo { restorer: None, @@ -343,7 +343,7 @@ impl GroveDb { println!( " starting:{:?}...", - replication::util_path_to_string(&vec![]) + replication::util_path_to_string(&[]) ); let mut root_prefix_state_sync_info = self.create_subtree_state_sync_info(); @@ -398,7 +398,7 @@ impl GroveDb { if state_sync_info.current_prefixes.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); } - if let Some(mut subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) + if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { if let Ok((res, mut new_subtree_state_sync)) = self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) @@ -427,7 +427,7 @@ impl GroveDb { // Subtree is finished. We can save it. match new_subtree_state_sync.restorer.take() { None => { - return Err(Error::InternalError("Unable to finalize subtree")); + Err(Error::InternalError("Unable to finalize subtree")) } Some(restorer) => { if (new_subtree_state_sync.num_processed_chunks > 0) @@ -454,16 +454,16 @@ impl GroveDb { next_chunk_ids.extend(res); Ok((next_chunk_ids, new_state_sync_info)) } else { - return Err(Error::InternalError("Unable to discover Subtrees")); + Err(Error::InternalError("Unable to discover Subtrees")) } } } } } else { - return Err(Error::InternalError("Unable to process incoming chunk")); + Err(Error::InternalError("Unable to process incoming chunk")) } } else { - return Err(Error::InternalError("Invalid incoming prefix")); + Err(Error::InternalError("Invalid incoming prefix")) } } @@ -490,7 +490,7 @@ impl GroveDb { } state_sync_info.pending_chunks.remove(chunk_id); if !chunk_data.is_empty() { - match restorer.process_chunk(&chunk_id, chunk_data) { + match restorer.process_chunk(chunk_id, chunk_data) { Ok(next_chunk_ids) => { state_sync_info.num_processed_chunks += 1; for next_chunk_id in next_chunk_ids { From a7e0008a59eebb1b239ef2f3debe7d5bfdfdec0b Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 9 May 2024 11:12:26 +0300 Subject: [PATCH 5/5] fmt --- grovedb/src/replication.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 9aa1ba5b..6cb9e3fb 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -398,8 +398,7 @@ impl GroveDb { if state_sync_info.current_prefixes.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); } - if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) - { + if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { if let Ok((res, mut new_subtree_state_sync)) = self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) { @@ -426,9 +425,7 @@ impl GroveDb { // Subtree is finished. We can save it. match new_subtree_state_sync.restorer.take() { - None => { - Err(Error::InternalError("Unable to finalize subtree")) - } + None => Err(Error::InternalError("Unable to finalize subtree")), Some(restorer) => { if (new_subtree_state_sync.num_processed_chunks > 0) && (restorer.finalize().is_err())