Skip to content

Commit

Permalink
We need to suspend when we can't make progress!
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Sep 23, 2024
1 parent 8282cfb commit 8aba0b8
Showing 1 changed file with 39 additions and 12 deletions.
51 changes: 39 additions & 12 deletions src/vm/transitions/combinators.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::service_protocol::messages::CombinatorEntryMessage;
use crate::service_protocol::messages::{CombinatorEntryMessage, SuspensionMessage};
use crate::vm::context::Context;
use crate::vm::errors::{UnexpectedStateError, BAD_COMBINATOR_ENTRY};
use crate::vm::transitions::{PopJournalEntry, TransitionAndReturn};
Expand All @@ -14,7 +14,8 @@ use std::vec::IntoIter;
pub(crate) enum AsyncResultAccessTrackerInner {
Processing {
known_results: HashMap<AsyncResultHandle, AsyncResultState>,
tracked_access: Vec<AsyncResultHandle>,
tracked_access_to_completed_results: Vec<AsyncResultHandle>,
tracked_access_to_uncompleted_results: Vec<AsyncResultHandle>,
},
Replaying {
replay_combinators: Peekable<IntoIter<(AsyncResultHandle, AsyncResultState)>>,
Expand All @@ -26,15 +27,17 @@ impl AsyncResultAccessTrackerInner {
match self {
AsyncResultAccessTrackerInner::Processing {
known_results,
tracked_access,
tracked_access_to_completed_results,
tracked_access_to_uncompleted_results,
} => {
// Record if a known result is available
if let Some(res) = known_results.get(&handle) {
tracked_access.push(handle);
return *res;
tracked_access_to_completed_results.push(handle);
*res
} else {
tracked_access_to_uncompleted_results.push(handle);
AsyncResultState::NotReady
}

AsyncResultState::NotReady
}
AsyncResultAccessTrackerInner::Replaying {
replay_combinators: replay_status,
Expand Down Expand Up @@ -75,7 +78,8 @@ where
let mut async_result_tracker =
AsyncResultAccessTracker(AsyncResultAccessTrackerInner::Processing {
known_results: async_results.get_ready_results_state(),
tracked_access: vec![],
tracked_access_to_completed_results: vec![],
tracked_access_to_uncompleted_results: vec![],
});

if let Some(combinator_result) = combinator.try_complete(&mut async_result_tracker)
Expand All @@ -84,9 +88,10 @@ where

// Prepare the message to write out
let completed_entries_order = match async_result_tracker.0 {
AsyncResultAccessTrackerInner::Processing { tracked_access, .. } => {
tracked_access
}
AsyncResultAccessTrackerInner::Processing {
tracked_access_to_completed_results,
..
} => tracked_access_to_completed_results,
_ => unreachable!(),
};
let message = CombinatorEntryMessage {
Expand All @@ -113,7 +118,29 @@ where
Ok((self, Some(AsyncResultHandle(current_journal_index))))
} else {
// --- The combinator is not ready yet! Let's wait for more completions to come.
Ok((self, None))

if context.input_is_closed {
let uncompleted_entries_order = match async_result_tracker.0 {
AsyncResultAccessTrackerInner::Processing {
tracked_access_to_uncompleted_results,
..
} => tracked_access_to_uncompleted_results,
_ => unreachable!(),
};

// We can't do progress anymore, let's suspend
context.output.send(&SuspensionMessage {
entry_indexes: uncompleted_entries_order
.into_iter()
.map(Into::into)
.collect(),
});
context.output.send_eof();

Ok((State::Suspended, None))
} else {
Ok((self, None))
}
}
}
s => {
Expand Down

0 comments on commit 8aba0b8

Please sign in to comment.