Skip to content

Commit

Permalink
very dirty commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielides committed Jun 6, 2024
1 parent 90ebcac commit 7628068
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 23 deletions.
16 changes: 12 additions & 4 deletions grovedb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl GroveDb {
&'db self,
path: SubtreePath<'b, B>,
tx: &'tx Transaction<'db>,
) -> Result<Merk<PrefixedRocksDbImmediateStorageContext<'tx>>, Error>
) -> Result<(Merk<PrefixedRocksDbImmediateStorageContext<'tx>>, Option<Vec<u8>>, bool), Error>
where
B: AsRef<[u8]> + 'b,
{
Expand All @@ -350,29 +350,37 @@ impl GroveDb {
.unwrap()?;
let is_sum_tree = element.is_sum_tree();
if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element {
Ok((
Merk::open_layered_with_root_key(
storage,
root_key,
root_key.clone(),
is_sum_tree,
Some(&Element::value_defined_cost_for_serialized_value),
)
.map_err(|_| {
Error::CorruptedData("cannot open a subtree with given root key".to_owned())
})
.unwrap()
.unwrap()?,
root_key,
is_sum_tree
))
} else {
Err(Error::CorruptedPath(
"cannot open a subtree as parent exists but is not a tree",
))
}
} else {
Ok((
Merk::open_base(
storage,
false,
None::<&fn(&[u8]) -> Option<ValueDefinedCostType>>,
)
.map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned()))
.unwrap()
.unwrap()?,
None,
false
))
}
}

Expand Down
203 changes: 196 additions & 7 deletions grovedb/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ mod state_sync_session;

use std::pin::Pin;

use grovedb_merk::{
ed::Encode,
proofs::{Decoder, Op},
tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash},
ChunkProducer,
};
use grovedb_merk::{ed::Encode, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, Merk};
use grovedb_path::SubtreePath;
use grovedb_storage::rocksdb_storage::RocksDbStorage;
use grovedb_storage::Storage;

pub use self::state_sync_session::MultiStateSyncSession;
use self::state_sync_session::SubtreesMetadata;
use crate::{Error, GroveDb, TransactionArg};
use crate::{Element, Error, error, GroveDb, replication, TransactionArg};

pub const CURRENT_STATE_SYNC_VERSION: u16 = 1;

Expand Down Expand Up @@ -209,6 +205,113 @@ impl GroveDb {
}
}

pub fn fetch_chunk_2(
&self,
global_chunk_id: &[u8],
tx: TransactionArg,
version: u16,
) -> Result<Vec<u8>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let root_app_hash = self.root_hash(tx).value?;
let (chunk_prefix, root_key, is_sum_tree, chunk_id) =
replication::util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?;

match tx {
None => {
let storage = self
.db
.get_storage_context_by_subtree_prefix(chunk_prefix, None).value;
if root_key.is_some() {
let merk = Merk::open_layered_with_root_key(
storage,
root_key,
is_sum_tree,
Some(&Element::value_defined_cost_for_serialized_value),
).value;
match merk {
Ok(m) => {
if m.is_empty_tree().unwrap() {
return Ok(vec![]);
}

let chunk_producer_res = ChunkProducer::new(&m);
match chunk_producer_res {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer.chunk(&chunk_id);
match chunk_res {
Ok((chunk, _)) => match util_encode_vec_ops(chunk) {
Ok(op_bytes) => Ok(op_bytes),
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
},
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
}
}
Err(_) => Err(Error::CorruptedData(
"Unable to create Chunk producer".to_string(),
)),
}
}
Err(e) => Err(Error::CorruptedData(
"Unable to open merk".to_string(),
)),
}
}
else {
let merk = Merk::open_base(
storage,
false,
Some(&Element::value_defined_cost_for_serialized_value),
).value;
match merk {
Ok(m) => {
if m.is_empty_tree().unwrap() {
return Ok(vec![]);
}

let chunk_producer_res = ChunkProducer::new(&m);
match chunk_producer_res {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer.chunk(&chunk_id);
match chunk_res {
Ok((chunk, _)) => match util_encode_vec_ops(chunk) {
Ok(op_bytes) => Ok(op_bytes),
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
},
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
}
}
Err(_) => Err(Error::CorruptedData(
"Unable to create Chunk producer".to_string(),
)),
}
}
Err(e) => Err(Error::CorruptedData(
"Unable to open merk".to_string(),
)),
}
}

}
Some(t) => {
Ok(vec![])
}
}
}

