Skip to content

Commit

Permalink
Remove unnecessary reference counting from completion io callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
PThorpe92 committed Feb 7, 2025
1 parent 098da07 commit 4230bb4
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 87 deletions.
24 changes: 12 additions & 12 deletions bindings/wasm/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ impl limbo_core::File for File {
Ok(())
}

fn pread(&self, pos: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
let r = match &*c {
limbo_core::Completion::Read(r) => r,
fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> {
let r = match &c {
limbo_core::Completion::Read(ref r) => r,
_ => unreachable!(),
};
{
Expand All @@ -247,10 +247,10 @@ impl limbo_core::File for File {
&self,
pos: usize,
buffer: Rc<std::cell::RefCell<limbo_core::Buffer>>,
c: Rc<limbo_core::Completion>,
c: limbo_core::Completion,
) -> Result<()> {
let w = match &*c {
limbo_core::Completion::Write(w) => w,
let w = match &c {
limbo_core::Completion::Write(ref w) => w,
_ => unreachable!(),
};
let buf = buffer.borrow();
Expand All @@ -260,7 +260,7 @@ impl limbo_core::File for File {
Ok(())
}

fn sync(&self, c: Rc<limbo_core::Completion>) -> Result<()> {
fn sync(&self, c: limbo_core::Completion) -> Result<()> {
self.vfs.sync(self.fd);
c.complete(0);
Ok(())
Expand Down Expand Up @@ -331,9 +331,9 @@ impl DatabaseStorage {
}

impl limbo_core::DatabaseStorage for DatabaseStorage {
fn read_page(&self, page_idx: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
let r = match c.as_ref() {
limbo_core::Completion::Read(r) => r,
fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> Result<()> {
let r = match c {
limbo_core::Completion::Read(ref r) => r,
_ => unreachable!(),
};
let size = r.buf().len();
Expand All @@ -350,15 +350,15 @@ impl limbo_core::DatabaseStorage for DatabaseStorage {
&self,
page_idx: usize,
buffer: Rc<std::cell::RefCell<limbo_core::Buffer>>,
c: Rc<limbo_core::Completion>,
c: limbo_core::Completion,
) -> Result<()> {
let size = buffer.borrow().len();
let pos = (page_idx - 1) * size;
self.file.pwrite(pos, buffer, c)?;
Ok(())
}

fn sync(&self, _c: Rc<limbo_core::Completion>) -> Result<()> {
fn sync(&self, _c: limbo_core::Completion) -> Result<()> {
todo!()
}
}
Expand Down
21 changes: 8 additions & 13 deletions core/io/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct UringIO {
struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
pub pending: HashMap<u64, Rc<Completion>>,
pub pending: HashMap<u64, Completion>,
key: u64,
}

Expand Down Expand Up @@ -90,7 +90,7 @@ impl InnerUringIO {
}

impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Rc<Completion>) {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Completion) {
trace!("submit_entry({:?})", entry);
self.pending.insert(entry.get_user_data(), c);
unsafe {
Expand Down Expand Up @@ -175,7 +175,7 @@ impl IO for UringIO {
)));
}
{
let c = ring.pending.get(&cqe.user_data()).unwrap().clone();
let c = ring.pending.get(&cqe.user_data()).unwrap();
c.complete(cqe.result());
}
ring.pending.remove(&cqe.user_data());
Expand Down Expand Up @@ -237,9 +237,9 @@ impl File for UringFile {
Ok(())
}

fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let r = match c.as_ref() {
Completion::Read(r) => r,
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
let r = match c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
Expand All @@ -259,12 +259,7 @@ impl File for UringFile {
Ok(())
}

fn pwrite(
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<Completion>,
) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
let mut io = self.io.borrow_mut();
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let write = {
Expand All @@ -280,7 +275,7 @@ impl File for UringFile {
Ok(())
}

fn sync(&self, c: Rc<Completion>) -> Result<()> {
fn sync(&self, c: Completion) -> Result<()> {
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let mut io = self.io.borrow_mut();
trace!("sync()");
Expand Down
10 changes: 5 additions & 5 deletions core/io/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ impl File for MemoryFile {
Ok(())
}

fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let r = match &*c {
Completion::Read(r) => r,
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
let r = match &c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
let buf_len = r.buf().len();
Expand Down Expand Up @@ -122,7 +122,7 @@ impl File for MemoryFile {
Ok(())
}

fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Completion) -> Result<()> {
let buf = buffer.borrow();
let buf_len = buf.len();
if buf_len == 0 {
Expand Down Expand Up @@ -159,7 +159,7 @@ impl File for MemoryFile {
Ok(())
}

fn sync(&self, c: Rc<Completion>) -> Result<()> {
fn sync(&self, c: Completion) -> Result<()> {
// no-op
c.complete(0);
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions core/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::{
pub trait File {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
fn sync(&self, c: Rc<Completion>) -> Result<()>;
fn pread(&self, pos: usize, c: Completion) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Completion) -> Result<()>;
fn sync(&self, c: Completion) -> Result<()>;
fn size(&self) -> Result<u64>;
}

Expand Down
23 changes: 9 additions & 14 deletions core/io/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ impl IO for UnixIO {
}

enum CompletionCallback {
Read(Rc<RefCell<std::fs::File>>, Rc<Completion>, usize),
Read(Rc<RefCell<std::fs::File>>, Completion, usize),
Write(
Rc<RefCell<std::fs::File>>,
Rc<Completion>,
Completion,
Rc<RefCell<crate::Buffer>>,
usize,
),
Expand Down Expand Up @@ -173,11 +173,11 @@ impl File for UnixFile {
Ok(())
}

fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
let file = self.file.borrow();
let result = {
let r = match c.as_ref() {
Completion::Read(r) => r,
let r = match c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
let mut buf = r.buf_mut();
Expand All @@ -201,20 +201,15 @@ impl File for UnixFile {
}
self.callbacks.borrow_mut().insert(
fd as usize,
CompletionCallback::Read(self.file.clone(), c.clone(), pos),
CompletionCallback::Read(self.file.clone(), c, pos),
);
Ok(())
}
Err(e) => Err(e.into()),
}
}

fn pwrite(
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<Completion>,
) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
let file = self.file.borrow();
let result = {
let buf = buffer.borrow();
Expand All @@ -238,15 +233,15 @@ impl File for UnixFile {
}
self.callbacks.borrow_mut().insert(
fd as usize,
CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos),
CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos),
);
Ok(())
}
Err(e) => Err(e.into()),
}
}

fn sync(&self, c: Rc<Completion>) -> Result<()> {
fn sync(&self, c: Completion) -> Result<()> {
let file = self.file.borrow();
let result = fs::fsync(file.as_fd());
match result {
Expand Down
15 changes: 5 additions & 10 deletions core/io/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ impl File for WindowsFile {
unimplemented!()
}

fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
let r = match c.as_ref() {
Completion::Read(r) => r,
let r = match c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
let mut buf = r.buf_mut();
Expand All @@ -70,12 +70,7 @@ impl File for WindowsFile {
Ok(())
}

fn pwrite(
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<Completion>,
) -> Result<()> {
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<crate::Buffer>>, c: Completion) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
Expand All @@ -85,7 +80,7 @@ impl File for WindowsFile {
Ok(())
}

fn sync(&self, c: Rc<Completion>) -> Result<()> {
fn sync(&self, c: Completion) -> Result<()> {
let file = self.file.borrow_mut();
file.sync_all().map_err(LimboError::IOError)?;
c.complete(0);
Expand Down
2 changes: 1 addition & 1 deletion core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub fn maybe_init_database_file(file: &Rc<dyn File>, io: &Arc<dyn IO>) -> Result
let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| {
*flag_complete.borrow_mut() = true;
})));
file.pwrite(0, contents.buffer.clone(), Rc::new(completion))?;
file.pwrite(0, contents.buffer.clone(), completion)?;
}
let mut limit = 100;
loop {
Expand Down
22 changes: 9 additions & 13 deletions core/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@ use std::{cell::RefCell, rc::Rc};
/// the storage medium. A database can either be a file on disk, like in SQLite,
/// or something like a remote page server service.
pub trait DatabaseStorage {
fn read_page(&self, page_idx: usize, c: Rc<Completion>) -> Result<()>;
fn write_page(
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<Completion>,
) -> Result<()>;
fn sync(&self, c: Rc<Completion>) -> Result<()>;
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()>;
fn write_page(&self, page_idx: usize, buffer: Rc<RefCell<Buffer>>, c: Completion)
-> Result<()>;
fn sync(&self, c: Completion) -> Result<()>;
}

#[cfg(feature = "fs")]
Expand All @@ -24,9 +20,9 @@ pub struct FileStorage {

#[cfg(feature = "fs")]
impl DatabaseStorage for FileStorage {
fn read_page(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
let r = match c.as_ref() {
Completion::Read(r) => r,
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
let r = match c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
let size = r.buf().len();
Expand All @@ -43,7 +39,7 @@ impl DatabaseStorage for FileStorage {
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<Completion>,
c: Completion,
) -> Result<()> {
let buffer_size = buffer.borrow().len();
assert!(buffer_size >= 512);
Expand All @@ -54,7 +50,7 @@ impl DatabaseStorage for FileStorage {
Ok(())
}

fn sync(&self, c: Rc<Completion>) -> Result<()> {
fn sync(&self, c: Completion) -> Result<()> {
self.file.sync(c)
}
}
Expand Down
Loading

0 comments on commit 4230bb4

Please sign in to comment.