From f1c212c7a8f38599f15e9928a5f8402f5e14fb97 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 3 Jun 2024 22:31:47 +0300 Subject: [PATCH] cherry-picked #301 --- grovedb/src/replication.rs | 30 +++++++++++-------- grovedb/src/replication/state_sync_session.rs | 14 +++++---- tutorials/src/bin/replication.rs | 7 +++-- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 93c42a29..3b8a860a 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -19,8 +19,8 @@ pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; #[cfg(feature = "full")] impl GroveDb { - pub fn start_syncing_session(&self) -> Pin> { - MultiStateSyncSession::new(self.start_transaction()) + pub fn start_syncing_session(&self, app_hash:[u8; 32]) -> Pin> { + MultiStateSyncSession::new(self.start_transaction(), app_hash) } pub fn commit_session(&self, session: Pin>) { @@ -130,11 +130,9 @@ impl GroveDb { )); } - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; + let root_app_hash = self.root_hash(tx).value?; + let (chunk_prefix_key, chunk_id) = + util_split_global_chunk_id(global_chunk_id, root_app_hash)?; let subtrees_metadata = self.get_subtrees_metadata(tx)?; @@ -157,7 +155,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -187,7 +185,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -219,7 +217,7 @@ impl GroveDb { &'db self, app_hash: CryptoHash, version: u16, - ) -> Result<(Vec>, Pin>>), Error> { + ) -> Result>>, Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( @@ -231,10 +229,11 @@ impl GroveDb { let root_prefix = [0u8; 32]; - let mut session = self.start_syncing_session(); + let mut session = self.start_syncing_session(app_hash); + session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?; - Ok((vec![root_prefix.to_vec()], session)) + Ok(session) } } @@ -255,6 +254,7 @@ pub fn util_path_to_string(path: &[Vec]) -> Vec { // Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] pub fn util_split_global_chunk_id( global_chunk_id: &[u8], + app_hash: [u8; 32], ) -> Result<(crate::SubtreePrefix, Vec), Error> { let chunk_prefix_length: usize = 32; if global_chunk_id.len() < chunk_prefix_length { @@ -263,6 +263,12 @@ pub fn util_split_global_chunk_id( )); } + if global_chunk_id == app_hash { + let array_of_zeros: [u8; 32] = [0; 32]; + let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros; + return Ok((root_chunk_prefix_key, vec![])); + } + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); let mut array = [0u8; 32]; array.copy_from_slice(chunk_prefix); diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index bfc12b51..b8d23e0b 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -90,6 +90,8 @@ pub struct MultiStateSyncSession<'db> { current_prefixes: BTreeMap>, // Set of processed prefixes (Path digests) processed_prefixes: BTreeSet, + // Root app_hash + app_hash: [u8; 32], // Version of state sync protocol, pub(crate) version: u16, // Transaction goes last to be dropped last as well @@ -99,11 +101,12 @@ pub struct MultiStateSyncSession<'db> { impl<'db> MultiStateSyncSession<'db> { /// Initializes a new state sync session. - pub fn new(transaction: Transaction<'db>) -> Pin> { + pub fn new(transaction: Transaction<'db>, app_hash:[u8; 32]) -> Pin> { Box::pin(MultiStateSyncSession { transaction, current_prefixes: Default::default(), processed_prefixes: Default::default(), + app_hash, version: CURRENT_STATE_SYNC_VERSION, _pin: PhantomPinned, }) @@ -180,7 +183,8 @@ impl<'db> MultiStateSyncSession<'db> { pub fn apply_chunk( self: &mut Pin>>, db: &'db GroveDb, - chunk: (&[u8], Vec), + global_chunk_id: &[u8], + chunk: Vec, version: u16, ) -> Result>, Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported @@ -197,8 +201,8 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id)?; + let (chunk_prefix, chunk_id) = + util_split_global_chunk_id(global_chunk_id, self.app_hash)?; if self.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); @@ -208,7 +212,7 @@ impl<'db> MultiStateSyncSession<'db> { let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { return Err(Error::InternalError("Unable to process incoming chunk")); }; - let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk_data) else { + let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk) else { return Err(Error::InternalError("Invalid incoming prefix")); }; diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index 440cb64e..f8dab81b 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -240,15 +240,16 @@ fn sync_db_demo( target_db: &GroveDb, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); - let (chunk_ids, mut session) = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; + let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; let mut chunk_queue : VecDeque> = VecDeque::new(); - chunk_queue.extend(chunk_ids); + // The very first chunk to fetch is always identified by the root app_hash + chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let more_chunks = session.apply_chunk(&target_db, (chunk_id.as_slice(), ops), CURRENT_STATE_SYNC_VERSION)?; + let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); }