diff --git a/elfo-configurer/src/lib.rs b/elfo-configurer/src/lib.rs index 8b39d4af..d7b59e60 100644 --- a/elfo-configurer/src/lib.rs +++ b/elfo-configurer/src/lib.rs @@ -22,7 +22,7 @@ use elfo_core::{ }, msg, scope, signal::{Signal, SignalKind}, - ActorGroup, ActorStatus, Addr, Blueprint, Context, Topology, + ActorGroup, ActorStatus, Addr, Blueprint, Context, RestartParams, RestartPolicy, Topology, }; pub use self::protocol::*; @@ -48,6 +48,10 @@ fn blueprint(topology: &Topology, source: ConfigSource) -> Blueprint { let topology = topology.clone(); ActorGroup::new() .stop_order(100) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .exec(move |ctx| Configurer::new(ctx, topology.clone(), source.clone()).main()) } diff --git a/elfo-core/Cargo.toml b/elfo-core/Cargo.toml index e1d6a4bb..6c286821 100644 --- a/elfo-core/Cargo.toml +++ b/elfo-core/Cargo.toml @@ -50,6 +50,7 @@ regex = "1.6.0" thread_local = { version = "1.1.3", optional = true } unicycle = "0.9.3" rmp-serde = { version = "1.1.0", optional = true } +humantime-serde = "1" [dev-dependencies] elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] } diff --git a/elfo-core/src/actor.rs b/elfo-core/src/actor.rs index 95ee5e9c..a5641bf4 100644 --- a/elfo-core/src/actor.rs +++ b/elfo-core/src/actor.rs @@ -9,11 +9,12 @@ use tracing::{error, info, warn}; use crate::{ envelope::Envelope, errors::{SendError, TrySendError}, - group::{RestartPolicy, TerminationPolicy}, + group::TerminationPolicy, mailbox::{Mailbox, RecvResult}, messages::{ActorStatusReport, Terminate}, msg, request_table::RequestTable, + restarting::RestartPolicy, scope, subscription::SubscriptionManager, Addr, @@ -117,6 +118,49 @@ impl ActorStatusKind { } } +// === ActorStartInfo === + +/// A struct holding information related to an actor start. +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub struct ActorStartInfo { + /// The cause for the actor start, indicating why the actor is being + /// initialized + pub cause: ActorStartCause, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// An enum representing various causes for an actor to start. +#[non_exhaustive] +pub enum ActorStartCause { + /// The actor started because its group was mounted. + GroupMounted, + /// The actor started in response to a message. + OnMessage, + /// The actor started due to the restart policy. + Restarted, +} + +impl ActorStartInfo { + pub(crate) fn on_group_mounted() -> Self { + Self { + cause: ActorStartCause::GroupMounted, + } + } + + pub(crate) fn on_message() -> Self { + Self { + cause: ActorStartCause::OnMessage, + } + } + + pub(crate) fn on_restart() -> Self { + Self { + cause: ActorStartCause::Restarted, + } + } +} + // === Actor === pub(crate) struct Actor { @@ -127,6 +171,7 @@ pub(crate) struct Actor { control: RwLock, finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`? status_subscription: Arc, + start_info: ActorStartInfo, } struct ControlBlock { @@ -139,6 +184,7 @@ impl Actor { pub(crate) fn new( meta: Arc, addr: Addr, + start_info: ActorStartInfo, termination_policy: TerminationPolicy, status_subscription: Arc, ) -> Self { @@ -153,6 +199,7 @@ impl Actor { }), finished: ManualResetEvent::new(false), status_subscription, + start_info, } } @@ -209,6 +256,10 @@ impl Actor { &self.request_table } + pub(crate) fn start_info(&self) -> ActorStartInfo { + self.start_info.clone() + } + pub(crate) fn restart_policy(&self) -> Option { self.control.read().restart_policy.clone() } diff --git a/elfo-core/src/config.rs b/elfo-core/src/config.rs index 98e906dc..063b5e8b 100644 --- a/elfo-core/src/config.rs +++ b/elfo-core/src/config.rs @@ -162,6 +162,7 @@ pub(crate) struct SystemConfig { pub(crate) logging: crate::logging::LoggingConfig, pub(crate) dumping: crate::dumping::DumpingConfig, pub(crate) telemetry: crate::telemetry::TelemetryConfig, + pub(crate) restarting: crate::restarting::RestartingConfig, } // === Secret === diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 38900790..59dcf2b3 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -7,7 +7,7 @@ use tracing::{info, trace}; use elfo_utils::unlikely; use crate::{ - actor::{Actor, ActorStatus}, + actor::{Actor, ActorStartInfo, ActorStatus}, addr::Addr, address_book::AddressBook, config::AnyConfig, @@ -15,12 +15,12 @@ use crate::{ dumping::{Direction, Dump, Dumper, INTERNAL_CLASS}, envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind}, errors::{RequestError, SendError, TryRecvError, TrySendError}, - group::RestartPolicy, mailbox::RecvResult, message::{Message, Request}, messages, msg, object::ObjectArc, request_table::ResponseToken, + restarting::RestartPolicy, routers::Singleton, scope, source::{SourceHandle, Sources, UnattachedSource}, @@ -624,6 +624,41 @@ impl Context { } } + /// Retrieves information related to the start of the actor. + /// + /// # Panics + /// + /// This method will panic if the context is pruned, indicating that the + /// required information is no longer available. + /// + /// # Examples + /// + /// ``` + /// # use elfo_core as elfo; + /// # use elfo_core::{ActorStartCause, ActorStartInfo}; + /// # async fn exec(mut ctx: elfo::Context) { + /// match ctx.start_info().cause { + /// ActorStartCause::GroupMounted => { + /// // The actor started because its group was mounted. + /// } + /// ActorStartCause::OnMessage => { + /// // The actor started in response to a message. + /// } + /// ActorStartCause::Restarted => { + /// // The actor started due to the restart policy. + /// } + /// _ => {} + /// } + /// # } + /// ``` + pub fn start_info(&self) -> ActorStartInfo { + self.actor + .as_ref() + .and_then(|o| o.as_actor()) + .map(|a| a.start_info()) + .expect("pruned context is not supported") + } + fn pre_recv(&mut self) { self.stats.on_recv(); diff --git a/elfo-core/src/group.rs b/elfo-core/src/group.rs index 0bd8d892..a0dd8b16 100644 --- a/elfo-core/src/group.rs +++ b/elfo-core/src/group.rs @@ -8,6 +8,7 @@ use crate::{ envelope::Envelope, exec::{Exec, ExecResult}, object::{GroupHandle, GroupVisitor, Object}, + restarting::RestartPolicy, routers::Router, runtime::RuntimeManager, supervisor::Supervisor, @@ -47,7 +48,7 @@ impl ActorGroup { } /// The behaviour on actor termination. - /// `RestartPolicy::on_failures` is used by default. + /// `RestartPolicy::never` is used by default. pub fn restart_policy(mut self, policy: RestartPolicy) -> Self { self.restart_policy = policy; self @@ -183,42 +184,3 @@ impl TerminationPolicy { // TODO: add `stop_spawning`? } - -/// The behaviour on actor termination. -#[derive(Debug, Clone)] -pub struct RestartPolicy { - pub(crate) mode: RestartMode, -} - -impl Default for RestartPolicy { - fn default() -> Self { - Self::on_failures() - } -} - -#[derive(Debug, Clone)] -pub(crate) enum RestartMode { - Always, - OnFailures, - Never, -} - -impl RestartPolicy { - pub fn always() -> Self { - Self { - mode: RestartMode::Always, - } - } - - pub fn on_failures() -> Self { - Self { - mode: RestartMode::OnFailures, - } - } - - pub fn never() -> Self { - Self { - mode: RestartMode::Never, - } - } -} diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 06d75ef6..a483c734 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -13,7 +13,7 @@ use elfo_utils::time::Instant; use crate::{memory_tracker::MemoryTracker, time::Interval}; use crate::{ - actor::{Actor, ActorMeta, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, addr::{Addr, GroupNo}, config::SystemConfig, context::Context, @@ -173,6 +173,7 @@ pub async fn do_start( let actor = Actor::new( meta.clone(), addr, + ActorStartInfo::on_group_mounted(), Default::default(), Arc::new(SubscriptionManager::new(ctx.clone())), ); diff --git a/elfo-core/src/lib.rs b/elfo-core/src/lib.rs index d6978044..e58a5def 100644 --- a/elfo-core/src/lib.rs +++ b/elfo-core/src/lib.rs @@ -11,15 +11,16 @@ extern crate self as elfo_core; // TODO: revise this list pub use crate::{ - actor::{ActorMeta, ActorStatus, ActorStatusKind}, + actor::{ActorMeta, ActorStartCause, ActorStartInfo, ActorStatus, ActorStatusKind}, addr::Addr, config::Config, context::{Context, RequestBuilder}, envelope::Envelope, - group::{ActorGroup, Blueprint, RestartPolicy, TerminationPolicy}, + group::{ActorGroup, Blueprint, TerminationPolicy}, local::{Local, MoveOwnership}, message::{Message, Request}, request_table::ResponseToken, + restarting::{RestartParams, RestartPolicy}, source::{SourceHandle, UnattachedSource}, topology::Topology, }; @@ -65,6 +66,7 @@ pub mod remote; #[cfg(all(feature = "network", not(feature = "unstable")))] mod remote; mod request_table; +mod restarting; mod runtime; mod source; mod subscription; diff --git a/elfo-core/src/node.rs b/elfo-core/src/node.rs index ce518a6e..fa8cef28 100644 --- a/elfo-core/src/node.rs +++ b/elfo-core/src/node.rs @@ -19,6 +19,7 @@ impl From for NodeNo { static NODE_NO: AtomicU16 = AtomicU16::new(0); +#[stability::unstable] /// Returns the current `node_no`. pub fn node_no() -> Option { crate::addr::NodeNo::from_bits(NODE_NO.load(Ordering::Relaxed)) diff --git a/elfo-core/src/restarting/config.rs b/elfo-core/src/restarting/config.rs new file mode 100644 index 00000000..4426f57b --- /dev/null +++ b/elfo-core/src/restarting/config.rs @@ -0,0 +1,69 @@ +use std::{num::NonZeroU64, time::Duration}; + +use serde::{Deserialize, Deserializer}; + +use crate::restarting::restart_policy::{RestartParams, RestartPolicy}; + +#[derive(Debug, Default)] +pub(crate) struct RestartingConfig { + pub(crate) overriding_policy: Option, +} + +impl<'de> Deserialize<'de> for RestartingConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Option::::deserialize(deserializer).map(|p| RestartingConfig { + overriding_policy: p.map(Into::into), + }) + } +} + +#[derive(Debug, Deserialize)] +struct RestartParamsConfig { + #[serde(with = "humantime_serde")] + min_backoff: Duration, + #[serde(with = "humantime_serde")] + max_backoff: Duration, + #[serde(with = "humantime_serde", default)] + auto_reset: Option, + max_retries: Option, + #[serde(default)] + factor: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "when")] +enum RestartPolicyConfig { + Always(RestartParamsConfig), + OnFailure(RestartParamsConfig), + Never, +} + +impl From for RestartPolicy { + fn from(p: RestartPolicyConfig) -> Self { + match p { + RestartPolicyConfig::Always(p) => RestartPolicy::always(p.into()), + RestartPolicyConfig::OnFailure(p) => RestartPolicy::on_failure(p.into()), + RestartPolicyConfig::Never => RestartPolicy::never(), + } + } +} + +impl From for RestartParams { + fn from(p: RestartParamsConfig) -> Self { + let restart_params = RestartParams::new(p.min_backoff, p.max_backoff); + let restart_params = p + .factor + .map(|f| restart_params.factor(f)) + .unwrap_or(restart_params); + let restart_params = p + .max_retries + .map(|max_retries| restart_params.max_retries(max_retries)) + .unwrap_or(restart_params); + p.auto_reset + .map(|auto_reset| restart_params.auto_reset(auto_reset)) + .unwrap_or(restart_params) + } +} diff --git a/elfo-core/src/restarting/mod.rs b/elfo-core/src/restarting/mod.rs new file mode 100644 index 00000000..b5281969 --- /dev/null +++ b/elfo-core/src/restarting/mod.rs @@ -0,0 +1,5 @@ +mod config; +mod restart_policy; + +pub(crate) use self::config::RestartingConfig; +pub use restart_policy::{RestartParams, RestartPolicy}; diff --git a/elfo-core/src/restarting/restart_policy.rs b/elfo-core/src/restarting/restart_policy.rs new file mode 100644 index 00000000..762a4b92 --- /dev/null +++ b/elfo-core/src/restarting/restart_policy.rs @@ -0,0 +1,102 @@ +use std::{num::NonZeroU64, time::Duration}; + +use crate::ActorStatus; + +/// The behaviour on actor termination. +#[derive(Debug, Clone, PartialEq)] +pub struct RestartPolicy { + pub(crate) mode: RestartMode, +} + +impl Default for RestartPolicy { + fn default() -> Self { + Self::never() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum RestartMode { + Always(RestartParams), + OnFailure(RestartParams), + Never, +} + +impl RestartPolicy { + pub fn always(restart_params: RestartParams) -> Self { + Self { + mode: RestartMode::Always(restart_params), + } + } + + pub fn on_failure(restart_params: RestartParams) -> Self { + Self { + mode: RestartMode::OnFailure(restart_params), + } + } + + pub fn never() -> Self { + Self { + mode: RestartMode::Never, + } + } + + pub(crate) fn restarting_allowed(&self, status: &ActorStatus) -> bool { + match &self.mode { + RestartMode::Always(_) => true, + RestartMode::OnFailure(_) => status.is_failed(), + _ => false, + } + } + + pub(crate) fn restart_params(&self) -> Option { + match &self.mode { + RestartMode::Always(params) | RestartMode::OnFailure(params) => Some(*params), + _ => None, + } + } +} + +/// The restart params for the backoff strategy when an actor restarts. +#[derive(Debug, Copy, Clone, PartialEq)] +pub struct RestartParams { + pub(crate) min_backoff: Duration, + pub(crate) max_backoff: Duration, + pub(crate) auto_reset: Duration, + pub(crate) max_retries: NonZeroU64, + pub(crate) factor: f64, +} + +impl RestartParams { + /// Creates a new instance with the specified minimum and maximum backoff + /// durations. + pub fn new(min_backoff: Duration, max_backoff: Duration) -> Self { + RestartParams { + min_backoff, + max_backoff: min_backoff.max(max_backoff), + auto_reset: min_backoff, + max_retries: NonZeroU64::MAX, + factor: 2.0, + } + } + + /// The duration after which the backoff strategy will automatically reset. + pub fn auto_reset(self, auto_reset: Duration) -> Self { + Self { auto_reset, ..self } + } + + /// Sets the factor used to calculate the next backoff duration. + pub fn factor(self, factor: f32) -> Self { + Self { + factor: factor.into(), + ..self + } + } + + /// Sets the maximum number of allowed retries. + pub fn max_retries(self, max_retries: NonZeroU64) -> Self { + Self { + max_retries, + ..self + } + } +} diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index 04ae1179..7854f220 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -13,15 +13,16 @@ use elfo_utils::CachePadded; use self::{backoff::Backoff, error_chain::ErrorChain, measure_poll::MeasurePoll}; use crate::{ - actor::{Actor, ActorMeta, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, config::{AnyConfig, Config, SystemConfig}, context::Context, envelope::Envelope, exec::{Exec, ExecResult}, - group::{RestartMode, RestartPolicy, TerminationPolicy}, + group::TerminationPolicy, message::Request, messages, msg, object::{GroupVisitor, Object, ObjectArc}, + restarting::RestartPolicy, routers::{Outcome, Router}, runtime::RuntimeManager, scope::{self, Scope, ScopeGroupShared}, @@ -58,14 +59,14 @@ struct ControlBlock { /// Returns `None` if cannot be spawned. macro_rules! get_or_spawn { - ($this:ident, $key:expr) => {{ + ($this:ident, $key:expr, $start_info:expr) => {{ let key = $key; match $this.objects.get(&key) { Some(object) => Some(object), None => $this .objects .entry(key.clone()) - .or_try_insert_with(|| $this.spawn(key, Default::default()).ok_or(())) + .or_try_insert_with(|| $this.spawn(key, $start_info, Default::default()).ok_or(())) .map(|o| o.downgrade()) // FIXME: take an exclusive lock here. .ok(), } @@ -173,7 +174,7 @@ where let outcome = self.router.route(&envelope); if only_spawn { - self.spawn_by_outcome(outcome); + self.spawn_on_group_mounted(outcome); let token = extract_response_token::(envelope); self.context.respond(token, Ok(())); return visitor.done(); @@ -216,8 +217,9 @@ where } }); + let start_info = ActorStartInfo::on_message(); match outcome { - Outcome::Unicast(key) => match get_or_spawn!(self, key) { + Outcome::Unicast(key) => match get_or_spawn!(self, key, start_info) { Some(object) => visitor.visit_last(&object, envelope), None => visitor.empty(envelope), }, @@ -228,7 +230,7 @@ where Outcome::Multicast(list) => { for key in list.iter() { if !self.objects.contains_key(key) { - get_or_spawn!(self, key.clone()); + get_or_spawn!(self, key.clone(), start_info.clone()); } } let iter = list.into_iter().filter_map(|key| self.objects.get(&key)); @@ -266,7 +268,12 @@ where } } - fn spawn(self: &Arc, key: R::Key, mut backoff: Backoff) -> Option { + fn spawn( + self: &Arc, + key: R::Key, + start_info: ActorStartInfo, + mut backoff: Backoff, + ) -> Option { let control = self.control.read(); if control.stop_spawning { return None; @@ -285,6 +292,8 @@ where ); let system_config = control.system_config.clone(); + let config_restart_policy = system_config.restarting.overriding_policy.clone(); + let user_config = control .user_config .as_ref() @@ -323,26 +332,33 @@ where Err(panic) => ActorStatus::FAILED.with_details(panic_to_string(panic)), }; - let should_restart = { - let object = sv.objects.get(&key).expect("where is the current actor?"); - let actor = object.as_actor().expect("a supervisor stores only actors"); - - let rp_override = actor.restart_policy(); - let restart_policy = rp_override.as_ref().unwrap_or(&sv.restart_policy); - let should_restart = match restart_policy.mode { - RestartMode::Always => true, - RestartMode::OnFailures => new_status.is_failed(), - RestartMode::Never => false, - }; - - actor.set_status(new_status); - should_restart - }; - - let need_to_restart = should_restart && !sv.control.read().stop_spawning; - if need_to_restart { - let after = backoff.next(); - + let object = sv.objects.get(&key).expect("where is the current actor?"); + let actor = object.as_actor().expect("a supervisor stores only actors"); + + let rp_override = actor.restart_policy(); + // Select the restart policy with the next priority: ctx.set_restart_policy(..) + // > config > blueprint (sv.restart_policy) + let restart_policy = rp_override + .as_ref() + .unwrap_or(config_restart_policy.as_ref().unwrap_or(&sv.restart_policy)) + .clone(); + + let restarting_allowed = + restart_policy.restarting_allowed(&new_status) && !sv.control.read().stop_spawning; + actor.set_status(new_status); + + drop(object); + + let restart_after = restarting_allowed + .then(|| { + let restart_params = restart_policy + .restart_params() + .expect("restart params are set if actor can restart"); + backoff.next(&restart_params) + }) + .flatten(); + + if let Some(after) = restart_after { if after == Duration::ZERO { debug!("actor will be restarted immediately"); } else { @@ -357,7 +373,7 @@ where scope::set_trace_id(TraceId::generate()); backoff.start(); - if let Some(object) = sv.spawn(key.clone(), backoff) { + if let Some(object) = sv.spawn(key.clone(), ActorStartInfo::on_restart(), backoff) { sv.objects.insert(key.clone(), object) } else { sv.objects.remove(&key).map(|(_, v)| v) @@ -382,6 +398,7 @@ where let actor = Actor::new( meta.clone(), addr, + start_info, self.termination_policy.clone(), self.status_subscription.clone(), ); @@ -400,14 +417,15 @@ where Some(object) } - fn spawn_by_outcome(self: &Arc, outcome: Outcome) { + fn spawn_on_group_mounted(self: &Arc, outcome: Outcome) { + let start_info = ActorStartInfo::on_group_mounted(); match outcome { Outcome::Unicast(key) => { - get_or_spawn!(self, key); + get_or_spawn!(self, key, start_info); } Outcome::Multicast(keys) => { for key in keys { - get_or_spawn!(self, key); + get_or_spawn!(self, key, start_info.clone()); } } Outcome::GentleUnicast(_) diff --git a/elfo-core/src/supervisor/backoff.rs b/elfo-core/src/supervisor/backoff.rs index 043ef33b..cf52fbae 100644 --- a/elfo-core/src/supervisor/backoff.rs +++ b/elfo-core/src/supervisor/backoff.rs @@ -1,20 +1,20 @@ use std::time::Duration; +use crate::RestartParams; use elfo_utils::time::Instant; -const BACKOFF_STEP: Duration = Duration::from_secs(5); -const MAX_BACKOFF: Duration = Duration::from_secs(30); - pub(crate) struct Backoff { - next_backoff: Duration, + next_backoff: Option, start_time: Instant, + restart_count: u64, } impl Default for Backoff { fn default() -> Self { Self { - next_backoff: BACKOFF_STEP, + next_backoff: None, start_time: Instant::now(), + restart_count: 0, } } } @@ -24,21 +24,53 @@ impl Backoff { self.start_time = Instant::now(); } - pub(crate) fn next(&mut self) -> Duration { + pub(crate) fn next(&mut self, params: &RestartParams) -> Option { // If an actor is alive enough time, reset the backoff. - if self.start_time.elapsed() >= BACKOFF_STEP { - self.next_backoff = Duration::ZERO; + if self.start_time.elapsed() >= params.auto_reset { + self.next_backoff = Some(Duration::ZERO); + self.restart_count = 0; + } + self.restart_count += 1; + + if self.restart_count > params.max_retries.get() { + return None; + } + + let next_backoff = self.next_backoff.unwrap_or(params.min_backoff); + if next_backoff.is_zero() { + self.next_backoff = Some(params.min_backoff); + return Some(Duration::ZERO); } - let backoff = self.next_backoff; - self.next_backoff = (self.next_backoff + BACKOFF_STEP).min(MAX_BACKOFF); - backoff + let current = next_backoff.min(params.max_backoff).max(params.min_backoff); + let current_nanos = duration_to_nanos(current); + let max_nanos = duration_to_nanos(params.max_backoff); + // Check for overflow, if overflow is detected set the current interval to the + // max interval. + self.next_backoff = if current_nanos >= max_nanos / params.factor { + Some(params.max_backoff) + } else { + let nanos = current_nanos * params.factor; + Some(nanos_to_duration(nanos).max(params.min_backoff)) + }; + Some(current) } } +fn duration_to_nanos(d: Duration) -> f64 { + d.as_secs() as f64 * 1_000_000_000.0 + f64::from(d.subsec_nanos()) +} + +fn nanos_to_duration(nanos: f64) -> Duration { + let secs = nanos / 1_000_000_000.0; + let nanos = nanos as u64 % 1_000_000_000; + Duration::new(secs as u64, nanos as u32) +} + #[cfg(test)] mod tests { use elfo_utils::time; + use std::num::NonZeroU64; use super::*; @@ -46,31 +78,71 @@ mod tests { fn it_works() { time::with_instant_mock(|mock| { let mut backoff = Backoff::default(); - + let params = RestartParams::new(Duration::from_secs(5), Duration::from_secs(30)) + .max_retries(NonZeroU64::new(3).unwrap()); // Immediately failed. - assert_eq!(backoff.next(), BACKOFF_STEP); - mock.advance(BACKOFF_STEP); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + mock.advance(params.min_backoff); backoff.start(); // And again. - assert_eq!(backoff.next(), 2 * BACKOFF_STEP); - mock.advance(2 * BACKOFF_STEP); + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + mock.advance(2 * params.min_backoff); backoff.start(); // After some, not enough to reset the backoff, time. - mock.advance(BACKOFF_STEP * 2 / 3); - assert_eq!(backoff.next(), 3 * BACKOFF_STEP); - mock.advance(3 * BACKOFF_STEP); + mock.advance(params.min_backoff * 2 / 3); + assert_eq!(backoff.next(¶ms), Some(4 * params.min_backoff)); + mock.advance(3 * params.min_backoff); backoff.start(); // After some, enough to reset the backoff, time. - mock.advance(BACKOFF_STEP); - assert_eq!(backoff.next(), Duration::ZERO); // resetted + mock.advance(params.min_backoff); + // The first retry. + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); // resetted backoff.start(); // After some, not enough to reset the backoff, time. - mock.advance(BACKOFF_STEP * 2 / 3); - assert_eq!(backoff.next(), BACKOFF_STEP); + mock.advance(params.min_backoff * 2 / 3); + // The second retry. + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + // The third retry. + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + // We reached the limit of reties. + assert_eq!(backoff.next(¶ms), None); }); } + + #[test] + fn correctness() { + let mut backoff = Backoff::default(); + // Start with zero backoff duration. + let params = RestartParams::new(Duration::from_secs(0), Duration::from_secs(0)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + + // Then check the transition from zero to nonzero backoff limits. + let params = RestartParams::new(Duration::from_secs(2), Duration::from_secs(16)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(4 * params.min_backoff)); + + // Decreasing the upper bound results in a reduced subsequent backoff. + let params = RestartParams::new(Duration::from_secs(3), Duration::from_secs(5)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + + // Increasing the lower bound raises the subsequent backoff. + let params = RestartParams::new(Duration::from_secs(20), Duration::from_secs(30)); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + + // Limiting the number of retry attempts kicks in. + let mut backoff = Backoff::default(); + let params = RestartParams::new(Duration::from_secs(20), Duration::from_secs(30)) + .max_retries(NonZeroU64::new(2).unwrap()); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + assert_eq!(backoff.next(¶ms), None); + } } diff --git a/elfo-dumper/src/actor.rs b/elfo-dumper/src/actor.rs index 4f4a883b..9d0bc83c 100644 --- a/elfo-dumper/src/actor.rs +++ b/elfo-dumper/src/actor.rs @@ -1,4 +1,4 @@ -use std::{iter, panic, sync::Arc}; +use std::{iter, panic, sync::Arc, time::Duration}; use eyre::{Result, WrapErr}; use fxhash::FxHashSet; @@ -15,7 +15,7 @@ use elfo_core::{ scope::{self, SerdeMode}, signal::{Signal, SignalKind}, time::Interval, - ActorGroup, Blueprint, Context, TerminationPolicy, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy, }; use elfo_utils::ward; @@ -252,6 +252,10 @@ pub(crate) fn new(dump_storage: Arc>) -> Blueprint { ActorGroup::new() .config::() .termination_policy(TerminationPolicy::manually()) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .router(MapRouter::new(move |envelope| { msg!(match envelope { diff --git a/elfo-logger/src/actor.rs b/elfo-logger/src/actor.rs index 118ee6c1..c8ca666c 100644 --- a/elfo-logger/src/actor.rs +++ b/elfo-logger/src/actor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use metrics::increment_counter; use tokio::{ @@ -12,7 +12,7 @@ use elfo_core::{ messages::{ConfigUpdated, Terminate}, msg, signal::{Signal, SignalKind}, - ActorGroup, Blueprint, Context, TerminationPolicy, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy, }; use crate::{ @@ -42,6 +42,10 @@ impl Logger { ActorGroup::new() .config::() .termination_policy(TerminationPolicy::manually()) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(105) .exec(move |ctx| Logger::new(ctx, shared.clone(), filtering_layer.clone()).main()) } diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index b4cc15f2..16da04d3 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use eyre::{bail, eyre, Result, WrapErr}; use futures::StreamExt; @@ -9,7 +9,7 @@ use elfo_core::{ _priv::{GroupNo, MessageKind}, messages::ConfigUpdated, stream::Stream, - Topology, + RestartParams, Topology, }; use crate::{ @@ -90,7 +90,11 @@ impl Discovery { pub(super) async fn main(mut self) -> Result<()> { // The default restart policy of this group is `never`, so override it. - self.ctx.set_restart_policy(RestartPolicy::on_failures()); + self.ctx + .set_restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))); self.listen().await?; self.discover(); diff --git a/elfo-network/src/lib.rs b/elfo-network/src/lib.rs index 512f1ac8..30d9bfeb 100644 --- a/elfo-network/src/lib.rs +++ b/elfo-network/src/lib.rs @@ -10,13 +10,14 @@ extern crate elfo_utils; use std::{ fmt::{self, Display}, hash::Hash, + time::Duration, }; use elfo_core::{ messages::UpdateConfig, msg, routers::{MapRouter, Outcome}, - ActorGroup, Blueprint, Context, RestartPolicy, Topology, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, Topology, }; use crate::{ @@ -66,6 +67,10 @@ pub fn new(topology: &Topology) -> Blueprint { // The restart policy is overrided by the discovery actor. .restart_policy(RestartPolicy::never()) .stop_order(100) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .router(MapRouter::new(|envelope| { msg!(match envelope { // TODO: send to all connections. diff --git a/elfo-pinger/src/lib.rs b/elfo-pinger/src/lib.rs index 6b9f9b92..29d6eceb 100644 --- a/elfo-pinger/src/lib.rs +++ b/elfo-pinger/src/lib.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms, unreachable_pub)] -use elfo_core::{ActorGroup, Blueprint, Topology}; +use std::time::Duration; + +use elfo_core::{ActorGroup, Blueprint, RestartParams, RestartPolicy, Topology}; mod actor; mod config; @@ -9,6 +11,10 @@ pub fn new(topology: &Topology) -> Blueprint { let topology = topology.clone(); ActorGroup::new() .config::() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .exec(move |ctx| actor::exec(ctx, topology.clone())) } diff --git a/elfo-telemeter/src/actor.rs b/elfo-telemeter/src/actor.rs index 21973940..ddd2502b 100644 --- a/elfo-telemeter/src/actor.rs +++ b/elfo-telemeter/src/actor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use metrics::gauge; use tokio::task::JoinHandle; @@ -6,7 +6,7 @@ use tracing::{error, info}; use elfo_core::{ message, messages::ConfigUpdated, msg, scope, time::Interval, tracing::TraceId, ActorGroup, - Blueprint, Context, MoveOwnership, + Blueprint, Context, MoveOwnership, RestartParams, RestartPolicy, }; use crate::{ @@ -39,6 +39,10 @@ struct ServerFailed(MoveOwnership); pub(crate) fn new(storage: Arc) -> Blueprint { ActorGroup::new() .config::() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .exec(move |ctx| Telemeter::new(ctx, storage.clone()).main()) } diff --git a/elfo/tests/restarting.rs b/elfo/tests/restarting.rs index 004fd3ed..6771e1fb 100644 --- a/elfo/tests/restarting.rs +++ b/elfo/tests/restarting.rs @@ -1,23 +1,25 @@ #![cfg(feature = "test-util")] +#![allow(clippy::never_loop)] use std::{ - panic::AssertUnwindSafe, sync::{Arc, Mutex}, time::Duration, }; -use futures::FutureExt; - -use elfo::{prelude::*, RestartPolicy}; - -#[message] -struct Terminate; - -#[message] -struct Terminated; +use elfo::{ + prelude::*, + routers::{MapRouter, Outcome, Singleton}, + RestartParams, RestartPolicy, +}; #[tokio::test] async fn actor_restarts_explicitly() { + #[message] + struct Terminate; + + #[message] + struct Terminated; + let blueprint = ActorGroup::new().exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { @@ -38,27 +40,42 @@ async fn actor_restarts_explicitly() { #[tokio::test(start_paused = true)] async fn actor_restarts_with_timeout_after_failures() { - let blueprint = ActorGroup::new().exec(move |mut ctx| async move { - while let Some(envelope) = ctx.recv().await { - msg!(match envelope { - Terminate { .. } => panic!("boom!"), - _ => unreachable!(), - }); - } - }); - - let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + #[message] + struct HealthCheck; - for i in 1..5 { - proxy.send(Terminate).await; + #[message] + struct Spawn; + + let blueprint = ActorGroup::new() + .router(MapRouter::new(|e| { + msg!(match e { + Spawn => Outcome::Unicast(Singleton), + // HealthCheck should not spawn the actor. + HealthCheck => Outcome::GentleUnicast(Singleton), + _ => Outcome::Discard, + }) + })) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) + .exec(move |mut ctx| async move { + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + HealthCheck | Spawn => panic!("boom!"), + _ => unreachable!(), + }); + } + }); - let r = AssertUnwindSafe(async { proxy.recv().await }) - .catch_unwind() - .await; - assert!(r.is_err()); + let proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + proxy.send(Spawn).await; + for i in 1..5 { + proxy.send(HealthCheck).await; + let delay = Duration::from_millis(((5000f64 * 2.0f64.powi(i)) as u64).min(30000)); // https://github.com/tokio-rs/tokio/issues/3985 - tokio::time::sleep(Duration::from_millis(5000 * i + 1)).await; + tokio::time::sleep(delay).await; } } @@ -68,7 +85,10 @@ async fn actor_overrides_policy() { struct Started; let blueprint = ActorGroup::new().exec(move |ctx| async move { - ctx.set_restart_policy(RestartPolicy::always()); + ctx.set_restart_policy(RestartPolicy::always(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))); let _ = ctx.send(Started).await; }); diff --git a/elfo/tests/start_info.rs b/elfo/tests/start_info.rs new file mode 100644 index 00000000..6d907ab8 --- /dev/null +++ b/elfo/tests/start_info.rs @@ -0,0 +1,83 @@ +#![cfg(feature = "test-util")] + +use std::{num::NonZeroU64, time::Duration}; + +use elfo::{ + prelude::*, + routers::{MapRouter, Outcome, Singleton}, + RestartParams, RestartPolicy, +}; +use elfo_core::{messages::UpdateConfig, ActorStartCause, Local}; + +#[tokio::test] +async fn it_works() { + #[message] + struct Start; + + #[message(part)] + #[derive(PartialEq)] + pub enum Cause { + /// The actor started because its group was mounted. + GroupMounted, + /// The actor started in response to a message. + OnMessage, + /// The actor restarted. + Restarted, + Unknown, + } + + #[message] + #[derive(PartialEq)] + struct Info { + cause: Local, + } + + let blueprint = ActorGroup::new() + .router(MapRouter::new(|e| { + msg!(match e { + UpdateConfig | Start => Outcome::Unicast(Singleton), + _ => Outcome::Discard, + }) + })) + .restart_policy(RestartPolicy::on_failure( + RestartParams::new(Duration::from_secs(0), Duration::from_secs(0)) + .auto_reset(Duration::MAX) + .max_retries(NonZeroU64::new(1).unwrap()), + )) + .exec(move |ctx| async move { + let cause = ctx.start_info().cause; + let _ = ctx + .send(Info { + cause: Local::from(cause), + }) + .await; + panic!("boom!"); + }); + + let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + assert_msg_eq!( + proxy.recv().await, + Info { + cause: ActorStartCause::GroupMounted.into() + } + ); + assert_msg_eq!( + proxy.recv().await, + Info { + cause: ActorStartCause::Restarted.into() + } + ); + proxy.send(Start).await; + assert_msg_eq!( + proxy.recv().await, + Info { + cause: ActorStartCause::OnMessage.into() + } + ); + assert_msg_eq!( + proxy.recv().await, + Info { + cause: ActorStartCause::Restarted.into() + } + ); +} diff --git a/elfo/tests/subscription_to_statuses.rs b/elfo/tests/subscription_to_statuses.rs index 3c26e2b3..cd154d98 100644 --- a/elfo/tests/subscription_to_statuses.rs +++ b/elfo/tests/subscription_to_statuses.rs @@ -9,6 +9,7 @@ use elfo::{ test::Proxy, ActorStatus, ActorStatusKind, }; +use elfo_core::{RestartParams, RestartPolicy}; #[message] struct Start(u32); @@ -27,6 +28,10 @@ async fn run_group() -> Proxy { _ => Outcome::Default, }) })) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { diff --git a/elfo/tests/termination.rs b/elfo/tests/termination.rs index d485ff34..609274ac 100644 --- a/elfo/tests/termination.rs +++ b/elfo/tests/termination.rs @@ -1,4 +1,5 @@ #![cfg(feature = "test-util")] +#![allow(clippy::never_loop)] use elfo::{messages::Terminate, prelude::*, TerminationPolicy};