diff --git a/luda-editor/new-server/bptree/src/bp_map/backend.rs b/luda-editor/new-server/bptree/src/bp_map/backend.rs index 8a34c6394..65a69290f 100644 --- a/luda-editor/new-server/bptree/src/bp_map/backend.rs +++ b/luda-editor/new-server/bptree/src/bp_map/backend.rs @@ -140,6 +140,24 @@ impl Backend { FeBeRequest::Close => { close_requested = true; } + FeBeRequest::FileSize { tx } => { + let file_size = operator.file_size().await; + let tx_result; + match file_size { + Ok(file_size) => { + tx_result = Ok(file_size); + result = Ok(()); + } + Err(err) => { + tx_result = Err(()); + result = Err(err.into()); + } + } + txs.push(Tx::FileSize { + tx, + result: tx_result, + }); + } } } @@ -230,6 +248,14 @@ impl Backend { Err(()) }); } + Tx::FileSize { tx, result } => { + _ = tx.send(if no_error { + assert!(result.is_ok()); + result + } else { + Err(()) + }); + } }); } @@ -296,4 +322,8 @@ enum Tx { tx: oneshot::Sender>, ()>>, result: std::result::Result>, ()>, }, + FileSize { + tx: oneshot::Sender>, + result: std::result::Result, + }, } diff --git a/luda-editor/new-server/bptree/src/bp_map/cache.rs b/luda-editor/new-server/bptree/src/bp_map/cache.rs index d73da08a6..15863af89 100644 --- a/luda-editor/new-server/bptree/src/bp_map/cache.rs +++ b/luda-editor/new-server/bptree/src/bp_map/cache.rs @@ -192,6 +192,11 @@ impl PageCache { pub(crate) fn load_full(&self) -> CachedPages { self.inner.load_full() } + + pub(crate) fn header(&self) -> Option
{ + let guard = self.inner.load(); + Some(*guard.get(&PageRange::HEADER)?.as_page().as_header()) + } } #[derive(Debug)] diff --git a/luda-editor/new-server/bptree/src/bp_map/frontend.rs b/luda-editor/new-server/bptree/src/bp_map/frontend.rs index dd1264412..5c31d0d20 100644 --- a/luda-editor/new-server/bptree/src/bp_map/frontend.rs +++ b/luda-editor/new-server/bptree/src/bp_map/frontend.rs @@ -150,6 +150,14 @@ impl BpMap { }), } } + pub async fn file_size(&self) -> Result { + if let Some(header) = self.cache.header() { + return Ok(header.file_size()); + } + + let (tx, rx) = oneshot::channel(); + self.send_request(FeBeRequest::FileSize { tx }, rx).await + } async fn send_request( &self, request: FeBeRequest, diff --git a/luda-editor/new-server/bptree/src/bp_map/mod.rs b/luda-editor/new-server/bptree/src/bp_map/mod.rs index cc762a984..7f74bfde6 100644 --- a/luda-editor/new-server/bptree/src/bp_map/mod.rs +++ b/luda-editor/new-server/bptree/src/bp_map/mod.rs @@ -71,6 +71,9 @@ enum FeBeRequest { exclusive_start_key: Option, tx: oneshot::Sender>, ()>>, }, + FileSize { + tx: oneshot::Sender>, + }, } #[cfg(test)] @@ -563,4 +566,60 @@ mod test { assert_eq!(entry.value.as_ref(), i.to_le_bytes()); } } + + #[tokio::test] + async fn test_insert_delete_insert() { + let path = std::env::temp_dir().join("bp_map::bp_map_test_insert_delete_insert"); + if let Err(err) = std::fs::remove_file(&path) { + if err.kind() != std::io::ErrorKind::NotFound { + panic!("{:?}", err); + } + } + let wal_path = path.with_extension("wal"); + if let Err(err) = std::fs::remove_file(&wal_path) { + if err.kind() != std::io::ErrorKind::NotFound { + panic!("{:?}", err); + } + } + let shadow_path = path.with_extension("shadow"); + if let Err(err) = std::fs::remove_file(&shadow_path) { + if err.kind() != std::io::ErrorKind::NotFound { + panic!("{:?}", err); + } + } + std::fs::create_dir_all(path.parent().unwrap()).unwrap(); + + let map = BpMap::new(path, TEST_COUNT as usize / 2).await.unwrap(); + let mut join_set = JoinSet::new(); + for i in 1..=TEST_COUNT { + let map = map.clone(); + join_set + .spawn(async move { map.insert(i as Key, i.to_le_bytes().to_vec().into()).await }); + } + + join_set.join_all().await; + + let size = map.file_size().await.unwrap(); + + let mut join_set = JoinSet::new(); + for i in 1..=TEST_COUNT { + let map = map.clone(); + join_set.spawn(async move { map.delete(i as Key).await }); + } + + join_set.join_all().await; + + let mut join_set = JoinSet::new(); + for i in 1..=TEST_COUNT { + let map = map.clone(); + join_set + .spawn(async move { map.insert(i as Key, i.to_le_bytes().to_vec().into()).await }); + } + + join_set.join_all().await; + + let new_size = map.file_size().await.unwrap(); + + assert_eq!(size, new_size); + } } diff --git a/luda-editor/new-server/bptree/src/bp_map/operator.rs b/luda-editor/new-server/bptree/src/bp_map/operator.rs index 0c2df47a9..4ecd46a91 100644 --- a/luda-editor/new-server/bptree/src/bp_map/operator.rs +++ b/luda-editor/new-server/bptree/src/bp_map/operator.rs @@ -27,7 +27,7 @@ impl Operator { let mut route = self.find_route_for_insertion(key).await?; let leaf_node_offset = route.pop().unwrap(); - let right_node_offset = self.reserve_page_offset().await; + let right_node_offset = self.reserve_page_offset().await?; let leaf_node = self .page_mut(leaf_node_offset, PageBlockTypeHint::Node) @@ -36,7 +36,7 @@ impl Operator { if !leaf_node.is_full() { leaf_node.insert(key, record_page_range); - self.rollback_reserve_page_offset(right_node_offset).await; + self.rollback_reserve_page_offset(right_node_offset).await?; return Ok(()); } @@ -48,17 +48,17 @@ impl Operator { ); if route.is_empty() { - assert_eq!(leaf_node_offset, self.header().await.root_node_offset); + assert_eq!(leaf_node_offset, self.header().await?.root_node_offset); let internal_node = InternalNode::new(&[center_key], &[leaf_node_offset, right_node_offset]); - let internal_node_offset = self.reserve_page_offset().await; + let internal_node_offset = self.reserve_page_offset().await?; self.blocks_updated.insert( PageRange::page(internal_node_offset), PageBlock::Page(Page::InternalNode(internal_node)), ); - self.header_mut().await.root_node_offset = internal_node_offset; + self.header_mut().await?.root_node_offset = internal_node_offset; return Ok(()); } @@ -75,19 +75,19 @@ impl Operator { else { return Ok(()); }; - right_node_offset = self.reserve_page_offset().await; + right_node_offset = self.reserve_page_offset().await?; self.blocks_updated.insert( PageRange::page(right_node_offset), PageBlock::Page(Page::InternalNode(right_node)), ); - if node_offset != self.header().await.root_node_offset { + if node_offset != self.header().await?.root_node_offset { center_key = next_center_key; continue; } assert!(route.is_empty()); - let new_root_node_offset = self.reserve_page_offset().await; + let new_root_node_offset = self.reserve_page_offset().await?; let new_root_node = InternalNode::new(&[next_center_key], &[node_offset, right_node_offset]); self.blocks_updated.insert( @@ -169,6 +169,9 @@ impl Operator { } } } + pub async fn file_size(&mut self) -> Result { + Ok(self.header().await?.file_size()) + } pub fn done(self) -> Done { Done { pages_read_from_file: self.blocks_read_from_file, @@ -176,7 +179,7 @@ impl Operator { } } async fn find_route_for_insertion(&mut self, key: Key) -> Result> { - let mut node_offset = self.header().await.root_node_offset; + let mut node_offset = self.header().await?.root_node_offset; let mut route = vec![]; loop { @@ -231,11 +234,11 @@ impl Operator { .unwrap() .as_page_mut()) } - async fn reserve_page_offset(&mut self) -> PageOffset { - self.header_mut().await.next_page_offset.fetch_increase(1) + async fn reserve_page_offset(&mut self) -> Result { + Ok(self.header_mut().await?.next_page_offset.fetch_increase(1)) } // async fn pop_free_page(&mut self) -> Result> { - // let free_page_stack_top_page_offset = self.header().await.free_page_stack_top_page_offset; + // let free_page_stack_top_page_offset = self.header().await?.free_page_stack_top_page_offset; // if free_page_stack_top_page_offset.is_null() { // return Ok(None); // } @@ -249,26 +252,26 @@ impl Operator { // let next_page_offset = stack_node.next_page_offset; // if stack_node.is_empty() { - // self.header_mut().await.free_page_stack_top_page_offset = next_page_offset; + // self.header_mut().await?.free_page_stack_top_page_offset = next_page_offset; // } // Ok(Some(page_offset)) // } - async fn header(&mut self) -> &Header { - self.page(PageOffset::HEADER, PageBlockTypeHint::Header) - .await - .unwrap() - .as_header() + async fn header(&mut self) -> Result<&Header> { + Ok(self + .page(PageOffset::HEADER, PageBlockTypeHint::Header) + .await? + .as_header()) } - async fn header_mut(&mut self) -> &mut Header { - self.page_mut(PageOffset::HEADER, PageBlockTypeHint::Header) - .await - .unwrap() - .as_header_mut() + async fn header_mut(&mut self) -> Result<&mut Header> { + Ok(self + .page_mut(PageOffset::HEADER, PageBlockTypeHint::Header) + .await? + .as_header_mut()) } async fn find_leaf_node_for(&mut self, key: Key) -> Result { - let mut node_offset = self.header().await.root_node_offset; + let mut node_offset = self.header().await?.root_node_offset; loop { let page_block = self @@ -325,7 +328,7 @@ impl Operator { let page_offset = self .header_mut() - .await + .await? .next_page_offset .fetch_increase(page_count as usize); @@ -337,12 +340,13 @@ impl Operator { Ok(block_page_range) } - async fn rollback_reserve_page_offset(&mut self, right_node_offset: PageOffset) { + async fn rollback_reserve_page_offset(&mut self, right_node_offset: PageOffset) -> Result<()> { assert_eq!( - self.header().await.next_page_offset.as_u32(), + self.header().await?.next_page_offset.as_u32(), right_node_offset.as_u32() + 1 ); - self.header_mut().await.next_page_offset.decrease(); + self.header_mut().await?.next_page_offset.decrease(); + Ok(()) } } diff --git a/luda-editor/new-server/bptree/src/bp_map/pages.rs b/luda-editor/new-server/bptree/src/bp_map/pages.rs index 4156bda05..c9cfd383e 100644 --- a/luda-editor/new-server/bptree/src/bp_map/pages.rs +++ b/luda-editor/new-server/bptree/src/bp_map/pages.rs @@ -47,7 +47,7 @@ pub(crate) trait Deserialize { fn from_slice(slice: &[u8]) -> Self; } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub(crate) struct Header { /// Would be null pub free_page_stack_top_page_offset: PageOffset, @@ -68,6 +68,10 @@ impl Header { next_page_offset, } } + + pub fn file_size(&self) -> usize { + self.next_page_offset.file_offset() + } } impl Serialize for Header { fn to_vec(&self) -> Vec {