Skip to content

Commit

Permalink
Tweak error handling for libertem_dectris
Browse files Browse the repository at this point in the history
* In case of an error in the background thread, log the error, raise an
  exception on the Python side and try to reconnect
* Drain: also drain away messages that are not of type dheader-1.0
  • Loading branch information
sk1p committed Mar 15, 2023
1 parent 617507d commit ae3ca10
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 92 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libertem_asi_tpx3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libertem-asi-tpx3"
authors = ["Alexander Clausen <[email protected]>"]
license = "MIT"
version = "0.2.4"
version = "0.2.5"
edition = "2021"
readme = "README.md"

Expand Down
2 changes: 1 addition & 1 deletion libertem_dectris/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libertem-dectris"
authors = ["Alexander Clausen <[email protected]>"]
license = "MIT"
version = "0.2.4"
version = "0.2.5"
edition = "2021"
readme = "README.md"

Expand Down
3 changes: 2 additions & 1 deletion libertem_dectris/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use zmq::{Context, Message, Socket, SocketEvent};

#[derive(Serialize, Deserialize, Debug, Clone)]
#[pyclass]
pub struct DSeriesOnly {
pub struct DSeriesAndType {
pub series: u64,
pub htype: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
190 changes: 103 additions & 87 deletions libertem_dectris/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::{

use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError};
use ipc_test::{SHMHandle, SharedSlabAllocator};
use log::{debug, info, warn};
use log::{debug, info, warn, error};
use zmq::{Message, Socket};

use crate::{
common::{
setup_monitor, DConfig, DHeader, DImage, DImageD, DSeriesEnd, DSeriesOnly, DetectorConfig,
setup_monitor, DConfig, DHeader, DImage, DImageD, DSeriesEnd, DSeriesAndType, DetectorConfig,
},
frame_stack::{FrameStackForWriting, FrameStackHandle},
};
Expand Down Expand Up @@ -386,11 +386,11 @@ fn drain_if_mismatch(
control_channel: &Receiver<ControlMsg>,
) -> Result<(), AcquisitionError> {
loop {
let series_res: Result<DSeriesOnly, _> = serde_json::from_str(msg.as_str().unwrap());
let series_res: Result<DSeriesAndType, _> = serde_json::from_str(msg.as_str().unwrap());

if let Ok(recvd_series) = series_res {
// everything is ok, we can go ahead:
if recvd_series.series == series {
if recvd_series.series == series && recvd_series.htype == "dheader-1.0" {
return Ok(());
}
}
Expand Down Expand Up @@ -424,94 +424,110 @@ fn background_thread(
frame_stack_size: usize,
mut shm: SharedSlabAllocator,
) -> Result<(), AcquisitionError> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::PULL).unwrap();
socket.set_rcvtimeo(1000).unwrap();
socket.connect(&uri).unwrap();
socket.set_rcvhwm(4 * 1024).unwrap();

setup_monitor(ctx, "DectrisReceiver".to_string(), &socket);

loop {
// control: main threads tells us to quit
let control = to_thread_r.recv_timeout(Duration::from_millis(100));
match control {
Ok(ControlMsg::StartAcquisitionPassive) => {
match passive_acquisition(
to_thread_r,
from_thread_s,
&socket,
frame_stack_size,
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
return Ok(());
}
e => {
return e;
'outer: loop {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::PULL).unwrap();
socket.set_rcvtimeo(1000).unwrap();
socket.connect(&uri).unwrap();
socket.set_rcvhwm(4 * 1024).unwrap();

setup_monitor(ctx, "DectrisReceiver".to_string(), &socket);

loop {
// control: main threads tells us to quit
let control = to_thread_r.recv_timeout(Duration::from_millis(100));
match control {
Ok(ControlMsg::StartAcquisitionPassive) => {
match passive_acquisition(
to_thread_r,
from_thread_s,
&socket,
frame_stack_size,
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
return Ok(());
}
Err(e) => {
let msg = format!("passive_acquisition error: {}", e);
from_thread_s
.send(ResultMsg::Error {
msg,
})
.unwrap();
error!("background_thread: error: {}; re-connecting", e);
continue 'outer;
}
}
}
}
Ok(ControlMsg::StartAcquisition { series }) => {
let mut msg: Message = Message::new();
recv_part(&mut msg, &socket, to_thread_r)?;

drain_if_mismatch(&mut msg, &socket, series, to_thread_r)?;

let dheader_res: Result<DHeader, _> = serde_json::from_str(msg.as_str().unwrap());
let dheader: DHeader = match dheader_res {
Ok(header) => header,
Err(err) => {
from_thread_s
.send(ResultMsg::SerdeError {
msg: err.to_string(),
recvd_msg: msg
.as_str()
.map_or_else(|| "".to_string(), |m| m.to_string()),
})
.unwrap();
log::error!(
"background_thread: serialization error: {}",
err.to_string()
);
break;
}
};
debug!("dheader: {dheader:?}");

// second message: the header itself
recv_part(&mut msg, &socket, to_thread_r)?;
let detector_config: DetectorConfig =
serde_json::from_str(msg.as_str().unwrap()).unwrap();

match acquisition(
detector_config,
to_thread_r,
from_thread_s,
&socket,
series,
frame_stack_size,
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
return Ok(());
}
e => {
return e;
Ok(ControlMsg::StartAcquisition { series }) => {
let mut msg: Message = Message::new();
recv_part(&mut msg, &socket, to_thread_r)?;

drain_if_mismatch(&mut msg, &socket, series, to_thread_r)?;

let dheader_res: Result<DHeader, _> = serde_json::from_str(msg.as_str().unwrap());
let dheader: DHeader = match dheader_res {
Ok(header) => header,
Err(err) => {
from_thread_s
.send(ResultMsg::SerdeError {
msg: err.to_string(),
recvd_msg: msg
.as_str()
.map_or_else(|| "".to_string(), |m| m.to_string()),
})
.unwrap();
log::error!(
"background_thread: serialization error: {}",
err.to_string()
);
break;
}
};
debug!("dheader: {dheader:?}");

// second message: the header itself
recv_part(&mut msg, &socket, to_thread_r)?;
let detector_config: DetectorConfig =
serde_json::from_str(msg.as_str().unwrap()).unwrap();

match acquisition(
detector_config,
to_thread_r,
from_thread_s,
&socket,
series,
frame_stack_size,
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
return Ok(());
}
Err(e) => {
let msg = format!("acquisition error: {}", e);
from_thread_s
.send(ResultMsg::Error {
msg,
})
.unwrap();
error!("background_thread: error: {}; re-connecting", e);
continue 'outer;
}
}
}
Ok(ControlMsg::StopThread) => {
debug!("background_thread: got a StopThread message");
break 'outer;
}
Err(RecvTimeoutError::Disconnected) => {
debug!("background_thread: control channel has disconnected");
break 'outer;
}
Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do
}
Ok(ControlMsg::StopThread) => {
debug!("background_thread: got a StopThread message");
break;
}
Err(RecvTimeoutError::Disconnected) => {
debug!("background_thread: control channel has disconnected");
break;
}
Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do
}
}
debug!("background_thread: is done");
Expand Down

0 comments on commit ae3ca10

Please sign in to comment.