/// Starts a state sync process of a snapshot with `app_hash` root hash,
/// should be called by ABCI when OfferSnapshot method is called.
/// Returns the first set of global chunk ids that can be fetched from
Expand Down Expand Up @@ -274,6 +377,92 @@ pub fn util_split_global_chunk_id(
Ok((chunk_prefix_key, chunk_id.to_vec()))
}


pub fn util_split_global_chunk_id_2(
global_chunk_id: &[u8],
app_hash: &[u8],
) -> Result<(crate::SubtreePrefix, Option<Vec<u8>>, bool, Vec<u8>), Error> {
//println!("got>{}", hex::encode(global_chunk_id));
let chunk_prefix_length: usize = 32;
if global_chunk_id.len() < chunk_prefix_length {
return Err(Error::CorruptedData(
"expected global chunk id of at least 32 length".to_string(),
));
}

if global_chunk_id == app_hash {
let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32];
return Ok((root_chunk_prefix_key, None, false, vec![]));
}

let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length);

let root_key_size_length: usize = 1;
if remaining.len() < root_key_size_length {
return Err(Error::CorruptedData(
"unable to decode root key size".to_string(),
));
}
let (root_key_size, remaining) = remaining.split_at(root_key_size_length);
if remaining.len() < root_key_size[0] as usize {
return Err(Error::CorruptedData(
"unable to decode root key".to_string(),
));
}
let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize);
let is_sum_tree_length: usize = 1;
if remaining.len() < is_sum_tree_length {
return Err(Error::CorruptedData(
"unable to decode root key".to_string(),
));
}
let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length);

let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key.try_into()
.map_err(|_| {
error::Error::CorruptedData(
"unable to construct subtree".to_string(),
)
})?;

Ok((subtree_prefix, Some(root_key.to_vec()), is_sum_tree[0] != 0, chunk_id.to_vec()))
}

// Create the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]
pub fn util_create_global_chunk_id_2(
subtree_prefix: [u8; blake3::OUT_LEN],
root_key_opt: Option<Vec<u8>>,
is_sum_tree:bool,
chunk_id: Vec<u8>
) -> (Vec<u8>){
let mut res = vec![];

res.extend(subtree_prefix);

let mut root_key_len = 0u8;
let mut root_key_vec = vec![];
if let Some(root_key) = root_key_opt {
res.push(root_key.len() as u8);
res.extend(root_key.clone());
root_key_len = root_key.len() as u8;
root_key_vec = root_key;
}
else {
res.push(0u8);
}

let mut is_sum_tree_v = 0u8;
if is_sum_tree {
is_sum_tree_v = 1u8;
}
res.push(is_sum_tree_v);


res.extend(chunk_id.to_vec());
//println!("snd>{}|{}|{}|{}|{:?}", hex::encode(res.clone()), root_key_len, hex::encode(root_key_vec), is_sum_tree_v, chunk_id);
res
}

pub fn util_encode_vec_ops(chunk: Vec<Op>) -> Result<Vec<u8>, Error> {
let mut res = vec![];
for op in chunk {
Expand Down
42 changes: 31 additions & 11 deletions grovedb/src/replication/state_sync_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use grovedb_merk::{CryptoHash, Restorer};
use grovedb_path::SubtreePath;
use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext;

use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION};
use crate::{replication::util_path_to_string, Error, GroveDb, Transaction};
use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id_2};
use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication};

pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN];

