Skip to content

Commit

Permalink
Before using free stack
Browse files Browse the repository at this point in the history
  • Loading branch information
namse committed Nov 14, 2024
1 parent 48ed3be commit 5dd0ed8
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 29 deletions.
30 changes: 30 additions & 0 deletions luda-editor/new-server/bptree/src/bp_map/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,24 @@ impl Backend {
FeBeRequest::Close => {
close_requested = true;
}
FeBeRequest::FileSize { tx } => {
let file_size = operator.file_size().await;
let tx_result;
match file_size {
Ok(file_size) => {
tx_result = Ok(file_size);
result = Ok(());
}
Err(err) => {
tx_result = Err(());
result = Err(err.into());
}
}
txs.push(Tx::FileSize {
tx,
result: tx_result,
});
}
}
}

Expand Down Expand Up @@ -230,6 +248,14 @@ impl Backend {
Err(())
});
}
Tx::FileSize { tx, result } => {
_ = tx.send(if no_error {
assert!(result.is_ok());
result
} else {
Err(())
});
}
});
}

Expand Down Expand Up @@ -296,4 +322,8 @@ enum Tx {
tx: oneshot::Sender<std::result::Result<Option<Vec<Entry>>, ()>>,
result: std::result::Result<Option<Vec<Entry>>, ()>,
},
FileSize {
tx: oneshot::Sender<std::result::Result<usize, ()>>,
result: std::result::Result<usize, ()>,
},
}
5 changes: 5 additions & 0 deletions luda-editor/new-server/bptree/src/bp_map/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ impl PageCache {
pub(crate) fn load_full(&self) -> CachedPages {
self.inner.load_full()
}

pub(crate) fn header(&self) -> Option<Header> {
let guard = self.inner.load();
Some(*guard.get(&PageRange::HEADER)?.as_page().as_header())
}
}

#[derive(Debug)]
Expand Down
8 changes: 8 additions & 0 deletions luda-editor/new-server/bptree/src/bp_map/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ impl BpMap {
}),
}
}
pub async fn file_size(&self) -> Result<usize> {
if let Some(header) = self.cache.header() {
return Ok(header.file_size());
}

let (tx, rx) = oneshot::channel();
self.send_request(FeBeRequest::FileSize { tx }, rx).await
}
async fn send_request<T>(
&self,
request: FeBeRequest,
Expand Down
59 changes: 59 additions & 0 deletions luda-editor/new-server/bptree/src/bp_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ enum FeBeRequest {
exclusive_start_key: Option<u128>,
tx: oneshot::Sender<std::result::Result<Option<Vec<Entry>>, ()>>,
},
FileSize {
tx: oneshot::Sender<std::result::Result<usize, ()>>,
},
}

#[cfg(test)]
Expand Down Expand Up @@ -563,4 +566,60 @@ mod test {
assert_eq!(entry.value.as_ref(), i.to_le_bytes());
}
}

