From 65a75120fbc600fee8b2450833f69d7806364ee4 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 13 May 2024 18:21:57 +0300 Subject: [PATCH] feat: State sync APIs should use encoded Ops (#297) * state sync APIs use encoded ops * fmt * clippy * fmt * refactor --- grovedb/src/replication.rs | 88 +++++++++++++++++++++++++++++-------- merk/src/proofs/encoding.rs | 19 ++++++++ 2 files changed, 88 insertions(+), 19 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 6cb9e3fb..c42d6b7c 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -4,8 +4,9 @@ use std::{ }; use grovedb_merk::{ + ed::Encode, merk::restore::Restorer, - proofs::Op, + proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, }; @@ -111,6 +112,32 @@ pub fn util_split_global_chunk_id( Ok((chunk_prefix_key, chunk_id.to_vec())) } +pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { + let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) +} + +pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); + } + } + } + Ok(res) +} + #[cfg(feature = "full")] impl GroveDb { fn create_subtree_state_sync_info(&self) -> SubtreeStateSyncInfo { @@ -213,13 +240,13 @@ impl GroveDb { // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization // as a subtree can be big hence traversal instructions for the deepest chunks // tx: Transaction. Function returns the data by opening merks at given tx. - // Returns the Chunk proof operators for the requested chunk + // Returns the Chunk proof operators for the requested chunk encoded in bytes pub fn fetch_chunk( &self, global_chunk_id: &[u8], tx: TransactionArg, version: u16, - ) -> Result, Error> { + ) -> Result, Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( @@ -263,7 +290,12 @@ impl GroveDb { Ok(mut chunk_producer) => { let chunk_res = chunk_producer.chunk(chunk_id); match chunk_res { - Ok((chunk, _)) => Ok(chunk), + Ok((chunk, _)) => match util_encode_vec_ops(chunk) { + Ok(op_bytes) => Ok(op_bytes), + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + }, Err(_) => Err(Error::CorruptedData( "Unable to create to load chunk".to_string(), )), @@ -288,7 +320,12 @@ impl GroveDb { Ok(mut chunk_producer) => { let chunk_res = chunk_producer.chunk(chunk_id); match chunk_res { - Ok((chunk, _)) => Ok(chunk), + Ok((chunk, _)) => match util_encode_vec_ops(chunk) { + Ok(op_bytes) => Ok(op_bytes), + Err(_) => Err(Error::CorruptedData( + "Unable to create to load chunk".to_string(), + )), + }, Err(_) => Err(Error::CorruptedData( "Unable to create to load chunk".to_string(), )), @@ -367,14 +404,14 @@ impl GroveDb { // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is // called) Params: // state_sync_info: Consumed MultiStateSyncInfo - // chunk: (Global chunk id, Chunk proof operators) + // chunk: (Global chunk id, Chunk proof operators encoded in bytes) // tx: Transaction for the state sync // Returns the next set of global chunk ids that can be fetched from sources (+ // the MultiStateSyncInfo transferring ownership back to the caller) pub fn apply_chunk<'db>( &'db self, mut state_sync_info: MultiStateSyncInfo<'db>, - chunk: (&[u8], Vec), + chunk: (&[u8], Vec), tx: &'db Transaction, version: u16, ) -> Result<(Vec>, MultiStateSyncInfo), Error> { @@ -467,14 +504,14 @@ impl GroveDb { // Apply a chunk using the given SubtreeStateSyncInfo // state_sync_info: Consumed SubtreeStateSyncInfo // chunk_id: Local chunk id - // chunk_data: Chunk proof operators + // chunk_data: Chunk proof operators encoded in bytes // Returns the next set of global chunk ids that can be fetched from sources (+ // the SubtreeStateSyncInfo transferring ownership back to the caller) fn apply_inner_chunk<'db>( &'db self, mut state_sync_info: SubtreeStateSyncInfo<'db>, chunk_id: &[u8], - chunk_data: Vec, + chunk_data: Vec, ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { let mut res = vec![]; @@ -487,18 +524,31 @@ impl GroveDb { } state_sync_info.pending_chunks.remove(chunk_id); if !chunk_data.is_empty() { - match restorer.process_chunk(chunk_id, chunk_data) { - Ok(next_chunk_ids) => { - state_sync_info.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - state_sync_info.pending_chunks.insert(next_chunk_id.clone()); - res.push(next_chunk_id); - } + match util_decode_vec_ops(chunk_data) { + Ok(ops) => { + match restorer.process_chunk(chunk_id, ops) { + Ok(next_chunk_ids) => { + state_sync_info.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + state_sync_info + .pending_chunks + .insert(next_chunk_id.clone()); + res.push(next_chunk_id); + } + } + _ => { + return Err(Error::InternalError( + "Unable to process incoming chunk", + )); + } + }; } - _ => { - return Err(Error::InternalError("Unable to process incoming chunk")); + Err(_) => { + return Err(Error::CorruptedData( + "Unable to decode incoming chunk".to_string(), + )); } - }; + } } } _ => { diff --git a/merk/src/proofs/encoding.rs b/merk/src/proofs/encoding.rs index 6b5f95b0..b0e31484 100644 --- a/merk/src/proofs/encoding.rs +++ b/merk/src/proofs/encoding.rs @@ -464,6 +464,7 @@ impl<'a> Iterator for Decoder<'a> { mod test { use super::super::{Node, Op}; use crate::{ + proofs::Decoder, tree::HASH_LENGTH, TreeFeatureType::{BasicMerkNode, SummedMerkNode}, }; @@ -994,6 +995,24 @@ mod test { assert_eq!(op, Op::Child); } + #[test] + fn decode_multiple_child() { + let bytes = [0x11, 0x11, 0x11, 0x10]; + let mut decoder = Decoder { + bytes: &bytes, + offset: 0, + }; + + let mut vecop = vec![]; + for op in decoder { + match op { + Ok(op) => vecop.push(op), + Err(e) => eprintln!("Error decoding: {:?}", e), + } + } + assert_eq!(vecop, vec![Op::Child, Op::Child, Op::Child, Op::Parent]); + } + #[test] fn decode_parent_inverted() { let bytes = [0x12];