From fe799491229d601bf856a1fcfed9cb335f1fd3f9 Mon Sep 17 00:00:00 2001 From: Goldstein Date: Thu, 10 Aug 2023 15:27:06 +0300 Subject: [PATCH 1/2] fix(core/supervisor): remove faulty optimization leading to deadlock --- elfo-core/src/supervisor.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index e398fe1d..a3d62290 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -251,13 +251,8 @@ where return visitor.empty(envelope); } - loop { - let object = iter.next().unwrap(); - if iter.peek().is_none() { - return visitor.visit_last(&object, envelope); - } else { - visitor.visit(&object, &envelope); - } + while let Some(object) = iter.next() { + visitor.visit(&object, &envelope); } } From 130535206d5000ce232aed3b207a6f162244c876 Mon Sep 17 00:00:00 2001 From: Sargarass Date: Thu, 10 Aug 2023 16:13:28 +0300 Subject: [PATCH 2/2] fix(core/supervisor): mark last token as received MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ​ --- elfo-core/src/object.rs | 20 ++++---------------- elfo-core/src/supervisor.rs | 13 ++++++++----- elfo-network/src/worker/mod.rs | 8 +------- 3 files changed, 13 insertions(+), 28 deletions(-) diff --git a/elfo-core/src/object.rs b/elfo-core/src/object.rs index f66afa06..63b48390 100644 --- a/elfo-core/src/object.rs +++ b/elfo-core/src/object.rs @@ -147,13 +147,11 @@ pub(crate) trait GroupHandle: Send + Sync + 'static { /// Possible sequences of calls: /// * `done()`, if handled by a supervisor /// * `empty()`, if no relevant actors in a group -/// * `visit_last()`, if only one relevant actor in a group -/// * `visit()`, `visit()`, .., `visit_last()` +/// * `visit()`, otherwise pub trait GroupVisitor { fn done(&mut self); fn empty(&mut self, envelope: Envelope); - fn visit(&mut self, object: &ObjectArc, envelope: &Envelope); - fn visit_last(&mut self, object: &ObjectArc, envelope: Envelope); + fn visit(&mut self, object: &ObjectArc, envelope: Envelope); } // === SendGroupVisitor === @@ -266,12 +264,7 @@ impl GroupVisitor for SendGroupVisitor<'_> { self.extra = Some(envelope); } - fn visit(&mut self, object: &ObjectArc, envelope: &Envelope) { - let envelope = self.extra.take().unwrap_or_else(|| envelope.duplicate()); - self.try_send(object, envelope); - } - - fn visit_last(&mut self, object: &ObjectArc, envelope: Envelope) { + fn visit(&mut self, object: &ObjectArc, envelope: Envelope) { self.try_send(object, envelope); } } @@ -328,12 +321,7 @@ impl GroupVisitor for TrySendGroupVisitor { self.extra = Some(envelope); } - fn visit(&mut self, object: &ObjectArc, envelope: &Envelope) { - let envelope = self.extra.take().unwrap_or_else(|| envelope.duplicate()); - self.try_send(object, envelope); - } - - fn visit_last(&mut self, object: &ObjectArc, envelope: Envelope) { + fn visit(&mut self, object: &ObjectArc, envelope: Envelope) { self.try_send(object, envelope); } } diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index a3d62290..ee27f5ad 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -16,7 +16,7 @@ use crate::{ actor::{Actor, ActorMeta, ActorStatus}, config::{AnyConfig, Config, SystemConfig}, context::Context, - envelope::Envelope, + envelope::{Envelope, EnvelopeOwned}, exec::{Exec, ExecResult}, group::{RestartMode, RestartPolicy, TerminationPolicy}, message::Request, @@ -218,11 +218,11 @@ where match outcome { Outcome::Unicast(key) => match get_or_spawn!(self, key) { - Some(object) => visitor.visit_last(&object, envelope), + Some(object) => visitor.visit(&object, envelope), None => visitor.empty(envelope), }, Outcome::GentleUnicast(key) => match self.objects.get(&key) { - Some(object) => visitor.visit_last(&object, envelope), + Some(object) => visitor.visit(&object, envelope), None => visitor.empty(envelope), }, Outcome::Multicast(list) => { @@ -251,9 +251,12 @@ where return visitor.empty(envelope); } - while let Some(object) = iter.next() { - visitor.visit(&object, &envelope); + for item in iter { + visitor.visit(&item, envelope.duplicate()); } + + let (_, token) = envelope.unpack_request(); + token.forget(); } fn spawn(self: &Arc, key: R::Key, mut backoff: Backoff) -> Option { diff --git a/elfo-network/src/worker/mod.rs b/elfo-network/src/worker/mod.rs index 3c1f3f06..36ebb1e5 100644 --- a/elfo-network/src/worker/mod.rs +++ b/elfo-network/src/worker/mod.rs @@ -585,13 +585,7 @@ impl SocketReader { // TODO: maybe emit some metric? } - fn visit(&mut self, object: &ObjectArc, envelope: &Envelope) { - let envelope = envelope.duplicate(); - self.this - .do_handle_message(self.flows, object, envelope, true); - } - - fn visit_last(&mut self, object: &ObjectArc, envelope: Envelope) { + fn visit(&mut self, object: &ObjectArc, envelope: Envelope) { self.this .do_handle_message(self.flows, object, envelope, true); }