diff --git a/src/receive/decoding.rs b/src/receive/decoding.rs index 0322bbb..91f8555 100644 --- a/src/receive/decoding.rs +++ b/src/receive/decoding.rs @@ -2,7 +2,10 @@ use crate::{protocol, receive}; -pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::Error> { +pub(crate) fn start( + receiver: &receive::Receiver, + nb_decoding_threads: u8, +) -> Result<(), receive::Error> { let encoding_block_size = receiver.object_transmission_info.transfer_length(); loop { @@ -21,20 +24,34 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E match decoder.decode(packets) { None => { - log::warn!("lost block {block_id}"); + log::error!("lost block {block_id}, synchronization lost"); continue; } Some(block) => { - log::trace!("block {} decoded with {} bytes!", block_id, block.len()); + log::trace!("block {block_id} decoded with {} bytes!", block.len()); + + let mut retried = 0; loop { let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock"); + if *to_receive == block_id { + // The decoded block is the expected one, dispatching it receiver .to_dispatch .send(protocol::Message::deserialize(block))?; *to_receive = to_receive.wrapping_add(1); break; + } else { + // The decoded block is not the expected one + // Retrying until all decoding threads had one chance to dispatch their block + if nb_decoding_threads < retried { + // All decoding threads should have had one chance to dispatch their block + log::warn!("dropping block {block_id} after trying to dispatch it {retried} times"); + + break; + } + retried += 1; } } } diff --git a/src/receive/mod.rs b/src/receive/mod.rs index e2e8b3e..6e078b1 100644 --- a/src/receive/mod.rs +++ b/src/receive/mod.rs @@ -276,7 +276,9 @@ where for i in 0..self.config.nb_decoding_threads { thread::Builder::new() .name(format!("decoding_{i}")) - .spawn_scoped(scope, || decoding::start(self))?; + .spawn_scoped(scope, || { + decoding::start(self, self.config.nb_decoding_threads) + })?; } thread::Builder::new()