diff --git a/elfo-configurer/src/lib.rs b/elfo-configurer/src/lib.rs index 8b39d4af..d7b59e60 100644 --- a/elfo-configurer/src/lib.rs +++ b/elfo-configurer/src/lib.rs @@ -22,7 +22,7 @@ use elfo_core::{ }, msg, scope, signal::{Signal, SignalKind}, - ActorGroup, ActorStatus, Addr, Blueprint, Context, Topology, + ActorGroup, ActorStatus, Addr, Blueprint, Context, RestartParams, RestartPolicy, Topology, }; pub use self::protocol::*; @@ -48,6 +48,10 @@ fn blueprint(topology: &Topology, source: ConfigSource) -> Blueprint { let topology = topology.clone(); ActorGroup::new() .stop_order(100) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .exec(move |ctx| Configurer::new(ctx, topology.clone(), source.clone()).main()) } diff --git a/elfo-core/Cargo.toml b/elfo-core/Cargo.toml index e1d6a4bb..6c286821 100644 --- a/elfo-core/Cargo.toml +++ b/elfo-core/Cargo.toml @@ -50,6 +50,7 @@ regex = "1.6.0" thread_local = { version = "1.1.3", optional = true } unicycle = "0.9.3" rmp-serde = { version = "1.1.0", optional = true } +humantime-serde = "1" [dev-dependencies] elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] } diff --git a/elfo-core/src/actor.rs b/elfo-core/src/actor.rs index 95ee5e9c..9f2fe16c 100644 --- a/elfo-core/src/actor.rs +++ b/elfo-core/src/actor.rs @@ -7,13 +7,15 @@ use serde::{Deserialize, Serialize}; use tracing::{error, info, warn}; use crate::{ + config::SystemConfig, envelope::Envelope, errors::{SendError, TrySendError}, - group::{RestartPolicy, TerminationPolicy}, + group::TerminationPolicy, mailbox::{Mailbox, RecvResult}, - messages::{ActorStatusReport, Terminate}, + messages::{ActorStatusReport, Terminate, UpdateConfig}, msg, request_table::RequestTable, + restarting::RestartPolicy, scope, subscription::SubscriptionManager, Addr, @@ -117,11 +119,70 @@ impl ActorStatusKind { } } +// === ActorStartInfo === + +/// A struct holding information related to an actor start. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ActorStartInfo { + /// The cause for the actor start, indicating why the actor is being + /// initialized. + pub cause: ActorStartCause, +} + +/// An enum representing various causes for an actor to start. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum ActorStartCause { + /// The actor started because its group was mounted. + GroupMounted, + /// The actor started in response to a message. + OnMessage, + /// The actor started due to the restart policy. + Restarted, +} + +impl ActorStartInfo { + pub(crate) fn on_group_mounted() -> Self { + Self { + cause: ActorStartCause::GroupMounted, + } + } + + pub(crate) fn on_message() -> Self { + Self { + cause: ActorStartCause::OnMessage, + } + } + + pub(crate) fn on_restart() -> Self { + Self { + cause: ActorStartCause::Restarted, + } + } +} + +impl ActorStartCause { + pub fn is_group_mounted(&self) -> bool { + matches!(self, ActorStartCause::GroupMounted) + } + + pub fn is_restarted(&self) -> bool { + matches!(self, ActorStartCause::Restarted) + } + + pub fn is_on_message(&self) -> bool { + matches!(self, ActorStartCause::OnMessage) + } +} + // === Actor === pub(crate) struct Actor { meta: Arc, termination_policy: TerminationPolicy, + // blueprint restart policy + default_restart_policy: RestartPolicy, mailbox: Mailbox, request_table: RequestTable, control: RwLock, @@ -131,25 +192,29 @@ pub(crate) struct Actor { struct ControlBlock { status: ActorStatus, - /// If `None`, a group's policy will be used. - restart_policy: Option, + /// If `None`, `group_restart_policy will be used. + rp_override: Option, + group_restart_policy: RestartPolicy, } impl Actor { pub(crate) fn new( meta: Arc, addr: Addr, + restart_policy: RestartPolicy, termination_policy: TerminationPolicy, status_subscription: Arc, ) -> Self { Actor { meta, + default_restart_policy: restart_policy.clone(), termination_policy, mailbox: Mailbox::new(), request_table: RequestTable::new(addr), control: RwLock::new(ControlBlock { status: ActorStatus::INITIALIZING, - restart_policy: None, + rp_override: None, + group_restart_policy: restart_policy, }), finished: ManualResetEvent::new(false), status_subscription, @@ -167,6 +232,9 @@ impl Actor { pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError> { msg!(match &envelope { + UpdateConfig { config } => { + self.update_config(config.get_system()); + } Terminate { closing } => { if *closing || self.termination_policy.close_mailbox { if self.close() { @@ -181,8 +249,20 @@ impl Actor { self.mailbox.try_send(envelope) } + pub(crate) fn update_config(&self, config: &SystemConfig) { + let restart_policy = config + .restart_policy + .make_policy() + .unwrap_or(self.default_restart_policy.clone()); + + self.control.write().group_restart_policy = restart_policy; + } + pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError> { msg!(match &envelope { + UpdateConfig { config } => { + self.update_config(config.get_system()); + } Terminate { closing } => { if *closing || self.termination_policy.close_mailbox { if self.close() { @@ -209,12 +289,16 @@ impl Actor { &self.request_table } - pub(crate) fn restart_policy(&self) -> Option { - self.control.read().restart_policy.clone() + pub(crate) fn restart_policy(&self) -> RestartPolicy { + let control = self.control.read(); + control + .rp_override + .clone() + .unwrap_or(control.group_restart_policy.clone()) } pub(crate) fn set_restart_policy(&self, policy: Option) { - self.control.write().restart_policy = policy; + self.control.write().rp_override = policy; } // Note that this method should be called inside a right scope. diff --git a/elfo-core/src/config.rs b/elfo-core/src/config.rs index 98e906dc..423bb52d 100644 --- a/elfo-core/src/config.rs +++ b/elfo-core/src/config.rs @@ -162,6 +162,7 @@ pub(crate) struct SystemConfig { pub(crate) logging: crate::logging::LoggingConfig, pub(crate) dumping: crate::dumping::DumpingConfig, pub(crate) telemetry: crate::telemetry::TelemetryConfig, + pub(crate) restart_policy: crate::restarting::RestartPolicyConfig, } // === Secret === diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 38900790..ac1e8e0a 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -7,7 +7,7 @@ use tracing::{info, trace}; use elfo_utils::unlikely; use crate::{ - actor::{Actor, ActorStatus}, + actor::{Actor, ActorStartInfo, ActorStatus}, addr::Addr, address_book::AddressBook, config::AnyConfig, @@ -15,12 +15,12 @@ use crate::{ dumping::{Direction, Dump, Dumper, INTERNAL_CLASS}, envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind}, errors::{RequestError, SendError, TryRecvError, TrySendError}, - group::RestartPolicy, mailbox::RecvResult, message::{Message, Request}, messages, msg, object::ObjectArc, request_table::ResponseToken, + restarting::RestartPolicy, routers::Singleton, scope, source::{SourceHandle, Sources, UnattachedSource}, @@ -38,6 +38,7 @@ pub struct Context { book: AddressBook, actor: Option, // `None` for group's and pruned context. actor_addr: Addr, + actor_start_info: Option, // `None` for group's context, group_addr: Addr, demux: Demux, config: Arc, @@ -624,6 +625,39 @@ impl Context { } } + /// Retrieves information related to the start of the actor. + /// + /// # Panics + /// + /// This method will panic if the context is pruned, indicating that the + /// required information is no longer available. + /// + /// # Example + /// + /// ``` + /// # use elfo_core as elfo; + /// # use elfo_core::{ActorStartCause, ActorStartInfo}; + /// # async fn exec(mut ctx: elfo::Context) { + /// match ctx.start_info().cause { + /// ActorStartCause::GroupMounted => { + /// // The actor started because its group was mounted. + /// } + /// ActorStartCause::OnMessage => { + /// // The actor started in response to a message. + /// } + /// ActorStartCause::Restarted => { + /// // The actor started due to the restart policy. + /// } + /// _ => {} + /// } + /// # } + /// ``` + pub fn start_info(&self) -> &ActorStartInfo { + self.actor_start_info + .as_ref() + .expect("start_info is not available for a group context") + } + fn pre_recv(&mut self) { self.stats.on_recv(); @@ -718,6 +752,7 @@ impl Context { book: self.book.clone(), actor: None, actor_addr: self.actor_addr, + actor_start_info: self.actor_start_info.clone(), group_addr: self.group_addr, demux: self.demux.clone(), config: Arc::new(()), @@ -740,6 +775,7 @@ impl Context { book: self.book, actor: self.actor, actor_addr: self.actor_addr, + actor_start_info: self.actor_start_info, group_addr: self.group_addr, demux: self.demux, config, @@ -764,11 +800,17 @@ impl Context { self } + pub(crate) fn with_start_info(mut self, actor_start_info: ActorStartInfo) -> Self { + self.actor_start_info = Some(actor_start_info); + self + } + pub(crate) fn with_key(self, key: K1) -> Context { Context { book: self.book, actor: self.actor, actor_addr: self.actor_addr, + actor_start_info: self.actor_start_info, group_addr: self.group_addr, demux: self.demux, config: self.config, @@ -832,6 +874,7 @@ impl Context { actor: None, actor_addr: Addr::NULL, group_addr: Addr::NULL, + actor_start_info: None, demux, config: Arc::new(()), key: Singleton, @@ -850,6 +893,7 @@ impl Clone for Context { book: self.book.clone(), actor: self.book.get_owned(self.actor_addr), actor_addr: self.actor_addr, + actor_start_info: self.actor_start_info.clone(), group_addr: self.group_addr, demux: self.demux.clone(), config: self.config.clone(), diff --git a/elfo-core/src/group.rs b/elfo-core/src/group.rs index 0bd8d892..a39c928a 100644 --- a/elfo-core/src/group.rs +++ b/elfo-core/src/group.rs @@ -8,6 +8,7 @@ use crate::{ envelope::Envelope, exec::{Exec, ExecResult}, object::{GroupHandle, GroupVisitor, Object}, + restarting::RestartPolicy, routers::Router, runtime::RuntimeManager, supervisor::Supervisor, @@ -47,13 +48,15 @@ impl ActorGroup { } /// The behaviour on actor termination. - /// `RestartPolicy::on_failures` is used by default. + /// + /// `RestartPolicy::never` is used by default. pub fn restart_policy(mut self, policy: RestartPolicy) -> Self { self.restart_policy = policy; self } /// The behaviour on the `Terminate` message. + /// /// `TerminationPolicy::closing` is used by default. pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self { self.termination_policy = policy; @@ -183,42 +186,3 @@ impl TerminationPolicy { // TODO: add `stop_spawning`? } - -/// The behaviour on actor termination. -#[derive(Debug, Clone)] -pub struct RestartPolicy { - pub(crate) mode: RestartMode, -} - -impl Default for RestartPolicy { - fn default() -> Self { - Self::on_failures() - } -} - -#[derive(Debug, Clone)] -pub(crate) enum RestartMode { - Always, - OnFailures, - Never, -} - -impl RestartPolicy { - pub fn always() -> Self { - Self { - mode: RestartMode::Always, - } - } - - pub fn on_failures() -> Self { - Self { - mode: RestartMode::OnFailures, - } - } - - pub fn never() -> Self { - Self { - mode: RestartMode::Never, - } - } -} diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 06d75ef6..796f2f1a 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -13,7 +13,7 @@ use elfo_utils::time::Instant; use crate::{memory_tracker::MemoryTracker, time::Interval}; use crate::{ - actor::{Actor, ActorMeta, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, addr::{Addr, GroupNo}, config::SystemConfig, context::Context, @@ -174,6 +174,7 @@ pub async fn do_start( meta.clone(), addr, Default::default(), + Default::default(), Arc::new(SubscriptionManager::new(ctx.clone())), ); @@ -187,7 +188,9 @@ pub async fn do_start( entry.insert(Object::new(addr, actor)); // It must be called after `entry.insert()`. - let ctx = ctx.with_addr(addr); + let ctx = ctx + .with_addr(addr) + .with_start_info(ActorStartInfo::on_group_mounted()); let init = async move { start_entrypoints(&ctx, &topology, is_check_only).await?; diff --git a/elfo-core/src/lib.rs b/elfo-core/src/lib.rs index d6978044..e58a5def 100644 --- a/elfo-core/src/lib.rs +++ b/elfo-core/src/lib.rs @@ -11,15 +11,16 @@ extern crate self as elfo_core; // TODO: revise this list pub use crate::{ - actor::{ActorMeta, ActorStatus, ActorStatusKind}, + actor::{ActorMeta, ActorStartCause, ActorStartInfo, ActorStatus, ActorStatusKind}, addr::Addr, config::Config, context::{Context, RequestBuilder}, envelope::Envelope, - group::{ActorGroup, Blueprint, RestartPolicy, TerminationPolicy}, + group::{ActorGroup, Blueprint, TerminationPolicy}, local::{Local, MoveOwnership}, message::{Message, Request}, request_table::ResponseToken, + restarting::{RestartParams, RestartPolicy}, source::{SourceHandle, UnattachedSource}, topology::Topology, }; @@ -65,6 +66,7 @@ pub mod remote; #[cfg(all(feature = "network", not(feature = "unstable")))] mod remote; mod request_table; +mod restarting; mod runtime; mod source; mod subscription; diff --git a/elfo-core/src/node.rs b/elfo-core/src/node.rs index ce518a6e..fa8cef28 100644 --- a/elfo-core/src/node.rs +++ b/elfo-core/src/node.rs @@ -19,6 +19,7 @@ impl From for NodeNo { static NODE_NO: AtomicU16 = AtomicU16::new(0); +#[stability::unstable] /// Returns the current `node_no`. pub fn node_no() -> Option { crate::addr::NodeNo::from_bits(NODE_NO.load(Ordering::Relaxed)) diff --git a/elfo-core/src/restarting/backoff.rs b/elfo-core/src/restarting/backoff.rs new file mode 100644 index 00000000..21e3374c --- /dev/null +++ b/elfo-core/src/restarting/backoff.rs @@ -0,0 +1,135 @@ +use std::time::Duration; + +use crate::RestartParams; +use elfo_utils::time::Instant; + +pub(crate) struct RestartBackoff { + start_time: Instant, + restart_count: u64, + power: u64, +} + +impl Default for RestartBackoff { + fn default() -> Self { + Self { + start_time: Instant::now(), + restart_count: 0, + power: 0, + } + } +} + +impl RestartBackoff { + pub(crate) fn start(&mut self) { + self.start_time = Instant::now(); + } + + pub(crate) fn next(&mut self, params: &RestartParams) -> Option { + // If an actor is alive enough time, reset the backoff. + if self.start_time.elapsed() >= params.auto_reset { + self.restart_count = 1; + self.power = 0; + return Some(Duration::ZERO); + } + self.restart_count += 1; + + if self.restart_count > params.max_retries.get() { + return None; + } + + let delay = (params.min_backoff.as_secs_f64() * params.factor.powf(self.power as f64)) + .clamp( + params.min_backoff.as_secs_f64(), + params.max_backoff.as_secs_f64(), + ); + + self.power += 1; + // Check for overflow, if overflow is detected set the current delay to maximal + let delay = if delay.is_finite() { + Duration::from_secs_f64(delay) + } else { + params.max_backoff + }; + + Some(delay) + } +} + +#[cfg(test)] +mod tests { + use elfo_utils::time; + use std::num::NonZeroU64; + + use super::*; + + #[test] + fn it_works() { + time::with_instant_mock(|mock| { + let mut backoff = RestartBackoff::default(); + let params = RestartParams::new(Duration::from_secs(5), Duration::from_secs(30)) + .max_retries(NonZeroU64::new(3).unwrap()); + // Immediately failed. + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + mock.advance(params.min_backoff); + backoff.start(); + + // And again. + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + mock.advance(2 * params.min_backoff); + backoff.start(); + + // After some, not enough to reset the backoff, time. + mock.advance(params.min_backoff * 2 / 3); + assert_eq!(backoff.next(¶ms), Some(4 * params.min_backoff)); + mock.advance(3 * params.min_backoff); + backoff.start(); + + // After some, enough to reset the backoff, time. + mock.advance(params.min_backoff); + // The first retry. + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); // resetted + backoff.start(); + + // After some, not enough to reset the backoff, time. + mock.advance(params.min_backoff * 2 / 3); + // The second retry. + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + // The third retry. + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + // We reached the limit of reties. + assert_eq!(backoff.next(¶ms), None); + }); + } + + #[test] + fn correctness() { + let mut backoff = RestartBackoff::default(); + // Start with zero backoff duration. + let params = RestartParams::new(Duration::from_secs(0), Duration::from_secs(0)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + assert_eq!(backoff.next(¶ms), Some(Duration::ZERO)); + + // Then check the transition from zero to nonzero backoff limits. + let params = RestartParams::new(Duration::from_secs(2), Duration::from_secs(16)); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(2 * params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(4 * params.min_backoff)); + + // Decreasing the upper bound results in a reduced subsequent backoff. + let params = RestartParams::new(Duration::from_secs(3), Duration::from_secs(5)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + + // Increasing the lower bound raises the subsequent backoff. + let params = RestartParams::new(Duration::from_secs(20), Duration::from_secs(30)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + + // Limiting the number of retry attempts kicks in. + let mut backoff = RestartBackoff::default(); + let params = RestartParams::new(Duration::from_secs(20), Duration::from_secs(30)) + .max_retries(NonZeroU64::new(2).unwrap()); + assert_eq!(backoff.next(¶ms), Some(params.min_backoff)); + assert_eq!(backoff.next(¶ms), Some(params.max_backoff)); + assert_eq!(backoff.next(¶ms), None); + } +} diff --git a/elfo-core/src/restarting/config.rs b/elfo-core/src/restarting/config.rs new file mode 100644 index 00000000..bf58a147 --- /dev/null +++ b/elfo-core/src/restarting/config.rs @@ -0,0 +1,47 @@ +use std::{num::NonZeroU64, time::Duration}; + +use serde::Deserialize; + +use crate::restarting::restart_policy::{RestartParams, RestartPolicy}; + +#[derive(Debug, Clone, Default, Deserialize)] +pub(crate) struct RestartPolicyConfig(Option); + +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "when")] +enum WhenConfig { + Always(RestartParamsConfig), + OnFailure(RestartParamsConfig), + Never, +} + +#[derive(Debug, Clone, Deserialize)] +struct RestartParamsConfig { + #[serde(with = "humantime_serde")] + min_backoff: Duration, + #[serde(with = "humantime_serde")] + max_backoff: Duration, + #[serde(with = "humantime_serde", default)] + auto_reset: Option, + max_retries: Option, + factor: Option, +} + +impl RestartPolicyConfig { + pub(crate) fn make_policy(&self) -> Option { + self.0.as_ref().map(|cfg| match cfg { + WhenConfig::Always(rp_cfg) => RestartPolicy::always(rp_cfg.make_params()), + WhenConfig::OnFailure(rp_cfg) => RestartPolicy::on_failure(rp_cfg.make_params()), + WhenConfig::Never => RestartPolicy::never(), + }) + } +} + +impl RestartParamsConfig { + fn make_params(&self) -> RestartParams { + RestartParams::new(self.min_backoff, self.max_backoff) + .factor(self.factor) + .auto_reset(self.auto_reset) + .max_retries(self.max_retries) + } +} diff --git a/elfo-core/src/restarting/mod.rs b/elfo-core/src/restarting/mod.rs new file mode 100644 index 00000000..0e38d8d6 --- /dev/null +++ b/elfo-core/src/restarting/mod.rs @@ -0,0 +1,6 @@ +mod backoff; +mod config; +mod restart_policy; + +pub(crate) use self::{backoff::RestartBackoff, config::RestartPolicyConfig}; +pub use restart_policy::{RestartParams, RestartPolicy}; diff --git a/elfo-core/src/restarting/restart_policy.rs b/elfo-core/src/restarting/restart_policy.rs new file mode 100644 index 00000000..764b01ad --- /dev/null +++ b/elfo-core/src/restarting/restart_policy.rs @@ -0,0 +1,144 @@ +use std::{num::NonZeroU64, time::Duration}; + +use tracing::warn; + +use crate::ActorStatus; + +/// The behaviour on actor termination. +#[derive(Debug, Clone, PartialEq)] +pub struct RestartPolicy { + pub(crate) mode: RestartMode, +} + +impl Default for RestartPolicy { + fn default() -> Self { + Self::never() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum RestartMode { + Always(RestartParams), + OnFailure(RestartParams), + Never, +} + +impl RestartPolicy { + pub fn always(restart_params: RestartParams) -> Self { + Self { + mode: RestartMode::Always(restart_params), + } + } + + pub fn on_failure(restart_params: RestartParams) -> Self { + Self { + mode: RestartMode::OnFailure(restart_params), + } + } + + pub fn never() -> Self { + Self { + mode: RestartMode::Never, + } + } + + pub(crate) fn restarting_allowed(&self, status: &ActorStatus) -> bool { + match &self.mode { + RestartMode::Always(_) => true, + RestartMode::OnFailure(_) => status.is_failed(), + _ => false, + } + } + + pub(crate) fn restart_params(&self) -> Option { + match &self.mode { + RestartMode::Always(params) | RestartMode::OnFailure(params) => Some(*params), + _ => None, + } + } +} + +/// Restart parameters for the backoff strategy when an actor restarts based on +/// the [RestartPolicy]. +#[derive(Debug, Copy, Clone, PartialEq)] +pub struct RestartParams { + pub(crate) min_backoff: Duration, + pub(crate) max_backoff: Duration, + pub(crate) auto_reset: Duration, + pub(crate) max_retries: NonZeroU64, + pub(crate) factor: f64, +} + +impl RestartParams { + /// Creates a new instance with the specified minimum and maximum backoff + /// durations. The default values for `auto_reset`, `max_retries`, and + /// `factor` are set as follows: + /// - `auto_reset = min_backoff` + /// - `max_retries = NonZeroU64::MAX` + /// - `factor = 2.0` + pub fn new(min_backoff: Duration, max_backoff: Duration) -> Self { + RestartParams { + min_backoff, + max_backoff: min_backoff.max(max_backoff), + auto_reset: min_backoff, + max_retries: NonZeroU64::MAX, + factor: 2.0, + } + } + + /// Sets the duration deemed sufficient to consider an actor healthy. Once + /// this duration elapses, the backoff strategy automatically resets, + /// including retry counting, effectively treating the next attempt as the + /// first retry. Therefore, setting the `auto_reset` to small values, + /// such as [Duration::ZERO], can result in the absence of a limit on + /// the maximum number of retries. After the backoff strategy resets, the + /// actor will restart immediately. + /// + /// `None` does not change the `auto_reset` setting. + /// + /// If the function isn't used, `auto_reset = min_backoff` is used by + /// default. + pub fn auto_reset(self, auto_reset: impl Into>) -> Self { + Self { + auto_reset: auto_reset.into().unwrap_or(self.auto_reset), + ..self + } + } + + /// Sets the factor used to calculate the next backoff duration. + /// The factor should be a finite value and should not be negative; otherwise, a warning will be emitted. + /// + /// `None` value does not change the `factor` setting. + /// + /// If the function isn't used, `factor = 2.0` is used by default. + pub fn factor(self, factor: impl Into>) -> Self { + let factor = factor.into().unwrap_or(self.factor); + + let factor = if !factor.is_finite() || factor.is_sign_negative() { + warn!("factor should be a finite value and should not be negative"); + 0.0 + } else { + factor + }; + + Self { factor, ..self } + } + + /// Sets the maximum number of allowed retries. Each time the actor + /// restarts, it counts as a retry. If the retries reach the specified + /// max_retries, the actor stops restarting. If the actor lives long + /// enough to be considered healthy (see [RestartParams::auto_reset]), the + /// restart count goes back to zero, and the next restart is considered + /// the first retry again. + /// + /// `None` does not change the `max_retries` setting. + /// + /// If the function isn't used, `max_retries = NonZeroU64::MAX` is used by + /// default. + pub fn max_retries(self, max_retries: impl Into>) -> Self { + Self { + max_retries: max_retries.into().unwrap_or(self.max_retries), + ..self + } + } +} diff --git a/elfo-core/src/supervisor.rs b/elfo-core/src/supervisor.rs index 04ae1179..43dbeee5 100644 --- a/elfo-core/src/supervisor.rs +++ b/elfo-core/src/supervisor.rs @@ -11,17 +11,18 @@ use tracing::{debug, error, error_span, info, warn, Instrument, Span}; use elfo_utils::CachePadded; -use self::{backoff::Backoff, error_chain::ErrorChain, measure_poll::MeasurePoll}; +use self::{error_chain::ErrorChain, measure_poll::MeasurePoll}; use crate::{ - actor::{Actor, ActorMeta, ActorStatus}, + actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus}, config::{AnyConfig, Config, SystemConfig}, context::Context, envelope::Envelope, exec::{Exec, ExecResult}, - group::{RestartMode, RestartPolicy, TerminationPolicy}, + group::TerminationPolicy, message::Request, messages, msg, object::{GroupVisitor, Object, ObjectArc}, + restarting::{RestartBackoff, RestartPolicy}, routers::{Outcome, Router}, runtime::RuntimeManager, scope::{self, Scope, ScopeGroupShared}, @@ -30,7 +31,6 @@ use crate::{ Addr, ResponseToken, }; -mod backoff; mod error_chain; mod measure_poll; @@ -58,14 +58,14 @@ struct ControlBlock { /// Returns `None` if cannot be spawned. macro_rules! get_or_spawn { - ($this:ident, $key:expr) => {{ + ($this:ident, $key:expr, $start_info:expr) => {{ let key = $key; match $this.objects.get(&key) { Some(object) => Some(object), None => $this .objects .entry(key.clone()) - .or_try_insert_with(|| $this.spawn(key, Default::default()).ok_or(())) + .or_try_insert_with(|| $this.spawn(key, $start_info, Default::default()).ok_or(())) .map(|o| o.downgrade()) // FIXME: take an exclusive lock here. .ok(), } @@ -173,7 +173,7 @@ where let outcome = self.router.route(&envelope); if only_spawn { - self.spawn_by_outcome(outcome); + self.spawn_on_group_mounted(outcome); let token = extract_response_token::(envelope); self.context.respond(token, Ok(())); return visitor.done(); @@ -216,8 +216,9 @@ where } }); + let start_info = ActorStartInfo::on_message(); match outcome { - Outcome::Unicast(key) => match get_or_spawn!(self, key) { + Outcome::Unicast(key) => match get_or_spawn!(self, key, start_info) { Some(object) => visitor.visit_last(&object, envelope), None => visitor.empty(envelope), }, @@ -228,7 +229,7 @@ where Outcome::Multicast(list) => { for key in list.iter() { if !self.objects.contains_key(key) { - get_or_spawn!(self, key.clone()); + get_or_spawn!(self, key.clone(), start_info.clone()); } } let iter = list.into_iter().filter_map(|key| self.objects.get(&key)); @@ -266,7 +267,12 @@ where } } - fn spawn(self: &Arc, key: R::Key, mut backoff: Backoff) -> Option { + fn spawn( + self: &Arc, + key: R::Key, + start_info: ActorStartInfo, + mut backoff: RestartBackoff, + ) -> Option { let control = self.control.read(); if control.stop_spawning { return None; @@ -285,6 +291,7 @@ where ); let system_config = control.system_config.clone(); + let user_config = control .user_config .as_ref() @@ -315,7 +322,7 @@ where .on_start(); // It must be called after `entry.insert()`. - let ctx = ctx.with_addr(addr); + let ctx = ctx.with_addr(addr).with_start_info(start_info); let fut = AssertUnwindSafe(async { sv.exec.exec(ctx).await.unify() }).catch_unwind(); let new_status = match fut.await { Ok(Ok(())) => ActorStatus::TERMINATED, @@ -323,26 +330,26 @@ where Err(panic) => ActorStatus::FAILED.with_details(panic_to_string(panic)), }; - let should_restart = { + let restart_after = { let object = sv.objects.get(&key).expect("where is the current actor?"); let actor = object.as_actor().expect("a supervisor stores only actors"); - let rp_override = actor.restart_policy(); - let restart_policy = rp_override.as_ref().unwrap_or(&sv.restart_policy); - let should_restart = match restart_policy.mode { - RestartMode::Always => true, - RestartMode::OnFailures => new_status.is_failed(), - RestartMode::Never => false, - }; + let restart_policy = actor.restart_policy(); + let restarting_allowed = restart_policy.restarting_allowed(&new_status) + && !sv.control.read().stop_spawning; actor.set_status(new_status); - should_restart - }; - let need_to_restart = should_restart && !sv.control.read().stop_spawning; - if need_to_restart { - let after = backoff.next(); + restarting_allowed + .then(|| { + restart_policy + .restart_params() + .and_then(|p| backoff.next(&p)) + }) + .flatten() + }; + if let Some(after) = restart_after { if after == Duration::ZERO { debug!("actor will be restarted immediately"); } else { @@ -357,7 +364,7 @@ where scope::set_trace_id(TraceId::generate()); backoff.start(); - if let Some(object) = sv.spawn(key.clone(), backoff) { + if let Some(object) = sv.spawn(key.clone(), ActorStartInfo::on_restart(), backoff) { sv.objects.insert(key.clone(), object) } else { sv.objects.remove(&key).map(|(_, v)| v) @@ -382,9 +389,11 @@ where let actor = Actor::new( meta.clone(), addr, + self.restart_policy.clone(), self.termination_policy.clone(), self.status_subscription.clone(), ); + actor.update_config(&system_config); entry.insert(Object::new(addr, actor)); let scope = Scope::new(scope::trace_id(), addr, meta, self.scope_shared.clone()) @@ -400,14 +409,15 @@ where Some(object) } - fn spawn_by_outcome(self: &Arc, outcome: Outcome) { + fn spawn_on_group_mounted(self: &Arc, outcome: Outcome) { + let start_info = ActorStartInfo::on_group_mounted(); match outcome { Outcome::Unicast(key) => { - get_or_spawn!(self, key); + get_or_spawn!(self, key, start_info); } Outcome::Multicast(keys) => { for key in keys { - get_or_spawn!(self, key); + get_or_spawn!(self, key, start_info.clone()); } } Outcome::GentleUnicast(_) @@ -425,6 +435,7 @@ where // Update user's config. control.system_config = config.get_system().clone(); control.user_config = Some(config.get_user::().clone()); + self.router .update(control.user_config.as_ref().expect("just saved")); diff --git a/elfo-core/src/supervisor/backoff.rs b/elfo-core/src/supervisor/backoff.rs deleted file mode 100644 index 043ef33b..00000000 --- a/elfo-core/src/supervisor/backoff.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::time::Duration; - -use elfo_utils::time::Instant; - -const BACKOFF_STEP: Duration = Duration::from_secs(5); -const MAX_BACKOFF: Duration = Duration::from_secs(30); - -pub(crate) struct Backoff { - next_backoff: Duration, - start_time: Instant, -} - -impl Default for Backoff { - fn default() -> Self { - Self { - next_backoff: BACKOFF_STEP, - start_time: Instant::now(), - } - } -} - -impl Backoff { - pub(crate) fn start(&mut self) { - self.start_time = Instant::now(); - } - - pub(crate) fn next(&mut self) -> Duration { - // If an actor is alive enough time, reset the backoff. - if self.start_time.elapsed() >= BACKOFF_STEP { - self.next_backoff = Duration::ZERO; - } - - let backoff = self.next_backoff; - self.next_backoff = (self.next_backoff + BACKOFF_STEP).min(MAX_BACKOFF); - backoff - } -} - -#[cfg(test)] -mod tests { - use elfo_utils::time; - - use super::*; - - #[test] - fn it_works() { - time::with_instant_mock(|mock| { - let mut backoff = Backoff::default(); - - // Immediately failed. - assert_eq!(backoff.next(), BACKOFF_STEP); - mock.advance(BACKOFF_STEP); - backoff.start(); - - // And again. - assert_eq!(backoff.next(), 2 * BACKOFF_STEP); - mock.advance(2 * BACKOFF_STEP); - backoff.start(); - - // After some, not enough to reset the backoff, time. - mock.advance(BACKOFF_STEP * 2 / 3); - assert_eq!(backoff.next(), 3 * BACKOFF_STEP); - mock.advance(3 * BACKOFF_STEP); - backoff.start(); - - // After some, enough to reset the backoff, time. - mock.advance(BACKOFF_STEP); - assert_eq!(backoff.next(), Duration::ZERO); // resetted - backoff.start(); - - // After some, not enough to reset the backoff, time. - mock.advance(BACKOFF_STEP * 2 / 3); - assert_eq!(backoff.next(), BACKOFF_STEP); - }); - } -} diff --git a/elfo-dumper/src/actor.rs b/elfo-dumper/src/actor.rs index 4f4a883b..9d0bc83c 100644 --- a/elfo-dumper/src/actor.rs +++ b/elfo-dumper/src/actor.rs @@ -1,4 +1,4 @@ -use std::{iter, panic, sync::Arc}; +use std::{iter, panic, sync::Arc, time::Duration}; use eyre::{Result, WrapErr}; use fxhash::FxHashSet; @@ -15,7 +15,7 @@ use elfo_core::{ scope::{self, SerdeMode}, signal::{Signal, SignalKind}, time::Interval, - ActorGroup, Blueprint, Context, TerminationPolicy, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy, }; use elfo_utils::ward; @@ -252,6 +252,10 @@ pub(crate) fn new(dump_storage: Arc>) -> Blueprint { ActorGroup::new() .config::() .termination_policy(TerminationPolicy::manually()) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .router(MapRouter::new(move |envelope| { msg!(match envelope { diff --git a/elfo-logger/src/actor.rs b/elfo-logger/src/actor.rs index 118ee6c1..c8ca666c 100644 --- a/elfo-logger/src/actor.rs +++ b/elfo-logger/src/actor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use metrics::increment_counter; use tokio::{ @@ -12,7 +12,7 @@ use elfo_core::{ messages::{ConfigUpdated, Terminate}, msg, signal::{Signal, SignalKind}, - ActorGroup, Blueprint, Context, TerminationPolicy, + ActorGroup, Blueprint, Context, RestartParams, RestartPolicy, TerminationPolicy, }; use crate::{ @@ -42,6 +42,10 @@ impl Logger { ActorGroup::new() .config::() .termination_policy(TerminationPolicy::manually()) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(105) .exec(move |ctx| Logger::new(ctx, shared.clone(), filtering_layer.clone()).main()) } diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index b4cc15f2..16da04d3 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use eyre::{bail, eyre, Result, WrapErr}; use futures::StreamExt; @@ -9,7 +9,7 @@ use elfo_core::{ _priv::{GroupNo, MessageKind}, messages::ConfigUpdated, stream::Stream, - Topology, + RestartParams, Topology, }; use crate::{ @@ -90,7 +90,11 @@ impl Discovery { pub(super) async fn main(mut self) -> Result<()> { // The default restart policy of this group is `never`, so override it. - self.ctx.set_restart_policy(RestartPolicy::on_failures()); + self.ctx + .set_restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))); self.listen().await?; self.discover(); diff --git a/elfo-network/src/lib.rs b/elfo-network/src/lib.rs index 512f1ac8..bb8f2896 100644 --- a/elfo-network/src/lib.rs +++ b/elfo-network/src/lib.rs @@ -16,7 +16,7 @@ use elfo_core::{ messages::UpdateConfig, msg, routers::{MapRouter, Outcome}, - ActorGroup, Blueprint, Context, RestartPolicy, Topology, + ActorGroup, Blueprint, Context, Topology, }; use crate::{ @@ -63,8 +63,6 @@ pub fn new(topology: &Topology) -> Blueprint { ActorGroup::new() .config::() - // The restart policy is overrided by the discovery actor. - .restart_policy(RestartPolicy::never()) .stop_order(100) .router(MapRouter::new(|envelope| { msg!(match envelope { diff --git a/elfo-pinger/src/lib.rs b/elfo-pinger/src/lib.rs index 6b9f9b92..29d6eceb 100644 --- a/elfo-pinger/src/lib.rs +++ b/elfo-pinger/src/lib.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms, unreachable_pub)] -use elfo_core::{ActorGroup, Blueprint, Topology}; +use std::time::Duration; + +use elfo_core::{ActorGroup, Blueprint, RestartParams, RestartPolicy, Topology}; mod actor; mod config; @@ -9,6 +11,10 @@ pub fn new(topology: &Topology) -> Blueprint { let topology = topology.clone(); ActorGroup::new() .config::() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .exec(move |ctx| actor::exec(ctx, topology.clone())) } diff --git a/elfo-telemeter/src/actor.rs b/elfo-telemeter/src/actor.rs index 8bda21d6..315d2851 100644 --- a/elfo-telemeter/src/actor.rs +++ b/elfo-telemeter/src/actor.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use metrics::gauge; use tracing::{error, info}; use elfo_core::{ message, messages::ConfigUpdated, msg, stream::Stream, time::Interval, ActorGroup, Blueprint, - Context, SourceHandle, + Context, RestartParams, RestartPolicy, SourceHandle, }; use crate::{ @@ -31,6 +31,10 @@ struct CompactionTick; pub(crate) fn new(storage: Arc) -> Blueprint { ActorGroup::new() .config::() + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .stop_order(100) .exec(move |ctx| Telemeter::new(ctx, storage.clone()).main()) } diff --git a/elfo/tests/restarting.rs b/elfo/tests/restarting.rs index 004fd3ed..1a3a784d 100644 --- a/elfo/tests/restarting.rs +++ b/elfo/tests/restarting.rs @@ -1,23 +1,27 @@ #![cfg(feature = "test-util")] +#![allow(clippy::never_loop)] use std::{ - panic::AssertUnwindSafe, sync::{Arc, Mutex}, time::Duration, }; -use futures::FutureExt; +use toml::toml; -use elfo::{prelude::*, RestartPolicy}; - -#[message] -struct Terminate; - -#[message] -struct Terminated; +use elfo::{ + prelude::*, + routers::{MapRouter, Outcome, Singleton}, + RestartParams, RestartPolicy, +}; #[tokio::test] async fn actor_restarts_explicitly() { + #[message] + struct Terminate; + + #[message] + struct Terminated; + let blueprint = ActorGroup::new().exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { @@ -38,42 +42,86 @@ async fn actor_restarts_explicitly() { #[tokio::test(start_paused = true)] async fn actor_restarts_with_timeout_after_failures() { - let blueprint = ActorGroup::new().exec(move |mut ctx| async move { - while let Some(envelope) = ctx.recv().await { - msg!(match envelope { - Terminate { .. } => panic!("boom!"), - _ => unreachable!(), - }); - } - }); - - let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + #[message] + struct HealthCheck; - for i in 1..5 { - proxy.send(Terminate).await; + #[message] + struct Spawn; + + let blueprint = ActorGroup::new() + .router(MapRouter::new(|e| { + msg!(match e { + Spawn => Outcome::Unicast(Singleton), + // HealthCheck should not spawn the actor. + HealthCheck => Outcome::GentleUnicast(Singleton), + _ => Outcome::Discard, + }) + })) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) + .exec(move |mut ctx| async move { + while let Some(envelope) = ctx.recv().await { + msg!(match envelope { + HealthCheck | Spawn => panic!("boom!"), + _ => unreachable!(), + }); + } + }); - let r = AssertUnwindSafe(async { proxy.recv().await }) - .catch_unwind() - .await; - assert!(r.is_err()); + let proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + proxy.send(Spawn).await; + for i in 1..5 { + proxy.send(HealthCheck).await; + let delay = Duration::from_millis(((5000f64 * 2.0f64.powi(i)) as u64).min(30000)); // https://github.com/tokio-rs/tokio/issues/3985 - tokio::time::sleep(Duration::from_millis(5000 * i + 1)).await; + tokio::time::sleep(delay).await; } } #[tokio::test(start_paused = true)] -async fn actor_overrides_policy() { +async fn restart_policy_overriding() { #[message] struct Started; + let config = toml! { + [system.restart_policy] + when = "Always" + min_backoff = "10s" + max_backoff = "30s" + }; + + // The config overrides the default group policy. The default policy here is set + // to RestartPolicy::never(). let blueprint = ActorGroup::new().exec(move |ctx| async move { - ctx.set_restart_policy(RestartPolicy::always()); let _ = ctx.send(Started).await; }); - let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + let mut proxy = elfo::test::proxy(blueprint, config.clone()).await; + assert_msg!(proxy.recv().await, Started); + assert_msg!(proxy.recv().await, Started); + + // The actor overrides config policy. + let blueprint = ActorGroup::new().exec(move |ctx| async move { + ctx.set_restart_policy(RestartPolicy::never()); + let _ = ctx.send(Started).await; + }); + let mut proxy = elfo::test::proxy(blueprint, config.clone()).await; + assert_msg!(proxy.recv().await, Started); + proxy.sync().await; + assert!(proxy.try_recv().await.is_none()); + + // if the actor resets the restart policy with `None`, the restart policy is + // reverted back to the group policy. + let blueprint = ActorGroup::new().exec(move |ctx| async move { + ctx.set_restart_policy(RestartPolicy::never()); + ctx.set_restart_policy(None); + let _ = ctx.send(Started).await; + }); + let mut proxy = elfo::test::proxy(blueprint, config).await; assert_msg!(proxy.recv().await, Started); assert_msg!(proxy.recv().await, Started); } diff --git a/elfo/tests/start_info.rs b/elfo/tests/start_info.rs new file mode 100644 index 00000000..f73a76b6 --- /dev/null +++ b/elfo/tests/start_info.rs @@ -0,0 +1,55 @@ +#![cfg(feature = "test-util")] + +use std::{num::NonZeroU64, time::Duration}; + +use elfo::{ + messages::UpdateConfig, + prelude::*, + routers::{MapRouter, Outcome, Singleton}, + RestartParams, RestartPolicy, +}; + +#[tokio::test] +async fn it_works() { + #[message] + struct Start; + + #[message] + struct Restarted; + #[message] + struct GroupMounted; + #[message] + struct OnMessage; + + let blueprint = ActorGroup::new() + .router(MapRouter::new(|e| { + msg!(match e { + UpdateConfig | Start => Outcome::Unicast(Singleton), + _ => Outcome::Discard, + }) + })) + .restart_policy(RestartPolicy::on_failure( + RestartParams::new(Duration::from_secs(0), Duration::from_secs(0)) + .auto_reset(Duration::MAX) + .max_retries(NonZeroU64::new(1).unwrap()), + )) + .exec(move |ctx| async move { + if ctx.start_info().cause.is_group_mounted() { + let _ = ctx.send(GroupMounted).await; + } + if ctx.start_info().cause.is_restarted() { + let _ = ctx.send(Restarted).await; + } + if ctx.start_info().cause.is_on_message() { + let _ = ctx.send(OnMessage).await; + } + panic!("boom!"); + }); + + let mut proxy = elfo::test::proxy(blueprint, elfo::config::AnyConfig::default()).await; + assert_msg!(proxy.recv().await, GroupMounted); + assert_msg!(proxy.recv().await, Restarted); + proxy.send(Start).await; + assert_msg!(proxy.recv().await, OnMessage); + assert_msg!(proxy.recv().await, Restarted); +} diff --git a/elfo/tests/subscription_to_statuses.rs b/elfo/tests/subscription_to_statuses.rs index 3c26e2b3..cd154d98 100644 --- a/elfo/tests/subscription_to_statuses.rs +++ b/elfo/tests/subscription_to_statuses.rs @@ -9,6 +9,7 @@ use elfo::{ test::Proxy, ActorStatus, ActorStatusKind, }; +use elfo_core::{RestartParams, RestartPolicy}; #[message] struct Start(u32); @@ -27,6 +28,10 @@ async fn run_group() -> Proxy { _ => Outcome::Default, }) })) + .restart_policy(RestartPolicy::on_failure(RestartParams::new( + Duration::from_secs(5), + Duration::from_secs(30), + ))) .exec(move |mut ctx| async move { while let Some(envelope) = ctx.recv().await { msg!(match envelope { diff --git a/elfo/tests/termination.rs b/elfo/tests/termination.rs index d485ff34..609274ac 100644 --- a/elfo/tests/termination.rs +++ b/elfo/tests/termination.rs @@ -1,4 +1,5 @@ #![cfg(feature = "test-util")] +#![allow(clippy::never_loop)] use elfo::{messages::Terminate, prelude::*, TerminationPolicy};