Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clear subtree #272

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions grovedb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ pub enum Error {
/// Deleting non empty tree
DeletingNonEmptyTree(&'static str),

#[error("clearing tree with subtrees not allowed error: {0}")]
/// Clearing tree with subtrees not allowed
ClearingTreeWithSubtreesNotAllowed(&'static str),

// Client allowed errors
#[error("just in time element flags client error: {0}")]
/// Just in time element flags client error
Expand Down
317 changes: 315 additions & 2 deletions grovedb/src/operations/delete/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
storage_cost::removal::{StorageRemovedBytes, StorageRemovedBytes::BasicStorageRemoval},
CostResult, CostsExt, OperationCost,
};
use grovedb_merk::{proofs::Query, KVIterator};
#[cfg(feature = "full")]
use grovedb_merk::{Error as MerkError, Merk, MerkOptions};
use grovedb_path::SubtreePath;
Expand All @@ -55,13 +56,37 @@
Storage, StorageBatch, StorageContext,
};

use crate::util::merk_optional_tx_path_not_empty;
#[cfg(feature = "full")]
use crate::{
batch::{GroveDbOp, Op},
util::{storage_context_optional_tx, storage_context_with_parent_optional_tx},
Element, ElementFlags, Error, GroveDb, Transaction, TransactionArg,
};
use crate::{raw_decode, util::merk_optional_tx_path_not_empty};

#[cfg(feature = "full")]
#[derive(Clone)]
/// Clear options
pub struct ClearOptions {
/// Check for Subtrees
pub check_for_subtrees: bool,
/// Allow deleting non empty trees if we check for subtrees
pub allow_deleting_subtrees: bool,
/// If we check for subtrees, and we don't allow deleting and there are
/// some, should we error?
pub trying_to_clear_with_subtrees_returns_error: bool,
}

#[cfg(feature = "full")]
impl Default for ClearOptions {
fn default() -> Self {
ClearOptions {
check_for_subtrees: true,
allow_deleting_subtrees: false,
trying_to_clear_with_subtrees_returns_error: true,
}
}
}

#[cfg(feature = "full")]
#[derive(Clone)]
Expand Down Expand Up @@ -138,6 +163,180 @@
})
}

/// Delete all elements in a specified subtree
/// Returns if we successfully cleared the subtree
fn clear_subtree<'b, B, P>(

Check warning on line 168 in grovedb/src/operations/delete/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

methods `clear_subtree` and `clear_subtree_with_costs` are never used

