From 9b26d2583326ee563dc9ad6fe482d1e3fbebf933 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 3 Jun 2024 21:41:03 +0300 Subject: [PATCH 01/10] state sync chunk lifetime fixes --- grovedb/src/lib.rs | 6 +- grovedb/src/replication.rs | 456 +++--------------- grovedb/src/replication/state_sync_session.rs | 377 +++++++++++++++ tutorials/src/bin/replication.rs | 18 +- 4 files changed, 459 insertions(+), 398 deletions(-) create mode 100644 grovedb/src/replication/state_sync_session.rs diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index fd11f10d..a68dc1b3 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -319,11 +319,11 @@ impl GroveDb { /// Opens a Merk at given path for with direct write access. Intended for /// replication purposes. - fn open_merk_for_replication<'db, 'b, B>( + fn open_merk_for_replication<'tx, 'db: 'tx, 'b, B>( &'db self, path: SubtreePath<'b, B>, - tx: &'db Transaction, - ) -> Result>, Error> + tx: &'tx Transaction<'db>, + ) -> Result>, Error> where B: AsRef<[u8]> + 'b, { diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 11b0cb6c..93c42a29 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -1,170 +1,40 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, -}; +mod state_sync_session; + +use std::pin::Pin; use grovedb_merk::{ ed::Encode, - merk::restore::Restorer, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, }; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::RocksDbStorage; -#[rustfmt::skip] -use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; -use crate::{replication, Error, GroveDb, Transaction, TransactionArg}; - -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub use self::state_sync_session::MultiStateSyncSession; +use self::state_sync_session::SubtreesMetadata; +use crate::{Error, GroveDb, TransactionArg}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -#[derive(Default)] -struct SubtreeStateSyncInfo<'db> { - // Current Chunk restorer - restorer: Option>>, - // Set of global chunk ids requested to be fetched and pending for processing. For the - // description of global chunk id check fetch_chunk(). - pending_chunks: BTreeSet>, - // Number of processed chunks in current prefix (Path digest) - num_processed_chunks: usize, -} - -impl<'a> SubtreeStateSyncInfo<'a> { - // Function to create an instance of SubtreeStateSyncInfo with default values - pub fn new() -> Self { - Self::default() - } -} - -// Struct governing state sync -pub struct MultiStateSyncInfo<'db> { - // Map of current processing subtrees - // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo - current_prefixes: BTreeMap>, - // Set of processed prefixes (Path digests) - processed_prefixes: BTreeSet, - // Version of state sync protocol, - version: u16, -} - -impl<'db> Default for MultiStateSyncInfo<'db> { - fn default() -> Self { - Self { - current_prefixes: BTreeMap::new(), - processed_prefixes: BTreeSet::new(), - version: CURRENT_STATE_SYNC_VERSION, - } - } -} - -// Struct containing information about current subtrees found in GroveDB -pub struct SubtreesMetadata { - // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent - // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree - // elem_value_hash are needed when verifying the new constructed subtree after wards. - pub data: BTreeMap>, CryptoHash, CryptoHash)>, -} - -impl SubtreesMetadata { - pub fn new() -> SubtreesMetadata { - SubtreesMetadata { - data: BTreeMap::new(), - } - } -} - -impl Default for SubtreesMetadata { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for SubtreesMetadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); - writeln!( - f, - " prefix:{:?} -> path:{:?}", - hex::encode(prefix), - metadata_path_str - ); - } - Ok(()) - } -} - -// Converts a path into a human-readable string (for debugging) -pub fn util_path_to_string(path: &[Vec]) -> Vec { - let mut subtree_path_str: Vec = vec![]; - for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); - subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), - ); - } - subtree_path_str -} - -// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] -pub fn util_split_global_chunk_id( - global_chunk_id: &[u8], -) -> Result<(crate::SubtreePrefix, Vec), Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; - Ok((chunk_prefix_key, chunk_id.to_vec())) -} - -pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { - let mut res = vec![]; - for op in chunk { - op.encode_into(&mut res) - .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; +#[cfg(feature = "full")] +impl GroveDb { + pub fn start_syncing_session(&self) -> Pin> { + MultiStateSyncSession::new(self.start_transaction()) } - Ok(res) -} -pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { - let decoder = Decoder::new(&chunk); - let mut res = vec![]; - for op in decoder { - match op { - Ok(op) => res.push(op), - Err(e) => { - return Err(Error::CorruptedData(format!( - "unable to decode chunk: {}", - e - ))); - } - } + pub fn commit_session(&self, session: Pin>) { + // we do not care about the cost + let _ = self.commit_transaction(session.into_transaction()); } - Ok(res) -} -#[cfg(feature = "full")] -impl GroveDb { // Returns the discovered subtrees found recursively along with their associated // metadata Params: // tx: Transaction. Function returns the data by opening merks at given tx. // TODO: Add a SubTreePath as param and start searching from that path instead // of root (as it is now) pub fn get_subtrees_metadata(&self, tx: TransactionArg) -> Result { - let mut subtrees_metadata = crate::replication::SubtreesMetadata::new(); + let mut subtrees_metadata = SubtreesMetadata::new(); let subtrees_root = self.find_subtrees(&SubtreePath::empty(), tx).value?; for subtree in subtrees_root.into_iter() { @@ -341,271 +211,87 @@ impl GroveDb { } } - // Starts a state sync process (should be called by ABCI when OfferSnapshot - // method is called) Params: - // state_sync_info: Consumed StateSyncInfo - // app_hash: Snapshot's AppHash - // tx: Transaction for the state sync - // Returns the first set of global chunk ids that can be fetched from sources (+ - // the StateSyncInfo transferring ownership back to the caller) + /// Starts a state sync process of a snapshot with `app_hash` root hash, + /// should be called by ABCI when OfferSnapshot method is called. + /// Returns the first set of global chunk ids that can be fetched from + /// sources and a new sync session. pub fn start_snapshot_syncing<'db>( &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, app_hash: CryptoHash, - tx: &'db Transaction, version: u16, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + ) -> Result<(Vec>, Pin>>), Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( "Unsupported state sync protocol version".to_string(), )); } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - - let mut res = vec![]; - - if !state_sync_info.current_prefixes.is_empty() - || !state_sync_info.processed_prefixes.is_empty() - { - return Err(Error::InternalError( - "GroveDB has already started a snapshot syncing", - )); - } - println!( - " starting:{:?}...", - replication::util_path_to_string(&[]) - ); + println!(" starting:{:?}...", util_path_to_string(&[])); - let mut root_prefix_state_sync_info = SubtreeStateSyncInfo::default(); let root_prefix = [0u8; 32]; - if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx) { - let restorer = Restorer::new(merk, app_hash, None); - root_prefix_state_sync_info.restorer = Some(restorer); - root_prefix_state_sync_info.pending_chunks.insert(vec![]); - state_sync_info - .current_prefixes - .insert(root_prefix, root_prefix_state_sync_info); - res.push(root_prefix.to_vec()); - } else { - return Err(Error::InternalError("Unable to open merk for replication")); - } + let mut session = self.start_syncing_session(); + session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?; - Ok((res, state_sync_info)) + Ok((vec![root_prefix.to_vec()], session)) } +} - // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is - // called) Params: - // state_sync_info: Consumed MultiStateSyncInfo - // chunk: (Global chunk id, Chunk proof operators encoded in bytes) - // tx: Transaction for the state sync - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - pub fn apply_chunk<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - chunk: (&[u8], Vec), - tx: &'db Transaction, - version: u16, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - // For now, only CURRENT_STATE_SYNC_VERSION is supported - if version != CURRENT_STATE_SYNC_VERSION { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - - let mut next_chunk_ids = vec![]; - - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?; - - if state_sync_info.current_prefixes.is_empty() { - return Err(Error::InternalError("GroveDB is not in syncing mode")); - } - if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { - if let Ok((res, mut new_subtree_state_sync)) = - self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) - { - if !res.is_empty() { - for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); - } - - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - Ok((next_chunk_ids, state_sync_info)) - } else { - if !new_subtree_state_sync.pending_chunks.is_empty() { - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - return Ok((vec![], state_sync_info)); - } - - // Subtree is finished. We can save it. - match new_subtree_state_sync.restorer.take() { - None => Err(Error::InternalError("Unable to finalize subtree")), - Some(restorer) => { - if (new_subtree_state_sync.num_processed_chunks > 0) - && (restorer.finalize().is_err()) - { - return Err(Error::InternalError("Unable to finalize Merk")); - } - state_sync_info.processed_prefixes.insert(chunk_prefix); - - // Subtree was successfully save. Time to discover new subtrees that - // need to be processed - let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; - if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { - println!( - " path:{:?} done (num_processed_chunks:{:?})", - replication::util_path_to_string(&value.0), - new_subtree_state_sync.num_processed_chunks - ); - } - - if let Ok((res, new_state_sync_info)) = - self.discover_subtrees(state_sync_info, subtrees_metadata, tx) - { - next_chunk_ids.extend(res); - Ok((next_chunk_ids, new_state_sync_info)) - } else { - Err(Error::InternalError("Unable to discover Subtrees")) - } - } - } - } - } else { - Err(Error::InternalError("Unable to process incoming chunk")) - } - } else { - Err(Error::InternalError("Invalid incoming prefix")) - } +// Converts a path into a human-readable string (for debugging) +pub fn util_path_to_string(path: &[Vec]) -> Vec { + let mut subtree_path_str: Vec = vec![]; + for subtree in path { + let string = std::str::from_utf8(subtree).expect("should be able to convert path"); + subtree_path_str.push( + string + .parse() + .expect("should be able to parse path to string"), + ); } + subtree_path_str +} - // Apply a chunk using the given SubtreeStateSyncInfo - // state_sync_info: Consumed SubtreeStateSyncInfo - // chunk_id: Local chunk id - // chunk_data: Chunk proof operators encoded in bytes - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the SubtreeStateSyncInfo transferring ownership back to the caller) - fn apply_inner_chunk<'db>( - &'db self, - mut state_sync_info: SubtreeStateSyncInfo<'db>, - chunk_id: &[u8], - chunk_data: Vec, - ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { - let mut res = vec![]; - - match &mut state_sync_info.restorer { - Some(restorer) => { - if !state_sync_info.pending_chunks.contains(chunk_id) { - return Err(Error::InternalError( - "Incoming global_chunk_id not expected", - )); - } - state_sync_info.pending_chunks.remove(chunk_id); - if !chunk_data.is_empty() { - match util_decode_vec_ops(chunk_data) { - Ok(ops) => { - match restorer.process_chunk(chunk_id, ops) { - Ok(next_chunk_ids) => { - state_sync_info.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - state_sync_info - .pending_chunks - .insert(next_chunk_id.clone()); - res.push(next_chunk_id); - } - } - _ => { - return Err(Error::InternalError( - "Unable to process incoming chunk", - )); - } - }; - } - Err(_) => { - return Err(Error::CorruptedData( - "Unable to decode incoming chunk".to_string(), - )); - } - } - } - } - _ => { - return Err(Error::InternalError("Invalid internal state (restorer")); - } - } - - Ok((res, state_sync_info)) +// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] +pub fn util_split_global_chunk_id( + global_chunk_id: &[u8], +) -> Result<(crate::SubtreePrefix, Vec), Error> { + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); } - // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in - // subtrees_metadata and returns the root global chunk ids for all of those - // new subtrees. state_sync_info: Consumed MultiStateSyncInfo - // subtrees_metadata: Metadata about discovered subtrees - // chunk_data: Chunk proof operators - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - fn discover_subtrees<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - subtrees_metadata: SubtreesMetadata, - tx: &'db Transaction, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - let mut res = vec![]; - - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) - && !state_sync_info.current_prefixes.contains_key(prefix) - { - let (current_path, s_actual_value_hash, s_elem_value_hash) = &prefix_metadata; - - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - println!( - " path:{:?} starting...", - replication::util_path_to_string(&prefix_metadata.0) - ); - - let mut subtree_state_sync_info = SubtreeStateSyncInfo::default(); - if let Ok(merk) = self.open_merk_for_replication(path.into(), tx) { - let restorer = - Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); - subtree_state_sync_info.restorer = Some(restorer); - subtree_state_sync_info.pending_chunks.insert(vec![]); + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); + let mut array = [0u8; 32]; + array.copy_from_slice(chunk_prefix); + let chunk_prefix_key: crate::SubtreePrefix = array; + Ok((chunk_prefix_key, chunk_id.to_vec())) +} - state_sync_info - .current_prefixes - .insert(*prefix, subtree_state_sync_info); +pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { + let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) +} - let root_chunk_prefix = prefix.to_vec(); - res.push(root_chunk_prefix.to_vec()); - } else { - return Err(Error::InternalError("Unable to open Merk for replication")); - } +pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); } } - - Ok((res, state_sync_info)) } + Ok(res) } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs new file mode 100644 index 00000000..bfc12b51 --- /dev/null +++ b/grovedb/src/replication/state_sync_session.rs @@ -0,0 +1,377 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt, + marker::PhantomPinned, + pin::Pin, +}; + +use grovedb_merk::{CryptoHash, Restorer}; +use grovedb_path::SubtreePath; +use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; + +use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION}; +use crate::{replication::util_path_to_string, Error, GroveDb, Transaction}; + +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; + +struct SubtreeStateSyncInfo<'db> { + /// Current Chunk restorer + restorer: Restorer>, + /// Set of global chunk ids requested to be fetched and pending for + /// processing. For the description of global chunk id check + /// fetch_chunk(). + pending_chunks: BTreeSet>, + /// Number of processed chunks in current prefix (Path digest) + num_processed_chunks: usize, +} + +impl<'db> SubtreeStateSyncInfo<'db> { + // Apply a chunk using the given SubtreeStateSyncInfo + // state_sync_info: Consumed SubtreeStateSyncInfo + // chunk_id: Local chunk id + // chunk_data: Chunk proof operators encoded in bytes + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the SubtreeStateSyncInfo transferring ownership back to the caller) + fn apply_inner_chunk( + &mut self, + chunk_id: &[u8], + chunk_data: Vec, + ) -> Result>, Error> { + let mut res = vec![]; + + if !self.pending_chunks.contains(chunk_id) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected", + )); + } + self.pending_chunks.remove(chunk_id); + if !chunk_data.is_empty() { + match util_decode_vec_ops(chunk_data) { + Ok(ops) => { + match self.restorer.process_chunk(chunk_id, ops) { + Ok(next_chunk_ids) => { + self.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + self.pending_chunks.insert(next_chunk_id.clone()); + res.push(next_chunk_id); + } + } + _ => { + return Err(Error::InternalError("Unable to process incoming chunk")); + } + }; + } + Err(_) => { + return Err(Error::CorruptedData( + "Unable to decode incoming chunk".to_string(), + )); + } + } + } + + Ok(res) + } +} + +impl<'tx> SubtreeStateSyncInfo<'tx> { + pub fn new(restorer: Restorer>) -> Self { + SubtreeStateSyncInfo { + restorer, + pending_chunks: Default::default(), + num_processed_chunks: 0, + } + } +} + +// Struct governing state sync +pub struct MultiStateSyncSession<'db> { + // Map of current processing subtrees + // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo + current_prefixes: BTreeMap>, + // Set of processed prefixes (Path digests) + processed_prefixes: BTreeSet, + // Version of state sync protocol, + pub(crate) version: u16, + // Transaction goes last to be dropped last as well + transaction: Transaction<'db>, + _pin: PhantomPinned, +} + +impl<'db> MultiStateSyncSession<'db> { + /// Initializes a new state sync session. + pub fn new(transaction: Transaction<'db>) -> Pin> { + Box::pin(MultiStateSyncSession { + transaction, + current_prefixes: Default::default(), + processed_prefixes: Default::default(), + version: CURRENT_STATE_SYNC_VERSION, + _pin: PhantomPinned, + }) + } + + pub fn is_empty(&self) -> bool { + self.current_prefixes.is_empty() + } + + pub fn is_sync_completed(&self) -> bool { + for (_, subtree_state_info) in self.current_prefixes.iter() { + if !subtree_state_info.pending_chunks.is_empty() { + return false; + } + } + return true; + } + + pub fn into_transaction(self: Pin>) -> Transaction<'db> { + // SAFETY: the struct isn't used anymore and no one will refer to transaction + // address again + unsafe { Pin::into_inner_unchecked(self) }.transaction + } + + pub fn add_subtree_sync_info<'b, B: AsRef<[u8]>>( + self: &mut Pin>>, + db: &'db GroveDb, + path: SubtreePath<'b, B>, + hash: CryptoHash, + actual_hash: Option, + chunk_prefix: [u8; 32], + ) -> Result<(), Error> { + // SAFETY: we get an immutable reference of a transaction that stays behind + // `Pin` so this reference shall remain valid for the whole session + // object lifetime. + let transaction_ref: &'db Transaction<'db> = unsafe { + let tx: &mut Transaction<'db> = + &mut Pin::into_inner_unchecked(self.as_mut()).transaction; + &*(tx as *mut _) + }; + + if let Ok(merk) = db.open_merk_for_replication(path, transaction_ref) { + let restorer = Restorer::new(merk, hash, actual_hash); + let mut sync_info = SubtreeStateSyncInfo::new(restorer); + sync_info.pending_chunks.insert(vec![]); + self.as_mut() + .current_prefixes() + .insert(chunk_prefix, sync_info); + Ok(()) + } else { + Err(Error::InternalError("Unable to open merk for replication")) + } + } + + fn current_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeMap> { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.current_prefixes + } + + fn processed_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeSet { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.processed_prefixes + } + + /// Applies a chunk, shuold be called by ABCI when `ApplySnapshotChunk` + /// method is called. `chunk` is a pair of global chunk id and an + /// encoded proof. + pub fn apply_chunk( + self: &mut Pin>>, + db: &'db GroveDb, + chunk: (&[u8], Vec), + version: u16, + ) -> Result>, Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + if version != self.version { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + + let mut next_chunk_ids = vec![]; + + let (global_chunk_id, chunk_data) = chunk; + let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id)?; + + if self.is_empty() { + return Err(Error::InternalError("GroveDB is not in syncing mode")); + } + + let current_prefixes = self.as_mut().current_prefixes(); + let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { + return Err(Error::InternalError("Unable to process incoming chunk")); + }; + let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk_data) else { + return Err(Error::InternalError("Invalid incoming prefix")); + }; + + if !res.is_empty() { + for local_chunk_id in res.iter() { + let mut next_global_chunk_id = chunk_prefix.to_vec(); + next_global_chunk_id.extend(local_chunk_id.to_vec()); + next_chunk_ids.push(next_global_chunk_id); + } + + Ok(next_chunk_ids) + } else { + if !subtree_state_sync.pending_chunks.is_empty() { + return Ok(vec![]); + } + + // Subtree is finished. We can save it. + if (subtree_state_sync.num_processed_chunks > 0) + && (current_prefixes + .remove(&chunk_prefix) + .expect("prefix exists") + .restorer + .finalize() + .is_err()) + { + return Err(Error::InternalError("Unable to finalize Merk")); + } + self.as_mut().processed_prefixes().insert(chunk_prefix); + + // // Subtree was successfully save. Time to discover new subtrees that + // // need to be processed + // if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { + // println!( + // " path:{:?} done (num_processed_chunks:{:?})", + // util_path_to_string(&value.0), + // subtree_state_sync.num_processed_chunks + // ); + // } + + if let Ok(res) = self.discover_subtrees(db) { + next_chunk_ids.extend(res); + Ok(next_chunk_ids) + } else { + Err(Error::InternalError("Unable to discover Subtrees")) + } + } + } + + /// Prepares sync session for the freshly discovered subtrees and returns + /// global chunk ids of those new subtrees. + fn discover_subtrees( + self: &mut Pin>>, + db: &'db GroveDb, + ) -> Result>, Error> { + let subtrees_metadata = db.get_subtrees_metadata(Some(&self.transaction))?; + + let mut res = vec![]; + + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !self.processed_prefixes.contains(prefix) + && !self.current_prefixes.contains_key(prefix) + { + let (current_path, actual_value_hash, elem_value_hash) = &prefix_metadata; + + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + println!( + " path:{:?} starting...", + util_path_to_string(&prefix_metadata.0) + ); + + self.add_subtree_sync_info( + db, + path.into(), + elem_value_hash.clone(), + Some(actual_value_hash.clone()), + prefix.clone(), + )?; + res.push(prefix.to_vec()); + } + } + + Ok(res) + } +} + +// impl<'db> Default for MultiStateSyncInfo<'db> { +// fn default() -> Self { +// Self { +// current_prefixes: BTreeMap::new(), +// processed_prefixes: BTreeSet::new(), +// version: CURRENT_STATE_SYNC_VERSION, +// } +// } +// } + +// fn lol(db: &GroveDb) -> MultiStateSyncSession { +// let mut sync = MultiStateSyncSession { +// transaction: db.start_transaction(), +// current_prefixes: Default::default(), +// processed_prefixes: Default::default(), +// version: 0, +// }; + +// sync.current_prefixes.insert( +// b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_owned(), +// SubtreeStateSyncInfo { +// restorer: Some(Restorer::new( +// db.open_merk_for_replication(SubtreePath::empty(), +// &sync.transaction) .unwrap(), +// b"11111111111111111111111111111111".to_owned(), +// None, +// )), +// pending_chunks: Default::default(), +// num_processed_chunks: 0, +// }, +// ); + +// let ass: Option<&mut SubtreeStateSyncInfo> = +// sync.current_prefixes.values_mut().next(); + +// let ass2: &mut SubtreeStateSyncInfo = ass.unwrap(); + +// ass2.apply_inner_chunk(b"a", vec![]).unwrap(); + +// sync +// } + +// Struct containing information about current subtrees found in GroveDB +pub struct SubtreesMetadata { + // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent + // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree + // elem_value_hash are needed when verifying the new constructed subtree after wards. + pub data: BTreeMap>, CryptoHash, CryptoHash)>, +} + +impl SubtreesMetadata { + pub fn new() -> SubtreesMetadata { + SubtreesMetadata { + data: BTreeMap::new(), + } + } +} + +impl Default for SubtreesMetadata { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for SubtreesMetadata { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for (prefix, metadata) in self.data.iter() { + let metadata_path = &metadata.0; + let metadata_path_str = util_path_to_string(metadata_path); + writeln!( + f, + " prefix:{:?} -> path:{:?}", + hex::encode(prefix), + metadata_path_str + )?; + } + Ok(()) + } +} diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index f3e09532..440cb64e 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -5,7 +5,7 @@ use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; -use grovedb::replication::MultiStateSyncInfo; +use grovedb::replication::MultiStateSyncSession; const MAIN_ΚΕΥ: &[u8] = b"key_main"; const MAIN_ΚΕΥ_EMPTY: &[u8] = b"key_main_empty"; @@ -101,10 +101,7 @@ fn main() { println!("{:?}", subtrees_metadata_source); println!("\n######### db_checkpoint_0 -> db_destination state sync"); - let state_info = MultiStateSyncInfo::default(); - let tx = db_destination.start_transaction(); - sync_db_demo(&db_checkpoint_0, &db_destination, state_info, &tx).unwrap(); - db_destination.commit_transaction(tx).unwrap().expect("expected to commit transaction"); + sync_db_demo(&db_checkpoint_0, &db_destination, /*&mut state_sync_session*/).unwrap(); println!("\n######### verify db_destination"); let incorrect_hashes = db_destination.verify_grovedb(None).unwrap(); @@ -241,11 +238,9 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec) { fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, - state_sync_info: MultiStateSyncInfo, - target_tx: &Transaction, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); - let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?; + let (chunk_ids, mut session) = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; let mut chunk_queue : VecDeque> = VecDeque::new(); @@ -253,11 +248,14 @@ fn sync_db_demo( while let Some(chunk_id) = chunk_queue.pop_front() { let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx, CURRENT_STATE_SYNC_VERSION)?; - state_sync_info = new_state_sync_info; + let more_chunks = session.apply_chunk(&target_db, (chunk_id.as_slice(), ops), CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); } + if session.is_sync_completed() { + target_db.commit_session(session); + } + Ok(()) } From f1c212c7a8f38599f15e9928a5f8402f5e14fb97 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 3 Jun 2024 22:31:47 +0300 Subject: [PATCH 02/10] cherry-picked #301 --- grovedb/src/replication.rs | 30 +++++++++++-------- grovedb/src/replication/state_sync_session.rs | 14 +++++---- tutorials/src/bin/replication.rs | 7 +++-- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 93c42a29..3b8a860a 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -19,8 +19,8 @@ pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; #[cfg(feature = "full")] impl GroveDb { - pub fn start_syncing_session(&self) -> Pin> { - MultiStateSyncSession::new(self.start_transaction()) + pub fn start_syncing_session(&self, app_hash:[u8; 32]) -> Pin> { + MultiStateSyncSession::new(self.start_transaction(), app_hash) } pub fn commit_session(&self, session: Pin>) { @@ -130,11 +130,9 @@ impl GroveDb { )); } - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; + let root_app_hash = self.root_hash(tx).value?; + let (chunk_prefix_key, chunk_id) = + util_split_global_chunk_id(global_chunk_id, root_app_hash)?; let subtrees_metadata = self.get_subtrees_metadata(tx)?; @@ -157,7 +155,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -187,7 +185,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -219,7 +217,7 @@ impl GroveDb { &'db self, app_hash: CryptoHash, version: u16, - ) -> Result<(Vec>, Pin>>), Error> { + ) -> Result>>, Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( @@ -231,10 +229,11 @@ impl GroveDb { let root_prefix = [0u8; 32]; - let mut session = self.start_syncing_session(); + let mut session = self.start_syncing_session(app_hash); + session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?; - Ok((vec![root_prefix.to_vec()], session)) + Ok(session) } } @@ -255,6 +254,7 @@ pub fn util_path_to_string(path: &[Vec]) -> Vec { // Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] pub fn util_split_global_chunk_id( global_chunk_id: &[u8], + app_hash: [u8; 32], ) -> Result<(crate::SubtreePrefix, Vec), Error> { let chunk_prefix_length: usize = 32; if global_chunk_id.len() < chunk_prefix_length { @@ -263,6 +263,12 @@ pub fn util_split_global_chunk_id( )); } + if global_chunk_id == app_hash { + let array_of_zeros: [u8; 32] = [0; 32]; + let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros; + return Ok((root_chunk_prefix_key, vec![])); + } + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); let mut array = [0u8; 32]; array.copy_from_slice(chunk_prefix); diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index bfc12b51..b8d23e0b 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -90,6 +90,8 @@ pub struct MultiStateSyncSession<'db> { current_prefixes: BTreeMap>, // Set of processed prefixes (Path digests) processed_prefixes: BTreeSet, + // Root app_hash + app_hash: [u8; 32], // Version of state sync protocol, pub(crate) version: u16, // Transaction goes last to be dropped last as well @@ -99,11 +101,12 @@ pub struct MultiStateSyncSession<'db> { impl<'db> MultiStateSyncSession<'db> { /// Initializes a new state sync session. - pub fn new(transaction: Transaction<'db>) -> Pin> { + pub fn new(transaction: Transaction<'db>, app_hash:[u8; 32]) -> Pin> { Box::pin(MultiStateSyncSession { transaction, current_prefixes: Default::default(), processed_prefixes: Default::default(), + app_hash, version: CURRENT_STATE_SYNC_VERSION, _pin: PhantomPinned, }) @@ -180,7 +183,8 @@ impl<'db> MultiStateSyncSession<'db> { pub fn apply_chunk( self: &mut Pin>>, db: &'db GroveDb, - chunk: (&[u8], Vec), + global_chunk_id: &[u8], + chunk: Vec, version: u16, ) -> Result>, Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported @@ -197,8 +201,8 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id)?; + let (chunk_prefix, chunk_id) = + util_split_global_chunk_id(global_chunk_id, self.app_hash)?; if self.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); @@ -208,7 +212,7 @@ impl<'db> MultiStateSyncSession<'db> { let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { return Err(Error::InternalError("Unable to process incoming chunk")); }; - let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk_data) else { + let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk) else { return Err(Error::InternalError("Invalid incoming prefix")); }; diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index 440cb64e..f8dab81b 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -240,15 +240,16 @@ fn sync_db_demo( target_db: &GroveDb, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); - let (chunk_ids, mut session) = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; + let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; let mut chunk_queue : VecDeque> = VecDeque::new(); - chunk_queue.extend(chunk_ids); + // The very first chunk to fetch is always identified by the root app_hash + chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let more_chunks = session.apply_chunk(&target_db, (chunk_id.as_slice(), ops), CURRENT_STATE_SYNC_VERSION)?; + let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); } From 0e940590076224c570ad051eff848da6681ff0fc Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 3 Jun 2024 22:32:25 +0300 Subject: [PATCH 03/10] fmt --- grovedb/src/replication.rs | 2 +- grovedb/src/replication/state_sync_session.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 3b8a860a..f7eb3e2e 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -19,7 +19,7 @@ pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; #[cfg(feature = "full")] impl GroveDb { - pub fn start_syncing_session(&self, app_hash:[u8; 32]) -> Pin> { + pub fn start_syncing_session(&self, app_hash: [u8; 32]) -> Pin> { MultiStateSyncSession::new(self.start_transaction(), app_hash) } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index b8d23e0b..891585ab 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -101,7 +101,7 @@ pub struct MultiStateSyncSession<'db> { impl<'db> MultiStateSyncSession<'db> { /// Initializes a new state sync session. - pub fn new(transaction: Transaction<'db>, app_hash:[u8; 32]) -> Pin> { + pub fn new(transaction: Transaction<'db>, app_hash: [u8; 32]) -> Pin> { Box::pin(MultiStateSyncSession { transaction, current_prefixes: Default::default(), @@ -201,8 +201,7 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - let (chunk_prefix, chunk_id) = - util_split_global_chunk_id(global_chunk_id, self.app_hash)?; + let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; if self.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); From ee76ebf1fbeed2ee235a87510d8c654ec0bd235f Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Wed, 5 Jun 2024 19:14:46 +0300 Subject: [PATCH 04/10] better debug info --- grovedb/src/replication.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index f7eb3e2e..dafd3683 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -241,11 +241,9 @@ impl GroveDb { pub fn util_path_to_string(path: &[Vec]) -> Vec { let mut subtree_path_str: Vec = vec![]; for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); + let string = std::str::from_utf8(&subtree).unwrap_or_else(|_| ""); subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), + string.to_string() ); } subtree_path_str From 90ebcac20effede599dbeb05a738e29299af6138 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 6 Jun 2024 15:55:52 +0300 Subject: [PATCH 05/10] get storage context by subtree prefix --- storage/src/rocksdb_storage/storage.rs | 21 ++++++++++++++++++++- storage/src/storage.rs | 19 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs index a396b75f..ff4260a8 100644 --- a/storage/src/rocksdb_storage/storage.rs +++ b/storage/src/rocksdb_storage/storage.rs @@ -58,7 +58,7 @@ use crate::{ const BLAKE_BLOCK_LEN: usize = 64; -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub type SubtreePrefix = [u8; blake3::OUT_LEN]; fn blake_block_count(len: usize) -> usize { if len == 0 { @@ -472,6 +472,15 @@ impl<'db> Storage<'db> for RocksDbStorage { .map(|prefix| PrefixedRocksDbStorageContext::new(&self.db, prefix, batch)) } + fn get_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> CostContext + { + PrefixedRocksDbStorageContext::new(&self.db, prefix, batch).wrap_with_cost(OperationCost::default()) + } + fn get_transactional_storage_context<'b, B>( &'db self, path: SubtreePath<'b, B>, @@ -486,6 +495,16 @@ impl<'db> Storage<'db> for RocksDbStorage { }) } + fn get_transactional_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + transaction: &'db Self::Transaction, + ) -> CostContext + { + PrefixedRocksDbTransactionContext::new(&self.db, transaction, prefix, batch).wrap_with_cost(OperationCost::default()) + } + fn get_immediate_storage_context<'b, B>( &'db self, path: SubtreePath<'b, B>, diff --git a/storage/src/storage.rs b/storage/src/storage.rs index 5ef26e06..4c42d276 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -43,6 +43,8 @@ use grovedb_visualize::visualize_to_vec; use crate::{worst_case_costs::WorstKeyLength, Error}; +pub type SubtreePrefix = [u8; blake3::OUT_LEN]; + /// Top-level storage_cost abstraction. /// Should be able to hold storage_cost connection and to start transaction when /// needed. All query operations will be exposed using [StorageContext]. @@ -89,6 +91,14 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make storage context for a subtree with prefix, keeping all write + /// operations inside a `batch` if provided. + fn get_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> CostContext; + /// Make context for a subtree on transactional data, keeping all write /// operations inside a `batch` if provided. fn get_transactional_storage_context<'b, B>( @@ -100,6 +110,15 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make context for a subtree by prefix on transactional data, keeping all write + /// operations inside a `batch` if provided. + fn get_transactional_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + transaction: &'db Self::Transaction, + ) -> CostContext; + /// Make context for a subtree on transactional data that will apply all /// operations straight to the storage. fn get_immediate_storage_context<'b, B>( From 03e66c055b5c8e7778cd4d8583c280ea341668a5 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 6 Jun 2024 19:21:32 +0300 Subject: [PATCH 06/10] benchmark --- tutorials/src/bin/replication.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index f8dab81b..c1d28377 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; use std::path::Path; +use std::time::Instant; use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction}; use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; @@ -43,14 +44,14 @@ fn populate_db(grovedb_path: String) -> GroveDb { let tx = db.start_transaction(); let batch_size = 50; - for i in 0..=5 { + for i in 0..=55 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_1], i * batch_size, i * batch_size + batch_size - 1, &tx); } let _ = db.commit_transaction(tx); let tx = db.start_transaction(); let batch_size = 50; - for i in 0..=5 { + for i in 0..=55 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_2], i * batch_size, i * batch_size + batch_size - 1, &tx); } let _ = db.commit_transaction(tx); @@ -239,6 +240,7 @@ fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, ) -> Result<(), grovedb::Error> { + let start_time = Instant::now(); let app_hash = source_db.root_hash(None).value.unwrap(); let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; @@ -256,7 +258,10 @@ fn sync_db_demo( if session.is_sync_completed() { target_db.commit_session(session); } - + let elapsed = start_time.elapsed(); + println!("state_synced in {:.2?}", elapsed); + + Ok(()) } From 401c760eefb86e1f2550e863b6f1ef2848ac9468 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 6 Jun 2024 20:00:46 +0300 Subject: [PATCH 07/10] fix --- grovedb/src/lib.rs | 16 +- grovedb/src/replication.rs | 208 +++++++++++++++++- grovedb/src/replication/state_sync_session.rs | 42 +++- tutorials/src/bin/replication.rs | 3 +- 4 files changed, 246 insertions(+), 23 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index a68dc1b3..138c32bc 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -323,7 +323,7 @@ impl GroveDb { &'db self, path: SubtreePath<'b, B>, tx: &'tx Transaction<'db>, - ) -> Result>, Error> + ) -> Result<(Merk>, Option>, bool), Error> where B: AsRef<[u8]> + 'b, { @@ -350,29 +350,37 @@ impl GroveDb { .unwrap()?; let is_sum_tree = element.is_sum_tree(); if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element { + Ok(( Merk::open_layered_with_root_key( storage, - root_key, + root_key.clone(), is_sum_tree, Some(&Element::value_defined_cost_for_serialized_value), ) .map_err(|_| { Error::CorruptedData("cannot open a subtree with given root key".to_owned()) }) - .unwrap() + .unwrap()?, + root_key, + is_sum_tree + )) } else { Err(Error::CorruptedPath( "cannot open a subtree as parent exists but is not a tree", )) } } else { + Ok(( Merk::open_base( storage, false, None::<&fn(&[u8]) -> Option>, ) .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) - .unwrap() + .unwrap()?, + None, + false + )) } } diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index dafd3683..171b413d 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -2,18 +2,14 @@ mod state_sync_session; use std::pin::Pin; -use grovedb_merk::{ - ed::Encode, - proofs::{Decoder, Op}, - tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, - ChunkProducer, -}; +use grovedb_merk::{ed::Encode, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, Merk}; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::RocksDbStorage; +use grovedb_storage::Storage; pub use self::state_sync_session::MultiStateSyncSession; use self::state_sync_session::SubtreesMetadata; -use crate::{Error, GroveDb, TransactionArg}; +use crate::{Element, Error, error, GroveDb, replication, TransactionArg}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; @@ -209,6 +205,113 @@ impl GroveDb { } } + pub fn fetch_chunk_2( + &self, + global_chunk_id: &[u8], + tx: TransactionArg, + version: u16, + ) -> Result, Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + + let root_app_hash = self.root_hash(tx).value?; + let (chunk_prefix, root_key, is_sum_tree, chunk_id) = + replication::util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?; + + match tx { + None => { + let storage = self + .db + .get_storage_context_by_subtree_prefix(chunk_prefix, None).value; + if root_key.is_some() { + let merk = Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + ).value; + match merk { + Ok(m) => { + if m.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let chunk_producer_res = ChunkProducer::new(&m); + match chunk_producer_res { + Ok(mut chunk_producer) => { + let chunk_res = chunk_producer.chunk(&chunk_id); + match chunk_res { + Ok((chunk, _)) => match util_encode_vec_ops(chunk) { + Ok(op_bytes) => Ok(op_bytes), + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_layered fail_0:{}", e), + )), + }, + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_layered fail_1:{}", e), + )), + } + } + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_layered fail_2:{}", e), + )), + } + } + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_layered fail_3:{}", e), + )), + } + } + else { + let merk = Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + ).value; + match merk { + Ok(m) => { + if m.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let chunk_producer_res = ChunkProducer::new(&m); + match chunk_producer_res { + Ok(mut chunk_producer) => { + let chunk_res = chunk_producer.chunk(&chunk_id); + match chunk_res { + Ok((chunk, _)) => match util_encode_vec_ops(chunk) { + Ok(op_bytes) => Ok(op_bytes), + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_base fail_0:{}", e), + )), + }, + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_base fail_1:{}", e), + )), + } + } + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_base fail_2:{}", e), + )), + } + } + Err(e) => Err(Error::CorruptedData( + format!("2_no_tx_base fail_3:{}", e), + )), + } + } + + } + Some(t) => { + Ok(vec![]) + } + } + } + /// Starts a state sync process of a snapshot with `app_hash` root hash, /// should be called by ABCI when OfferSnapshot method is called. /// Returns the first set of global chunk ids that can be fetched from @@ -274,6 +377,97 @@ pub fn util_split_global_chunk_id( Ok((chunk_prefix_key, chunk_id.to_vec())) } + +pub fn util_split_global_chunk_id_2( + global_chunk_id: &[u8], + app_hash: &[u8], +) -> Result<(crate::SubtreePrefix, Option>, bool, Vec), Error> { + //println!("got>{}", hex::encode(global_chunk_id)); + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); + } + + if global_chunk_id == app_hash { + let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32]; + return Ok((root_chunk_prefix_key, None, false, vec![])); + } + + let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length); + + let root_key_size_length: usize = 1; + if remaining.len() < root_key_size_length { + return Err(Error::CorruptedData( + "unable to decode root key size".to_string(), + )); + } + let (root_key_size, remaining) = remaining.split_at(root_key_size_length); + if remaining.len() < root_key_size[0] as usize { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize); + let is_sum_tree_length: usize = 1; + if remaining.len() < is_sum_tree_length { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length); + + let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key.try_into() + .map_err(|_| { + error::Error::CorruptedData( + "unable to construct subtree".to_string(), + ) + })?; + + if !root_key.is_empty() { + Ok((subtree_prefix, Some(root_key.to_vec()), is_sum_tree[0] != 0, chunk_id.to_vec())) + } + else { + Ok((subtree_prefix, None, is_sum_tree[0] != 0, chunk_id.to_vec())) + } +} + +// Create the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] +pub fn util_create_global_chunk_id_2( + subtree_prefix: [u8; blake3::OUT_LEN], + root_key_opt: Option>, + is_sum_tree:bool, + chunk_id: Vec +) -> (Vec){ + let mut res = vec![]; + + res.extend(subtree_prefix); + + let mut root_key_len = 0u8; + let mut root_key_vec = vec![]; + if let Some(root_key) = root_key_opt { + res.push(root_key.len() as u8); + res.extend(root_key.clone()); + root_key_len = root_key.len() as u8; + root_key_vec = root_key; + } + else { + res.push(0u8); + } + + let mut is_sum_tree_v = 0u8; + if is_sum_tree { + is_sum_tree_v = 1u8; + } + res.push(is_sum_tree_v); + + + res.extend(chunk_id.to_vec()); + //println!("snd>{}|{}|{}|{}|{:?}", hex::encode(res.clone()), root_key_len, hex::encode(root_key_vec), is_sum_tree_v, chunk_id); + res +} + pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { let mut res = vec![]; for op in chunk { diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index 891585ab..c7b49ae3 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -9,8 +9,8 @@ use grovedb_merk::{CryptoHash, Restorer}; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; -use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION}; -use crate::{replication::util_path_to_string, Error, GroveDb, Transaction}; +use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id_2}; +use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication}; pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; @@ -20,6 +20,8 @@ struct SubtreeStateSyncInfo<'db> { /// Set of global chunk ids requested to be fetched and pending for /// processing. For the description of global chunk id check /// fetch_chunk(). + root_key: Option>, + is_sum_tree: bool, pending_chunks: BTreeSet>, /// Number of processed chunks in current prefix (Path digest) num_processed_chunks: usize, @@ -77,6 +79,8 @@ impl<'tx> SubtreeStateSyncInfo<'tx> { pub fn new(restorer: Restorer>) -> Self { SubtreeStateSyncInfo { restorer, + root_key: None, + is_sum_tree: false, pending_chunks: Default::default(), num_processed_chunks: 0, } @@ -138,7 +142,7 @@ impl<'db> MultiStateSyncSession<'db> { hash: CryptoHash, actual_hash: Option, chunk_prefix: [u8; 32], - ) -> Result<(), Error> { + ) -> Result<(Vec), Error> { // SAFETY: we get an immutable reference of a transaction that stays behind // `Pin` so this reference shall remain valid for the whole session // object lifetime. @@ -148,14 +152,17 @@ impl<'db> MultiStateSyncSession<'db> { &*(tx as *mut _) }; - if let Ok(merk) = db.open_merk_for_replication(path, transaction_ref) { + if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path, transaction_ref) { let restorer = Restorer::new(merk, hash, actual_hash); let mut sync_info = SubtreeStateSyncInfo::new(restorer); sync_info.pending_chunks.insert(vec![]); + sync_info.root_key = root_key.clone(); + sync_info.is_sum_tree = is_sum_tree; self.as_mut() .current_prefixes() .insert(chunk_prefix, sync_info); - Ok(()) + let x = util_create_global_chunk_id_2(chunk_prefix, root_key, is_sum_tree, vec![]); + Ok((x)) } else { Err(Error::InternalError("Unable to open merk for replication")) } @@ -201,7 +208,10 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; + // [OLD_WAY] + //let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; + // [NEW_WAY] + let (chunk_prefix, _, _, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &self.app_hash)?; if self.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); @@ -217,9 +227,13 @@ impl<'db> MultiStateSyncSession<'db> { if !res.is_empty() { for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); + // [NEW_WAY] + let x = util_create_global_chunk_id_2(chunk_prefix, subtree_state_sync.root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone()); + next_chunk_ids.push(x); + // [OLD_WAY] + //let mut next_global_chunk_id = chunk_prefix.to_vec(); + //next_global_chunk_id.extend(local_chunk_id.to_vec()); + //next_chunk_ids.push(next_global_chunk_id); } Ok(next_chunk_ids) @@ -284,14 +298,20 @@ impl<'db> MultiStateSyncSession<'db> { util_path_to_string(&prefix_metadata.0) ); - self.add_subtree_sync_info( + let x = self.add_subtree_sync_info( db, path.into(), elem_value_hash.clone(), Some(actual_value_hash.clone()), prefix.clone(), )?; - res.push(prefix.to_vec()); + + // [NEW_WAY] + res.push(x); + // [OLD_WAY] + //let root_chunk_prefix = prefix.to_vec(); + //res.push(root_chunk_prefix.to_vec()); + //res.push(prefix.to_vec()); } } diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index c1d28377..35383fb3 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -250,7 +250,8 @@ fn sync_db_demo( chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { - let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; + //let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; + let ops = source_db.fetch_chunk_2(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); } From dc78326de1132c58f5aee891b30f743db62d7974 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Wed, 12 Jun 2024 17:46:34 +0300 Subject: [PATCH 08/10] get_immediate_storage_context_by_subtree_prefix --- storage/src/rocksdb_storage/storage.rs | 9 +++++++++ storage/src/storage.rs | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs index ff4260a8..7e031167 100644 --- a/storage/src/rocksdb_storage/storage.rs +++ b/storage/src/rocksdb_storage/storage.rs @@ -518,6 +518,15 @@ impl<'db> Storage<'db> for RocksDbStorage { }) } + fn get_immediate_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + transaction: &'db Self::Transaction, + ) -> CostContext + { + PrefixedRocksDbImmediateStorageContext::new(&self.db, transaction, prefix).wrap_with_cost(OperationCost::default()) + } + fn commit_multi_context_batch( &self, batch: StorageBatch, diff --git a/storage/src/storage.rs b/storage/src/storage.rs index 4c42d276..019b0e48 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -129,6 +129,14 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make context for a subtree by prefix on transactional data that will apply all + /// operations straight to the storage. + fn get_immediate_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + transaction: &'db Self::Transaction, + ) -> CostContext; + /// Creates a database checkpoint in a specified path fn create_checkpoint>(&self, path: P) -> Result<(), Error>; From bd17ca380e0d1b617f9cc9af3090cf71109c6488 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Wed, 12 Jun 2024 17:58:29 +0300 Subject: [PATCH 09/10] cleanup --- grovedb/src/replication.rs | 109 ------------------ grovedb/src/replication/state_sync_session.rs | 70 +---------- tutorials/src/bin/replication.rs | 9 +- 3 files changed, 3 insertions(+), 185 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 171b413d..3ff88e40 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -96,16 +96,6 @@ impl GroveDb { Ok(subtrees_metadata) } - // Fetch a chunk by global chunk id (should be called by ABCI when - // LoadSnapshotChunk method is called) Params: - // global_chunk_id: Global chunk id in the following format: - // [SUBTREE_PREFIX:CHUNK_ID] SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros - // = Root subtree) CHUNK_ID: 0.. bytes (optional) Traversal instructions to - // the root of the given chunk. Traversal instructions are "1" for left, and - // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization - // as a subtree can be big hence traversal instructions for the deepest chunks - // tx: Transaction. Function returns the data by opening merks at given tx. - // Returns the Chunk proof operators for the requested chunk encoded in bytes pub fn fetch_chunk( &self, global_chunk_id: &[u8], @@ -119,105 +109,6 @@ impl GroveDb { )); } - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - let root_app_hash = self.root_hash(tx).value?; - let (chunk_prefix_key, chunk_id) = - util_split_global_chunk_id(global_chunk_id, root_app_hash)?; - - let subtrees_metadata = self.get_subtrees_metadata(tx)?; - - match subtrees_metadata.data.get(&chunk_prefix_key) { - Some(path_data) => { - let subtree = &path_data.0; - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - - match tx { - None => { - let merk = self - .open_non_transactional_merk_at_path(path.into(), None) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - }, - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } - Some(t) => { - let merk = self - .open_transactional_merk_at_path(path.into(), t, None) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - }, - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } - } - } - None => Err(Error::CorruptedData("Prefix not found".to_string())), - } - } - - pub fn fetch_chunk_2( - &self, - global_chunk_id: &[u8], - tx: TransactionArg, - version: u16, - ) -> Result, Error> { - // For now, only CURRENT_STATE_SYNC_VERSION is supported - if version != CURRENT_STATE_SYNC_VERSION { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - let root_app_hash = self.root_hash(tx).value?; let (chunk_prefix, root_key, is_sum_tree, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?; diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index c7b49ae3..5dc7d665 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -255,16 +255,6 @@ impl<'db> MultiStateSyncSession<'db> { } self.as_mut().processed_prefixes().insert(chunk_prefix); - // // Subtree was successfully save. Time to discover new subtrees that - // // need to be processed - // if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { - // println!( - // " path:{:?} done (num_processed_chunks:{:?})", - // util_path_to_string(&value.0), - // subtree_state_sync.num_processed_chunks - // ); - // } - if let Ok(res) = self.discover_subtrees(db) { next_chunk_ids.extend(res); Ok(next_chunk_ids) @@ -319,48 +309,6 @@ impl<'db> MultiStateSyncSession<'db> { } } -// impl<'db> Default for MultiStateSyncInfo<'db> { -// fn default() -> Self { -// Self { -// current_prefixes: BTreeMap::new(), -// processed_prefixes: BTreeSet::new(), -// version: CURRENT_STATE_SYNC_VERSION, -// } -// } -// } - -// fn lol(db: &GroveDb) -> MultiStateSyncSession { -// let mut sync = MultiStateSyncSession { -// transaction: db.start_transaction(), -// current_prefixes: Default::default(), -// processed_prefixes: Default::default(), -// version: 0, -// }; - -// sync.current_prefixes.insert( -// b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_owned(), -// SubtreeStateSyncInfo { -// restorer: Some(Restorer::new( -// db.open_merk_for_replication(SubtreePath::empty(), -// &sync.transaction) .unwrap(), -// b"11111111111111111111111111111111".to_owned(), -// None, -// )), -// pending_chunks: Default::default(), -// num_processed_chunks: 0, -// }, -// ); - -// let ass: Option<&mut SubtreeStateSyncInfo> = -// sync.current_prefixes.values_mut().next(); - -// let ass2: &mut SubtreeStateSyncInfo = ass.unwrap(); - -// ass2.apply_inner_chunk(b"a", vec![]).unwrap(); - -// sync -// } - // Struct containing information about current subtrees found in GroveDB pub struct SubtreesMetadata { // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent @@ -381,20 +329,4 @@ impl Default for SubtreesMetadata { fn default() -> Self { Self::new() } -} - -impl fmt::Debug for SubtreesMetadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); - writeln!( - f, - " prefix:{:?} -> path:{:?}", - hex::encode(prefix), - metadata_path_str - )?; - } - Ok(()) - } -} +} \ No newline at end of file diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index 35383fb3..cbbbb8aa 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -97,12 +97,8 @@ fn main() { let root_hash_destination = db_destination.root_hash(None).unwrap().unwrap(); println!("root_hash_destination: {:?}", hex::encode(root_hash_destination)); - println!("\n######### source_subtree_metadata of db_source"); - let subtrees_metadata_source = db_source.get_subtrees_metadata(None).unwrap(); - println!("{:?}", subtrees_metadata_source); - println!("\n######### db_checkpoint_0 -> db_destination state sync"); - sync_db_demo(&db_checkpoint_0, &db_destination, /*&mut state_sync_session*/).unwrap(); + sync_db_demo(&db_checkpoint_0, &db_destination).unwrap(); println!("\n######### verify db_destination"); let incorrect_hashes = db_destination.verify_grovedb(None).unwrap(); @@ -250,8 +246,7 @@ fn sync_db_demo( chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { - //let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let ops = source_db.fetch_chunk_2(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; + let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); } From 442d00a9d8b584f0dfd2f9925e3b839d23036278 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 13 Jun 2024 15:30:32 +0300 Subject: [PATCH 10/10] temp work --- grovedb/src/lib.rs | 71 ++++++ grovedb/src/operations/auxiliary.rs | 45 ++++ grovedb/src/replication.rs | 205 ++++++++++-------- grovedb/src/replication/state_sync_session.rs | 39 +++- tutorials/src/bin/replication.rs | 4 + 5 files changed, 278 insertions(+), 86 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 138c32bc..2c481908 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -199,6 +199,7 @@ use grovedb_merk::{ tree::{combine_hash, value_hash}, BatchEntry, CryptoHash, KVIterator, Merk, }; +use grovedb_merk::ChunkProducer; use grovedb_path::SubtreePath; #[cfg(feature = "full")] use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; @@ -223,6 +224,7 @@ pub use crate::error::Error; #[cfg(feature = "full")] use crate::util::{root_merk_optional_tx, storage_context_optional_tx}; use crate::Error::MerkError; +use crate::replication::util_encode_vec_ops; #[cfg(feature = "full")] type Hash = [u8; 32]; @@ -317,6 +319,41 @@ impl GroveDb { } } + fn open_transactional_merk_by_prefix<'db>( + &'db self, + prefix: SubtreePrefix, + root_key: Option>, + is_sum_tree: bool, + tx: &'db Transaction, + batch: Option<&'db StorageBatch>, + ) -> CostResult, Error> + { + let mut cost = OperationCost::default(); + let storage = self + .db + .get_transactional_storage_context_by_subtree_prefix(prefix, batch, tx) + .unwrap_add_cost(&mut cost); + if root_key.is_some() { + Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + ).map_err(|_| { + Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned()) + }).add_cost(cost) + } + else { + Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + ).map_err(|_| { + Error::CorruptedData("cannot open a root subtree by prefix".to_owned()) + }).add_cost(cost) + } + } + /// Opens a Merk at given path for with direct write access. Intended for /// replication purposes. fn open_merk_for_replication<'tx, 'db: 'tx, 'b, B>( @@ -445,6 +482,40 @@ impl GroveDb { } } + fn open_non_transactional_merk_by_prefix<'db>( + &'db self, + prefix: SubtreePrefix, + root_key: Option>, + is_sum_tree: bool, + batch: Option<&'db StorageBatch>, + ) -> CostResult, Error> + { + let mut cost = OperationCost::default(); + let storage = self + .db + .get_storage_context_by_subtree_prefix(prefix, batch) + .unwrap_add_cost(&mut cost); + if root_key.is_some() { + Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + ).map_err(|_| { + Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned()) + }).add_cost(cost) + } + else { + Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + ).map_err(|_| { + Error::CorruptedData("cannot open a root subtree by prefix".to_owned()) + }).add_cost(cost) + } + } + /// Creates a checkpoint pub fn create_checkpoint>(&self, path: P) -> Result<(), Error> { self.db.create_checkpoint(path).map_err(|e| e.into()) diff --git a/grovedb/src/operations/auxiliary.rs b/grovedb/src/operations/auxiliary.rs index 1b6b884d..d2696fda 100644 --- a/grovedb/src/operations/auxiliary.rs +++ b/grovedb/src/operations/auxiliary.rs @@ -166,4 +166,49 @@ impl GroveDb { } Ok(result).wrap_with_cost(cost) } + + /// Finds keys which are trees for a given subtree. + /// One element means a key of a `merk`, n > 1 elements mean relative path + /// for a deeply nested subtree. + pub fn find_subtrees_non_recursive>( + &self, + path: &SubtreePath, + transaction: TransactionArg, + ) -> CostResult>>, Error> { + let mut cost = OperationCost::default(); + + // TODO: remove conversion to vec; + // However, it's not easy for a reason: + // new keys to enqueue are taken from raw iterator which returns Vec; + // changing that to slice is hard as cursor should be moved for next iteration + // which requires exclusive (&mut) reference, also there is no guarantee that + // slice which points into storage internals will remain valid if raw + // iterator got altered so why that reference should be exclusive; + // + // Update: there are pinned views into RocksDB to return slices of data, perhaps + // there is something for iterators + + let mut queue: Vec>> = vec![path.to_vec()]; + let mut result: Vec>> = queue.clone(); + + while let Some(q) = queue.pop() { + let subtree_path: SubtreePath> = q.as_slice().into(); + // Get the correct subtree with q_ref as path + storage_context_optional_tx!(self.db, subtree_path, None, transaction, storage, { + let storage = storage.unwrap_add_cost(&mut cost); + let mut raw_iter = Element::iterator(storage.raw_iter()).unwrap_add_cost(&mut cost); + while let Some((key, value)) = + cost_return_on_error!(&mut cost, raw_iter.next_element()) + { + if value.is_tree() { + let mut sub_path = q.clone(); + sub_path.push(key.to_vec()); + queue.push(sub_path.clone()); + result.push(sub_path); + } + } + }) + } + Ok(result).wrap_with_cost(cost) + } } diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 3ff88e40..3e9b6afb 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -1,15 +1,17 @@ mod state_sync_session; use std::pin::Pin; +use grovedb_costs::{cost_return_on_error, CostResult, CostsExt, OperationCost}; use grovedb_merk::{ed::Encode, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, Merk}; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::RocksDbStorage; -use grovedb_storage::Storage; +use grovedb_storage::{Storage, StorageContext}; pub use self::state_sync_session::MultiStateSyncSession; use self::state_sync_session::SubtreesMetadata; use crate::{Element, Error, error, GroveDb, replication, TransactionArg}; +use crate::replication::state_sync_session::{SubtreeMetadata, SubtreePrefix}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; @@ -24,6 +26,72 @@ impl GroveDb { let _ = self.commit_transaction(session.into_transaction()); } + pub fn get_subtree_metadata_by_prefix( + &self, + transaction: TransactionArg, + path: Vec>, + ) -> Result, Error> { + let mut res = vec![]; + + if let Some(tx) = transaction { + let current_path = SubtreePath::from(path.as_slice()); + let storage = self.db.get_transactional_storage_context(current_path, None, tx) + .value; + let mut raw_iter = Element::iterator(storage.raw_iter()).value; + while let Ok(Some((key, value))) = raw_iter.next_element().value + { + match value { + Element::Tree(ref root_key, _) => {} + Element::SumTree(ref root_key, _, _) => {} + _ => {} + } + if value.is_tree() { + + } + } + } + /* + if let Some(tx) = transaction { + let storage = self.db.get_transactional_storage_context_by_subtree_prefix(prefix, None, tx) + .unwrap_add_cost(&mut cost); + let mut raw_iter = Element::iterator(storage.raw_iter()).unwrap_add_cost(&mut cost); + while let Some((key, value)) = + cost_return_on_error!(&mut cost, raw_iter.next_element()) + { + match value { + + } + if value.is_tree() { + + } + } + } +*/ + //let storage = self.get_transactional_storage_context_by_subtree_prefix() + /* + while let Some(q) = queue.pop() { + let subtree_path: SubtreePath> = q.as_slice().into(); + // Get the correct subtree with q_ref as path + storage_context_optional_tx!(self.db, subtree_path, None, transaction, storage, { + let storage = storage.unwrap_add_cost(&mut cost); + let mut raw_iter = Element::iterator(storage.raw_iter()).unwrap_add_cost(&mut cost); + while let Some((key, value)) = + cost_return_on_error!(&mut cost, raw_iter.next_element()) + { + if value.is_tree() { + let mut sub_path = q.clone(); + sub_path.push(key.to_vec()); + queue.push(sub_path.clone()); + result.push(sub_path); + } + } + }) + } + Ok(result).wrap_with_cost(cost) + */ + Ok(res) + } + // Returns the discovered subtrees found recursively along with their associated // metadata Params: // tx: Transaction. Function returns the data by opening merks at given tx. @@ -99,7 +167,7 @@ impl GroveDb { pub fn fetch_chunk( &self, global_chunk_id: &[u8], - tx: TransactionArg, + transaction: TransactionArg, version: u16, ) -> Result, Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported @@ -109,96 +177,63 @@ impl GroveDb { )); } - let root_app_hash = self.root_hash(tx).value?; + let root_app_hash = self.root_hash(transaction).value?; let (chunk_prefix, root_key, is_sum_tree, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?; - match tx { + // TODO: Refactor this by writing fetch_chunk_inner (as only merk constructor and type are different) + match transaction { None => { - let storage = self - .db - .get_storage_context_by_subtree_prefix(chunk_prefix, None).value; - if root_key.is_some() { - let merk = Merk::open_layered_with_root_key( - storage, - root_key, - is_sum_tree, - Some(&Element::value_defined_cost_for_serialized_value), - ).value; - match merk { - Ok(m) => { - if m.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&m); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_layered fail_0:{}", e), - )), - }, - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_layered fail_1:{}", e), - )), - } - } - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_layered fail_2:{}", e), - )), - } - } - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_layered fail_3:{}", e), - )), - } + let merk = self.open_non_transactional_merk_by_prefix(chunk_prefix, + root_key, + is_sum_tree, None) + .value + .map_err(|e| Error::CorruptedData( + format!("failed to open merk by prefix non-tx:{} with:{}", e, hex::encode(chunk_prefix)), + ))?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); } - else { - let merk = Merk::open_base( - storage, - false, - Some(&Element::value_defined_cost_for_serialized_value), - ).value; - match merk { - Ok(m) => { - if m.is_empty_tree().unwrap() { - return Ok(vec![]); - } - let chunk_producer_res = ChunkProducer::new(&m); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_base fail_0:{}", e), - )), - }, - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_base fail_1:{}", e), - )), - } - } - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_base fail_2:{}", e), - )), - } - } - Err(e) => Err(Error::CorruptedData( - format!("2_no_tx_base fail_3:{}", e), - )), - } + let mut chunk_producer = ChunkProducer::new(&merk) + .map_err(|e| Error::CorruptedData( + format!("failed to create chunk producer by prefix non-tx:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let ((chunk,_)) = chunk_producer.chunk(&chunk_id) + .map_err(|e| Error::CorruptedData( + format!("failed to apply chunk:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let op_bytes = util_encode_vec_ops(chunk) + .map_err(|e| Error::CorruptedData( + format!("failed to encode chunk ops:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + Ok(op_bytes) + } + Some(tx) => { + let merk = self.open_transactional_merk_by_prefix(chunk_prefix, + root_key, + is_sum_tree, tx, None) + .value + .map_err(|e| Error::CorruptedData( + format!("failed to open merk by prefix tx:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); } - } - Some(t) => { - Ok(vec![]) + let mut chunk_producer = ChunkProducer::new(&merk) + .map_err(|e| Error::CorruptedData( + format!("failed to create chunk producer by prefix tx:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let ((chunk,_)) = chunk_producer.chunk(&chunk_id) + .map_err(|e| Error::CorruptedData( + format!("failed to apply chunk:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let op_bytes = util_encode_vec_ops(chunk) + .map_err(|e| Error::CorruptedData( + format!("failed to encode chunk ops:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + Ok(op_bytes) } } } @@ -225,7 +260,7 @@ impl GroveDb { let mut session = self.start_syncing_session(app_hash); - session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?; + session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix, vec![], vec![])?; Ok(session) } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index 5dc7d665..f98193ff 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -4,6 +4,8 @@ use std::{ marker::PhantomPinned, pin::Pin, }; +use std::fs::Metadata; +use grovedb_costs::CostsExt; use grovedb_merk::{CryptoHash, Restorer}; use grovedb_path::SubtreePath; @@ -11,9 +13,12 @@ use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id_2}; use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication}; +use crate::util::storage_context_optional_tx; pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub(crate) type SubtreeMetadata = (SubtreePrefix, Vec>, /*Option>, bool,*/ CryptoHash, CryptoHash); + struct SubtreeStateSyncInfo<'db> { /// Current Chunk restorer restorer: Restorer>, @@ -23,6 +28,7 @@ struct SubtreeStateSyncInfo<'db> { root_key: Option>, is_sum_tree: bool, pending_chunks: BTreeSet>, + current_path: Vec>, /// Number of processed chunks in current prefix (Path digest) num_processed_chunks: usize, } @@ -82,6 +88,7 @@ impl<'tx> SubtreeStateSyncInfo<'tx> { root_key: None, is_sum_tree: false, pending_chunks: Default::default(), + current_path: vec![], num_processed_chunks: 0, } } @@ -142,6 +149,8 @@ impl<'db> MultiStateSyncSession<'db> { hash: CryptoHash, actual_hash: Option, chunk_prefix: [u8; 32], + parent_path: Vec>, + current_path: Vec, ) -> Result<(Vec), Error> { // SAFETY: we get an immutable reference of a transaction that stays behind // `Pin` so this reference shall remain valid for the whole session @@ -152,12 +161,13 @@ impl<'db> MultiStateSyncSession<'db> { &*(tx as *mut _) }; - if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path, transaction_ref) { + if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path.clone(), transaction_ref) { let restorer = Restorer::new(merk, hash, actual_hash); let mut sync_info = SubtreeStateSyncInfo::new(restorer); sync_info.pending_chunks.insert(vec![]); sync_info.root_key = root_key.clone(); sync_info.is_sum_tree = is_sum_tree; + println!("{}", format!("adding:{} {:?} {} {:?}", hex::encode(chunk_prefix), root_key.clone(), is_sum_tree, util_path_to_string(path.to_vec().as_slice()))); self.as_mut() .current_prefixes() .insert(chunk_prefix, sync_info); @@ -168,6 +178,15 @@ impl<'db> MultiStateSyncSession<'db> { } } + pub fn add_subtree_sync_info_2<'b, B: AsRef<[u8]>>( + self: &mut Pin>>, + db: &'db GroveDb, + metadata: SubtreeMetadata + ) -> Result<(Vec), Error> { + let (prefix, path, hash, actual_hash) = metadata; + Ok(vec![]) + } + fn current_prefixes( self: Pin<&mut MultiStateSyncSession<'db>>, ) -> &mut BTreeMap> { @@ -294,6 +313,8 @@ impl<'db> MultiStateSyncSession<'db> { elem_value_hash.clone(), Some(actual_value_hash.clone()), prefix.clone(), + vec![], + vec![], )?; // [NEW_WAY] @@ -329,4 +350,20 @@ impl Default for SubtreesMetadata { fn default() -> Self { Self::new() } +} + +impl fmt::Debug for SubtreesMetadata { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for (prefix, metadata) in self.data.iter() { + let metadata_path = &metadata.0; + let metadata_path_str = util_path_to_string(metadata_path); + writeln!( + f, + " prefix:{:?} -> path:{:?}", + hex::encode(prefix), + metadata_path_str + )?; + } + Ok(()) + } } \ No newline at end of file diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index cbbbb8aa..36da5924 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -97,6 +97,10 @@ fn main() { let root_hash_destination = db_destination.root_hash(None).unwrap().unwrap(); println!("root_hash_destination: {:?}", hex::encode(root_hash_destination)); + println!("\n######### source_subtree_metadata of db_source"); + let subtrees_metadata_source = db_source.get_subtrees_metadata(None).unwrap(); + println!("{:?}", subtrees_metadata_source); + println!("\n######### db_checkpoint_0 -> db_destination state sync"); sync_db_demo(&db_checkpoint_0, &db_destination).unwrap();