Skip to content

Commit

Permalink
cherry-picked #301
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielides committed Jun 3, 2024
1 parent 9b26d25 commit f1c212c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
30 changes: 18 additions & 12 deletions grovedb/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub const CURRENT_STATE_SYNC_VERSION: u16 = 1;

#[cfg(feature = "full")]
impl GroveDb {
pub fn start_syncing_session(&self) -> Pin<Box<MultiStateSyncSession>> {
MultiStateSyncSession::new(self.start_transaction())
pub fn start_syncing_session(&self, app_hash:[u8; 32]) -> Pin<Box<MultiStateSyncSession>> {
MultiStateSyncSession::new(self.start_transaction(), app_hash)
}

pub fn commit_session(&self, session: Pin<Box<MultiStateSyncSession>>) {
Expand Down Expand Up @@ -130,11 +130,9 @@ impl GroveDb {
));
}

let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length);

let mut array = [0u8; 32];
array.copy_from_slice(chunk_prefix);
let chunk_prefix_key: crate::SubtreePrefix = array;
let root_app_hash = self.root_hash(tx).value?;
let (chunk_prefix_key, chunk_id) =
util_split_global_chunk_id(global_chunk_id, root_app_hash)?;

let subtrees_metadata = self.get_subtrees_metadata(tx)?;

Expand All @@ -157,7 +155,7 @@ impl GroveDb {
let chunk_producer_res = ChunkProducer::new(&merk);
match chunk_producer_res {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer.chunk(chunk_id);
let chunk_res = chunk_producer.chunk(&chunk_id);
match chunk_res {
Ok((chunk, _)) => match util_encode_vec_ops(chunk) {
Ok(op_bytes) => Ok(op_bytes),
Expand Down Expand Up @@ -187,7 +185,7 @@ impl GroveDb {
let chunk_producer_res = ChunkProducer::new(&merk);
match chunk_producer_res {
Ok(mut chunk_producer) => {
let chunk_res = chunk_producer.chunk(chunk_id);
let chunk_res = chunk_producer.chunk(&chunk_id);
match chunk_res {
Ok((chunk, _)) => match util_encode_vec_ops(chunk) {
Ok(op_bytes) => Ok(op_bytes),
Expand Down Expand Up @@ -219,7 +217,7 @@ impl GroveDb {
&'db self,
app_hash: CryptoHash,
version: u16,
) -> Result<(Vec<Vec<u8>>, Pin<Box<MultiStateSyncSession<'db>>>), Error> {
) -> Result<Pin<Box<MultiStateSyncSession<'db>>>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
Expand All @@ -231,10 +229,11 @@ impl GroveDb {

let root_prefix = [0u8; 32];

let mut session = self.start_syncing_session();
let mut session = self.start_syncing_session(app_hash);

session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?;

Ok((vec![root_prefix.to_vec()], session))
Ok(session)
}
}

Expand All @@ -255,6 +254,7 @@ pub fn util_path_to_string(path: &[Vec<u8>]) -> Vec<String> {
// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID]
pub fn util_split_global_chunk_id(
global_chunk_id: &[u8],
app_hash: [u8; 32],
) -> Result<(crate::SubtreePrefix, Vec<u8>), Error> {
let chunk_prefix_length: usize = 32;
if global_chunk_id.len() < chunk_prefix_length {
Expand All @@ -263,6 +263,12 @@ pub fn util_split_global_chunk_id(
));
}

if global_chunk_id == app_hash {
let array_of_zeros: [u8; 32] = [0; 32];
let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros;
return Ok((root_chunk_prefix_key, vec![]));
}

let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length);
let mut array = [0u8; 32];
array.copy_from_slice(chunk_prefix);
Expand Down
14 changes: 9 additions & 5 deletions grovedb/src/replication/state_sync_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub struct MultiStateSyncSession<'db> {
current_prefixes: BTreeMap<SubtreePrefix, SubtreeStateSyncInfo<'db>>,
// Set of processed prefixes (Path digests)
processed_prefixes: BTreeSet<SubtreePrefix>,
// Root app_hash
app_hash: [u8; 32],
// Version of state sync protocol,
pub(crate) version: u16,
// Transaction goes last to be dropped last as well
Expand All @@ -99,11 +101,12 @@ pub struct MultiStateSyncSession<'db> {

impl<'db> MultiStateSyncSession<'db> {
/// Initializes a new state sync session.
pub fn new(transaction: Transaction<'db>) -> Pin<Box<Self>> {
pub fn new(transaction: Transaction<'db>, app_hash:[u8; 32]) -> Pin<Box<Self>> {
Box::pin(MultiStateSyncSession {
transaction,
current_prefixes: Default::default(),
processed_prefixes: Default::default(),
app_hash,
version: CURRENT_STATE_SYNC_VERSION,
_pin: PhantomPinned,
})
Expand Down Expand Up @@ -180,7 +183,8 @@ impl<'db> MultiStateSyncSession<'db> {
pub fn apply_chunk(
self: &mut Pin<Box<MultiStateSyncSession<'db>>>,
db: &'db GroveDb,
chunk: (&[u8], Vec<u8>),
global_chunk_id: &[u8],
chunk: Vec<u8>,
version: u16,
) -> Result<Vec<Vec<u8>>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
Expand All @@ -197,8 +201,8 @@ impl<'db> MultiStateSyncSession<'db> {

let mut next_chunk_ids = vec![];

let (global_chunk_id, chunk_data) = chunk;
let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id)?;
let (chunk_prefix, chunk_id) =
util_split_global_chunk_id(global_chunk_id, self.app_hash)?;

if self.is_empty() {
return Err(Error::InternalError("GroveDB is not in syncing mode"));
Expand All @@ -208,7 +212,7 @@ impl<'db> MultiStateSyncSession<'db> {
let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else {
return Err(Error::InternalError("Unable to process incoming chunk"));
};
let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk_data) else {
let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk) else {
return Err(Error::InternalError("Invalid incoming prefix"));
};

Expand Down
7 changes: 4 additions & 3 deletions tutorials/src/bin/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,16 @@ fn sync_db_demo(
target_db: &GroveDb,
) -> Result<(), grovedb::Error> {
let app_hash = source_db.root_hash(None).value.unwrap();
let (chunk_ids, mut session) = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?;
let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?;

let mut chunk_queue : VecDeque<Vec<u8>> = VecDeque::new();

chunk_queue.extend(chunk_ids);
// The very first chunk to fetch is always identified by the root app_hash
chunk_queue.push_back(app_hash.to_vec());

while let Some(chunk_id) = chunk_queue.pop_front() {
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?;
let more_chunks = session.apply_chunk(&target_db, (chunk_id.as_slice(), ops), CURRENT_STATE_SYNC_VERSION)?;
let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION)?;
chunk_queue.extend(more_chunks);
}

Expand Down

0 comments on commit f1c212c

Please sign in to comment.