Skip to content

Commit

Permalink
chore: address in-person review
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Feb 28, 2024
1 parent 1274d64 commit d0cd81c
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
2 changes: 1 addition & 1 deletion homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ async fn handle_swarm_event<DB: Database>(
debug!(
subject = "libp2p.kad.get_record",
category = "handle_swarm_event",
receipt_cid = receipt.cid().to_string(),
cid = receipt.cid().to_string(),
instruction_cid = receipt.instruction().cid().to_string(),
"found receipt record published by {}",
match peer_id {
Expand Down
20 changes: 14 additions & 6 deletions homestar-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub(crate) struct TaskScheduler<'a> {
pub(crate) run: Schedule<'a>,

/// Set of Cids to possibly fetch from the DHT.
pub(crate) promises_to_resolve: FnvHashSet<Cid>,
pub(crate) promises_to_resolve: Arc<FnvHashSet<Cid>>,

/// Step/batch to resume from.
pub(crate) resume_step: Option<usize>,
Expand Down Expand Up @@ -111,7 +111,10 @@ impl<'a> TaskScheduler<'a> {
let mut_graph = Arc::make_mut(&mut graph);
let schedule = &mut mut_graph.schedule;
let schedule_length = schedule.len();

// Gather all CIDs to resolve
let mut cids_to_resolve = Vec::new();
// Gather all resources to fetch
let mut resources_to_fetch = Vec::new();
let mut linkmap = LinkMap::<task::Result<Arg>>::default();

Expand All @@ -125,7 +128,6 @@ impl<'a> TaskScheduler<'a> {
for rsc in resource.iter() {
resources_to_fetch.push((cid, rsc.clone()));
}
cids_to_resolve.push(cid);
} else {
return Err(anyhow!("Resource not found for instruction {cid}"));
}
Expand All @@ -137,7 +139,6 @@ impl<'a> TaskScheduler<'a> {
if let Ok(found) = Db::find_instruction_pointers(&pointers, conn) {
for receipt in found.iter() {
resources_to_fetch.retain(|(cid, _)| *cid != receipt.instruction().cid());
cids_to_resolve.retain(|cid| *cid != receipt.instruction().cid());
linkmap.insert(receipt.instruction().cid(), receipt.output_as_arg());
}

Expand All @@ -151,12 +152,17 @@ impl<'a> TaskScheduler<'a> {
}
}

// Fetch resources from the DHT.
// Add all CIDs not resolved to the list of CIDs to resolve.
cids_to_resolve.extend(resources_to_fetch.iter().map(|(cid, _)| *cid));

// Fetch resources from the DHT as a unique set.
let resources_to_fetch: FnvHashSet<Resource> =
resources_to_fetch.into_iter().map(|(_, rsc)| rsc).collect();
let fetched_resources = fetch_fn(resources_to_fetch).await?;

// Store awaits outside of the workflow in our In-memory cache for resolving.
// Filter out promises/awaits outside of the workflow that
// have been already resolved and store them in our in-memory
// cache (linkmap).
let promises_as_pointers =
mut_graph
.awaiting
Expand All @@ -177,6 +183,8 @@ impl<'a> TaskScheduler<'a> {
linkmap.insert(receipt.instruction().cid(), receipt.output_as_arg());
}
}

// Convert the list of CIDs to resolve into a unique set.
let promises_to_resolve: FnvHashSet<Cid> = cids_to_resolve.into_iter().collect();

let (ran, run, resume_step) = if last_idx > 0 {
Expand All @@ -193,7 +201,7 @@ impl<'a> TaskScheduler<'a> {
Ok(SchedulerContext {
scheduler: Self {
linkmap: Arc::new(RwLock::new(linkmap)),
promises_to_resolve,
promises_to_resolve: Arc::new(promises_to_resolve),
ran,
run,
resume_step,
Expand Down
5 changes: 2 additions & 3 deletions homestar-runtime/src/worker/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ impl Resolver for Cid {

/// A resolver for CIDs that may be available on the DHT.
pub(crate) struct DHTResolver {
cids: FnvHashSet<Cid>,
cids: Arc<FnvHashSet<Cid>>,
p2p_receipt_timeout: Duration,
workflow_cid: Cid,
}

impl DHTResolver {
/// Create a new [DHTResolver].
pub(crate) fn new(
cids: FnvHashSet<Cid>,
cids: Arc<FnvHashSet<Cid>>,
p2p_receipt_timeout: Duration,
workflow_cid: Cid,
) -> Self {
Expand All @@ -104,7 +104,6 @@ where
DB: Database,
{
async fn poll(&self, ctx: &Poller<DB>) -> Result<()> {
println!("polling DHTResolver: {:?}", self.cids);
for cid in self.cids.iter() {
let (tx, rx) = AsyncChannel::oneshot();

Expand Down
1 change: 0 additions & 1 deletion homestar-runtime/src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ impl Promises {
}

/// Return an iterator over the [Promises] in-flow and out-flow [Cid]s.
#[allow(dead_code)]
pub(crate) fn iter(&self) -> impl Iterator<Item = (Origin, &Cid)> {
let in_iter = self.in_flow.iter().map(|cid| (Origin::InFlow, cid));
let out_iter = self.out_flow.iter().map(|cid| (Origin::OutFlow, cid));
Expand Down
3 changes: 3 additions & 0 deletions homestar-runtime/tests/network/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ fn test_libp2p_dht_records_integration() -> Result<()> {
assert!(retrieved_workflow_info_logged);
assert!(retrieved_receipt_info_logged);
assert!(committed_receipt);

let stored_receipt = Db::find_receipt_by_cid(put_receipt_cid, &mut db.conn().unwrap());
assert!(stored_receipt.is_ok());
});

Ok(())
Expand Down

0 comments on commit d0cd81c

Please sign in to comment.