Skip to content

Commit

Permalink
feat: State sync APIs should use encoded Ops (#297)
Browse files Browse the repository at this point in the history
* state sync APIs use encoded ops

* fmt

* clippy

* fmt

* refactor
  • Loading branch information
ogabrielides authored May 13, 2024
1 parent 5b67055 commit 65a7512
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 19 deletions.
88 changes: 69 additions & 19 deletions grovedb/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::{
};

use grovedb_merk::{
ed::Encode,
merk::restore::Restorer,
proofs::Op,
proofs::{Decoder, Op},
tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash},
ChunkProducer,
};
Expand Down Expand Up @@ -111,6 +112,32 @@ pub fn util_split_global_chunk_id(
Ok((chunk_prefix_key, chunk_id.to_vec()))
}

pub fn util_encode_vec_ops(chunk: Vec<Op>) -> Result<Vec<u8>, Error> {
let mut res = vec![];
for op in chunk {
op.encode_into(&mut res)
.map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?;
}
Ok(res)
}

pub fn util_decode_vec_ops(chunk: Vec<u8>) -> Result<Vec<Op>, Error> {
let decoder = Decoder::new(&chunk);
let mut res = vec![];
for op in decoder {
match op {
Ok(op) => res.push(op),
Err(e) => {
return Err(Error::CorruptedData(format!(
"unable to decode chunk: {}",
e
)));
}
}
}
Ok(res)
}

#[cfg(feature = "full")]
impl GroveDb {
fn create_subtree_state_sync_info(&self) -> SubtreeStateSyncInfo {
Expand Down Expand Up @@ -213,13 +240,13 @@ impl GroveDb {
// "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization
// as a subtree can be big hence traversal instructions for the deepest chunks
// tx: Transaction. Function returns the data by opening merks at given tx.
// Returns the Chunk proof operators for the requested chunk
// Returns the Chunk proof operators for the requested chunk encoded in bytes
pub fn fetch_chunk(
&self,
global_chunk_id: &[u8],
tx: TransactionArg,
version: u16,
) -> Result<Vec<Op>, Error> {
) -> Result<Vec<u8>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
Expand Down Expand Up @@ -263,7 +290,12 @@ impl GroveDb {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer.chunk(chunk_id);
match chunk_res {
Ok((chunk, _)) => Ok(chunk),
Ok((chunk, _)) => match util_encode_vec_ops(chunk) {
Ok(op_bytes) => Ok(op_bytes),
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
},
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
Expand All @@ -288,7 +320,12 @@ impl GroveDb {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer.chunk(chunk_id);
match chunk_res {
Ok((chunk, _)) => Ok(chunk),
Ok((chunk, _)) => match util_encode_vec_ops(chunk) {
Ok(op_bytes) => Ok(op_bytes),
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
},
Err(_) => Err(Error::CorruptedData(
"Unable to create to load chunk".to_string(),
)),
Expand Down Expand Up @@ -367,14 +404,14 @@ impl GroveDb {
// Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is
// called) Params:
// state_sync_info: Consumed MultiStateSyncInfo
// chunk: (Global chunk id, Chunk proof operators)
// chunk: (Global chunk id, Chunk proof operators encoded in bytes)
// tx: Transaction for the state sync
// Returns the next set of global chunk ids that can be fetched from sources (+
// the MultiStateSyncInfo transferring ownership back to the caller)
pub fn apply_chunk<'db>(
&'db self,
mut state_sync_info: MultiStateSyncInfo<'db>,
chunk: (&[u8], Vec<Op>),
chunk: (&[u8], Vec<u8>),
tx: &'db Transaction,
version: u16,
) -> Result<(Vec<Vec<u8>>, MultiStateSyncInfo), Error> {
Expand Down Expand Up @@ -467,14 +504,14 @@ impl GroveDb {
// Apply a chunk using the given SubtreeStateSyncInfo
// state_sync_info: Consumed SubtreeStateSyncInfo
// chunk_id: Local chunk id
// chunk_data: Chunk proof operators
// chunk_data: Chunk proof operators encoded in bytes
// Returns the next set of global chunk ids that can be fetched from sources (+
// the SubtreeStateSyncInfo transferring ownership back to the caller)
fn apply_inner_chunk<'db>(
&'db self,
mut state_sync_info: SubtreeStateSyncInfo<'db>,
chunk_id: &[u8],
chunk_data: Vec<Op>,
chunk_data: Vec<u8>,
) -> Result<(Vec<Vec<u8>>, SubtreeStateSyncInfo), Error> {
let mut res = vec![];

Expand All @@ -487,18 +524,31 @@ impl GroveDb {
}
state_sync_info.pending_chunks.remove(chunk_id);
if !chunk_data.is_empty() {
match restorer.process_chunk(chunk_id, chunk_data) {
Ok(next_chunk_ids) => {
state_sync_info.num_processed_chunks += 1;
for next_chunk_id in next_chunk_ids {
state_sync_info.pending_chunks.insert(next_chunk_id.clone());
res.push(next_chunk_id);
}
match util_decode_vec_ops(chunk_data) {
Ok(ops) => {
match restorer.process_chunk(chunk_id, ops) {
Ok(next_chunk_ids) => {
state_sync_info.num_processed_chunks += 1;
for next_chunk_id in next_chunk_ids {
state_sync_info
.pending_chunks
.insert(next_chunk_id.clone());
res.push(next_chunk_id);
}
}
_ => {
return Err(Error::InternalError(
"Unable to process incoming chunk",
));
}
};
}
_ => {
return Err(Error::InternalError("Unable to process incoming chunk"));
Err(_) => {
return Err(Error::CorruptedData(
"Unable to decode incoming chunk".to_string(),
));
}
};
}
}
}
_ => {
Expand Down
19 changes: 19 additions & 0 deletions merk/src/proofs/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ impl<'a> Iterator for Decoder<'a> {
mod test {
use super::super::{Node, Op};
use crate::{
proofs::Decoder,
tree::HASH_LENGTH,
TreeFeatureType::{BasicMerkNode, SummedMerkNode},
};
Expand Down Expand Up @@ -994,6 +995,24 @@ mod test {
assert_eq!(op, Op::Child);
}

#[test]
fn decode_multiple_child() {
let bytes = [0x11, 0x11, 0x11, 0x10];
let mut decoder = Decoder {
bytes: &bytes,
offset: 0,
};

let mut vecop = vec![];
for op in decoder {
match op {
Ok(op) => vecop.push(op),
Err(e) => eprintln!("Error decoding: {:?}", e),
}
}
assert_eq!(vecop, vec![Op::Child, Op::Child, Op::Child, Op::Parent]);
}

#[test]
fn decode_parent_inverted() {
let bytes = [0x12];
Expand Down

0 comments on commit 65a7512

Please sign in to comment.