warning: methods `clear_subtree` and `clear_subtree_with_costs` are never used --> grovedb/src/operations/delete/mod.rs:168:8 | 127 | impl GroveDb { | ------------ methods in this implementation ... 168 | fn clear_subtree<'b, B, P>( | ^^^^^^^^^^^^^ ... 186 | fn clear_subtree_with_costs<'b, B, P>( | ^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(dead_code)]` on by default
&self,
path: P,
options: Option<ClearOptions>,
transaction: TransactionArg,
) -> Result<bool, Error>
where
B: AsRef<[u8]> + 'b,
P: Into<SubtreePath<'b, B>>,
{
self.clear_subtree_with_costs(path, options, transaction)
.unwrap()
}

/// Delete all elements in a specified subtree and get back costs
/// Warning: The costs for this operation are not yet correct, hence we
/// should keep this private for now
/// Returns if we successfully cleared the subtree
fn clear_subtree_with_costs<'b, B, P>(
&self,
path: P,
options: Option<ClearOptions>,
transaction: TransactionArg,
) -> CostResult<bool, Error>
where
B: AsRef<[u8]> + 'b,
P: Into<SubtreePath<'b, B>>,
{
let subtree_path: SubtreePath<B> = path.into();
let mut cost = OperationCost::default();
let batch = StorageBatch::new();

let options = options.unwrap_or_default();

if let Some(transaction) = transaction {
let mut merk_to_clear = cost_return_on_error!(
&mut cost,
self.open_transactional_merk_at_path(
subtree_path.clone(),
transaction,
Some(&batch)
)
);

if options.check_for_subtrees {
let mut all_query = Query::new();
all_query.insert_all();

let mut element_iterator =
KVIterator::new(merk_to_clear.storage.raw_iter(), &all_query).unwrap();

// delete all nested subtrees
while let Some((key, element_value)) =
element_iterator.next_kv().unwrap_add_cost(&mut cost)
{
let element = raw_decode(&element_value).unwrap();
if element.is_tree() {
if options.allow_deleting_subtrees {
cost_return_on_error!(
&mut cost,
self.delete(
subtree_path.clone(),
key.as_slice(),
Some(DeleteOptions {
allow_deleting_non_empty_trees: true,
deleting_non_empty_trees_returns_error: false,
..Default::default()
}),
Some(transaction),
)
);
} else if options.trying_to_clear_with_subtrees_returns_error {
return Err(Error::ClearingTreeWithSubtreesNotAllowed(
"options do not allow to clear this merk tree as it contains \
subtrees",
))
.wrap_with_cost(cost);
} else {
return Ok(false).wrap_with_cost(cost);
}
}
}
}

// delete non subtree values
cost_return_on_error!(&mut cost, merk_to_clear.clear().map_err(Error::MerkError));

// propagate changes
let mut merk_cache: HashMap<SubtreePath<B>, Merk<PrefixedRocksDbTransactionContext>> =
HashMap::default();
merk_cache.insert(subtree_path.clone(), merk_to_clear);
cost_return_on_error!(
&mut cost,
self.propagate_changes_with_transaction(
merk_cache,
subtree_path.clone(),
transaction,
&batch,
)
);
} else {
let mut merk_to_clear = cost_return_on_error!(
&mut cost,
self.open_non_transactional_merk_at_path(subtree_path.clone(), Some(&batch))
);

if options.check_for_subtrees {
let mut all_query = Query::new();
all_query.insert_all();

let mut element_iterator =
KVIterator::new(merk_to_clear.storage.raw_iter(), &all_query).unwrap();

// delete all nested subtrees
while let Some((key, element_value)) =
element_iterator.next_kv().unwrap_add_cost(&mut cost)
{
let element = raw_decode(&element_value).unwrap();
if options.allow_deleting_subtrees {
if element.is_tree() {
cost_return_on_error!(
&mut cost,
self.delete(
subtree_path.clone(),
key.as_slice(),
Some(DeleteOptions {
allow_deleting_non_empty_trees: true,
deleting_non_empty_trees_returns_error: false,
..Default::default()
}),
None
)
);
}
} else if options.trying_to_clear_with_subtrees_returns_error {
return Err(Error::ClearingTreeWithSubtreesNotAllowed(
"options do not allow to clear this merk tree as it contains subtrees",
))
.wrap_with_cost(cost);
} else {
return Ok(false).wrap_with_cost(cost);
}
}
}

// delete non subtree values
cost_return_on_error!(&mut cost, merk_to_clear.clear().map_err(Error::MerkError));

// propagate changes
let mut merk_cache: HashMap<SubtreePath<B>, Merk<PrefixedRocksDbStorageContext>> =
HashMap::default();
merk_cache.insert(subtree_path.clone(), merk_to_clear);
cost_return_on_error!(
&mut cost,
self.propagate_changes_without_transaction(
merk_cache,
subtree_path.clone(),
&batch,
)
);
}

cost_return_on_error!(
&mut cost,
self.db
.commit_multi_context_batch(batch, transaction)
.map_err(Into::into)
);

Ok(true).wrap_with_cost(cost)
}

/// Delete element with sectional storage function
pub fn delete_with_sectional_storage_function<B: AsRef<[u8]>>(
&self,
Expand Down Expand Up @@ -738,7 +937,7 @@
use pretty_assertions::assert_eq;

use crate::{
operations::delete::{delete_up_tree::DeleteUpTreeOptions, DeleteOptions},
operations::delete::{delete_up_tree::DeleteUpTreeOptions, ClearOptions, DeleteOptions},
tests::{
common::EMPTY_PATH, make_empty_grovedb, make_test_grovedb, ANOTHER_TEST_LEAF, TEST_LEAF,
},
Expand Down Expand Up @@ -1445,4 +1644,118 @@
}
);
}

#[test]
fn test_subtree_clear() {
let element = Element::new_item(b"ayy".to_vec());

let db = make_test_grovedb();

// Insert some nested subtrees
db.insert(
[TEST_LEAF].as_ref(),
b"key1",
Element::empty_tree(),
None,
None,
)
.unwrap()
.expect("successful subtree 1 insert");
db.insert(
[TEST_LEAF, b"key1"].as_ref(),
b"key2",
Element::empty_tree(),
None,
None,
)
.unwrap()
.expect("successful subtree 2 insert");

// Insert an element into subtree
db.insert(
[TEST_LEAF, b"key1", b"key2"].as_ref(),
b"key3",
element,
None,
None,
)
.unwrap()
.expect("successful value insert");
db.insert(
[TEST_LEAF].as_ref(),
b"key4",
Element::empty_tree(),
None,
None,
)
.unwrap()
.expect("successful subtree 3 insert");

let key1_tree = db
.get([TEST_LEAF].as_ref(), b"key1", None)
.unwrap()
.unwrap();
assert!(!matches!(key1_tree, Element::Tree(None, _)));
let key1_merk = db
.open_non_transactional_merk_at_path([TEST_LEAF, b"key1"].as_ref().into(), None)
.unwrap()
.unwrap();
assert_ne!(key1_merk.root_hash().unwrap(), [0; 32]);

let root_hash_before_clear = db.root_hash(None).unwrap().unwrap();
db.clear_subtree([TEST_LEAF, b"key1"].as_ref(), None, None)
.expect_err("unable to delete subtree");

let success = db
.clear_subtree(
[TEST_LEAF, b"key1"].as_ref(),
Some(ClearOptions {
check_for_subtrees: true,
allow_deleting_subtrees: false,
trying_to_clear_with_subtrees_returns_error: false,
}),
None,
)
.expect("expected no error");
assert!(!success);

let success = db
.clear_subtree(
[TEST_LEAF, b"key1"].as_ref(),
Some(ClearOptions {
check_for_subtrees: true,
allow_deleting_subtrees: true,
trying_to_clear_with_subtrees_returns_error: false,
}),
None,
)
.expect("unable to delete subtree");

assert!(success);

assert!(matches!(
db.get([TEST_LEAF, b"key1"].as_ref(), b"key2", None)
.unwrap(),
Err(Error::PathKeyNotFound(_))
));
assert!(matches!(
db.get([TEST_LEAF, b"key1", b"key2"].as_ref(), b"key3", None)
.unwrap(),
Err(Error::PathParentLayerNotFound(_))
));
let key1_tree = db
.get([TEST_LEAF].as_ref(), b"key1", None)
.unwrap()
.unwrap();
assert!(matches!(key1_tree, Element::Tree(None, _)));

let key1_merk = db
.open_non_transactional_merk_at_path([TEST_LEAF, b"key1"].as_ref().into(), None)
.unwrap()
.unwrap();
assert_eq!(key1_merk.root_hash().unwrap(), [0; 32]);

let root_hash_after_clear = db.root_hash(None).unwrap().unwrap();
assert_ne!(root_hash_before_clear, root_hash_after_clear);
}
}
Loading