Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielides committed Apr 30, 2024
1 parent 884a8ee commit 01da079
Show file tree
Hide file tree
Showing 2 changed files with 293 additions and 283 deletions.
281 changes: 0 additions & 281 deletions grovedb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,285 +1025,4 @@ impl GroveDb {
}
Ok(issues)
}

// Returns the discovered subtrees found recursively along with their associated
// metadata Params:
// tx: Transaction. Function returns the data by opening merks at given tx.
// TODO: Add a SubTreePath as param and start searching from that path instead
// of root (as it is now)
pub fn get_subtrees_metadata<'db>(
&'db self,
tx: &'db Transaction,
) -> Result<SubtreesMetadata, Error> {
let mut subtrees_metadata = crate::SubtreesMetadata::new();

let subtrees_root = self.find_subtrees(&SubtreePath::empty(), Some(tx)).value?;
for subtree in subtrees_root.into_iter() {
let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;
let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap();

let current_path = SubtreePath::from(path);

let parent_path_opt = current_path.derive_parent();
if parent_path_opt.is_some() {
let parent_path = parent_path_opt.unwrap().0;
let parent_merk = self
.open_transactional_merk_at_path(parent_path, tx, None)
.value?;
let parent_key = subtree.last().unwrap();
let (elem_value, elem_value_hash) = parent_merk
.get_value_and_value_hash(
parent_key,
true,
None::<&fn(&[u8]) -> Option<ValueDefinedCostType>>,
)
.value
.expect("should get value hash")
.expect("value hash should be some");

let actual_value_hash = value_hash(&elem_value).unwrap();
subtrees_metadata.data.insert(
prefix,
(current_path.to_vec(), actual_value_hash, elem_value_hash),
);
} else {
subtrees_metadata.data.insert(
prefix,
(
current_path.to_vec(),
CryptoHash::default(),
CryptoHash::default(),
),
);
}
}
Ok(subtrees_metadata)
}

// Fetch a chunk by global chunk id (should be called by ABCI when
// LoadSnapshotChunk method is called) Params:
// global_chunk_id: Global chunk id in the following format:
// [SUBTREE_PREFIX:CHUNK_ID] SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros
// = Root subtree) CHUNK_ID: 0.. bytes (optional) Traversal instructions to
// the root of the given chunk. Traversal instructions are "1" for left, and
// "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization
// as a subtree can be big hence traversal instructions for the deepest chunks
// tx: Transaction. Function returns the data by opening merks at given tx.
// TODO: Make this tx optional: None -> Use latest data
// Returns the Chunk proof operators for the requested chunk
pub fn fetch_chunk<'db>(
&'db self,
global_chunk_id: &[u8],
tx: &'db Transaction,
) -> Result<Vec<Op>, Error> {
let chunk_prefix_length: usize = 32;
if global_chunk_id.len() < chunk_prefix_length {
return Err(Error::CorruptedData(
"expected global chunk id of at least 32 length".to_string(),
));
}

let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length);

let mut array = [0u8; 32];
array.copy_from_slice(chunk_prefix);
let chunk_prefix_key: SubtreePrefix = array;

let subtrees_metadata = self.get_subtrees_metadata(tx)?;

match subtrees_metadata.data.get(&chunk_prefix_key) {
Some(path_data) => {
let subtree = &path_data.0;
let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;

let merk = self
.open_transactional_merk_at_path(path.into(), tx, None)
.value?;

if merk.is_empty_tree().unwrap() {
return Ok(vec![]);
}

let chunk_producer_res = ChunkProducer::new(&merk);
match chunk_producer_res {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer
.chunk(String::from_utf8(chunk_id.to_vec()).unwrap().as_str());
match chunk_res {
Ok((chunk, _)) => Ok(chunk),
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
}
}
Err(_) => Err(Error::CorruptedData(
"Unable to create Chunk producer".to_string(),
)),
}
}
None => Err(Error::CorruptedData("Prefix not found".to_string())),
}
}

