Skip to content

Commit

Permalink
fix(core/supervisor): remove faulty optimization leading to deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
sargarass authored and GoldsteinE committed Aug 11, 2023
1 parent 9c41531 commit 57da181
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions elfo-core/src/supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::{
any::Any, future::Future, mem, ops::Deref, panic::AssertUnwindSafe, sync::Arc, time::Duration,
};
use std::{any::Any, future::Future, mem, panic::AssertUnwindSafe, sync::Arc, time::Duration};

use dashmap::DashMap;
use futures::{future::BoxFuture, FutureExt};
Expand Down Expand Up @@ -226,14 +224,24 @@ where
None => visitor.empty(envelope),
},
Outcome::Multicast(list) => {
let iter = list.into_iter().filter_map(|key| get_or_spawn!(self, key));
self.visit_multiple(envelope, visitor, iter);
let keys = list
.into_iter()
.filter(|key| get_or_spawn!(self, key.clone()).is_some())
.collect();
self.visit_multiple(envelope, visitor, keys);
}
Outcome::GentleMulticast(list) => {
let iter = list.into_iter().filter_map(|key| self.objects.get(&key));
self.visit_multiple(envelope, visitor, iter);
let keys = list
.into_iter()
.filter(|key| self.objects.get(key).is_some())
.collect();
self.visit_multiple(envelope, visitor, keys);
}
Outcome::Broadcast => self.visit_multiple(envelope, visitor, self.objects.iter()),
Outcome::Broadcast => self.visit_multiple(
envelope,
visitor,
self.objects.iter().map(|o| o.key().clone()).collect(),
),
Outcome::Discard => visitor.empty(envelope),
Outcome::Default => unreachable!("must be altered earlier"),
}
Expand All @@ -243,22 +251,22 @@ where
&self,
envelope: Envelope,
visitor: &mut dyn GroupVisitor,
iter: impl Iterator<Item = impl Deref<Target = ObjectArc>>,
// TODO: get rid of this allocation
keys: Vec<R::Key>,
) {
let mut iter = iter.peekable();

if iter.peek().is_none() {
return visitor.empty(envelope);
}

loop {
let object = iter.next().unwrap();
if iter.peek().is_none() {
return visitor.visit_last(&object, envelope);
let len = keys.len();
for (index, key) in keys.into_iter().enumerate() {
let item = self.objects.get(&key).unwrap();
if index + 1 == len {
visitor.visit_last(&item, envelope);
return;
} else {
visitor.visit(&object, &envelope);
visitor.visit(&item, &envelope);
}
}
if len == 0 {
visitor.empty(envelope);
}
}

fn spawn(self: &Arc<Self>, key: R::Key, mut backoff: Backoff) -> Option<ObjectArc> {
Expand Down

0 comments on commit 57da181

Please sign in to comment.