Skip to content

Commit

Permalink
feat(core): the restarting policy can be overridden via configuration…
Browse files Browse the repository at this point in the history
… for each actor.

`RestartParams` have been added for `RestartPolicy::always(..)` and `RestartPolicy::on_failure(..)`. The linear backoff has been replaced with an exponential approach, with a configurable limit for restarts.

BREAKING CHANGE: The default RestartPolicy is set to `RestartPolicy::never()`.
  • Loading branch information
sargarass committed Dec 20, 2023
1 parent a710c52 commit 1c2bdba
Show file tree
Hide file tree
Showing 25 changed files with 701 additions and 201 deletions.
6 changes: 5 additions & 1 deletion elfo-configurer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use elfo_core::{
},
msg, scope,
signal::{Signal, SignalKind},
ActorGroup, ActorStatus, Addr, Blueprint, Context, Topology,
ActorGroup, ActorStatus, Addr, Blueprint, Context, RestartParams, RestartPolicy, Topology,
};

pub use self::protocol::*;
Expand All @@ -48,6 +48,10 @@ fn blueprint(topology: &Topology, source: ConfigSource) -> Blueprint {
let topology = topology.clone();
ActorGroup::new()
.stop_order(100)
.restart_policy(RestartPolicy::on_failure(RestartParams::new(
Duration::from_secs(5),
Duration::from_secs(30),
)))
.exec(move |ctx| Configurer::new(ctx, topology.clone(), source.clone()).main())
}

Expand Down
1 change: 1 addition & 0 deletions elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ regex = "1.6.0"
thread_local = { version = "1.1.3", optional = true }
unicycle = "0.9.3"
rmp-serde = { version = "1.1.0", optional = true }
humantime-serde = "1"

[dev-dependencies]
elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] }
Expand Down
100 changes: 92 additions & 8 deletions elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};

use crate::{
config::SystemConfig,
envelope::Envelope,
errors::{SendError, TrySendError},
group::{RestartPolicy, TerminationPolicy},
group::TerminationPolicy,
mailbox::{Mailbox, RecvResult},
messages::{ActorStatusReport, Terminate},
messages::{ActorStatusReport, Terminate, UpdateConfig},
msg,
request_table::RequestTable,
restarting::RestartPolicy,
scope,
subscription::SubscriptionManager,
Addr,
Expand Down Expand Up @@ -117,11 +119,70 @@ impl ActorStatusKind {
}
}

// === ActorStartInfo ===

/// A struct holding information related to an actor start.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ActorStartInfo {
/// The cause for the actor start, indicating why the actor is being
/// initialized.
pub cause: ActorStartCause,
}

/// An enum representing various causes for an actor to start.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ActorStartCause {
/// The actor started because its group was mounted.
GroupMounted,
/// The actor started in response to a message.
OnMessage,
/// The actor started due to the restart policy.
Restarted,
}

impl ActorStartInfo {
pub(crate) fn on_group_mounted() -> Self {
Self {
cause: ActorStartCause::GroupMounted,
}
}

pub(crate) fn on_message() -> Self {
Self {
cause: ActorStartCause::OnMessage,
}
}

pub(crate) fn on_restart() -> Self {
Self {
cause: ActorStartCause::Restarted,
}
}
}

impl ActorStartCause {
pub fn is_group_mounted(&self) -> bool {
matches!(self, ActorStartCause::GroupMounted)
}

pub fn is_restarted(&self) -> bool {
matches!(self, ActorStartCause::Restarted)
}

pub fn is_on_message(&self) -> bool {
matches!(self, ActorStartCause::OnMessage)
}
}

// === Actor ===

