From 3c8135b2419da95d36d73a01ebb1d54684957068 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Wed, 29 Jan 2025 00:50:46 +0100 Subject: [PATCH] wip --- crates/bench/benches/delete_table.rs | 107 ++++-- .../locking_tx_datastore/committed_state.rs | 7 +- .../locking_tx_datastore/delete_table.rs | 324 ++++++++++++++++++ .../db/datastore/locking_tx_datastore/mod.rs | 1 + .../datastore/locking_tx_datastore/mut_tx.rs | 25 +- .../locking_tx_datastore/state_view.rs | 10 +- .../locking_tx_datastore/tx_state.rs | 27 +- crates/table/src/table.rs | 4 +- 8 files changed, 449 insertions(+), 56 deletions(-) create mode 100644 crates/core/src/db/datastore/locking_tx_datastore/delete_table.rs diff --git a/crates/bench/benches/delete_table.rs b/crates/bench/benches/delete_table.rs index faee303bfc2..c2f186a5df9 100644 --- a/crates/bench/benches/delete_table.rs +++ b/crates/bench/benches/delete_table.rs @@ -46,20 +46,22 @@ fn gen_row_pointers(iters: u64) -> impl Iterator { } fn bench_custom(g: &mut BenchmarkGroup<'_, WallTime>, name: &str, run: impl Fn(u64) -> Duration) { - g.bench_function(name, |b| b.iter_custom(|i| run(i))); + g.bench_function(name, |b| b.iter_custom(&run)); } fn bench_delete_table(c: &mut Criterion) { let name = DT::NAME; let mut g = c.benchmark_group(name); + let row_size = black_box(FIXED_ROW_SIZE); + let new_dt = || DT::new(row_size); bench_custom(&mut g, "mixed", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); gen_row_pointers(i) .map(|ptr| time(|| dt.contains(ptr)) + time(|| dt.insert(ptr))) .sum() }); bench_custom(&mut g, "mixed_random", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); let mut ptrs = gen_row_pointers(i).collect_vec(); let mut rng = ThreadRng::default(); ptrs.shuffle(&mut rng); @@ -68,11 +70,11 @@ fn bench_delete_table(c: &mut Criterion) { .sum() }); bench_custom(&mut g, "insert", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); gen_row_pointers(i).map(|ptr| time(|| dt.insert(ptr))).sum() }); bench_custom(&mut g, "contains_for_half", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); gen_row_pointers(i) .enumerate() .map(|(i, ptr)| { @@ -84,7 +86,7 @@ fn bench_delete_table(c: &mut Criterion) { .sum() }); bench_custom(&mut g, "contains_for_full", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); gen_row_pointers(i) .map(|ptr| { black_box(dt.insert(ptr)); @@ -93,14 +95,14 @@ fn bench_delete_table(c: &mut Criterion) { .sum() }); bench_custom(&mut g, "remove", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); for ptr in gen_row_pointers(i) { black_box(dt.insert(ptr)); } gen_row_pointers(i).map(|ptr| time(|| dt.remove(ptr))).sum() }); bench_custom(&mut g, "iter", |i| { - let mut dt = DT::new(FIXED_ROW_SIZE); + let mut dt = new_dt(); for ptr in gen_row_pointers(i) { black_box(dt.insert(ptr)); } @@ -113,9 +115,10 @@ trait DeleteTable { const NAME: &'static str; fn new(fixed_row_size: Size) -> Self; fn contains(&self, ptr: RowPointer) -> bool; - fn insert(&mut self, ptr: RowPointer); + fn insert(&mut self, ptr: RowPointer) -> bool; fn remove(&mut self, ptr: RowPointer); fn iter(&self) -> impl Iterator; + fn len(&self) -> usize; } struct DTBTree(BTreeSet); @@ -128,8 +131,8 @@ impl DeleteTable for DTBTree { fn contains(&self, ptr: RowPointer) -> bool { self.0.contains(&ptr) } - fn insert(&mut self, ptr: RowPointer) { - self.0.insert(ptr); + fn insert(&mut self, ptr: RowPointer) -> bool { + self.0.insert(ptr) } fn remove(&mut self, ptr: RowPointer) { self.0.remove(&ptr); @@ -137,6 +140,9 @@ impl DeleteTable for DTBTree { fn iter(&self) -> impl Iterator { self.0.iter().copied() } + fn len(&self) -> usize { + self.0.len() + } } struct DTHashSet(HashSet); @@ -149,8 +155,8 @@ impl DeleteTable for DTHashSet { fn contains(&self, ptr: RowPointer) -> bool { self.0.contains(&ptr) } - fn insert(&mut self, ptr: RowPointer) { - self.0.insert(ptr); + fn insert(&mut self, ptr: RowPointer) -> bool { + self.0.insert(ptr) } fn remove(&mut self, ptr: RowPointer) { self.0.remove(&ptr); @@ -158,6 +164,9 @@ impl DeleteTable for DTHashSet { fn iter(&self) -> impl Iterator { self.0.iter().copied() } + fn len(&self) -> usize { + self.0.len() + } } struct DTHashSetFH(foldhash::HashSet); @@ -170,8 +179,8 @@ impl DeleteTable for DTHashSetFH { fn contains(&self, ptr: RowPointer) -> bool { self.0.contains(&ptr) } - fn insert(&mut self, ptr: RowPointer) { - self.0.insert(ptr); + fn insert(&mut self, ptr: RowPointer) -> bool { + self.0.insert(ptr) } fn remove(&mut self, ptr: RowPointer) { self.0.remove(&ptr); @@ -179,10 +188,14 @@ impl DeleteTable for DTHashSetFH { fn iter(&self) -> impl Iterator { self.0.iter().copied() } + fn len(&self) -> usize { + self.0.len() + } } struct DTPageAndBitSet { deleted: Vec>, + len: usize, fixed_row_size: Size, } @@ -191,6 +204,7 @@ impl DeleteTable for DTPageAndBitSet { fn new(fixed_row_size: Size) -> Self { Self { deleted: <_>::default(), + len: 0, fixed_row_size, } } @@ -201,7 +215,7 @@ impl DeleteTable for DTPageAndBitSet { _ => false, } } - fn insert(&mut self, ptr: RowPointer) { + fn insert(&mut self, ptr: RowPointer) -> bool { let fixed_row_size = self.fixed_row_size; let page_idx = ptr.page_index().idx(); let bitset_idx = ptr.page_offset() / fixed_row_size; @@ -209,11 +223,20 @@ impl DeleteTable for DTPageAndBitSet { let new_set = || FixedBitSet::new(PageOffset::PAGE_END.idx().div_ceil(fixed_row_size.len())); match self.deleted.get_mut(page_idx) { - Some(Some(set)) => set.set(bitset_idx, true), + Some(Some(set)) => { + let added = !set.get(bitset_idx); + set.set(bitset_idx, true); + if added { + self.len += 1; + } + added + } Some(slot) => { let mut set = new_set(); set.set(bitset_idx, true); *slot = Some(set); + self.len += 1; + true } None => { let pages = self.deleted.len(); @@ -225,6 +248,8 @@ impl DeleteTable for DTPageAndBitSet { let mut set = new_set(); set.set(bitset_idx, true); self.deleted.push(Some(set)); + self.len += 1; + true } } } @@ -233,6 +258,9 @@ impl DeleteTable for DTPageAndBitSet { let page_idx = ptr.page_index().idx(); let bitset_idx = ptr.page_offset() / fixed_row_size; if let Some(Some(set)) = self.deleted.get_mut(page_idx) { + if set.get(bitset_idx) { + self.len -= 1; + } set.set(bitset_idx, false); } } @@ -248,6 +276,9 @@ impl DeleteTable for DTPageAndBitSet { }) }) } + fn len(&self) -> usize { + self.len + } } #[derive(Clone, Copy)] @@ -266,6 +297,7 @@ impl OffsetRange { type OffsetRanges = SmallVec<[OffsetRange; 4]>; struct DTPageAndOffsetRanges { deleted: Vec, + len: usize, fixed_row_size: Size, } @@ -285,11 +317,13 @@ fn find_range_to_insert_offset( ranges: &OffsetRanges, offset: PageOffset, fixed_row_size: Size, -) -> Result<(bool, usize), usize> { +) -> Result<(bool, bool, usize), usize> { let mut extend_end = true; + let mut exists = false; ranges .binary_search_by(|&OffsetRange { start, end }| { extend_end = true; + exists = false; match end.cmp(&offset) { // `end + row_size = offset` => we can just extend `end = offset`. Ordering::Less if end.0 + fixed_row_size.0 == offset.0 => Ordering::Equal, @@ -297,11 +331,17 @@ fn find_range_to_insert_offset( Ordering::Less => Ordering::Less, // `offset` is already covered, so don't do anything, // but `end = offset` is a no-op. - Ordering::Equal => Ordering::Equal, + Ordering::Equal => { + exists = true; + Ordering::Equal + } // `end` is greater, but we may be covered by `start` instead. Ordering::Greater => match start.cmp(&offset) { // `offset` is within the range, so don't do anything. - Ordering::Less | Ordering::Equal => Ordering::Equal, + Ordering::Less | Ordering::Equal => { + exists = true; + Ordering::Equal + } // `start - row_size = offset` => we can just extend `start = offset`. Ordering::Greater if start.0 - fixed_row_size.0 == offset.0 => { extend_end = false; @@ -312,7 +352,7 @@ fn find_range_to_insert_offset( }, } }) - .map(|idx| (extend_end, idx)) + .map(|idx| (extend_end, exists, idx)) } impl DeleteTable for DTPageAndOffsetRanges { @@ -320,6 +360,7 @@ impl DeleteTable for DTPageAndOffsetRanges { fn new(fixed_row_size: Size) -> Self { Self { deleted: <_>::default(), + len: 0, fixed_row_size, } } @@ -333,7 +374,7 @@ impl DeleteTable for DTPageAndOffsetRanges { _ => false, } } - fn insert(&mut self, ptr: RowPointer) { + fn insert(&mut self, ptr: RowPointer) -> bool { let fixed_row_size = self.fixed_row_size; let page_idx = ptr.page_index().idx(); let page_offset = ptr.page_offset(); @@ -346,17 +387,20 @@ impl DeleteTable for DTPageAndOffsetRanges { self.deleted.push(SmallVec::new()); } self.deleted[page_idx].push(OffsetRange::point(page_offset)); - return; + self.len += 1; + return true; }; - let (extend_end, range_idx) = match find_range_to_insert_offset(ranges, page_offset, fixed_row_size) { + let (extend_end, exists, range_idx) = match find_range_to_insert_offset(ranges, page_offset, fixed_row_size) { Err(range_idx) => { // Not found, so add a point range. ranges.insert(range_idx, OffsetRange::point(page_offset)); - return; + self.len += 1; + return true; } Ok(x) => x, }; + if extend_end { let next = range_idx + 1; let new_end = if let Some(r) = ranges @@ -383,6 +427,12 @@ impl DeleteTable for DTPageAndOffsetRanges { ranges[range_idx].start = page_offset; }; } + + let added = !exists; + if added { + self.len += 1; + } + added } fn remove(&mut self, ptr: RowPointer) { let fixed_row_size = self.fixed_row_size; @@ -396,6 +446,8 @@ impl DeleteTable for DTPageAndOffsetRanges { return; }; + self.len -= 1; + let range = &mut ranges[idx]; let is_start = range.start == page_offset; let is_end = range.end == page_offset; @@ -431,14 +483,19 @@ impl DeleteTable for DTPageAndOffsetRanges { .map(move |po| RowPointer::new(false, pi, po, SquashedOffset::COMMITTED_STATE)) }) } + fn len(&self) -> usize { + self.len + } } criterion_group!( delete_table, + /* bench_delete_table::, bench_delete_table::, bench_delete_table::, bench_delete_table::, + */ bench_delete_table::, // best so far. ); criterion_main!(delete_table); diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 29326dfd1d6..67b6dc8cc1e 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -1,8 +1,9 @@ use super::{ datastore::Result, + delete_table::DeleteTable, sequence::{Sequence, SequencesState}, state_view::{IterByColRangeTx, StateView}, - tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState}, + tx_state::{IndexIdMap, RemovedIndexIdSet, TxState}, IterByColEqTx, }; use crate::{ @@ -546,7 +547,7 @@ impl CommittedState { // holds only committed rows which should be deleted, // i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`, // so no need to check before applying the deletes. - for row_ptr in row_ptrs.iter().copied() { + for row_ptr in row_ptrs.iter() { debug_assert!(row_ptr.squashed_offset().is_committed_state()); // TODO: re-write `TxData` to remove `ProductValue`s @@ -698,6 +699,6 @@ impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> { fn next(&mut self) -> Option { self.committed_rows - .find(|row_ref| !self.del_table.contains(&row_ref.pointer())) + .find(|row_ref| !self.del_table.contains(row_ref.pointer())) } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/delete_table.rs b/crates/core/src/db/datastore/locking_tx_datastore/delete_table.rs new file mode 100644 index 00000000000..fb155d40fa8 --- /dev/null +++ b/crates/core/src/db/datastore/locking_tx_datastore/delete_table.rs @@ -0,0 +1,324 @@ +/* +use core::cmp::Ordering; +use smallvec::SmallVec; +use spacetimedb_table::indexes::{PageIndex, PageOffset, RowPointer, Size, SquashedOffset}; + +pub(super) struct DeleteTable { + deleted: Vec, + len: usize, + fixed_row_size: Size, +} + +type OffsetRanges = SmallVec<[OffsetRange; 4]>; + +#[derive(Clone, Copy)] +struct OffsetRange { + start: PageOffset, + end: PageOffset, +} + +impl OffsetRange { + fn point(offset: PageOffset) -> Self { + Self { + start: offset, + end: offset, + } + } +} + +#[inline] +fn find_range_to_insert_offset( + ranges: &OffsetRanges, + offset: PageOffset, + fixed_row_size: Size, +) -> Result<(bool, bool, usize), usize> { + let mut extend_end = true; + let mut exists = false; + ranges + .binary_search_by(|&OffsetRange { start, end }| { + extend_end = true; + exists = false; + match end.cmp(&offset) { + // `end + row_size = offset` => we can just extend `end = offset`. + Ordering::Less if end.0 + fixed_row_size.0 == offset.0 => Ordering::Equal, + // Cannot extend this range, so let's not find it. + Ordering::Less => Ordering::Less, + // `offset` is already covered, so don't do anything, + // but `end = offset` is a no-op. + Ordering::Equal => { + exists = true; + Ordering::Equal + } + // `end` is greater, but we may be covered by `start` instead. + Ordering::Greater => match start.cmp(&offset) { + // `offset` is within the range, so don't do anything. + Ordering::Less | Ordering::Equal => { + exists = true; + Ordering::Equal + } + // `start - row_size = offset` => we can just extend `start = offset`. + Ordering::Greater if start.0 - fixed_row_size.0 == offset.0 => { + extend_end = false; + Ordering::Equal + } + // Range is entirely greater than offset. + Ordering::Greater => Ordering::Greater, + }, + } + }) + .map(|idx| (extend_end, exists, idx)) +} + +fn cmp_start_end(start: &T, end: &T, needle: &T) -> Ordering { + match (start.cmp(needle), end.cmp(needle)) { + // start = needle or start < offset <= end => we have a match. + (Ordering::Less, Ordering::Equal | Ordering::Greater) | (Ordering::Equal, _) => Ordering::Equal, + // start <= end < needle => no match. + (Ordering::Less, Ordering::Less) => Ordering::Less, + // start <= end > needle => no match. + (Ordering::Greater, _) => Ordering::Greater, + } +} + +impl DeleteTable { + pub fn new(fixed_row_size: Size) -> Self { + Self { + deleted: <_>::default(), + len: 0, + fixed_row_size, + } + } + + pub fn contains(&self, ptr: RowPointer) -> bool { + let page_idx = ptr.page_index().idx(); + let page_offset = ptr.page_offset(); + match self.deleted.get(page_idx) { + Some(ranges) => ranges + .binary_search_by(|r| cmp_start_end(&r.start, &r.end, &page_offset)) + .is_ok(), + _ => false, + } + } + + pub fn insert(&mut self, ptr: RowPointer) -> bool { + let fixed_row_size = self.fixed_row_size; + let page_idx = ptr.page_index().idx(); + let page_offset = ptr.page_offset(); + + let Some(ranges) = self.deleted.get_mut(page_idx) else { + let pages = self.deleted.len(); + let after = 1 + page_idx; + self.deleted.reserve(after - pages); + for _ in pages..after { + self.deleted.push(SmallVec::new()); + } + self.deleted[page_idx].push(OffsetRange::point(page_offset)); + self.len += 1; + return true; + }; + + let (extend_end, exists, range_idx) = match find_range_to_insert_offset(ranges, page_offset, fixed_row_size) { + Err(range_idx) => { + // Not found, so add a point range. + ranges.insert(range_idx, OffsetRange::point(page_offset)); + self.len += 1; + return true; + } + Ok(x) => x, + }; + + if extend_end { + let next = range_idx + 1; + let new_end = if let Some(r) = ranges + .get(next) + .copied() + .filter(|r| r.start.0 - fixed_row_size.0 == page_offset.0) + { + ranges.remove(next); + r.end + } else { + page_offset + }; + ranges[range_idx].end = new_end; + } else { + let prev = range_idx.saturating_sub(1); + if let Some(r) = ranges + .get(prev) + .copied() + .filter(|r| r.end.0 + fixed_row_size.0 == page_offset.0) + { + ranges[range_idx].start = r.start; + ranges.remove(prev); + } else { + ranges[range_idx].start = page_offset; + }; + } + + let added = !exists; + if added { + self.len += 1; + } + added + } + + pub fn remove(&mut self, ptr: RowPointer) { + let fixed_row_size = self.fixed_row_size; + let page_idx = ptr.page_index().idx(); + let page_offset = ptr.page_offset(); + + let Some(ranges) = self.deleted.get_mut(page_idx) else { + return; + }; + let Ok(idx) = ranges.binary_search_by(|r| cmp_start_end(&r.start, &r.end, &page_offset)) else { + return; + }; + + self.len -= 1; + + let range = &mut ranges[idx]; + let is_start = range.start == page_offset; + let is_end = range.end == page_offset; + match (is_start, is_end) { + // Remove the point range. + (true, true) => drop(ranges.remove(idx)), + // Narrow the start. + (true, false) => range.start += fixed_row_size, + // Narrow the end. + (false, true) => range.end -= fixed_row_size, + // Split the range. + (false, false) => { + // Derive the second range, to the right of the hole. + let end = range.end; + let start = PageOffset(page_offset.0 + fixed_row_size.0); + let new = OffsetRange { start, end }; + // Adjust the first range, to the left of the hole. + range.end.0 = page_offset.0 - fixed_row_size.0; + // Add the second range. + ranges.insert(idx + 1, new); + } + } + } + + pub fn iter(&self) -> impl '_ + Iterator { + (0..) + .map(PageIndex) + .zip(self.deleted.iter()) + .flat_map(move |(pi, ranges)| { + ranges + .iter() + .flat_map(|range| (range.start.0..=range.end.0).step_by(self.fixed_row_size.0 as usize)) + .map(PageOffset) + .map(move |po| RowPointer::new(false, pi, po, SquashedOffset::COMMITTED_STATE)) + }) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} +*/ + +use spacetimedb_table::{ + fixed_bit_set::FixedBitSet, + indexes::{PageIndex, PageOffset, RowPointer, Size, SquashedOffset}, +}; + +pub(super) struct DeleteTable { + deleted: Vec>, + len: usize, + fixed_row_size: Size, +} + +impl DeleteTable { + pub fn new(fixed_row_size: Size) -> Self { + Self { + deleted: <_>::default(), + len: 0, + fixed_row_size, + } + } + + pub fn contains(&self, ptr: RowPointer) -> bool { + let page_idx = ptr.page_index().idx(); + match self.deleted.get(page_idx) { + Some(Some(set)) => set.get(ptr.page_offset() / self.fixed_row_size), + _ => false, + } + } + + pub fn insert(&mut self, ptr: RowPointer) -> bool { + let fixed_row_size = self.fixed_row_size; + let page_idx = ptr.page_index().idx(); + let bitset_idx = ptr.page_offset() / fixed_row_size; + + let new_set = || FixedBitSet::new(PageOffset::PAGE_END.idx().div_ceil(fixed_row_size.len())); + + match self.deleted.get_mut(page_idx) { + Some(Some(set)) => { + let added = !set.get(bitset_idx); + set.set(bitset_idx, true); + if added { + self.len += 1; + } + added + } + Some(slot) => { + let mut set = new_set(); + set.set(bitset_idx, true); + *slot = Some(set); + self.len += 1; + true + } + None => { + let pages = self.deleted.len(); + let after = 1 + page_idx; + self.deleted.reserve(after - pages); + for _ in pages..page_idx { + self.deleted.push(None); + } + let mut set = new_set(); + set.set(bitset_idx, true); + self.deleted.push(Some(set)); + self.len += 1; + true + } + } + } + + pub fn remove(&mut self, ptr: RowPointer) { + let fixed_row_size = self.fixed_row_size; + let page_idx = ptr.page_index().idx(); + let bitset_idx = ptr.page_offset() / fixed_row_size; + if let Some(Some(set)) = self.deleted.get_mut(page_idx) { + if set.get(bitset_idx) { + self.len -= 1; + } + set.set(bitset_idx, false); + } + } + + pub fn iter(&self) -> impl '_ + Iterator { + (0..) + .map(PageIndex) + .zip(self.deleted.iter()) + .filter_map(|(pi, set)| Some((pi, set.as_ref()?))) + .flat_map(move |(pi, set)| { + set.iter_set().map(move |idx| { + let po = PageOffset(idx as u16 * self.fixed_row_size.0); + RowPointer::new(false, pi, po, SquashedOffset::COMMITTED_STATE) + }) + }) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index b40ba83107d..da9c611d24e 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -7,6 +7,7 @@ pub use mut_tx::MutTxId; mod sequence; pub mod state_view; pub use state_view::{IterByColEqTx, IterByColRangeTx}; +mod delete_table; pub(crate) mod tx; mod tx_state; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 8bc95752679..af489a20cc0 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1,10 +1,11 @@ use super::{ committed_state::CommittedState, datastore::{record_metrics, Result}, + delete_table::DeleteTable, sequence::{Sequence, SequencesState}, state_view::{IndexSeekIterIdMutTx, ScanIterByColRangeMutTx, StateView}, tx::TxId, - tx_state::{DeleteTable, IndexIdMap, TxState}, + tx_state::{IndexIdMap, TxState}, SharedMutexGuard, SharedWriteGuard, }; use crate::db::datastore::system_tables::{ @@ -1170,7 +1171,7 @@ impl<'a> Iterator for BTreeScan<'a> { impl<'a> Iterator for IndexScanFilterDeleted<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { - self.iter.find(|row| !self.deletes.contains(&row.pointer())) + self.iter.find(|row| !self.deletes.contains(row.pointer())) } } @@ -1324,7 +1325,7 @@ impl MutTxId { // It's possible that `row` appears in the committed state, // but is marked as deleted. // In this case, undelete it, so it remains in the committed state. - delete_table.remove(&commit_ptr); + delete_table.remove(commit_ptr); // No new row was inserted, but return `committed_ptr`. let blob_store = &self.committed_state_write_lock.blob_store; @@ -1347,7 +1348,7 @@ impl MutTxId { if let Err(e) = commit_table.check_unique_constraints( tx_row_ref, |ixs| ixs, - |commit_ptr| delete_table.contains(&commit_ptr), + |commit_ptr| delete_table.contains(commit_ptr), ) { // There was a constraint violation, so undo the insertion. tx_table.delete(tx_blob_store, tx_row_ptr, |_| {}); @@ -1468,7 +1469,7 @@ impl MutTxId { new_row, // Don't check this index since we'll do a 1-1 old/new replacement. |ixs| ixs.filter(|(&id, _)| id != ignore_index_id), - |commit_ptr| commit_ptr == old_ptr || del_table.contains(&commit_ptr), + |commit_ptr| commit_ptr == old_ptr || del_table.contains(commit_ptr), ) .map_err(IndexError::from) .map_err(Into::into) @@ -1504,7 +1505,7 @@ impl MutTxId { .committed_state_write_lock .get_index_by_id_with_table(table_id, index_id) .and_then(|index| find_old_row(tx_row_ref, index).0.map(|ptr| (index, ptr))) - .filter(|(_, ptr)| !del_table.contains(ptr)) + .filter(|(_, ptr)| !del_table.contains(*ptr)) { // 1. Ensure the index is unique. // 2. Ensure the new row doesn't violate any other committed state unique indices. @@ -1536,7 +1537,7 @@ impl MutTxId { let (old_ptr, needle) = find_old_row(tx_row_ref, tx_index); let res = old_ptr // If we have an old committed state row, ensure it hasn't been deleted in our tx. - .filter(|ptr| ptr.squashed_offset() == SquashedOffset::TX_STATE || !del_table.contains(ptr)) + .filter(|ptr| ptr.squashed_offset() == SquashedOffset::TX_STATE || !del_table.contains(*ptr)) .ok_or_else(|| IndexError::KeyNotFound(index_id, needle).into()) .and_then(|old_ptr| { ensure_unique(index_id, tx_index)?; @@ -1606,11 +1607,15 @@ impl MutTxId { Ok(table.delete(blob_store, row_pointer, |_| ()).is_some()) } SquashedOffset::COMMITTED_STATE => { + let commit_table = self + .committed_state_write_lock + .get_table(table_id) + .expect("there's a row in committed state so there should be a committed table"); // NOTE: We trust the `row_pointer` refers to an extant row, // and check only that it hasn't yet been deleted. - let delete_table = self.tx_state.get_delete_table_mut(table_id); - - Ok(delete_table.insert(row_pointer)) + let delete_table = self.tx_state.get_delete_table_mut(table_id, commit_table); + delete_table.insert(row_pointer); + Ok(true) } _ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", row_pointer), } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 8f3533febe1..41c0be1faa3 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -1,8 +1,4 @@ -use super::{ - committed_state::CommittedState, - datastore::Result, - tx_state::{DeleteTable, TxState}, -}; +use super::{committed_state::CommittedState, datastore::Result, delete_table::DeleteTable, tx_state::TxState}; use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx; use crate::{ db::datastore::system_tables::{ @@ -261,7 +257,7 @@ impl<'a> Iterator for IterMutTx<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) { + if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(row_ref.pointer())) { return next; } } @@ -358,7 +354,7 @@ impl<'a> Iterator for IndexSeekIterIdWithDeletedMutTx<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - .and_then(|i| i.find(|row_ref| !self.del_table.contains(&row_ref.pointer()))) + .and_then(|i| i.find(|row_ref| !self.del_table.contains(row_ref.pointer()))) { // TODO(metrics): This doesn't actually fetch a row. // Move this counter to `RowRef::read_row`. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index 3b8b354b5c4..72633f93e44 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -1,5 +1,6 @@ +use super::delete_table::DeleteTable; use core::ops::RangeBounds; -use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; +use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::AlgebraicValue; use spacetimedb_table::{ @@ -10,8 +11,6 @@ use spacetimedb_table::{ }; use std::collections::{btree_map, BTreeMap}; -pub(super) type DeleteTable = HashSet; - /// A mapping to find the actual index given an `IndexId`. pub(super) type IndexIdMap = IntMap; pub(super) type RemovedIndexIdSet = IntSet; @@ -154,7 +153,7 @@ impl TxState { ); self.delete_tables .get(&table_id) - .map(|tbl| tbl.contains(&row_ptr)) + .map(|tbl| tbl.contains(row_ptr)) .unwrap_or(false) } @@ -164,8 +163,8 @@ impl TxState { } /// Guarantees that the `table_id` returns a `DeleteTable`. - pub(super) fn get_delete_table_mut(&mut self, table_id: TableId) -> &mut DeleteTable { - self.delete_tables.entry(table_id).or_default() + pub(super) fn get_delete_table_mut(&mut self, table_id: TableId, commit_table: &Table) -> &mut DeleteTable { + get_delete_table_mut(&mut self.delete_tables, table_id, commit_table) } pub(super) fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> { @@ -185,17 +184,17 @@ impl TxState { &'this mut DeleteTable, )> { let insert_tables = &mut self.insert_tables; - let delete_tables = &mut self.delete_tables; let blob_store = &mut self.blob_store; let idx_map = &mut self.index_id_map; - let tbl = match insert_tables.entry(table_id) { + let table = match insert_tables.entry(table_id) { btree_map::Entry::Vacant(e) => { let new_table = template?.clone_structure(SquashedOffset::TX_STATE); e.insert(new_table) } btree_map::Entry::Occupied(e) => e.into_mut(), }; - Some((tbl, blob_store, idx_map, delete_tables.entry(table_id).or_default())) + let delete_table = get_delete_table_mut(&mut self.delete_tables, table_id, table); + Some((table, blob_store, idx_map, delete_table)) } /// Assumes that the insert and delete tables exist for `table_id` and fetches them. @@ -217,3 +216,13 @@ impl TxState { (tx_table, tx_blob_store, delete_table) } } + +fn get_delete_table_mut<'a>( + delete_tables: &'a mut BTreeMap, + table_id: TableId, + table: &Table, +) -> &'a mut DeleteTable { + delete_tables + .entry(table_id) + .or_insert_with(|| DeleteTable::new(table.row_size())) +} diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 06569ed44bc..c573b6b5466 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1655,8 +1655,8 @@ impl Table { } /// Returns the row size for a row in the table. - fn row_size(&self) -> Size { - self.inner.row_layout.size() + pub fn row_size(&self) -> Size { + self.row_layout().size() } /// Returns the layout for a row in the table.