From 5843e4093cf4654a0b79c4868bb1f669acc99a42 Mon Sep 17 00:00:00 2001 From: Sargarass Date: Thu, 14 Dec 2023 20:33:18 +0300 Subject: [PATCH] feat(core): the restarting policy can be overridden via configuration for each actor. `RestartParams` have been added for `RestartPolicy::always(..)` and `RestartPolicy::on_failure(..)`. The linear backoff has been replaced with an exponential approach, with a configurable limit for restarts. BREAKING CHANGE: The default RestartPolicy is set to `RestartPolicy::never()`. --- elfo-configurer/src/lib.rs | 6 +- elfo-core/Cargo.toml | 1 + elfo-core/src/actor.rs | 66 +++++++++- elfo-core/src/config.rs | 1 + elfo-core/src/context.rs | 39 +++++- elfo-core/src/group.rs | 44 +------ elfo-core/src/init.rs | 3 +- elfo-core/src/lib.rs | 6 +- elfo-core/src/node.rs | 1 + elfo-core/src/restarting/backoff.rs | 138 +++++++++++++++++++++ elfo-core/src/restarting/config.rs | 61 +++++++++ elfo-core/src/restarting/mod.rs | 6 + elfo-core/src/restarting/restart_policy.rs | 123 ++++++++++++++++++ elfo-core/src/supervisor.rs | 86 ++++++++----- elfo-core/src/supervisor/backoff.rs | 76 ------------ elfo-dumper/src/actor.rs | 8 +- elfo-logger/src/actor.rs | 8 +- elfo-network/src/discovery/mod.rs | 10 +- elfo-network/src/lib.rs | 4 +- elfo-pinger/src/lib.rs | 8 +- elfo-telemeter/src/actor.rs | 8 +- elfo/tests/restarting.rs | 76 +++++++----- elfo/tests/start_info.rs | 56 +++++++++ elfo/tests/subscription_to_statuses.rs | 5 + elfo/tests/termination.rs | 1 + 25 files changed, 643 insertions(+), 198 deletions(-) create mode 100644 elfo-core/src/restarting/backoff.rs create mode 100644 elfo-core/src/restarting/config.rs create mode 100644 elfo-core/src/restarting/mod.rs create mode 100644 elfo-core/src/restarting/restart_policy.rs delete mode 100644 elfo-core/src/supervisor/backoff.rs create mode 100644 elfo/tests/start_info.rs diff --git a/elfo-configurer/src/lib.rs b/elfo-configurer/src/lib.rs index 8b39d4af..d7b59e60 100644 --- a/elfo-configurer/src/lib.rs +++ b/elfo-configurer/src/lib.rs @@ -22,7 +22,7 @@ use elfo_core::{ }, msg, scope, signal::{Signal, SignalKind}, - ActorGroup, ActorStatus, Addr, Blueprint, Context, Topology, + ActorGroup, ActorStatus, Addr, Blueprint, Context, RestartParams, RestartPolicy, Topology, }; pub use self::protocol::*; @@ -48,6 +48,10 @@ fn blueprint(topology: &Topology, source: ConfigSource) -> Blueprint { let topology = topology.clone(); ActorGroup::new() .stop_order(100) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .exec(move |ctx| Configurer::new(ctx, topology.clone(), source.clone()).main()) } diff --git a/elfo-core/Cargo.toml b/elfo-core/Cargo.toml index e1d6a4bb..6c286821 100644 --- a/elfo-core/Cargo.toml +++ b/elfo-core/Cargo.toml @@ -50,6 +50,7 @@ regex = "1.6.0" thread_local = { version = "1.1.3", optional = true } unicycle = "0.9.3" rmp-serde = { version = "1.1.0", optional = true } +humantime-serde = "1" [dev-dependencies] elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] } diff --git a/elfo-core/src/actor.rs b/elfo-core/src/actor.rs index 95ee5e9c..ddb2a4f3 100644 --- a/elfo-core/src/actor.rs +++ b/elfo-core/src/actor.rs @@ -9,11 +9,12 @@ use tracing::{error, info, warn}; use crate::{ envelope::Envelope, errors::{SendError, TrySendError}, - group::{RestartPolicy, TerminationPolicy}, + group::TerminationPolicy, mailbox::{Mailbox, RecvResult}, messages::{ActorStatusReport, Terminate}, msg, request_table::RequestTable, + restarting::RestartPolicy, scope, subscription::SubscriptionManager, Addr, @@ -117,6 +118,62 @@ impl ActorStatusKind { } } +// === ActorStartInfo === + +/// A struct holding information related to an actor start. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ActorStartInfo { + /// The cause for the actor start, indicating why the actor is being + /// initialized. + pub cause: ActorStartCause, +} + +/// An enum representing various causes for an actor to start. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum ActorStartCause { + /// The actor started because its group was mounted. + GroupMounted, + /// The actor started in response to a message. + OnMessage, + /// The actor started due to the restart policy. + Restarted, +} + +impl ActorStartInfo { + pub(crate) fn on_group_mounted() -> Self { + Self { + cause: ActorStartCause::GroupMounted, + } + } + + pub(crate) fn on_message() -> Self { + Self { + cause: ActorStartCause::OnMessage, + } + } + + pub(crate) fn on_restart() -> Self { + Self { + cause: ActorStartCause::Restarted, + } + } +} + +impl ActorStartCause { + pub fn is_group_mounted(&self) -> bool { + matches!(self, ActorStartCause::GroupMounted) + } + pub fn is_restarted(&self) -> bool { + matches!(self, ActorStartCause::Restarted) + } + + pub fn is_on_message(&self) -> bool { + matches!(self, ActorStartCause::OnMessage) + } +} + // === Actor === pub(crate) struct Actor { @@ -127,6 +184,7 @@ pub(crate) struct Actor { control: RwLock, finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`? status_subscription: Arc, + start_info: ActorStartInfo, } struct ControlBlock { @@ -139,6 +197,7 @@ impl Actor { pub(crate) fn new( meta: Arc, addr: Addr, + start_info: ActorStartInfo, termination_policy: TerminationPolicy, status_subscription: Arc, ) -> Self { @@ -153,6 +212,7 @@ impl Actor { }), finished: ManualResetEvent::new(false), status_subscription, + start_info, } } @@ -209,6 +269,10 @@ impl Actor { &self.request_table } + pub(crate) fn start_info(&self) -> ActorStartInfo { + self.start_info.clone() + } + pub(crate) fn restart_policy(&self) -> Option { self.control.read().restart_policy.clone() } diff --git a/elfo-core/src/config.rs b/elfo-core/src/config.rs index 98e906dc..063b5e8b 100644 --- a/elfo-core/src/config.rs +++ b/elfo-core/src/config.rs @@ -162,6 +162,7 @@ pub(crate) struct SystemConfig { pub(crate) logging: crate::logging::LoggingConfig, pub(crate) dumping: crate::dumping::DumpingConfig, pub(crate) telemetry: crate::telemetry::TelemetryConfig, + pub(crate) restarting: crate::restarting::RestartingConfig, } // === Secret === diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 38900790..d7c5d935 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -7,7 +7,7 @@ use tracing::{info, trace}; use elfo_utils::unlikely; use crate::{ - actor::{Actor, ActorStatus}, + actor::{Actor, ActorStartInfo, ActorStatus}, addr::Addr, address_book::AddressBook, config::AnyConfig, @@ -15,12 +15,12 @@ use crate::{ dumping::{Direction, Dump, Dumper, INTERNAL_CLASS}, envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind}, errors::{RequestError, SendError, TryRecvError, TrySendError}, - group::RestartPolicy, mailbox::RecvResult, message::{Message, Request}, messages, msg, object::ObjectArc, request_table::ResponseToken, + restarting::RestartPolicy, routers::Singleton, scope, source::{SourceHandle, Sources, UnattachedSource}, @@ -624,6 +624,41 @@ impl Context { } } + /// Retrieves information related to the start of the actor. + /// + /// # Panics + /// + /// This method will panic if the context is pruned, indicating that the + /// required information is no longer available. + /// + /// # Example + /// + /// ``` + /// # use elfo_core as elfo; + /// # use elfo_core::{ActorStartCause, ActorStartInfo}; + /// # async fn exec(mut ctx: elfo::Context) { + /// match ctx.start_info().cause { + /// ActorStartCause::GroupMounted => { + /// // The actor started because its group was mounted. + /// } + /// ActorStartCause::OnMessage => { + /// // The actor started in response to a message. + /// } + /// ActorStartCause::Restarted => { + /// // The actor started due to the restart policy. + /// } + /// _ => {} + /// } + /// # } + /// ``` + pub fn start_info(&self) -> ActorStartInfo { + self.actor + .as_ref() + .and_then(|o| o.as_actor()) + .map(|a| a.start_info()) + .expect("pruned context is not supported") + } + fn pre_recv(&mut self) { self.stats.on_recv(); diff --git a/elfo-core/src/group.rs b/elfo-core/src/group.rs index 0bd8d892..a39c928a 100644 --- a/elfo-core/src/group.rs +++ b/elfo-core/src/group.rs @@ -8,6 +8,7 @@ use crate::{ envelope::Envelope, exec::{Exec, ExecResult}, object::{GroupHandle, GroupVisitor, Object}, + restarting::RestartPolicy, routers::Router, runtime::RuntimeManager, supervisor::Supervisor, @@ -47,13 +48,15 @@ impl ActorGroup { } /// The behaviour on actor termination. - /// `RestartPolicy::on_failures` is used by default. + /// + /// `RestartPolicy::never` is used by default. pub fn restart_policy(mut self, policy: RestartPolicy) -> Self { self.restart_policy = policy; self } /// The behaviour on the `Terminate` message. + /// /// `TerminationPolicy::closing` is used by default. pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self { self.termination_policy = policy; @@ -183,42 +186,3 @@ impl TerminationPolicy { // TODO: add `stop_spawning`? } - -/// The behaviour on actor termination. -#[derive(Debug, Clone)] -pub struct RestartPolicy { - pub(crate) mode: RestartMode, -} - -impl Default for RestartPolicy { - fn default() -> Self { - Self::on_failures() - } -} - -#[derive(Debug, Clone)] -pub(crate) enum RestartMode { - Always, - OnFailures, - Never, -} - -impl RestartPolicy { - pub fn always() -> Self { - Self { - mode: RestartMode::Always, - } - } - - pub fn on_failures() -> Self { - Self { - mode: RestartMode::OnFailures, - } - } - - pub fn never() -> Self { - Self { - mode: RestartMode::Never, - } - } -} diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 06d75ef6..a483c734 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -13,7 +13,7 @@ use elfo_utils::time::Instant; use crate::{memory_tracker::MemoryTracker, time::Interval}; use crate::{ - actor::{Actor, ActorMeta, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, addr::{Addr, GroupNo}, config::SystemConfig, context::Context, @@ -173,6 +173,7 @@ pub async fn do_start( let actor = Actor::new( meta.clone(), addr, + ActorStartInfo::on_group_mounted(), Default::default(), Arc::new(SubscriptionManager::new(ctx.clone())), ); diff --git a/elfo-core/src/lib.rs b/elfo-core/src/lib.rs index d6978044..e58a5def 100644 --- a/elfo-core/src/lib.rs +++ b/elfo-core/src/lib.rs @@ -11,15 +11,16 @@ extern crate self as elfo_core; // TODO: revise this list pub use crate::{ - actor::{ActorMeta, ActorStatus, ActorStatusKind}, + actor::{ActorMeta, ActorStartCause, ActorStartInfo, ActorStatus, ActorStatusKind}, addr::Addr, config::Config, context::{Context, RequestBuilder}, envelope::Envelope, - group::{ActorGroup, Blueprint, RestartPolicy, TerminationPolicy}, + group::{ActorGroup, Blueprint, TerminationPolicy}, local::{Local, MoveOwnership}, message::{Message, Request}, request_table::ResponseToken, + restarting::{RestartParams, RestartPolicy}, source::{SourceHandle, UnattachedSource}, topology::Topology, }; @@ -65,6 +66,7 @@ pub mod remote; #[cfg(all(feature = "network", not(feature = "unstable")))] mod remote; mod request_table; +mod restarting; mod runtime; mod source; mod subscription; diff --git a/elfo-core/src/node.rs b/elfo-core/src/node.rs index ce518a6e..fa8cef28 100644 --- a/elfo-core/src/node.rs +++ b/elfo-core/src/node.rs @@ -19,6 +19,7 @@ impl From for NodeNo { static NODE_NO: AtomicU16 = AtomicU16::new(0); +#[stability::unstable] /// Returns the current `node_no`. pub fn node_no() -> Option { crate::addr::NodeNo::from_bits(NODE_NO.load(Ordering::Relaxed)) diff --git a/elfo-core/src/restarting/backoff.rs b/elfo-core/src/restarting/backoff.rs new file mode 100644 index 00000000..f30cf164 --- /dev/null +++ b/elfo-core/src/restarting/backoff.rs @@ -0,0 +1,138 @@ +use std::time::Duration; + +use crate::RestartParams; +use elfo_utils::time::Instant; + +pub(crate) struct RestartBackoff { + next_backoff: Option, + start_time: Instant, + restart_count: u64, +} + +impl Default for RestartBackoff { + fn default() -> Self { + Self { + next_backoff: None, + start_time: Instant::now(), + restart_count: 0, + } + } +} + +impl RestartBackoff { + pub(crate) fn start(&mut self) { + self.start_time = Instant::now(); + } + + pub(crate) fn next(&mut self, params: &RestartParams) -> Option { + // If an actor is alive enough time, reset the backoff. + if self.start_time.elapsed() >= params.auto_reset { + self.next_backoff = Some(Duration::ZERO); + self.restart_count = 0; + } + self.restart_count += 1; + + if self.restart_count > params.max_retries.get() { + return None; + } + + let next_backoff = self.next_backoff.unwrap_or(params.min_backoff); + if next_backoff.is_zero() { + self.next_backoff = Some(params.min_backoff); + return Some(Duration::ZERO); + } + + let current = next_backoff.min(params.max_backoff).max(params.min_backoff); + let current_nanos = current.as_secs_f64(); + let max_nanos = params.max_backoff.as_secs_f64(); + // Check for overflow, if overflow is detected set the current interval to the + // max interval. + self.next_backoff = if current_nanos >= max_nanos / params.factor { + Some(params.max_backoff) + } else { + let nanos = current_nanos * params.factor; + Some(Duration::from_secs_f64(nanos).max(params.min_backoff)) + }; + Some(current) + } +} + +#[cfg(test)] +mod tests { + use elfo_utils::time; + use std::num::NonZeroU64; + + use super::*; + + #[test] + fn it_works() { + time::with_instant_mock(|mock| { + let mut backoff = RestartBackoff::default(); + let params = RestartParams::new(Duration::from_secs(5), Duration::from_secs(30)) + .max_retries(NonZeroU64::new(3).unwrap()); + // Immediately failed. + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + mock.advance(params.min_backoff); + backoff.start(); + + // And again. + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + mock.advance(2 * params.min_backoff); + backoff.start(); + + // After some, not enough to reset the backoff, time. + mock.advance(params.min_backoff * 2 / 3); + assert_eq!(backoff.next(¶ms), Some(4 * params.min_backoff)); + mock.advance(3 * params.min_backoff); + backoff.start(); + + // After some, enough to reset the backoff, time. + mock.advance(params.min_backoff); + // The first retry. + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); // resetted + backoff.start(); + + // After some, not enough to reset the backoff, time. + mock.advance(params.min_backoff * 2 / 3); + // The second retry. + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + // The third retry. + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + // We reached the limit of reties. + assert_eq!(backoff.next(¶ms), None); + }); + } + + #[test] + fn correctness() { + let mut backoff = RestartBackoff::default(); + // Start with zero backoff duration. + let params = RestartParams::new(Duration::from_secs(0), Duration::from_secs(0)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + + // Then check the transition from zero to nonzero backoff limits. + let params = RestartParams::new(Duration::from_secs(2), Duration::from_secs(16)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(4 * params.min_backoff)); + + // Decreasing the upper bound results in a reduced subsequent backoff. + let params = RestartParams::new(Duration::from_secs(3), Duration::from_secs(5)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + + // Increasing the lower bound raises the subsequent backoff. + let params = RestartParams::new(Duration::from_secs(20), Duration::from_secs(30)); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + + // Limiting the number of retry attempts kicks in. + let mut backoff = RestartBackoff::default(); + let params = RestartParams::new(Duration::from_secs(20), Duration::from_secs(30)) + .max_retries(NonZeroU64::new(2).unwrap()); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + assert_eq!(backoff.next(¶ms), None); + } +} diff --git a/elfo-core/src/restarting/config.rs b/elfo-core/src/restarting/config.rs new file mode 100644 index 00000000..efcf8bfb --- /dev/null +++ b/elfo-core/src/restarting/config.rs @@ -0,0 +1,61 @@ +use std::{num::NonZeroU64, time::Duration}; + +use serde::{Deserialize, Deserializer}; + +use crate::restarting::restart_policy::{RestartParams, RestartPolicy}; + +#[derive(Debug, Default)] +pub(crate) struct RestartingConfig { + pub(crate) overriding_policy: Option, +} + +impl<'de> Deserialize<'de> for RestartingConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Option::::deserialize(deserializer).map(|p| RestartingConfig { + overriding_policy: p.map(Into::into), + }) + } +} + +#[derive(Debug, Deserialize)] +struct RestartParamsConfig { + #[serde(with = "humantime_serde")] + min_backoff: Duration, + #[serde(with = "humantime_serde")] + max_backoff: Duration, + #[serde(with = "humantime_serde", default)] + auto_reset: Option, + max_retries: Option, + #[serde(default)] + factor: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "when")] +enum RestartPolicyConfig { + Always(RestartParamsConfig), + OnFailure(RestartParamsConfig), + Never, +} + +impl From for RestartPolicy { + fn from(p: RestartPolicyConfig) -> Self { + match p { + RestartPolicyConfig::Always(p) => RestartPolicy::always(p.into()), + RestartPolicyConfig::OnFailure(p) => RestartPolicy::on_failure(p.into()), + RestartPolicyConfig::Never => RestartPolicy::never(), + } + } +} + +impl From for RestartParams { + fn from(p: RestartParamsConfig) -> Self { + RestartParams::new(p.min_backoff, p.max_backoff) + .factor(p.factor) + .auto_reset(p.auto_reset) + .max_retries(p.max_retries) + } +} diff --git a/elfo-core/src/restarting/mod.rs b/elfo-core/src/restarting/mod.rs new file mode 100644 index 00000000..1b20186f --- /dev/null +++ b/elfo-core/src/restarting/mod.rs @@ -0,0 +1,6 @@ +mod backoff; +mod config; +mod restart_policy; + +pub(crate) use self::{backoff::RestartBackoff, config::RestartingConfig}; +pub use restart_policy::{RestartParams, RestartPolicy}; diff --git a/elfo-core/src/restarting/restart_policy.rs b/elfo-core/src/restarting/restart_policy.rs new file mode 100644 index 00000000..465d5004 --- /dev/null +++ b/elfo-core/src/restarting/restart_policy.rs @@ -0,0 +1,123 @@ +use std::{num::NonZeroU64, time::Duration}; + +use crate::ActorStatus; + +/// The behaviour on actor termination. +#[derive(Debug, Clone, PartialEq)] +pub struct RestartPolicy { + pub(crate) mode: RestartMode, +} + +impl Default for RestartPolicy { + fn default() -> Self { + Self::never() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum RestartMode { + Always(RestartParams), + OnFailure(RestartParams), + Never, +} + +impl RestartPolicy { + pub fn always(restart_params: RestartParams) -> Self { + Self { + mode: RestartMode::Always(restart_params), + } + } + + pub fn on_failure(restart_params: RestartParams) -> Self { + Self { + mode: RestartMode::OnFailure(restart_params), + } + } + + pub fn never() -> Self { + Self { + mode: RestartMode::Never, + } + } + + pub(crate) fn restarting_allowed(&self, status: &ActorStatus) -> bool { + match &self.mode { + RestartMode::Always(_) => true, + RestartMode::OnFailure(_) => status.is_failed(), + _ => false, + } + } + + pub(crate) fn restart_params(&self) -> Option { + match &self.mode { + RestartMode::Always(params) | RestartMode::OnFailure(params) => Some(*params), + _ => None, + } + } +} + +/// The restart params for the backoff strategy when an actor restarts due to [RestartPolicy]. +#[derive(Debug, Copy, Clone, PartialEq)] +pub struct RestartParams { + pub(crate) min_backoff: Duration, + pub(crate) max_backoff: Duration, + pub(crate) auto_reset: Duration, + pub(crate) max_retries: NonZeroU64, + pub(crate) factor: f64, +} + +impl RestartParams { + /// Creates a new instance with the specified minimum and maximum backoff + /// durations and `auto_reset`, `max_retries`, `factor` are set to their + /// default values: + /// `auto_reset = min_backoff` + /// `max_retries = NonZeroU64::MAX` + /// `factor = 2.0` + pub fn new(min_backoff: Duration, max_backoff: Duration) -> Self { + RestartParams { + min_backoff, + max_backoff: min_backoff.max(max_backoff), + auto_reset: min_backoff, + max_retries: NonZeroU64::MAX, + factor: 2.0, + } + } + + /// The duration considered sufficiently long-running for an actor. After + /// this, the backoff strategy automatically resets, including retry + /// counting (the next attempt becomes the first retry). + /// Therefore, setting the `auto_reset` to small values, such as + /// `Duration::ZERO`, can result in the absence of a limit on the maximum + /// number of retries. + /// `None` does not change the `auto_reset` setting. + /// + /// `min_backoff` is used by default. + pub fn auto_reset(self, auto_reset: impl Into>) -> Self { + Self { + auto_reset: auto_reset.into().unwrap_or(self.auto_reset), + ..self + } + } + + /// Sets the factor used to calculate the next backoff duration. + /// `None` does not change the `factor` setting. + /// + /// Factor of `2.0` is used by default. + pub fn factor(self, factor: impl Into>) -> Self { + Self { + factor: factor.into().unwrap_or(self.factor), + ..self + } + } + + /// Sets the maximum number of allowed retries. + /// `None` does not change the `max_retries` setting. + /// + /// `NonZeroU64::MAX` is used by default. + pub fn max_retries(self, max_retries: impl Into>) -> Self { + Self { + max_retries: max_retries.into().unwrap_or(self.max_retries), + ..self + } + } +} diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index 04ae1179..2654de8e 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -11,17 +11,19 @@ use tracing::{debug, error, error_span, info, warn, Instrument, Span}; use elfo_utils::CachePadded; -use self::{backoff::Backoff, error_chain::ErrorChain, measure_poll::MeasurePoll}; +use self::{error_chain::ErrorChain, measure_poll::MeasurePoll}; use crate::{ - actor::{Actor, ActorMeta, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, config::{AnyConfig, Config, SystemConfig}, context::Context, envelope::Envelope, exec::{Exec, ExecResult}, - group::{RestartMode, RestartPolicy, TerminationPolicy}, + group::TerminationPolicy, message::Request, messages, msg, object::{GroupVisitor, Object, ObjectArc}, + restarting::RestartBackoff, + restarting::RestartPolicy, routers::{Outcome, Router}, runtime::RuntimeManager, scope::{self, Scope, ScopeGroupShared}, @@ -30,7 +32,6 @@ use crate::{ Addr, ResponseToken, }; -mod backoff; mod error_chain; mod measure_poll; @@ -58,14 +59,14 @@ struct ControlBlock { /// Returns `None` if cannot be spawned. macro_rules! get_or_spawn { - ($this:ident, $key:expr) => {{ + ($this:ident, $key:expr, $start_info:expr) => {{ let key = $key; match $this.objects.get(&key) { Some(object) => Some(object), None => $this .objects .entry(key.clone()) - .or_try_insert_with(|| $this.spawn(key, Default::default()).ok_or(())) + .or_try_insert_with(|| $this.spawn(key, $start_info, Default::default()).ok_or(())) .map(|o| o.downgrade()) // FIXME: take an exclusive lock here. .ok(), } @@ -173,7 +174,7 @@ where let outcome = self.router.route(&envelope); if only_spawn { - self.spawn_by_outcome(outcome); + self.spawn_on_group_mounted(outcome); let token = extract_response_token::(envelope); self.context.respond(token, Ok(())); return visitor.done(); @@ -216,8 +217,9 @@ where } }); + let start_info = ActorStartInfo::on_message(); match outcome { - Outcome::Unicast(key) => match get_or_spawn!(self, key) { + Outcome::Unicast(key) => match get_or_spawn!(self, key, start_info) { Some(object) => visitor.visit_last(&object, envelope), None => visitor.empty(envelope), }, @@ -228,7 +230,7 @@ where Outcome::Multicast(list) => { for key in list.iter() { if !self.objects.contains_key(key) { - get_or_spawn!(self, key.clone()); + get_or_spawn!(self, key.clone(), start_info.clone()); } } let iter = list.into_iter().filter_map(|key| self.objects.get(&key)); @@ -266,7 +268,12 @@ where } } - fn spawn(self: &Arc, key: R::Key, mut backoff: Backoff) -> Option { + fn spawn( + self: &Arc, + key: R::Key, + start_info: ActorStartInfo, + mut backoff: RestartBackoff, + ) -> Option { let control = self.control.read(); if control.stop_spawning { return None; @@ -285,6 +292,8 @@ where ); let system_config = control.system_config.clone(); + let config_restart_policy = system_config.restarting.overriding_policy.clone(); + let user_config = control .user_config .as_ref() @@ -323,26 +332,33 @@ where Err(panic) => ActorStatus::FAILED.with_details(panic_to_string(panic)), }; - let should_restart = { - let object = sv.objects.get(&key).expect("where is the current actor?"); - let actor = object.as_actor().expect("a supervisor stores only actors"); - - let rp_override = actor.restart_policy(); - let restart_policy = rp_override.as_ref().unwrap_or(&sv.restart_policy); - let should_restart = match restart_policy.mode { - RestartMode::Always => true, - RestartMode::OnFailures => new_status.is_failed(), - RestartMode::Never => false, - }; - - actor.set_status(new_status); - should_restart - }; - - let need_to_restart = should_restart && !sv.control.read().stop_spawning; - if need_to_restart { - let after = backoff.next(); - + let object = sv.objects.get(&key).expect("where is the current actor?"); + let actor = object.as_actor().expect("a supervisor stores only actors"); + + let rp_override = actor.restart_policy(); + // Select the restart policy with the next priority: + // ctx.set_restart_policy(..) > config > blueprint (sv.restart_policy). + let restart_policy = rp_override + .as_ref() + .unwrap_or(config_restart_policy.as_ref().unwrap_or(&sv.restart_policy)) + .clone(); + + let restarting_allowed = + restart_policy.restarting_allowed(&new_status) && !sv.control.read().stop_spawning; + actor.set_status(new_status); + + drop(object); + + let restart_after = restarting_allowed + .then(|| { + let restart_params = restart_policy + .restart_params() + .expect("restart params are set if actor can restart"); + backoff.next(&restart_params) + }) + .flatten(); + + if let Some(after) = restart_after { if after == Duration::ZERO { debug!("actor will be restarted immediately"); } else { @@ -357,7 +373,7 @@ where scope::set_trace_id(TraceId::generate()); backoff.start(); - if let Some(object) = sv.spawn(key.clone(), backoff) { + if let Some(object) = sv.spawn(key.clone(), ActorStartInfo::on_restart(), backoff) { sv.objects.insert(key.clone(), object) } else { sv.objects.remove(&key).map(|(_, v)| v) @@ -382,6 +398,7 @@ where let actor = Actor::new( meta.clone(), addr, + start_info, self.termination_policy.clone(), self.status_subscription.clone(), ); @@ -400,14 +417,15 @@ where Some(object) } - fn spawn_by_outcome(self: &Arc, outcome: Outcome) { + fn spawn_on_group_mounted(self: &Arc, outcome: Outcome) { + let start_info = ActorStartInfo::on_group_mounted(); match outcome { Outcome::Unicast(key) => { - get_or_spawn!(self, key); + get_or_spawn!(self, key, start_info); } Outcome::Multicast(keys) => { for key in keys { - get_or_spawn!(self, key); + get_or_spawn!(self, key, start_info.clone()); } } Outcome::GentleUnicast(_) diff --git a/elfo-core/src/supervisor/backoff.rs b/elfo-core/src/supervisor/backoff.rs deleted file mode 100644 index 043ef33b..00000000 --- a/elfo-core/src/supervisor/backoff.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::time::Duration; - -use elfo_utils::time::Instant; - -const BACKOFF_STEP: Duration = Duration::from_secs(5); -const MAX_BACKOFF: Duration = Duration::from_secs(30); - -pub(crate) struct Backoff { - next_backoff: Duration, - start_time: Instant, -} - -impl Default for Backoff { - fn default() -> Self { - Self { - next_backoff: BACKOFF_STEP, - start_time: Instant::now(), - } - } -} - -impl Backoff { - pub(crate) fn start(&mut self) { - self.start_time = Instant::now(); - } - - pub(crate) fn next(&mut self) -> Duration { - // If an actor is alive enough time, reset the backoff. - if self.start_time.elapsed() >= BACKOFF_STEP { - self.next_backoff = Duration::ZERO; - } - - let backoff = self.next_backoff; - self.next_backoff = (self.next_backoff + BACKOFF_STEP).min(MAX_BACKOFF); - backoff - } -} - -#[cfg(test)] -mod tests { - use elfo_utils::time; - - use super::*; - - #[test] - fn it_works() { - time::with_instant_mock(|mock| { - let mut backoff = Backoff::default(); - - // Immediately failed. - assert_eq!(backoff.next(), BACKOFF_STEP); - mock.advance(BACKOFF_STEP); - backoff.start(); - - // And again. - assert_eq!(backoff.next(), 2 * BACKOFF_STEP); - mock.advance(2 * BACKOFF_STEP); - backoff.start(); - - // After some, not enough to reset the backoff, time. - mock.advance(BACKOFF_STEP * 2 / 3); - assert_eq!(backoff.next(), 3 * BACKOFF_STEP); - mock.advance(3 * BACKOFF_STEP); - backoff.start(); - - // After some, enough to reset the backoff, time. - mock.advance(BACKOFF_STEP); - assert_eq!(backoff.next(), Duration::ZERO); // resetted - backoff.start(); - - // After some, not enough to reset the backoff, time. - mock.advance(BACKOFF_STEP * 2 / 3); - assert_eq!(backoff.next(), BACKOFF_STEP); - }); - } -} diff --git a/elfo-dumper/src/actor.rs b/elfo-dumper/src/actor.rs index 4f4a883b..9d0bc83c 100644 --- a/elfo-dumper/src/actor.rs +++ b/elfo-dumper/src/actor.rs @@ -1,4 +1,4 @@ -use std::{iter, panic, sync::Arc}; +use std::{iter, panic, sync::Arc, time::Duration}; use eyre::{Result, WrapErr}; use fxhash::FxHashSet; @@ -15,7 +15,7 @@ use elfo_core::{ scope::{self, SerdeMode}, signal::{Signal, SignalKind}, time::Interval, - ActorGroup, Blueprint, Context, TerminationPolicy, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy, }; use elfo_utils::ward; @@ -252,6 +252,10 @@ pub(crate) fn new(dump_storage: Arc>) -> Blueprint { ActorGroup::new() .config::() .termination_policy(TerminationPolicy::manually()) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .router(MapRouter::new(move |envelope| { msg!(match envelope { diff --git a/elfo-logger/src/actor.rs b/elfo-logger/src/actor.rs index 118ee6c1..c8ca666c 100644 --- a/elfo-logger/src/actor.rs +++ b/elfo-logger/src/actor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use metrics::increment_counter; use tokio::{ @@ -12,7 +12,7 @@ use elfo_core::{ messages::{ConfigUpdated, Terminate}, msg, signal::{Signal, SignalKind}, - ActorGroup, Blueprint, Context, TerminationPolicy, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy, }; use crate::{ @@ -42,6 +42,10 @@ impl Logger { ActorGroup::new() .config::() .termination_policy(TerminationPolicy::manually()) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(105) .exec(move |ctx| Logger::new(ctx, shared.clone(), filtering_layer.clone()).main()) } diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index b4cc15f2..16da04d3 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use eyre::{bail, eyre, Result, WrapErr}; use futures::StreamExt; @@ -9,7 +9,7 @@ use elfo_core::{ _priv::{GroupNo, MessageKind}, messages::ConfigUpdated, stream::Stream, - Topology, + RestartParams, Topology, }; use crate::{ @@ -90,7 +90,11 @@ impl Discovery { pub(super) async fn main(mut self) -> Result<()> { // The default restart policy of this group is `never`, so override it. - self.ctx.set_restart_policy(RestartPolicy::on_failures()); + self.ctx + .set_restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))); self.listen().await?; self.discover(); diff --git a/elfo-network/src/lib.rs b/elfo-network/src/lib.rs index 512f1ac8..bb8f2896 100644 --- a/elfo-network/src/lib.rs +++ b/elfo-network/src/lib.rs @@ -16,7 +16,7 @@ use elfo_core::{ messages::UpdateConfig, msg, routers::{MapRouter, Outcome}, - ActorGroup, Blueprint, Context, RestartPolicy, Topology, + ActorGroup, Blueprint, Context, Topology, }; use crate::{ @@ -63,8 +63,6 @@ pub fn new(topology: &Topology) -> Blueprint { ActorGroup::new() .config::() - // The restart policy is overrided by the discovery actor. - .restart_policy(RestartPolicy::never()) .stop_order(100) .router(MapRouter::new(|envelope| { msg!(match envelope { diff --git a/elfo-pinger/src/lib.rs b/elfo-pinger/src/lib.rs index 6b9f9b92..29d6eceb 100644 --- a/elfo-pinger/src/lib.rs +++ b/elfo-pinger/src/lib.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms, unreachable_pub)] -use elfo_core::{ActorGroup, Blueprint, Topology}; +use std::time::Duration; + +use elfo_core::{ActorGroup, Blueprint, RestartParams, RestartPolicy, Topology}; mod actor; mod config; @@ -9,6 +11,10 @@ pub fn new(topology: &Topology) -> Blueprint { let topology = topology.clone(); ActorGroup::new() .config::() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .exec(move |ctx| actor::exec(ctx, topology.clone())) } diff --git a/elfo-telemeter/src/actor.rs b/elfo-telemeter/src/actor.rs index 21973940..ddd2502b 100644 --- a/elfo-telemeter/src/actor.rs +++ b/elfo-telemeter/src/actor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use metrics::gauge; use tokio::task::JoinHandle; @@ -6,7 +6,7 @@ use tracing::{error, info}; use elfo_core::{ message, messages::ConfigUpdated, msg, scope, time::Interval, tracing::TraceId, ActorGroup, - Blueprint, Context, MoveOwnership, + Blueprint, Context, MoveOwnership, RestartParams, RestartPolicy, }; use crate::{ @@ -39,6 +39,10 @@ struct ServerFailed(MoveOwnership); pub(crate) fn new(storage: Arc) -> Blueprint { ActorGroup::new() .config::() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .exec(move |ctx| Telemeter::new(ctx, storage.clone()).main()) } diff --git a/elfo/tests/restarting.rs b/elfo/tests/restarting.rs index 004fd3ed..6771e1fb 100644 --- a/elfo/tests/restarting.rs +++ b/elfo/tests/restarting.rs @@ -1,23 +1,25 @@ #![cfg(feature = "test-util")] +#![allow(clippy::never_loop)] use std::{ - panic::AssertUnwindSafe, sync::{Arc, Mutex}, time::Duration, }; -use futures::FutureExt; - -use elfo::{prelude::*, RestartPolicy}; - -#[message] -struct Terminate; - -#[message] -struct Terminated; +use elfo::{ + prelude::*, + routers::{MapRouter, Outcome, Singleton}, + RestartParams, RestartPolicy, +}; #[tokio::test] async fn actor_restarts_explicitly() { + #[message] + struct Terminate; + + #[message] + struct Terminated; + let blueprint = ActorGroup::new().exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { @@ -38,27 +40,42 @@ async fn actor_restarts_explicitly() { #[tokio::test(start_paused = true)] async fn actor_restarts_with_timeout_after_failures() { - let blueprint = ActorGroup::new().exec(move |mut ctx| async move { - while let Some(envelope) = ctx.recv().await { - msg!(match envelope { - Terminate { .. } => panic!("boom!"), - _ => unreachable!(), - }); - } - }); - - let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + #[message] + struct HealthCheck; - for i in 1..5 { - proxy.send(Terminate).await; + #[message] + struct Spawn; + + let blueprint = ActorGroup::new() + .router(MapRouter::new(|e| { + msg!(match e { + Spawn => Outcome::Unicast(Singleton), + // HealthCheck should not spawn the actor. + HealthCheck => Outcome::GentleUnicast(Singleton), + _ => Outcome::Discard, + }) + })) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) + .exec(move |mut ctx| async move { + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + HealthCheck | Spawn => panic!("boom!"), + _ => unreachable!(), + }); + } + }); - let r = AssertUnwindSafe(async { proxy.recv().await }) - .catch_unwind() - .await; - assert!(r.is_err()); + let proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + proxy.send(Spawn).await; + for i in 1..5 { + proxy.send(HealthCheck).await; + let delay = Duration::from_millis(((5000f64 * 2.0f64.powi(i)) as u64).min(30000)); // https://github.com/tokio-rs/tokio/issues/3985 - tokio::time::sleep(Duration::from_millis(5000 * i + 1)).await; + tokio::time::sleep(delay).await; } } @@ -68,7 +85,10 @@ async fn actor_overrides_policy() { struct Started; let blueprint = ActorGroup::new().exec(move |ctx| async move { - ctx.set_restart_policy(RestartPolicy::always()); + ctx.set_restart_policy(RestartPolicy::always(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))); let _ = ctx.send(Started).await; }); diff --git a/elfo/tests/start_info.rs b/elfo/tests/start_info.rs new file mode 100644 index 00000000..bb6c4412 --- /dev/null +++ b/elfo/tests/start_info.rs @@ -0,0 +1,56 @@ +#![cfg(feature = "test-util")] + +use std::{num::NonZeroU64, time::Duration}; + +use elfo::{ + messages::UpdateConfig, + prelude::*, + routers::{MapRouter, Outcome, Singleton}, + RestartParams, RestartPolicy, +}; + +#[tokio::test] +async fn it_works() { + #[message] + struct Start; + + #[message] + struct Restarted; + #[message] + struct GroupMounted; + #[message] + struct OnMessage; + + let blueprint = ActorGroup::new() + .router(MapRouter::new(|e| { + msg!(match e { + UpdateConfig | Start => Outcome::Unicast(Singleton), + _ => Outcome::Discard, + }) + })) + .restart_policy(RestartPolicy::on_failure( + RestartParams::new(Duration::from_secs(0), Duration::from_secs(0)) + .auto_reset(Duration::MAX) + .max_retries(NonZeroU64::new(1).unwrap()), + )) + .exec(move |ctx| async move { + let cause = ctx.start_info().cause; + if cause.is_group_mounted() { + let _ = ctx.send(GroupMounted).await; + } + if cause.is_restarted() { + let _ = ctx.send(Restarted).await; + } + if cause.is_group_mounted() { + let _ = ctx.send(OnMessage).await; + } + panic!("boom!"); + }); + + let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + assert_msg!(proxy.recv().await, GroupMounted); + assert_msg!(proxy.recv().await, Restarted); + proxy.send(Start).await; + assert_msg!(proxy.recv().await, OnMessage); + assert_msg!(proxy.recv().await, Restarted); +} diff --git a/elfo/tests/subscription_to_statuses.rs b/elfo/tests/subscription_to_statuses.rs index 3c26e2b3..cd154d98 100644 --- a/elfo/tests/subscription_to_statuses.rs +++ b/elfo/tests/subscription_to_statuses.rs @@ -9,6 +9,7 @@ use elfo::{ test::Proxy, ActorStatus, ActorStatusKind, }; +use elfo_core::{RestartParams, RestartPolicy}; #[message] struct Start(u32); @@ -27,6 +28,10 @@ async fn run_group() -> Proxy { _ => Outcome::Default, }) })) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { diff --git a/elfo/tests/termination.rs b/elfo/tests/termination.rs index d485ff34..609274ac 100644 --- a/elfo/tests/termination.rs +++ b/elfo/tests/termination.rs @@ -1,4 +1,5 @@ #![cfg(feature = "test-util")] +#![allow(clippy::never_loop)] use elfo::{messages::Terminate, prelude::*, TerminationPolicy};