// Starts a state sync process (should be called by ABCI when OfferSnapshot
// method is called) Params:
// state_sync_info: Consumed StateSyncInfo
// app_hash: Snapshot's AppHash
// tx: Transaction for the state sync
// Returns the first set of global chunk ids that can be fetched from sources (+
// the StateSyncInfo transferring ownership back to the caller)
pub fn start_snapshot_syncing<'db>(
&'db self,
mut state_sync_info: StateSyncInfo<'db>,
app_hash: CryptoHash,
tx: &'db Transaction,
) -> Result<(Vec<Vec<u8>>, StateSyncInfo), Error> {
let mut res = vec![];

match (
&mut state_sync_info.restorer,
&state_sync_info.current_prefix,
) {
(None, None) => {
if state_sync_info.pending_chunks.is_empty()
&& state_sync_info.processed_prefixes.is_empty()
{
let root_prefix = [0u8; 32];
let merk = self
.open_merk_for_replication(SubtreePath::empty(), tx)
.unwrap();
let restorer = Restorer::new(merk, app_hash, None);
state_sync_info.restorer = Some(restorer);
state_sync_info.current_prefix = Some(root_prefix);
state_sync_info.pending_chunks.insert(root_prefix.to_vec());

res.push(root_prefix.to_vec());
} else {
return Err(Error::InternalError("Invalid internal state sync info"));
}
}
_ => {
return Err(Error::InternalError(
"GroveDB has already started a snapshot syncing",
));
}
}

Ok((res, state_sync_info))
}

// Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is
// called) Params:
// state_sync_info: Consumed StateSyncInfo
// chunk: (Global chunk id, Chunk proof operators)
// tx: Transaction for the state sync
// Returns the next set of global chunk ids that can be fetched from sources (+
// the StateSyncInfo transferring ownership back to the caller)
pub fn apply_chunk<'db>(
&'db self,
mut state_sync_info: StateSyncInfo<'db>,
chunk: (&[u8], Vec<Op>),
tx: &'db Transaction,
) -> Result<(Vec<Vec<u8>>, StateSyncInfo), Error> {
let mut res = vec![];

let (global_chunk_id, chunk_data) = chunk;
let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?;

match (
&mut state_sync_info.restorer,
&state_sync_info.current_prefix,
) {
(Some(restorer), Some(ref current_prefix)) => {
if *current_prefix != chunk_prefix {
return Err(Error::InternalError("Invalid incoming prefix"));
}
if !state_sync_info.pending_chunks.contains(global_chunk_id) {
return Err(Error::InternalError(
"Incoming global_chunk_id not expected",
));
}
state_sync_info.pending_chunks.remove(global_chunk_id);
if !chunk_data.is_empty() {
match restorer.process_chunk(chunk_id.to_string(), chunk_data) {
Ok(next_chunk_ids) => {
state_sync_info.num_processed_chunks += 1;
for next_chunk_id in next_chunk_ids {
let mut next_global_chunk_id = chunk_prefix.to_vec();
next_global_chunk_id.extend(next_chunk_id.as_bytes().to_vec());
state_sync_info
.pending_chunks
.insert(next_global_chunk_id.clone());
res.push(next_global_chunk_id);
}
}
_ => {
return Err(Error::InternalError("Unable to process incoming chunk"));
}
};
}
}
_ => {
return Err(Error::InternalError("GroveDB is not in syncing mode"));
}
}

if res.is_empty() {
if !state_sync_info.pending_chunks.is_empty() {
return Ok((res, state_sync_info));
}
match (
state_sync_info.restorer.take(),
state_sync_info.current_prefix.take(),
) {
(Some(restorer), Some(current_prefix)) => {
if (state_sync_info.num_processed_chunks > 0) && (restorer.finalize().is_err())
{
return Err(Error::InternalError("Unable to finalize merk"));
}
state_sync_info.processed_prefixes.insert(current_prefix);

let subtrees_metadata = self.get_subtrees_metadata(tx)?;
if let Some(value) = subtrees_metadata.data.get(&current_prefix) {
println!(
" path:{:?} done",
replication::util_path_to_string(&value.0)
);
}

for (prefix, prefix_metadata) in &subtrees_metadata.data {
if !state_sync_info.processed_prefixes.contains(prefix) {
let (current_path, s_actual_value_hash, s_elem_value_hash) =
&prefix_metadata;

let subtree_path: Vec<&[u8]> =
current_path.iter().map(|vec| vec.as_slice()).collect();
let path: &[&[u8]] = &subtree_path;

let merk = self.open_merk_for_replication(path.into(), tx).unwrap();
let restorer =
Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash));
state_sync_info.restorer = Some(restorer);
state_sync_info.current_prefix = Some(*prefix);
state_sync_info.num_processed_chunks = 0;

let root_chunk_prefix = prefix.to_vec();
state_sync_info
.pending_chunks
.insert(root_chunk_prefix.clone());
res.push(root_chunk_prefix);
break;
}
}
}
_ => {
return Err(Error::InternalError("Unable to finalize tree"));
}
}
}

Ok((res, state_sync_info))
}
}
Loading

0 comments on commit 01da079

Please sign in to comment.