pub(crate) struct Actor {
meta: Arc<ActorMeta>,
termination_policy: TerminationPolicy,
// blueprint restart policy
default_restart_policy: RestartPolicy,
mailbox: Mailbox,
request_table: RequestTable,
control: RwLock<ControlBlock>,
Expand All @@ -131,25 +192,29 @@ pub(crate) struct Actor {

struct ControlBlock {
status: ActorStatus,
/// If `None`, a group's policy will be used.
restart_policy: Option<RestartPolicy>,
/// If `None`, `group_restart_policy will be used.
rp_override: Option<RestartPolicy>,
group_restart_policy: RestartPolicy,
}

impl Actor {
pub(crate) fn new(
meta: Arc<ActorMeta>,
addr: Addr,
restart_policy: RestartPolicy,
termination_policy: TerminationPolicy,
status_subscription: Arc<SubscriptionManager>,
) -> Self {
Actor {
meta,
default_restart_policy: restart_policy.clone(),
termination_policy,
mailbox: Mailbox::new(),
request_table: RequestTable::new(addr),
control: RwLock::new(ControlBlock {
status: ActorStatus::INITIALIZING,
restart_policy: None,
rp_override: None,
group_restart_policy: restart_policy,
}),
finished: ManualResetEvent::new(false),
status_subscription,
Expand All @@ -167,6 +232,9 @@ impl Actor {

pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
msg!(match &envelope {
UpdateConfig { config } => {
self.update_config(config.get_system());
}
Terminate { closing } => {
if *closing || self.termination_policy.close_mailbox {
if self.close() {
Expand All @@ -181,8 +249,20 @@ impl Actor {
self.mailbox.try_send(envelope)
}

pub(crate) fn update_config(&self, config: &SystemConfig) {
let restart_policy = config
.restart_policy
.make_policy()
.unwrap_or(self.default_restart_policy.clone());

self.control.write().group_restart_policy = restart_policy;
}

pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
msg!(match &envelope {
UpdateConfig { config } => {
self.update_config(config.get_system());
}
Terminate { closing } => {
if *closing || self.termination_policy.close_mailbox {
if self.close() {
Expand All @@ -209,12 +289,16 @@ impl Actor {
&self.request_table
}

pub(crate) fn restart_policy(&self) -> Option<RestartPolicy> {
self.control.read().restart_policy.clone()
pub(crate) fn restart_policy(&self) -> RestartPolicy {
let control = self.control.read();
control
.rp_override
.clone()
.unwrap_or(control.group_restart_policy.clone())
}

pub(crate) fn set_restart_policy(&self, policy: Option<RestartPolicy>) {
self.control.write().restart_policy = policy;
self.control.write().rp_override = policy;
}

// Note that this method should be called inside a right scope.
Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub(crate) struct SystemConfig {
pub(crate) logging: crate::logging::LoggingConfig,
pub(crate) dumping: crate::dumping::DumpingConfig,
pub(crate) telemetry: crate::telemetry::TelemetryConfig,
pub(crate) restart_policy: crate::restarting::RestartPolicyConfig,
}

// === Secret ===
Expand Down
48 changes: 46 additions & 2 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use tracing::{info, trace};
use elfo_utils::unlikely;

use crate::{
actor::{Actor, ActorStatus},
actor::{Actor, ActorStartInfo, ActorStatus},
addr::Addr,
address_book::AddressBook,
config::AnyConfig,
demux::Demux,
dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind},
errors::{RequestError, SendError, TryRecvError, TrySendError},
group::RestartPolicy,
mailbox::RecvResult,
message::{Message, Request},
messages, msg,
object::ObjectArc,
request_table::ResponseToken,
restarting::RestartPolicy,
routers::Singleton,
scope,
source::{SourceHandle, Sources, UnattachedSource},
Expand All @@ -38,6 +38,7 @@ pub struct Context<C = (), K = Singleton> {
book: AddressBook,
actor: Option<ObjectArc>, // `None` for group's and pruned context.
actor_addr: Addr,
actor_start_info: Option<ActorStartInfo>, // `None` for group's context,
group_addr: Addr,
demux: Demux,
config: Arc<C>,
Expand Down Expand Up @@ -624,6 +625,39 @@ impl<C, K> Context<C, K> {
}
}

/// Retrieves information related to the start of the actor.
///
/// # Panics
///
/// This method will panic if the context is pruned, indicating that the
/// required information is no longer available.
///
/// # Example
///
/// ```
/// # use elfo_core as elfo;
/// # use elfo_core::{ActorStartCause, ActorStartInfo};
/// # async fn exec(mut ctx: elfo::Context) {
/// match ctx.start_info().cause {
/// ActorStartCause::GroupMounted => {
/// // The actor started because its group was mounted.
/// }
/// ActorStartCause::OnMessage => {
/// // The actor started in response to a message.
/// }
/// ActorStartCause::Restarted => {
/// // The actor started due to the restart policy.
/// }
/// _ => {}
/// }
/// # }
/// ```
pub fn start_info(&self) -> &ActorStartInfo {
self.actor_start_info
.as_ref()
.expect("start_info is not available for a group context")
}

fn pre_recv(&mut self) {
self.stats.on_recv();

Expand Down Expand Up @@ -718,6 +752,7 @@ impl<C, K> Context<C, K> {
book: self.book.clone(),
actor: None,
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info.clone(),
group_addr: self.group_addr,
demux: self.demux.clone(),
config: Arc::new(()),
Expand All @@ -740,6 +775,7 @@ impl<C, K> Context<C, K> {
book: self.book,
actor: self.actor,
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info,
group_addr: self.group_addr,
demux: self.demux,
config,
Expand All @@ -764,11 +800,17 @@ impl<C, K> Context<C, K> {
self
}

pub(crate) fn with_start_info(mut self, actor_start_info: ActorStartInfo) -> Self {
self.actor_start_info = Some(actor_start_info);
self
}

pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1> {
Context {
book: self.book,
actor: self.actor,
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info,
group_addr: self.group_addr,
demux: self.demux,
config: self.config,
Expand Down Expand Up @@ -832,6 +874,7 @@ impl Context {
actor: None,
actor_addr: Addr::NULL,
group_addr: Addr::NULL,
actor_start_info: None,
demux,
config: Arc::new(()),
key: Singleton,
Expand All @@ -850,6 +893,7 @@ impl<C, K: Clone> Clone for Context<C, K> {
book: self.book.clone(),
actor: self.book.get_owned(self.actor_addr),
actor_addr: self.actor_addr,
actor_start_info: self.actor_start_info.clone(),
group_addr: self.group_addr,
demux: self.demux.clone(),
config: self.config.clone(),
Expand Down
44 changes: 4 additions & 40 deletions elfo-core/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
envelope::Envelope,
exec::{Exec, ExecResult},
object::{GroupHandle, GroupVisitor, Object},
restarting::RestartPolicy,
routers::Router,
runtime::RuntimeManager,
supervisor::Supervisor,
Expand Down Expand Up @@ -47,13 +48,15 @@ impl<R, C> ActorGroup<R, C> {
}

/// The behaviour on actor termination.
/// `RestartPolicy::on_failures` is used by default.
///
/// `RestartPolicy::never` is used by default.
pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
self.restart_policy = policy;
self
}

/// The behaviour on the `Terminate` message.
///
/// `TerminationPolicy::closing` is used by default.
pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self {
self.termination_policy = policy;
Expand Down Expand Up @@ -183,42 +186,3 @@ impl TerminationPolicy {

// TODO: add `stop_spawning`?
}

/// The behaviour on actor termination.
#[derive(Debug, Clone)]
pub struct RestartPolicy {
pub(crate) mode: RestartMode,
}

impl Default for RestartPolicy {
fn default() -> Self {
Self::on_failures()
}
}

#[derive(Debug, Clone)]
pub(crate) enum RestartMode {
Always,
OnFailures,
Never,
}

impl RestartPolicy {
pub fn always() -> Self {
Self {
mode: RestartMode::Always,
}
}

pub fn on_failures() -> Self {
Self {
mode: RestartMode::OnFailures,
}
}

pub fn never() -> Self {
Self {
mode: RestartMode::Never,
}
}
}
Loading

0 comments on commit 1c2bdba

Please sign in to comment.