Skip to content

Commit

Permalink
More explicit error handling; bump to v0.2.14
Browse files Browse the repository at this point in the history
Handle cases where constructing the `Mutex` object can fail, and push
the related error through the system. Replace manual `Display`
implementations with `thiserror` usage.
  • Loading branch information
sk1p committed Oct 1, 2024
1 parent a7ca811 commit 8da6ffc
Show file tree
Hide file tree
Showing 28 changed files with 303 additions and 266 deletions.
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "common"
authors = ["Alexander Clausen <[email protected]>"]
license = "MIT"
version = "0.2.13"
version = "0.2.14"
edition = "2021"
rust-version = "1.71"

Expand Down Expand Up @@ -34,3 +34,6 @@ criterion = "0.5.1"
[[bench]]
name = "casting"
harness = false

[lints.rust]
unused_must_use = "deny"
33 changes: 21 additions & 12 deletions common/src/frame_stack.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use ipc_test::{SharedSlabAllocator, SlotForWriting};
use ipc_test::{slab::ShmError, SharedSlabAllocator, SlotForWriting};
use log::{error, warn};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
Expand Down Expand Up @@ -45,6 +45,9 @@ pub enum FrameStackWriteError {

#[error("too small")]
TooSmall,

#[error("SHM access error: {0}")]
ShmAccessError(#[from] ShmError),
}

impl From<FrameStackWriteError> for PyErr {
Expand All @@ -56,6 +59,7 @@ impl From<FrameStackWriteError> for PyErr {
FrameStackWriteError::TooSmall => {
PyValueError::new_err("frame stack too small to handle single frame")
}
FrameStackWriteError::ShmAccessError(e) => PyValueError::new_err(e.to_string()),
}
}
}
Expand All @@ -64,6 +68,9 @@ impl From<FrameStackWriteError> for PyErr {
pub enum SplitError<M: FrameMeta> {
#[error("shm full")]
ShmFull(FrameStackHandle<M>),

#[error("shm access error: {0}")]
AccessError(#[from] ShmError),
}

pub struct FrameStackForWriting<M>
Expand Down Expand Up @@ -160,7 +167,7 @@ where
) -> Result<FrameStackHandle<M>, FrameStackWriteError> {
if self.is_empty() {
let slot_info = shm.writing_done(self.slot);
shm.free_idx(slot_info.slot_idx);
shm.free_idx(slot_info.slot_idx)?;
return Err(FrameStackWriteError::Empty);
}

Expand All @@ -180,7 +187,7 @@ where
) -> Result<(), FrameStackWriteError> {
if self.is_empty() {
let slot_info = shm.writing_done(self.slot);
shm.free_idx(slot_info.slot_idx);
shm.free_idx(slot_info.slot_idx)?;
Ok(())
} else {
Err(FrameStackWriteError::NonEmpty)
Expand Down Expand Up @@ -296,15 +303,17 @@ mod inner {
let mut slot_left = match shm.try_get_mut() {
Ok(s) => s,
Err(ShmError::NoSlotAvailable) => return Err(SplitError::ShmFull(self)),
Err(e @ ShmError::MutexError(_)) => return Err(e.into()),
};
let mut slot_right = match shm.try_get_mut() {
Ok(s) => s,
Err(ShmError::NoSlotAvailable) => {
// don't leak the left slot!
let l = shm.writing_done(slot_left);
shm.free_idx(l.slot_idx);
shm.free_idx(l.slot_idx)?;
return Err(SplitError::ShmFull(self));
}
Err(e @ ShmError::MutexError(_)) => return Err(e.into()),
};

let slice_left = slot_left.as_slice_mut();
Expand All @@ -316,7 +325,7 @@ mod inner {
let left = shm.writing_done(slot_left);
let right = shm.writing_done(slot_right);

shm.free_idx(self.slot.slot_idx);
shm.free_idx(self.slot.slot_idx)?;

(left, right)
};
Expand Down Expand Up @@ -351,8 +360,8 @@ mod inner {
f(&slot_r)
}

pub fn free_slot(self, shm: &mut SharedSlabAllocator) {
shm.free_idx(self.slot.slot_idx);
pub fn free_slot(self, shm: &mut SharedSlabAllocator) -> Result<(), ShmError> {
shm.free_idx(self.slot.slot_idx)
}
}

Expand Down Expand Up @@ -493,7 +502,7 @@ impl<'b, M: FrameMeta> Drop for WriteGuard<'b, M> {
match frame_stack.writing_done(self.shm) {
Ok(frame_stack) => {
warn!("discarding non-empty frame stack as result of previous errors");
frame_stack.free_slot(self.shm);
let _ = frame_stack.free_slot(self.shm);
}
Err(e) => error!("WriteGuard::drop failed: {e:?}"),
}
Expand Down Expand Up @@ -616,12 +625,12 @@ mod tests {
assert_eq!(a.offsets.len() + b.offsets.len(), 2);

// when the split is done, there should be one free shm slot:
assert_eq!(shm.num_slots_free(), 1);
assert_eq!(shm.num_slots_free().unwrap(), 1);

// and we can free them again:
shm.free_idx(a.slot.slot_idx);
shm.free_idx(b.slot.slot_idx);
shm.free_idx(a.slot.slot_idx).unwrap();
shm.free_idx(b.slot.slot_idx).unwrap();

assert_eq!(shm.num_slots_free(), 3);
assert_eq!(shm.num_slots_free().unwrap(), 3);
}
}
10 changes: 8 additions & 2 deletions common/src/generic_cam_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Debug;

use ipc_test::{slab::SlabInitError, SharedSlabAllocator};
use ipc_test::{
slab::{ShmError, SlabInitError},
SharedSlabAllocator,
};
use multiversion::multiversion;
use ndarray::ArrayViewMut3;
use num::cast::AsPrimitive;
Expand All @@ -18,6 +21,9 @@ pub enum CamClientError {
error: SlabInitError,
},

#[error("failed to access SHM: {0}")]
ShmError(#[from] ShmError),

#[error("operation on closed client")]
Closed,

Expand Down Expand Up @@ -166,7 +172,7 @@ where
M: FrameMeta,
{
let shm = self.get_shm_mut()?;
handle.free_slot(shm);
handle.free_slot(shm)?;
Ok(())
}

Expand Down
22 changes: 15 additions & 7 deletions common/src/generic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
time::{Duration, Instant},
};

use ipc_test::{slab::SlabInitError, SharedSlabAllocator};
use ipc_test::{
slab::{ShmError, SlabInitError},
SharedSlabAllocator,
};
use log::{debug, info, trace, warn};
use stats::Stats;

Expand Down Expand Up @@ -53,6 +56,9 @@ pub enum ConnectionError {
#[error("could not connect to SHM area: {0}")]
ShmConnectError(#[from] SlabInitError),

#[error("could not access SHM")]
ShmAccessError(#[from] ShmError),

#[error("background thread is dead")]
Disconnected,

Expand Down Expand Up @@ -117,6 +123,7 @@ where
frame_stack = old_frame_stack;
continue;
}
Err(SplitError::AccessError(e)) => return Err(e.into()),
};
}
}
Expand Down Expand Up @@ -314,13 +321,13 @@ where
return Err(ConnectionError::FatalError(error))
}
ReceiverMsg::FrameStack { frame_stack } => {
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
return Err(ConnectionError::UnexpectedMessage(
"ReceiverMsg::FrameStack in wait_for_arm".to_owned(),
));
}
ReceiverMsg::Finished { frame_stack } => {
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
return Err(ConnectionError::UnexpectedMessage(
"ReceiverMsg::Finished in wait_for_arm".to_owned(),
));
Expand Down Expand Up @@ -406,11 +413,11 @@ where
match res {
ReceiverMsg::FrameStack { frame_stack } => {
trace!("wait_for_status: ignoring received FrameStackHandle");
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
}
ReceiverMsg::Finished { frame_stack } => {
warn!("wait_for_status: ignoring FrameStackHandle received in ReceiverMsg::Finished message");
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
}
ReceiverMsg::FatalError { error } => {
return Err(ConnectionError::FatalError(error));
Expand Down Expand Up @@ -568,12 +575,13 @@ where
self.wait_for_status(ConnectionStatus::Idle, *timeout, periodic_callback)
}

pub fn log_shm_stats(&self) {
pub fn log_shm_stats(&self) -> Result<(), ConnectionError> {
let shm = &self.shm;
let free = shm.num_slots_free();
let free = shm.num_slots_free()?;
let total = shm.num_slots_total();
self.stats.log_stats();
info!("shm stats free/total: {}/{}", free, total);
Ok(())
}

pub fn reset_stats(&mut self) {
Expand Down
8 changes: 6 additions & 2 deletions common/src/py_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ macro_rules! impl_py_connection {
let _trace_guard = span_from_py(py, &format!("{}::close", stringify!($name)))?;

if let Some(mut conn_impl) = self.conn_impl.take() {
conn_impl.log_shm_stats();
conn_impl
.log_shm_stats()
.map_err(|e| PyConnectionError::new_err(e.to_string()))?;
conn_impl.reset_stats();
conn_impl.close();
Ok(())
Expand Down Expand Up @@ -261,7 +263,9 @@ macro_rules! impl_py_connection {

pub fn log_shm_stats(&self) -> PyResult<()> {
let conn_impl = self.get_conn()?;
conn_impl.log_shm_stats();
conn_impl
.log_shm_stats()
.map_err(|e| PyConnectionError::new_err(e.to_string()))?;
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions ipc_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ sendfd = "0.4.3"

[target.'cfg(not(windows))'.dependencies]
nix = { version = "0.29.0", features = ["poll"] }

[lints.rust]
unused_must_use = "deny"
4 changes: 2 additions & 2 deletions ipc_test/examples/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ fn main() {
// some additional "work":
//std::thread::sleep(Duration::from_micros(1));

ssa.free_idx(slot_info.slot_idx);
ssa.free_idx(slot_info.slot_idx).unwrap();

sum += sum_part.0 as f64;
bytes_processed += SLOT_SIZE_BYTES;

if t0.elapsed() > Duration::from_secs(1) {
let slots_free = ssa.num_slots_free();
let slots_free = ssa.num_slots_free().unwrap();
println!(
"idx: {idx:5}, sum: {sum_part}, throughput: {:7.2} MiB/s, slots free: {slots_free}",
bytes_processed as f32 / 1024.0 / 1024.0
Expand Down
2 changes: 1 addition & 1 deletion ipc_test/examples/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn handle_connection(
}

println!("done sending {} items", send_num_items);
while ssa.num_slots_free() < ssa.num_slots_total() {
while ssa.num_slots_free().unwrap() < ssa.num_slots_total() {
thread::sleep(Duration::from_millis(100));
}
println!("done!")
Expand Down
2 changes: 1 addition & 1 deletion ipc_test/src/backend_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl SharedMemory {
impl Drop for SharedMemory {
fn drop(&mut self) {
if self.is_owner {
remove_file(&self.handle_path).unwrap();
let _ = remove_file(&self.handle_path);
}
}
}
Expand Down
Loading

0 comments on commit 8da6ffc

Please sign in to comment.