From 7628068a92a91c90538cac607cee6a54ff11773b Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 6 Jun 2024 19:00:29 +0300 Subject: [PATCH] very dirty commit --- grovedb/src/lib.rs | 16 +- grovedb/src/replication.rs | 203 +++++++++++++++++- grovedb/src/replication/state_sync_session.rs | 42 +++- tutorials/src/bin/replication.rs | 3 +- 4 files changed, 241 insertions(+), 23 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index a68dc1b3..138c32bc 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -323,7 +323,7 @@ impl GroveDb { &'db self, path: SubtreePath<'b, B>, tx: &'tx Transaction<'db>, - ) -> Result>, Error> + ) -> Result<(Merk>, Option>, bool), Error> where B: AsRef<[u8]> + 'b, { @@ -350,29 +350,37 @@ impl GroveDb { .unwrap()?; let is_sum_tree = element.is_sum_tree(); if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element { + Ok(( Merk::open_layered_with_root_key( storage, - root_key, + root_key.clone(), is_sum_tree, Some(&Element::value_defined_cost_for_serialized_value), ) .map_err(|_| { Error::CorruptedData("cannot open a subtree with given root key".to_owned()) }) - .unwrap() + .unwrap()?, + root_key, + is_sum_tree + )) } else { Err(Error::CorruptedPath( "cannot open a subtree as parent exists but is not a tree", )) } } else { + Ok(( Merk::open_base( storage, false, None::<&fn(&[u8]) -> Option>, ) .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) - .unwrap() + .unwrap()?, + None, + false + )) } } diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index dafd3683..77daeaea 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -2,18 +2,14 @@ mod state_sync_session; use std::pin::Pin; -use grovedb_merk::{ - ed::Encode, - proofs::{Decoder, Op}, - tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, - ChunkProducer, -}; +use grovedb_merk::{ed::Encode, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, Merk}; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::RocksDbStorage; +use grovedb_storage::Storage; pub use self::state_sync_session::MultiStateSyncSession; use self::state_sync_session::SubtreesMetadata; -use crate::{Error, GroveDb, TransactionArg}; +use crate::{Element, Error, error, GroveDb, replication, TransactionArg}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; @@ -209,6 +205,113 @@ impl GroveDb { } } + pub fn fetch_chunk_2( + &self, + global_chunk_id: &[u8], + tx: TransactionArg, + version: u16, + ) -> Result, Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + + let root_app_hash = self.root_hash(tx).value?; + let (chunk_prefix, root_key, is_sum_tree, chunk_id) = + replication::util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?; + + match tx { + None => { + let storage = self + .db + .get_storage_context_by_subtree_prefix(chunk_prefix, None).value; + if root_key.is_some() { + let merk = Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + ).value; + match merk { + Ok(m) => { + if m.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let chunk_producer_res = ChunkProducer::new(&m); + match chunk_producer_res { + Ok(mut chunk_producer) => { + let chunk_res = chunk_producer.chunk(&chunk_id); + match chunk_res { + Ok((chunk, _)) => match util_encode_vec_ops(chunk) { + Ok(op_bytes) => Ok(op_bytes), + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + }, + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + } + } + Err(_) => Err(Error::CorruptedData( + "Unable to create Chunk producer".to_string(), + )), + } + } + Err(e) => Err(Error::CorruptedData( + "Unable to open merk".to_string(), + )), + } + } + else { + let merk = Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + ).value; + match merk { + Ok(m) => { + if m.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let chunk_producer_res = ChunkProducer::new(&m); + match chunk_producer_res { + Ok(mut chunk_producer) => { + let chunk_res = chunk_producer.chunk(&chunk_id); + match chunk_res { + Ok((chunk, _)) => match util_encode_vec_ops(chunk) { + Ok(op_bytes) => Ok(op_bytes), + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + }, + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + } + } + Err(_) => Err(Error::CorruptedData( + "Unable to create Chunk producer".to_string(), + )), + } + } + Err(e) => Err(Error::CorruptedData( + "Unable to open merk".to_string(), + )), + } + } + + } + Some(t) => { + Ok(vec![]) + } + } + } + /// Starts a state sync process of a snapshot with `app_hash` root hash, /// should be called by ABCI when OfferSnapshot method is called. /// Returns the first set of global chunk ids that can be fetched from @@ -274,6 +377,92 @@ pub fn util_split_global_chunk_id( Ok((chunk_prefix_key, chunk_id.to_vec())) } + +pub fn util_split_global_chunk_id_2( + global_chunk_id: &[u8], + app_hash: &[u8], +) -> Result<(crate::SubtreePrefix, Option>, bool, Vec), Error> { + //println!("got>{}", hex::encode(global_chunk_id)); + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); + } + + if global_chunk_id == app_hash { + let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32]; + return Ok((root_chunk_prefix_key, None, false, vec![])); + } + + let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length); + + let root_key_size_length: usize = 1; + if remaining.len() < root_key_size_length { + return Err(Error::CorruptedData( + "unable to decode root key size".to_string(), + )); + } + let (root_key_size, remaining) = remaining.split_at(root_key_size_length); + if remaining.len() < root_key_size[0] as usize { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize); + let is_sum_tree_length: usize = 1; + if remaining.len() < is_sum_tree_length { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length); + + let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key.try_into() + .map_err(|_| { + error::Error::CorruptedData( + "unable to construct subtree".to_string(), + ) + })?; + + Ok((subtree_prefix, Some(root_key.to_vec()), is_sum_tree[0] != 0, chunk_id.to_vec())) +} + +// Create the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] +pub fn util_create_global_chunk_id_2( + subtree_prefix: [u8; blake3::OUT_LEN], + root_key_opt: Option>, + is_sum_tree:bool, + chunk_id: Vec +) -> (Vec){ + let mut res = vec![]; + + res.extend(subtree_prefix); + + let mut root_key_len = 0u8; + let mut root_key_vec = vec![]; + if let Some(root_key) = root_key_opt { + res.push(root_key.len() as u8); + res.extend(root_key.clone()); + root_key_len = root_key.len() as u8; + root_key_vec = root_key; + } + else { + res.push(0u8); + } + + let mut is_sum_tree_v = 0u8; + if is_sum_tree { + is_sum_tree_v = 1u8; + } + res.push(is_sum_tree_v); + + + res.extend(chunk_id.to_vec()); + //println!("snd>{}|{}|{}|{}|{:?}", hex::encode(res.clone()), root_key_len, hex::encode(root_key_vec), is_sum_tree_v, chunk_id); + res +} + pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { let mut res = vec![]; for op in chunk { diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index 891585ab..c7b49ae3 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -9,8 +9,8 @@ use grovedb_merk::{CryptoHash, Restorer}; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; -use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION}; -use crate::{replication::util_path_to_string, Error, GroveDb, Transaction}; +use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id_2}; +use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication}; pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; @@ -20,6 +20,8 @@ struct SubtreeStateSyncInfo<'db> { /// Set of global chunk ids requested to be fetched and pending for /// processing. For the description of global chunk id check /// fetch_chunk(). + root_key: Option>, + is_sum_tree: bool, pending_chunks: BTreeSet>, /// Number of processed chunks in current prefix (Path digest) num_processed_chunks: usize, @@ -77,6 +79,8 @@ impl<'tx> SubtreeStateSyncInfo<'tx> { pub fn new(restorer: Restorer>) -> Self { SubtreeStateSyncInfo { restorer, + root_key: None, + is_sum_tree: false, pending_chunks: Default::default(), num_processed_chunks: 0, } @@ -138,7 +142,7 @@ impl<'db> MultiStateSyncSession<'db> { hash: CryptoHash, actual_hash: Option, chunk_prefix: [u8; 32], - ) -> Result<(), Error> { + ) -> Result<(Vec), Error> { // SAFETY: we get an immutable reference of a transaction that stays behind // `Pin` so this reference shall remain valid for the whole session // object lifetime. @@ -148,14 +152,17 @@ impl<'db> MultiStateSyncSession<'db> { &*(tx as *mut _) }; - if let Ok(merk) = db.open_merk_for_replication(path, transaction_ref) { + if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path, transaction_ref) { let restorer = Restorer::new(merk, hash, actual_hash); let mut sync_info = SubtreeStateSyncInfo::new(restorer); sync_info.pending_chunks.insert(vec![]); + sync_info.root_key = root_key.clone(); + sync_info.is_sum_tree = is_sum_tree; self.as_mut() .current_prefixes() .insert(chunk_prefix, sync_info); - Ok(()) + let x = util_create_global_chunk_id_2(chunk_prefix, root_key, is_sum_tree, vec![]); + Ok((x)) } else { Err(Error::InternalError("Unable to open merk for replication")) } @@ -201,7 +208,10 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; + // [OLD_WAY] + //let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; + // [NEW_WAY] + let (chunk_prefix, _, _, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &self.app_hash)?; if self.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); @@ -217,9 +227,13 @@ impl<'db> MultiStateSyncSession<'db> { if !res.is_empty() { for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); + // [NEW_WAY] + let x = util_create_global_chunk_id_2(chunk_prefix, subtree_state_sync.root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone()); + next_chunk_ids.push(x); + // [OLD_WAY] + //let mut next_global_chunk_id = chunk_prefix.to_vec(); + //next_global_chunk_id.extend(local_chunk_id.to_vec()); + //next_chunk_ids.push(next_global_chunk_id); } Ok(next_chunk_ids) @@ -284,14 +298,20 @@ impl<'db> MultiStateSyncSession<'db> { util_path_to_string(&prefix_metadata.0) ); - self.add_subtree_sync_info( + let x = self.add_subtree_sync_info( db, path.into(), elem_value_hash.clone(), Some(actual_value_hash.clone()), prefix.clone(), )?; - res.push(prefix.to_vec()); + + // [NEW_WAY] + res.push(x); + // [OLD_WAY] + //let root_chunk_prefix = prefix.to_vec(); + //res.push(root_chunk_prefix.to_vec()); + //res.push(prefix.to_vec()); } } diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index f8dab81b..a79d98a4 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -248,7 +248,8 @@ fn sync_db_demo( chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { - let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; + //let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; + let ops = source_db.fetch_chunk_2(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); }