From 4230bb491cb7acda0fb430aba5b2295a17118385 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Thu, 6 Feb 2025 20:29:47 -0500 Subject: [PATCH] Remove unnecessary reference counting from completion io callbacks --- bindings/wasm/lib.rs | 24 ++++++++++++------------ core/io/io_uring.rs | 21 ++++++++------------- core/io/memory.rs | 10 +++++----- core/io/mod.rs | 6 +++--- core/io/unix.rs | 23 +++++++++-------------- core/io/windows.rs | 15 +++++---------- core/lib.rs | 2 +- core/storage/database.rs | 22 +++++++++------------- core/storage/sqlite3_ondisk.rs | 24 ++++++++++++------------ core/storage/wal.rs | 2 +- simulator/runner/file.rs | 6 +++--- 11 files changed, 68 insertions(+), 87 deletions(-) diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 26f877199..f25f762ed 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -228,9 +228,9 @@ impl limbo_core::File for File { Ok(()) } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { - let r = match &*c { - limbo_core::Completion::Read(r) => r, + fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> { + let r = match &c { + limbo_core::Completion::Read(ref r) => r, _ => unreachable!(), }; { @@ -247,10 +247,10 @@ impl limbo_core::File for File { &self, pos: usize, buffer: Rc>, - c: Rc, + c: limbo_core::Completion, ) -> Result<()> { - let w = match &*c { - limbo_core::Completion::Write(w) => w, + let w = match &c { + limbo_core::Completion::Write(ref w) => w, _ => unreachable!(), }; let buf = buffer.borrow(); @@ -260,7 +260,7 @@ impl limbo_core::File for File { Ok(()) } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: limbo_core::Completion) -> Result<()> { self.vfs.sync(self.fd); c.complete(0); Ok(()) @@ -331,9 +331,9 @@ impl DatabaseStorage { } impl limbo_core::DatabaseStorage for DatabaseStorage { - fn read_page(&self, page_idx: usize, c: Rc) -> Result<()> { - let r = match c.as_ref() { - limbo_core::Completion::Read(r) => r, + fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> Result<()> { + let r = match c { + limbo_core::Completion::Read(ref r) => r, _ => unreachable!(), }; let size = r.buf().len(); @@ -350,7 +350,7 @@ impl limbo_core::DatabaseStorage for DatabaseStorage { &self, page_idx: usize, buffer: Rc>, - c: Rc, + c: limbo_core::Completion, ) -> Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; @@ -358,7 +358,7 @@ impl limbo_core::DatabaseStorage for DatabaseStorage { Ok(()) } - fn sync(&self, _c: Rc) -> Result<()> { + fn sync(&self, _c: limbo_core::Completion) -> Result<()> { todo!() } } diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 1598debfa..9b6979c79 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -40,7 +40,7 @@ pub struct UringIO { struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, - pub pending: HashMap>, + pub pending: HashMap, key: u64, } @@ -90,7 +90,7 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Rc) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Completion) { trace!("submit_entry({:?})", entry); self.pending.insert(entry.get_user_data(), c); unsafe { @@ -175,7 +175,7 @@ impl IO for UringIO { ))); } { - let c = ring.pending.get(&cqe.user_data()).unwrap().clone(); + let c = ring.pending.get(&cqe.user_data()).unwrap(); c.complete(cqe.result()); } ring.pending.remove(&cqe.user_data()); @@ -237,9 +237,9 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { - let r = match c.as_ref() { - Completion::Read(r) => r, + fn pread(&self, pos: usize, c: Completion) -> Result<()> { + let r = match c { + Completion::Read(ref r) => r, _ => unreachable!(), }; trace!("pread(pos = {}, length = {})", pos, r.buf().len()); @@ -259,12 +259,7 @@ impl File for UringFile { Ok(()) } - fn pwrite( - &self, - pos: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Rc>, c: Completion) -> Result<()> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { @@ -280,7 +275,7 @@ impl File for UringFile { Ok(()) } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: Completion) -> Result<()> { let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); trace!("sync()"); diff --git a/core/io/memory.rs b/core/io/memory.rs index 18decf78a..164268d5e 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -78,9 +78,9 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { - let r = match &*c { - Completion::Read(r) => r, + fn pread(&self, pos: usize, c: Completion) -> Result<()> { + let r = match &c { + Completion::Read(ref r) => r, _ => unreachable!(), }; let buf_len = r.buf().len(); @@ -122,7 +122,7 @@ impl File for MemoryFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Rc>, c: Completion) -> Result<()> { let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -159,7 +159,7 @@ impl File for MemoryFile { Ok(()) } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: Completion) -> Result<()> { // no-op c.complete(0); Ok(()) diff --git a/core/io/mod.rs b/core/io/mod.rs index f88a5d554..fa36e95be 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -12,9 +12,9 @@ use std::{ pub trait File { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Rc) -> Result<()>; - fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()>; - fn sync(&self, c: Rc) -> Result<()>; + fn pread(&self, pos: usize, c: Completion) -> Result<()>; + fn pwrite(&self, pos: usize, buffer: Rc>, c: Completion) -> Result<()>; + fn sync(&self, c: Completion) -> Result<()>; fn size(&self) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index effd94bf5..110e4546e 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -118,10 +118,10 @@ impl IO for UnixIO { } enum CompletionCallback { - Read(Rc>, Rc, usize), + Read(Rc>, Completion, usize), Write( Rc>, - Rc, + Completion, Rc>, usize, ), @@ -173,11 +173,11 @@ impl File for UnixFile { Ok(()) } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = { - let r = match c.as_ref() { - Completion::Read(r) => r, + let r = match c { + Completion::Read(ref r) => r, _ => unreachable!(), }; let mut buf = r.buf_mut(); @@ -201,7 +201,7 @@ impl File for UnixFile { } self.callbacks.borrow_mut().insert( fd as usize, - CompletionCallback::Read(self.file.clone(), c.clone(), pos), + CompletionCallback::Read(self.file.clone(), c, pos), ); Ok(()) } @@ -209,12 +209,7 @@ impl File for UnixFile { } } - fn pwrite( - &self, - pos: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Rc>, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = { let buf = buffer.borrow(); @@ -238,7 +233,7 @@ impl File for UnixFile { } self.callbacks.borrow_mut().insert( fd as usize, - CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos), + CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), ); Ok(()) } @@ -246,7 +241,7 @@ impl File for UnixFile { } } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/io/windows.rs b/core/io/windows.rs index 50acfcb50..d359c4575 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -54,12 +54,12 @@ impl File for WindowsFile { unimplemented!() } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match c.as_ref() { - Completion::Read(r) => r, + let r = match c { + Completion::Read(ref r) => r, _ => unreachable!(), }; let mut buf = r.buf_mut(); @@ -70,12 +70,7 @@ impl File for WindowsFile { Ok(()) } - fn pwrite( - &self, - pos: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Rc>, c: Completion) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -85,7 +80,7 @@ impl File for WindowsFile { Ok(()) } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: Completion) -> Result<()> { let file = self.file.borrow_mut(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); diff --git a/core/lib.rs b/core/lib.rs index 8fde24402..46ae86182 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -238,7 +238,7 @@ pub fn maybe_init_database_file(file: &Rc, io: &Arc) -> Result let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| { *flag_complete.borrow_mut() = true; }))); - file.pwrite(0, contents.buffer.clone(), Rc::new(completion))?; + file.pwrite(0, contents.buffer.clone(), completion)?; } let mut limit = 100; loop { diff --git a/core/storage/database.rs b/core/storage/database.rs index e59519f38..f9c20b972 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -7,14 +7,10 @@ use std::{cell::RefCell, rc::Rc}; /// the storage medium. A database can either be a file on disk, like in SQLite, /// or something like a remote page server service. pub trait DatabaseStorage { - fn read_page(&self, page_idx: usize, c: Rc) -> Result<()>; - fn write_page( - &self, - page_idx: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()>; - fn sync(&self, c: Rc) -> Result<()>; + fn read_page(&self, page_idx: usize, c: Completion) -> Result<()>; + fn write_page(&self, page_idx: usize, buffer: Rc>, c: Completion) + -> Result<()>; + fn sync(&self, c: Completion) -> Result<()>; } #[cfg(feature = "fs")] @@ -24,9 +20,9 @@ pub struct FileStorage { #[cfg(feature = "fs")] impl DatabaseStorage for FileStorage { - fn read_page(&self, page_idx: usize, c: Rc) -> Result<()> { - let r = match c.as_ref() { - Completion::Read(r) => r, + fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { + let r = match c { + Completion::Read(ref r) => r, _ => unreachable!(), }; let size = r.buf().len(); @@ -43,7 +39,7 @@ impl DatabaseStorage for FileStorage { &self, page_idx: usize, buffer: Rc>, - c: Rc, + c: Completion, ) -> Result<()> { let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); @@ -54,7 +50,7 @@ impl DatabaseStorage for FileStorage { Ok(()) } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: Completion) -> Result<()> { self.file.sync(c) } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 56bc959f9..7073e8b5a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -253,8 +253,8 @@ pub fn begin_read_database_header( let header = header.clone(); finish_read_database_header(buf, header).unwrap(); }); - let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); - page_io.read_page(1, c.clone())?; + let c = Completion::Read(ReadCompletion::new(buf, complete)); + page_io.read_page(1, c)?; Ok(result) } @@ -313,7 +313,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let c = Rc::new(Completion::Read(ReadCompletion::new(buf, read_complete))); + let c = Completion::Read(ReadCompletion::new(buf, read_complete)); page_source.read_page(1, c)?; // run get header block pager.io.run_once()?; @@ -327,7 +327,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re // finish_read_database_header(buf, header).unwrap(); }); - let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); + let c = Completion::Write(WriteCompletion::new(write_complete)); page_source.write_page(0, buffer_to_copy, c)?; Ok(()) @@ -675,8 +675,8 @@ pub fn begin_read_page( page.set_error(); } }); - let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); - page_io.read_page(page_idx, c.clone())?; + let c = Completion::Read(ReadCompletion::new(buf, complete)); + page_io.read_page(page_idx, c)?; Ok(()) } @@ -733,7 +733,7 @@ pub fn begin_write_btree_page( } }) }; - let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); + let c = Completion::Write(WriteCompletion::new(write_complete)); page_source.write_page(page_id, buffer.clone(), c)?; Ok(()) } @@ -746,7 +746,7 @@ pub fn begin_sync(page_io: Rc, syncing: Rc>) *syncing.borrow_mut() = false; }), }); - page_io.sync(Rc::new(completion))?; + page_io.sync(completion)?; Ok(()) } @@ -1145,7 +1145,7 @@ pub fn begin_read_wal_header(io: &Rc) -> Result> let header = header.clone(); finish_read_wal_header(buf, header).unwrap(); }); - let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); + let c = Completion::Read(ReadCompletion::new(buf, complete)); io.pread(0, c)?; Ok(result) } @@ -1187,7 +1187,7 @@ pub fn begin_read_wal_frame( let frame = frame.clone(); finish_read_page(2, buf, frame).unwrap(); }); - let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); + let c = Completion::Read(ReadCompletion::new(buf, complete)); io.pread(offset, c)?; Ok(()) } @@ -1262,7 +1262,7 @@ pub fn begin_write_wal_frame( } }) }; - let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); + let c = Completion::Write(WriteCompletion::new(write_complete)); io.pwrite(offset, buffer.clone(), c)?; Ok(checksums) } @@ -1295,7 +1295,7 @@ pub fn begin_write_wal_header(io: &Rc, header: &WalHeader) -> Result<( } }) }; - let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); + let c = Completion::Write(WriteCompletion::new(write_complete)); io.pwrite(0, buffer.clone(), c)?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e42fde2b6..fc5f68845 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -630,7 +630,7 @@ impl Wal for WalFile { *syncing.borrow_mut() = false; }), }); - shared.file.sync(Rc::new(completion))?; + shared.file.sync(completion)?; } self.sync_state.replace(SyncState::Syncing); Ok(CheckpointStatus::IO) diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 7a514ea1e..6d73af505 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -74,7 +74,7 @@ impl File for SimulatorFile { self.inner.unlock_file() } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { + fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> { *self.nr_pread_calls.borrow_mut() += 1; if *self.fault.borrow() { *self.nr_pread_faults.borrow_mut() += 1; @@ -89,7 +89,7 @@ impl File for SimulatorFile { &self, pos: usize, buffer: Rc>, - c: Rc, + c: limbo_core::Completion, ) -> Result<()> { *self.nr_pwrite_calls.borrow_mut() += 1; if *self.fault.borrow() { @@ -101,7 +101,7 @@ impl File for SimulatorFile { self.inner.pwrite(pos, buffer, c) } - fn sync(&self, c: Rc) -> Result<()> { + fn sync(&self, c: limbo_core::Completion) -> Result<()> { *self.nr_sync_calls.borrow_mut() += 1; self.inner.sync(c) }