Expand All @@ -20,6 +20,8 @@ struct SubtreeStateSyncInfo<'db> {
/// Set of global chunk ids requested to be fetched and pending for
/// processing. For the description of global chunk id check
/// fetch_chunk().
root_key: Option<Vec<u8>>,
is_sum_tree: bool,
pending_chunks: BTreeSet<Vec<u8>>,
/// Number of processed chunks in current prefix (Path digest)
num_processed_chunks: usize,
Expand Down Expand Up @@ -77,6 +79,8 @@ impl<'tx> SubtreeStateSyncInfo<'tx> {
pub fn new(restorer: Restorer<PrefixedRocksDbImmediateStorageContext<'tx>>) -> Self {
SubtreeStateSyncInfo {
restorer,
root_key: None,
is_sum_tree: false,
pending_chunks: Default::default(),
num_processed_chunks: 0,
}
Expand Down Expand Up @@ -138,7 +142,7 @@ impl<'db> MultiStateSyncSession<'db> {
hash: CryptoHash,
actual_hash: Option<CryptoHash>,
chunk_prefix: [u8; 32],
) -> Result<(), Error> {
) -> Result<(Vec<u8>), Error> {
// SAFETY: we get an immutable reference of a transaction that stays behind
// `Pin` so this reference shall remain valid for the whole session
// object lifetime.
Expand All @@ -148,14 +152,17 @@ impl<'db> MultiStateSyncSession<'db> {
&*(tx as *mut _)
};

if let Ok(merk) = db.open_merk_for_replication(path, transaction_ref) {
if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path, transaction_ref) {
let restorer = Restorer::new(merk, hash, actual_hash);
let mut sync_info = SubtreeStateSyncInfo::new(restorer);
sync_info.pending_chunks.insert(vec![]);
sync_info.root_key = root_key.clone();
sync_info.is_sum_tree = is_sum_tree;
self.as_mut()
.current_prefixes()
.insert(chunk_prefix, sync_info);
Ok(())
let x = util_create_global_chunk_id_2(chunk_prefix, root_key, is_sum_tree, vec![]);
Ok((x))
} else {
Err(Error::InternalError("Unable to open merk for replication"))
}
Expand Down Expand Up @@ -201,7 +208,10 @@ impl<'db> MultiStateSyncSession<'db> {

let mut next_chunk_ids = vec![];

let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?;
// [OLD_WAY]
//let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?;
// [NEW_WAY]
let (chunk_prefix, _, _, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &self.app_hash)?;

if self.is_empty() {
return Err(Error::InternalError("GroveDB is not in syncing mode"));
Expand All @@ -217,9 +227,13 @@ impl<'db> MultiStateSyncSession<'db> {

if !res.is_empty() {
for local_chunk_id in res.iter() {
let mut next_global_chunk_id = chunk_prefix.to_vec();
next_global_chunk_id.extend(local_chunk_id.to_vec());
next_chunk_ids.push(next_global_chunk_id);
// [NEW_WAY]
let x = util_create_global_chunk_id_2(chunk_prefix, subtree_state_sync.root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone());
next_chunk_ids.push(x);
// [OLD_WAY]
//let mut next_global_chunk_id = chunk_prefix.to_vec();
//next_global_chunk_id.extend(local_chunk_id.to_vec());
//next_chunk_ids.push(next_global_chunk_id);
}

Ok(next_chunk_ids)
Expand Down Expand Up @@ -284,14 +298,20 @@ impl<'db> MultiStateSyncSession<'db> {
util_path_to_string(&prefix_metadata.0)
);

self.add_subtree_sync_info(
let x = self.add_subtree_sync_info(
db,
path.into(),
elem_value_hash.clone(),
Some(actual_value_hash.clone()),
prefix.clone(),
)?;
res.push(prefix.to_vec());

// [NEW_WAY]
res.push(x);
// [OLD_WAY]
//let root_chunk_prefix = prefix.to_vec();
//res.push(root_chunk_prefix.to_vec());
//res.push(prefix.to_vec());
}
}

Expand Down
3 changes: 2 additions & 1 deletion tutorials/src/bin/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ fn sync_db_demo(
chunk_queue.push_back(app_hash.to_vec());

while let Some(chunk_id) = chunk_queue.pop_front() {
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?;
//let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?;
let ops = source_db.fetch_chunk_2(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?;
let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?;
chunk_queue.extend(more_chunks);
}
Expand Down

0 comments on commit 7628068

Please sign in to comment.