#[tokio::test]
async fn test_insert_delete_insert() {
let path = std::env::temp_dir().join("bp_map::bp_map_test_insert_delete_insert");
if let Err(err) = std::fs::remove_file(&path) {
if err.kind() != std::io::ErrorKind::NotFound {
panic!("{:?}", err);
}
}
let wal_path = path.with_extension("wal");
if let Err(err) = std::fs::remove_file(&wal_path) {
if err.kind() != std::io::ErrorKind::NotFound {
panic!("{:?}", err);
}
}
let shadow_path = path.with_extension("shadow");
if let Err(err) = std::fs::remove_file(&shadow_path) {
if err.kind() != std::io::ErrorKind::NotFound {
panic!("{:?}", err);
}
}
std::fs::create_dir_all(path.parent().unwrap()).unwrap();

let map = BpMap::new(path, TEST_COUNT as usize / 2).await.unwrap();
let mut join_set = JoinSet::new();
for i in 1..=TEST_COUNT {
let map = map.clone();
join_set
.spawn(async move { map.insert(i as Key, i.to_le_bytes().to_vec().into()).await });
}

join_set.join_all().await;

let size = map.file_size().await.unwrap();

let mut join_set = JoinSet::new();
for i in 1..=TEST_COUNT {
let map = map.clone();
join_set.spawn(async move { map.delete(i as Key).await });
}

join_set.join_all().await;

let mut join_set = JoinSet::new();
for i in 1..=TEST_COUNT {
let map = map.clone();
join_set
.spawn(async move { map.insert(i as Key, i.to_le_bytes().to_vec().into()).await });
}

join_set.join_all().await;

let new_size = map.file_size().await.unwrap();

assert_eq!(size, new_size);
}
}
60 changes: 32 additions & 28 deletions luda-editor/new-server/bptree/src/bp_map/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Operator {
let mut route = self.find_route_for_insertion(key).await?;
let leaf_node_offset = route.pop().unwrap();

let right_node_offset = self.reserve_page_offset().await;
let right_node_offset = self.reserve_page_offset().await?;

let leaf_node = self
.page_mut(leaf_node_offset, PageBlockTypeHint::Node)
Expand All @@ -36,7 +36,7 @@ impl Operator {

if !leaf_node.is_full() {
leaf_node.insert(key, record_page_range);
self.rollback_reserve_page_offset(right_node_offset).await;
self.rollback_reserve_page_offset(right_node_offset).await?;
return Ok(());
}

Expand All @@ -48,17 +48,17 @@ impl Operator {
);

if route.is_empty() {
assert_eq!(leaf_node_offset, self.header().await.root_node_offset);
assert_eq!(leaf_node_offset, self.header().await?.root_node_offset);

let internal_node =
InternalNode::new(&[center_key], &[leaf_node_offset, right_node_offset]);
let internal_node_offset = self.reserve_page_offset().await;
let internal_node_offset = self.reserve_page_offset().await?;
self.blocks_updated.insert(
PageRange::page(internal_node_offset),
PageBlock::Page(Page::InternalNode(internal_node)),
);

self.header_mut().await.root_node_offset = internal_node_offset;
self.header_mut().await?.root_node_offset = internal_node_offset;
return Ok(());
}

Expand All @@ -75,19 +75,19 @@ impl Operator {
else {
return Ok(());
};
right_node_offset = self.reserve_page_offset().await;
right_node_offset = self.reserve_page_offset().await?;
self.blocks_updated.insert(
PageRange::page(right_node_offset),
PageBlock::Page(Page::InternalNode(right_node)),
);

if node_offset != self.header().await.root_node_offset {
if node_offset != self.header().await?.root_node_offset {
center_key = next_center_key;
continue;
}

assert!(route.is_empty());
let new_root_node_offset = self.reserve_page_offset().await;
let new_root_node_offset = self.reserve_page_offset().await?;
let new_root_node =
InternalNode::new(&[next_center_key], &[node_offset, right_node_offset]);
self.blocks_updated.insert(
Expand Down Expand Up @@ -169,14 +169,17 @@ impl Operator {
}
}
}
pub async fn file_size(&mut self) -> Result<usize> {
Ok(self.header().await?.file_size())
}
pub fn done(self) -> Done {
Done {
pages_read_from_file: self.blocks_read_from_file,
updated_pages: self.blocks_updated,
}
}
async fn find_route_for_insertion(&mut self, key: Key) -> Result<Vec<PageOffset>> {
let mut node_offset = self.header().await.root_node_offset;
let mut node_offset = self.header().await?.root_node_offset;
let mut route = vec![];

loop {
Expand Down Expand Up @@ -231,11 +234,11 @@ impl Operator {
.unwrap()
.as_page_mut())
}
async fn reserve_page_offset(&mut self) -> PageOffset {
self.header_mut().await.next_page_offset.fetch_increase(1)
async fn reserve_page_offset(&mut self) -> Result<PageOffset> {
Ok(self.header_mut().await?.next_page_offset.fetch_increase(1))
}
// async fn pop_free_page(&mut self) -> Result<Option<PageOffset>> {
// let free_page_stack_top_page_offset = self.header().await.free_page_stack_top_page_offset;
// let free_page_stack_top_page_offset = self.header().await?.free_page_stack_top_page_offset;
// if free_page_stack_top_page_offset.is_null() {
// return Ok(None);
// }
Expand All @@ -249,26 +252,26 @@ impl Operator {
// let next_page_offset = stack_node.next_page_offset;

// if stack_node.is_empty() {
// self.header_mut().await.free_page_stack_top_page_offset = next_page_offset;
// self.header_mut().await?.free_page_stack_top_page_offset = next_page_offset;
// }

// Ok(Some(page_offset))
// }
async fn header(&mut self) -> &Header {
self.page(PageOffset::HEADER, PageBlockTypeHint::Header)
.await
.unwrap()
.as_header()
async fn header(&mut self) -> Result<&Header> {
Ok(self
.page(PageOffset::HEADER, PageBlockTypeHint::Header)
.await?
.as_header())
}

async fn header_mut(&mut self) -> &mut Header {
self.page_mut(PageOffset::HEADER, PageBlockTypeHint::Header)
.await
.unwrap()
.as_header_mut()
async fn header_mut(&mut self) -> Result<&mut Header> {
Ok(self
.page_mut(PageOffset::HEADER, PageBlockTypeHint::Header)
.await?
.as_header_mut())
}
async fn find_leaf_node_for(&mut self, key: Key) -> Result<PageOffset> {
let mut node_offset = self.header().await.root_node_offset;
let mut node_offset = self.header().await?.root_node_offset;

loop {
let page_block = self
Expand Down Expand Up @@ -325,7 +328,7 @@ impl Operator {

let page_offset = self
.header_mut()
.await
.await?
.next_page_offset
.fetch_increase(page_count as usize);

Expand All @@ -337,12 +340,13 @@ impl Operator {
Ok(block_page_range)
}

async fn rollback_reserve_page_offset(&mut self, right_node_offset: PageOffset) {
async fn rollback_reserve_page_offset(&mut self, right_node_offset: PageOffset) -> Result<()> {
assert_eq!(
self.header().await.next_page_offset.as_u32(),
self.header().await?.next_page_offset.as_u32(),
right_node_offset.as_u32() + 1
);
self.header_mut().await.next_page_offset.decrease();
self.header_mut().await?.next_page_offset.decrease();
Ok(())
}
}

Expand Down
6 changes: 5 additions & 1 deletion luda-editor/new-server/bptree/src/bp_map/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) trait Deserialize {
fn from_slice(slice: &[u8]) -> Self;
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct Header {
/// Would be null
pub free_page_stack_top_page_offset: PageOffset,
Expand All @@ -68,6 +68,10 @@ impl Header {
next_page_offset,
}
}

pub fn file_size(&self) -> usize {
self.next_page_offset.file_offset()
}
}
impl Serialize for Header {
fn to_vec(&self) -> Vec<u8> {
Expand Down

0 comments on commit 5dd0ed8

Please sign in to comment.