Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/chunk lifetimes #300

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 85 additions & 6 deletions grovedb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ use grovedb_merk::{
tree::{combine_hash, value_hash},
BatchEntry, CryptoHash, KVIterator, Merk,
};
use grovedb_merk::ChunkProducer;
use grovedb_path::SubtreePath;
#[cfg(feature = "full")]
use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext;
Expand All @@ -223,6 +224,7 @@ pub use crate::error::Error;
#[cfg(feature = "full")]
use crate::util::{root_merk_optional_tx, storage_context_optional_tx};
use crate::Error::MerkError;
use crate::replication::util_encode_vec_ops;

#[cfg(feature = "full")]
type Hash = [u8; 32];
Expand Down Expand Up @@ -317,13 +319,48 @@ impl GroveDb {
}
}

fn open_transactional_merk_by_prefix<'db>(
&'db self,
prefix: SubtreePrefix,
root_key: Option<Vec<u8>>,
is_sum_tree: bool,
tx: &'db Transaction,
batch: Option<&'db StorageBatch>,
) -> CostResult<Merk<PrefixedRocksDbTransactionContext>, Error>
{
let mut cost = OperationCost::default();
let storage = self
.db
.get_transactional_storage_context_by_subtree_prefix(prefix, batch, tx)
.unwrap_add_cost(&mut cost);
if root_key.is_some() {
Merk::open_layered_with_root_key(
storage,
root_key,
is_sum_tree,
Some(&Element::value_defined_cost_for_serialized_value),
).map_err(|_| {
Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned())
}).add_cost(cost)
}
else {
Merk::open_base(
storage,
false,
Some(&Element::value_defined_cost_for_serialized_value),
).map_err(|_| {
Error::CorruptedData("cannot open a root subtree by prefix".to_owned())
}).add_cost(cost)
}
}

/// Opens a Merk at given path for with direct write access. Intended for
/// replication purposes.
fn open_merk_for_replication<'db, 'b, B>(
fn open_merk_for_replication<'tx, 'db: 'tx, 'b, B>(
&'db self,
path: SubtreePath<'b, B>,
tx: &'db Transaction,
) -> Result<Merk<PrefixedRocksDbImmediateStorageContext<'db>>, Error>
tx: &'tx Transaction<'db>,
) -> Result<(Merk<PrefixedRocksDbImmediateStorageContext<'tx>>, Option<Vec<u8>>, bool), Error>
where
B: AsRef<[u8]> + 'b,
{
Expand All @@ -350,29 +387,37 @@ impl GroveDb {
.unwrap()?;
let is_sum_tree = element.is_sum_tree();
if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element {
Ok((
Merk::open_layered_with_root_key(
storage,
root_key,
root_key.clone(),
is_sum_tree,
Some(&Element::value_defined_cost_for_serialized_value),
)
.map_err(|_| {
Error::CorruptedData("cannot open a subtree with given root key".to_owned())
})
.unwrap()
.unwrap()?,
root_key,
is_sum_tree
))
} else {
Err(Error::CorruptedPath(
"cannot open a subtree as parent exists but is not a tree",
))
}
} else {
Ok((
Merk::open_base(
storage,
false,
None::<&fn(&[u8]) -> Option<ValueDefinedCostType>>,
)
.map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned()))
.unwrap()
.unwrap()?,
None,
false
))
}
}

Expand Down Expand Up @@ -437,6 +482,40 @@ impl GroveDb {
}
}

fn open_non_transactional_merk_by_prefix<'db>(
&'db self,
prefix: SubtreePrefix,
root_key: Option<Vec<u8>>,
is_sum_tree: bool,
batch: Option<&'db StorageBatch>,
) -> CostResult<Merk<PrefixedRocksDbStorageContext>, Error>
{
let mut cost = OperationCost::default();
let storage = self
.db
.get_storage_context_by_subtree_prefix(prefix, batch)
.unwrap_add_cost(&mut cost);
if root_key.is_some() {
Merk::open_layered_with_root_key(
storage,
root_key,
is_sum_tree,
Some(&Element::value_defined_cost_for_serialized_value),
).map_err(|_| {
Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned())
}).add_cost(cost)
}
else {
Merk::open_base(
storage,
false,
Some(&Element::value_defined_cost_for_serialized_value),
).map_err(|_| {
Error::CorruptedData("cannot open a root subtree by prefix".to_owned())
}).add_cost(cost)
}
}

/// Creates a checkpoint
pub fn create_checkpoint<P: AsRef<Path>>(&self, path: P) -> Result<(), Error> {
self.db.create_checkpoint(path).map_err(|e| e.into())
Expand Down
45 changes: 45 additions & 0 deletions grovedb/src/operations/auxiliary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,49 @@ impl GroveDb {
}
Ok(result).wrap_with_cost(cost)
}

/// Finds keys which are trees for a given subtree.
/// One element means a key of a `merk`, n > 1 elements mean relative path
/// for a deeply nested subtree.
pub fn find_subtrees_non_recursive<B: AsRef<[u8]>>(
&self,
path: &SubtreePath<B>,
transaction: TransactionArg,
) -> CostResult<Vec<Vec<Vec<u8>>>, Error> {
let mut cost = OperationCost::default();

// TODO: remove conversion to vec;
// However, it's not easy for a reason:
// new keys to enqueue are taken from raw iterator which returns Vec<u8>;
// changing that to slice is hard as cursor should be moved for next iteration
// which requires exclusive (&mut) reference, also there is no guarantee that
// slice which points into storage internals will remain valid if raw
// iterator got altered so why that reference should be exclusive;
//
// Update: there are pinned views into RocksDB to return slices of data, perhaps
// there is something for iterators

let mut queue: Vec<Vec<Vec<u8>>> = vec![path.to_vec()];
let mut result: Vec<Vec<Vec<u8>>> = queue.clone();

while let Some(q) = queue.pop() {
let subtree_path: SubtreePath<Vec<u8>> = q.as_slice().into();
// Get the correct subtree with q_ref as path
storage_context_optional_tx!(self.db, subtree_path, None, transaction, storage, {
let storage = storage.unwrap_add_cost(&mut cost);
let mut raw_iter = Element::iterator(storage.raw_iter()).unwrap_add_cost(&mut cost);
while let Some((key, value)) =
cost_return_on_error!(&mut cost, raw_iter.next_element())
{
if value.is_tree() {
let mut sub_path = q.clone();
sub_path.push(key.to_vec());
queue.push(sub_path.clone());
result.push(sub_path);
}
}
})
}
Ok(result).wrap_with_cost(cost)
}
}
Loading