Skip to content

Commit

Permalink
Use free stack
Browse files Browse the repository at this point in the history
  • Loading branch information
namse committed Nov 14, 2024
1 parent 5dd0ed8 commit a75f2c2
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 101 deletions.
5 changes: 0 additions & 5 deletions luda-editor/new-server/bptree/src/bp_map/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ impl ReadFd {
return Err(Error::last_os_error());
}
if len == 0 {
println!("buf_offset: {}", buf_offset);
println!("buf_len: {}", buf_len);
println!("offset: {}", offset);
println!("file_len: {}", file_len(fd).unwrap());
eprintln!("read_exact: Unexpected EOF");
return Err(Error::from(std::io::ErrorKind::UnexpectedEof));
}
buf_offset += len as usize;
Expand Down
7 changes: 6 additions & 1 deletion luda-editor/new-server/bptree/src/bp_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@ mod test {

join_set.join_all().await;

let new_size = map.file_size().await.unwrap();

// increased by storing free stack. and we didn't implemented free stack flush.
assert!((new_size as f32) < (size as f32 * 1.25));

let mut join_set = JoinSet::new();
for i in 1..=TEST_COUNT {
let map = map.clone();
Expand All @@ -620,6 +625,6 @@ mod test {

let new_size = map.file_size().await.unwrap();

assert_eq!(size, new_size);
assert!((new_size as f32) < (size as f32 * 1.25));
}
}
128 changes: 93 additions & 35 deletions luda-editor/new-server/bptree/src/bp_map/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ impl Operator {
.page_mut(leaf_node_offset, PageBlockTypeHint::Node)
.await?
.as_leaf_node_mut();
leaf_node.delete(key);
let entry = leaf_node.delete(key);
assert!(!leaf_node.contains(key));

self.push_free_block(entry.record_page_range).await?;
}

Ok(())
Expand All @@ -125,17 +127,14 @@ impl Operator {
Ok(contains)
}
pub async fn get(&mut self, key: Key) -> Result<Option<Bytes>> {
println!("key: {:?}", key);
let leaf_node_offset = self.find_leaf_node_for(key).await?;
let leaf_node = self
.page(leaf_node_offset, PageBlockTypeHint::Node)
.await?
.as_leaf_node();
let Some(record_page_range) = leaf_node.get_record_page_range(key) else {
println!("not exists: {:?}", key);
return Ok(None);
};
println!("record_page_range: {:?}", record_page_range);
let bytes = self.record(record_page_range).await?;
Ok(Some(bytes))
}
Expand Down Expand Up @@ -237,26 +236,6 @@ impl Operator {
async fn reserve_page_offset(&mut self) -> Result<PageOffset> {
Ok(self.header_mut().await?.next_page_offset.fetch_increase(1))
}
// async fn pop_free_page(&mut self) -> Result<Option<PageOffset>> {
// let free_page_stack_top_page_offset = self.header().await?.free_page_stack_top_page_offset;
// if free_page_stack_top_page_offset.is_null() {
// return Ok(None);
// }

// let stack_node = self
// .page_mut(free_page_stack_top_page_offset)
// .await?
// .as_free_page_stack_node_mut();

// let page_offset = stack_node.pop();
// let next_page_offset = stack_node.next_page_offset;

// if stack_node.is_empty() {
// self.header_mut().await?.free_page_stack_top_page_offset = next_page_offset;
// }

// Ok(Some(page_offset))
// }
async fn header(&mut self) -> Result<&Header> {
Ok(self
.page(PageOffset::HEADER, PageBlockTypeHint::Header)
Expand Down Expand Up @@ -320,26 +299,27 @@ impl Operator {
}
}
async fn allocate_record_pages(&mut self, value: Bytes) -> Result<PageRange> {
// TODO: Get contiguous pages from free page stack

let record = Record::new(value);

let page_count = record.page_count();

let page_offset = self
.header_mut()
.await?
.next_page_offset
.fetch_increase(page_count as usize);
let page_range = if let Some(page_range) = self.try_pop_free_block(page_count).await? {
page_range
} else {
let page_offset = self
.header_mut()
.await?
.next_page_offset
.fetch_increase(page_count as usize);

let block_page_range = PageRange::data(page_offset, page_count);
PageRange::data(page_offset, page_count)
};

self.blocks_updated
.insert(block_page_range, PageBlock::Record(record));
.insert(page_range, PageBlock::Record(record));

Ok(block_page_range)
Ok(page_range)
}

async fn rollback_reserve_page_offset(&mut self, right_node_offset: PageOffset) -> Result<()> {
assert_eq!(
self.header().await?.next_page_offset.as_u32(),
Expand All @@ -348,6 +328,77 @@ impl Operator {
self.header_mut().await?.next_page_offset.decrease();
Ok(())
}
async fn free_stack_mut(&mut self, page_offset: PageOffset) -> Result<&mut FreeStackNode> {
Ok(self
.page_mut(page_offset, PageBlockTypeHint::FreeStackNode)
.await?
.as_free_stack_mut())
}
async fn try_pop_free_block(&mut self, page_count: u8) -> Result<Option<PageRange>> {
let mut parent_page_offset = PageOffset::NULL;
let mut page_offset = self.header().await?.free_stack_top_page_offset;

loop {
if page_offset.is_null() {
return Ok(None);
}
let free_stack = self.free_stack_mut(page_offset).await?;

if let Some(range) = free_stack.try_pop(page_count) {
if free_stack.is_empty() && !parent_page_offset.is_null() {
let next_page_offset = free_stack.next_page_offset;
let parent_node = self.free_stack_mut(parent_page_offset).await?;
if !parent_node.is_full() {
parent_node.next_page_offset = next_page_offset;
parent_node.push(PageRange::page(page_offset));
}
}

return Ok(Some(range));
}

parent_page_offset = page_offset;
page_offset = free_stack.next_page_offset;
}
}

async fn push_free_block(&mut self, page_range: PageRange) -> Result<()> {
enum Parent {
Header,
FreeStack(PageOffset),
}
let mut parent = Parent::Header;
let mut page_offset = self.header().await?.free_stack_top_page_offset;
loop {
if page_offset.is_null() {
let new_free_stack_offset = self.reserve_page_offset().await?;
let new_free_stack = FreeStackNode::new();
self.blocks_updated.insert(
PageRange::page(new_free_stack_offset),
PageBlock::Page(Page::FreeStackNode(new_free_stack)),
);
match parent {
Parent::Header => {
self.header_mut().await?.free_stack_top_page_offset = new_free_stack_offset;
}
Parent::FreeStack(page_offset) => {
self.free_stack_mut(page_offset).await?.next_page_offset =
new_free_stack_offset;
}
}
page_offset = new_free_stack_offset;
continue;
}

let free_stack = self.free_stack_mut(page_offset).await?;
if !free_stack.is_full() {
free_stack.push(page_range);
return Ok(());
}
parent = Parent::FreeStack(page_offset);
page_offset = free_stack.next_page_offset;
}
}
}

async fn read_block_from_file(
Expand Down Expand Up @@ -379,6 +430,12 @@ async fn read_block_from_file(
Node::Leaf(leaf_node) => Page::LeafNode(leaf_node),
}))
}
PageBlockTypeHint::FreeStackNode => {
assert_eq!(block_page_range.page_count(), 1);
assert_eq!(bytes.len(), PAGE_LEN);
let free_stack_node = FreeStackNode::from_slice(&bytes);
Ok(PageBlock::Page(Page::FreeStackNode(free_stack_node)))
}
}
}

Expand All @@ -391,6 +448,7 @@ pub struct Done {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum PageBlockTypeHint {
Header,
FreeStackNode,
Record,
Node,
}
Loading

0 comments on commit a75f2c2

Please sign in to comment.