Skip to content

Commit

Permalink
Make DmaFile and OwnedDmaFile clonable (#657)
Browse files Browse the repository at this point in the history
The current mechanism for sharing an FD with a background thread
requires `dup` which can limit how many references to a file can exist
concurrently in background threads. The use-case is a file server that
responds to requests by handing back an FD and a region to respond with
- using `dup` limits how many concurrent requests can be referencing the
file at once due to kernel limits on simultaneous open FDs. However, by
letting DmaFile be clonable, it lets us send the FD to any thread
without risking race conditions about the FD closing while references
exist.

This does have a new consequence that close requests may fail if clones
exist, but this only impacts those who actually try to clone in the
first place, so should have no impact on existing users.

Co-authored-by: Glauber Costa <[email protected]>
  • Loading branch information
vlovich and Glauber Costa authored Apr 24, 2024
1 parent f920e80 commit 812fe6f
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 38 deletions.
2 changes: 1 addition & 1 deletion glommio/src/io/buffered_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl BufferedFile {
self.file.path()
}

pub(crate) fn discard(self) -> (RawFd, Option<PathBuf>) {
pub(crate) fn discard(self) -> (Option<RawFd>, Option<PathBuf>) {
self.file.discard()
}
}
Expand Down
7 changes: 6 additions & 1 deletion glommio/src/io/buffered_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,12 @@ impl StreamWriter {
}
Some(source) => {
let _ = source.result().unwrap();
self.file.take().unwrap().discard();
let file = self.file.take().unwrap();
let fd = file.as_raw_fd();
let (last_fd, _) = file.discard();
// This is really bad and shouldn't happen - we're handling the close event for the FD
// even though a reference is held onto it somewhere else. That means fd reuse is possible.
assert!(last_fd.is_some(), "Handling inner close for fd {fd} but it's still owned - fd reuse is a real risk");
Poll::Ready(Ok(()))
}
}
Expand Down
202 changes: 195 additions & 7 deletions glommio/src/io/dma_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(crate) fn align_down(v: u64, align: u64) -> u64 {
v & !(align - 1)
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// An asynchronously accessed Direct Memory Access (DMA) file.
///
/// All access uses Direct I/O, and all operations including open and close are
Expand Down Expand Up @@ -601,6 +601,27 @@ impl DmaFile {
self.file.statx().await.map(Into::into)
}

/// Attempt to confirm no other clones of this file exist. If no clones exist,
/// Ok(self) is returned. If clones remain, Err(self) is returned. Do not use
/// this unless you are implementing a polling mechanism to determine when it's
/// safe to invoke close - dropping the error case is equivalent to just invoking
/// close and ignoring the result.
/// NOTE: Clones are allowed to exist on any thread and all share the same underlying
/// fd safely. try_take_last_clone is also safe to invoke from any thread and will
/// behave correctly with respect to clones on other threads.
pub fn try_take_last_clone(mut self) -> std::result::Result<Self, Self> {
match self.file.try_take_last_clone() {
Ok(took) => {
self.file = took;
Ok(self)
}
Err(still_cloned) => {
self.file = still_cloned;
Err(self)
}
}
}

/// Closes this DMA file.
pub async fn close(self) -> Result<()> {
self.file.close().await
Expand Down Expand Up @@ -640,7 +661,7 @@ impl DmaFile {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// Takes ownership of internal state of the [`DmaFile`] that can be sent to an executor running on a different thread.
/// In the other thread, you'd convert this back into a [`DmaFile`].
///
Expand Down Expand Up @@ -721,18 +742,19 @@ pub struct OwnedDmaFile {
pollable: PollableStatus,
}

unsafe impl Send for OwnedDmaFile {}

impl OwnedDmaFile {
/// Creates a duplicate instance pointing to the same file descriptor as self.
/// See [`DmaFile::dup`](struct.DmaFile.html#method.dup) for more info.
/// See [`DmaFile::dup`](struct.DmaFile.html#method.dup) for more info. This is
/// very different from cloning which simply adds a reference to the underlying
/// fd already open and should be preferred unless you are trying to intentionally
/// decouple lifetime of the file descriptors.
pub fn dup(&self) -> Result<Self> {
Ok(Self {
file: enhanced_try!(
self.file.dup(),
"Duplicating",
self.file.path.as_ref(),
self.file.fd
self.file.fd.as_ref().map(|fd| fd.as_raw_fd())
)?,
o_direct_alignment: self.o_direct_alignment,
max_sectors_size: self.max_sectors_size,
Expand Down Expand Up @@ -776,7 +798,8 @@ impl AsRawFd for OwnedDmaFile {
pub(crate) mod test {
use super::*;
use crate::{
enclose, test_utils::make_test_directories, ByteSliceMutExt, Latency, LocalExecutor, Shares,
enclose, test_utils::make_test_directories, ByteSliceMutExt, GlommioError, Latency,
LocalExecutor, ResourceType, Shares,
};
use futures::join;
use futures_lite::{stream, StreamExt};
Expand Down Expand Up @@ -1661,4 +1684,169 @@ pub(crate) mod test {
assert_eq!(read.len(), buffer_len);
assert_eq!(original_write_buffer.as_slice(), &read[..]);
});

dma_file_test!(close_fails_if_clone_exists, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let cloned_file = file.clone();
assert_eq!(cloned_file.as_raw_fd(), file.as_raw_fd());

let cloned_file = cloned_file
.try_take_last_clone()
.expect_err("Can't take when a clone still exists");

match file.close().await {
Err(GlommioError::CanNotBeClosed(ResourceType::File(failed_path), reason)) => {
assert_eq!(Path::new(&failed_path), path);
assert_eq!(
reason,
"Another clone of this file exists somewhere - cannot close fd"
);
}
Ok(()) => panic!("Cannot close file while a clone exists"),
Err(e) => panic!("File close failed with an unexpected error {e:?}"),
}

cloned_file
.close()
.await
.expect("Last reference to file should close successfully");
});

dma_file_test!(close_succeeds_on_background_thread, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let cloned_file: OwnedDmaFile = file.clone().into();
assert_eq!(cloned_file.as_raw_fd(), file.as_raw_fd());

let (clone_registered_on_background_thread, waiting_for_registration) =
tokio::sync::oneshot::channel();
let (test_thread_dropped_file, waiting_for_file_drop) = tokio::sync::oneshot::channel();

let test_thread = std::thread::current().id();

let spawned_task = crate::executor()
.spawn_local(async move {
crate::executor()
.spawn_blocking(move || {
let local_ex = crate::executor::LocalExecutorBuilder::new(
crate::executor::Placement::Unbound,
)
.record_io_latencies(true)
.make()
.unwrap();
local_ex.run(async move {
assert_ne!(
std::thread::current().id(),
test_thread,
"Blocking thread and test thread shouldn't be the same thread"
);

let cloned_file: DmaFile = cloned_file.into();

clone_registered_on_background_thread.send(()).unwrap();
waiting_for_file_drop.await.unwrap();

cloned_file
.try_take_last_clone()
.expect("No more clones should exist")
.close()
.await
.expect("Close on background thread should succeed");
})
})
.await
})
.detach();

waiting_for_registration.await.unwrap();
let _ = file
.try_take_last_clone()
.expect_err("A clone is expected to exist");
test_thread_dropped_file.send(()).unwrap();
spawned_task.await.expect("Task wasn't cancelled");
});

dma_file_test!(share_file_between_threads, path, _k, {
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.tmpfile(true)
.dma_open(&path)
.await
.unwrap();

let cloned_file = file.clone();
assert_eq!(cloned_file.as_raw_fd(), file.as_raw_fd());

let owned_clone: OwnedDmaFile = cloned_file.into();

let test_thread = std::thread::current().id();

let expected_buffer: Vec<u8> = crate::executor()
.spawn_blocking(move || {
let local_ex =
crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound)
.record_io_latencies(true)
.make()
.unwrap();
local_ex.run(async move {
assert_ne!(
std::thread::current().id(),
test_thread,
"Blocking thread and test thread shouldn't be the same thread"
);

let cloned_file: DmaFile = owned_clone.into();

let buffer_len = cloned_file.alignment().max(4096) as usize;
let mut buffer = cloned_file.alloc_dma_buffer(buffer_len);
buffer.as_bytes_mut().fill(0);
for i in 0..10u8 {
buffer.as_bytes_mut()[i as usize] = i;
}
let original_write_buffer = buffer.as_bytes_mut().to_vec();

cloned_file
.write_at(buffer, cloned_file.alignment())
.await
.expect("File should be written from background thread successfully");

// See close_fails_if_clone_exists - error should be returned by close if a clone
// exists & that includes clones materialized on other threads.
if let GlommioError::CanNotBeClosed(ResourceType::File(_), _) =
cloned_file.close().await.expect_err(
"File shouldn't close because clone still exists back on main thread",
)
{
} else {
panic!("Unexpected error closing file");
}

original_write_buffer
})
})
.await;

let found_buffer = file
.read_at_aligned(file.alignment(), expected_buffer.len())
.await
.expect("File should be read from foreground thread successfully");
assert_eq!(&*found_buffer, expected_buffer.as_slice());
});
}
Loading

0 comments on commit 812fe6f

Please sign in to comment.