Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker #37

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ members = [
"wavesexchange_topic",
"wavesexchange_loaders",
"wavesexchange_apis",
"wavesexchange_repos",
]
18 changes: 18 additions & 0 deletions wavesexchange_repos/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "wavesexchange_repos"
version = "0.1.0"
edition = "2021"
authors = ["Artem Sidorenko <[email protected]>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deadpool-diesel = "0.4.0"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
derive_builder = "0.11.2"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
diesel = { version = "2.0.2", features = ["postgres"] }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
envy = "0.4.2"
futures = "0.3.25"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.37"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1", features = ["macros"] }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
wavesexchange_log = { path = "../wavesexchange_log" }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions wavesexchange_repos/src/circuit_breaker/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::Duration;

use serde::Deserialize;

fn default_max_timespan_ms() -> u64 {
10000
}

fn default_max_err_count_per_timespan() -> usize {
5
}

#[derive(Deserialize)]
struct ConfigFlat {
#[serde(default = "default_max_timespan_ms")]
max_timespan_ms: u64,
#[serde(default = "default_max_err_count_per_timespan")]
max_err_count_per_timespan: usize,
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone)]
pub struct Config {
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
pub max_timespan: Duration,
pub max_err_count_per_timespan: usize,
}

pub fn load() -> Result<Config, envy::Error> {
let config_flat = envy::prefixed("CIRCUIT_BREAKER_").from_env::<ConfigFlat>()?;
plazmoid marked this conversation as resolved.
Show resolved Hide resolved

Ok(Config {
max_timespan: Duration::from_millis(config_flat.max_timespan_ms),
max_err_count_per_timespan: config_flat.max_err_count_per_timespan,
})
}
299 changes: 299 additions & 0 deletions wavesexchange_repos/src/circuit_breaker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
pub mod config;

pub use config::Config;
use wavesexchange_log::debug;

use std::{
future::Future,
mem::drop,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::RwLock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the justification of using async mutex here? It seems that it is unnecessary.
Also, what is the justification to use RwLock? It seems that every attempt to lock this mutex requires write access, so a regular Mutex would do.

Copy link
Contributor Author

@plazmoid plazmoid Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read-only from rwlock if current and previous queries were successful, so we don't need to obtain write lock and reset state every time.
See upper comm about rwlock's purpose here


/// A trait represents some data source that can fail.
/// Must be implemented for a struct that used in circuit breaker.
pub trait FallibleDataSource {
type Error;

/// Shows if error will increase CB's counter
fn is_countable_err(err: &Self::Error) -> bool;

/// Set up CB's behaviour after maximum errors limit exceeded
fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error {
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
panic!(
"CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms"
)
}
}

pub trait DataSrcInitFn<S: FallibleDataSource>:
Fn() -> Result<S, S::Error> + Send + Sync + 'static
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
{
}

impl<T, S: FallibleDataSource> DataSrcInitFn<S> for T where
T: Fn() -> Result<S, S::Error> + Send + Sync + 'static
{
}

/// Count erroneous attempts while quering some data source and perform reinitialization/fallback.
///
/// To use within an object, you must implement `FallibleDataSource` first.
///
/// Example:
/// ```rust
/// fn main() {
/// struct Repo;
/// struct RepoError;
///
/// impl FallibleDataSource for Repo {
/// type Error = RepoError;
///
/// fn is_countable_err(err: &Self::Error) -> bool {
/// true
/// }
/// }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
///
/// let cb = CircuitBreaker::builder()
/// .with_max_timespan(Duration::from_secs(1))
/// .with_max_err_count_per_timespan(5)
/// .with_init_fn(|| Ok(Repo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of this init_fn? Why not to just create that "datasource" once?
What is the real-life example when this overcomplication is really necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadpool_diesel stops working after failed db query, so its needed to be reinitialized

///
/// cb.query(|src| async move { Err(RepoError) }).unwrap_err()
/// cb.query(|src| async move { Ok(()) }).unwrap()
///
/// // see CB test for more verbose example
/// }
/// ```
pub struct CircuitBreaker<S: FallibleDataSource> {
/// Timespan that errors will be counted in.
/// After it elapsed, error counter will be resetted.
max_timespan: Duration,

/// Maximum error count per timespan. Example: 3 errors per 1 sec (max_timespan)
max_err_count_per_timespan: usize,

/// A function that may be called on every fail to reinitialize data source
init_fn: Box<dyn DataSrcInitFn<S>>,

/// Current state of CB
state: RwLock<CBState<S>>,
}

struct CBState<S: FallibleDataSource> {
data_source: Arc<S>,
err_count: usize,
first_err_ts: Option<Instant>,
}

impl<S: FallibleDataSource> CBState<S> {
fn inc(&mut self) {
self.err_count += 1;
}

fn reset(&mut self) {
self.err_count = 0;
self.first_err_ts = None;
}

fn reinit(&mut self, src: S) {
self.data_source = Arc::new(src);
}
}

pub struct CircuitBreakerBuilder<S: FallibleDataSource> {
max_timespan: Option<Duration>,
max_err_count_per_timespan: Option<usize>,
init_fn: Option<Box<dyn DataSrcInitFn<S>>>,
}

impl<S: FallibleDataSource> CircuitBreakerBuilder<S> {
pub fn new() -> CircuitBreakerBuilder<S> {
CircuitBreakerBuilder {
max_timespan: None,
max_err_count_per_timespan: None,
init_fn: None,
}
}

pub fn with_max_timespan(mut self, ts: Duration) -> CircuitBreakerBuilder<S> {
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
self.max_timespan = Some(ts);
self
}

pub fn with_max_err_count_per_timespan(mut self, count: usize) -> CircuitBreakerBuilder<S> {
self.max_err_count_per_timespan = Some(count);
self
}

pub fn with_init_fn(mut self, f: impl DataSrcInitFn<S>) -> CircuitBreakerBuilder<S> {
self.init_fn = Some(Box::new(f));
self
}

/// Build the circuit breaker.
/// Note: you should set up all 3 fields within this builder, either it will panic
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
pub fn build(self) -> Result<CircuitBreaker<S>, S::Error> {
// probably there is a better way to force use all with_* methods on builder
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
if self.max_err_count_per_timespan.is_none() {
panic!("max_err_count_per_timespan is not set");
}

if self.max_timespan.is_none() {
panic!("max_timespan is not set");
}

if self.init_fn.is_none() {
panic!("init_fn is not set");
}

let init_fn = self.init_fn.unwrap();

Ok(CircuitBreaker {
state: RwLock::new(CBState {
data_source: Arc::new(init_fn()?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of putting Arc under Mutex? It is thread-safe by design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see below

err_count: 0,
first_err_ts: None,
}),
max_timespan: self.max_timespan.unwrap(),
max_err_count_per_timespan: self.max_err_count_per_timespan.unwrap(),
init_fn,
})
}
}

impl<S: FallibleDataSource> CircuitBreaker<S> {
pub fn builder() -> CircuitBreakerBuilder<S> {
CircuitBreakerBuilder::new()
}

pub fn builder_from_cfg(cfg: &Config) -> CircuitBreakerBuilder<S> {
Self::builder()
.with_max_err_count_per_timespan(cfg.max_err_count_per_timespan)
.with_max_timespan(cfg.max_timespan)
}

/// Query the data source. If succeeded, CB resets internal error counter.
/// If error returned, counter increases.
/// If (N > max_err_count_per_timespan) errors appeared, CB is falling back (panic as default).
/// If not enough errors in a timespan appeared to trigger CB's fallback, error counter will be reset.
pub async fn query<T, F, Fut>(&self, query_fn: F) -> Result<T, S::Error>
where
F: FnOnce(Arc<S>) -> Fut,
Fut: Future<Output = Result<T, S::Error>>,
{
let state_read_lock = self.state.read().await;
let result = query_fn(state_read_lock.data_source.clone()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling (and awaiting) an external async fn while holding lock makes this query() effectively single-threaded which is unacceptable in a real-world application.

Copy link
Contributor Author

@plazmoid plazmoid Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust docs:
An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.

So, read is not blocking, and write will block thread only if query fn failed

let old_err_count = state_read_lock.err_count;

drop(state_read_lock);

if let Err(e) = &result {
if S::is_countable_err(e) {
let mut state = self.state.write().await;
state.inc();

debug!("err count: {}", state.err_count);

match state.first_err_ts {
Some(ts) => {
let elapsed = ts.elapsed();

if state.err_count <= self.max_err_count_per_timespan {
if elapsed > self.max_timespan {
state.reset();
}
} else {
return Err(state
.data_source
.fallback(elapsed.as_millis(), state.err_count));
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
}
}
None => state.first_err_ts = Some(Instant::now()),
}
state.reinit((self.init_fn)()?);
}
} else {
if old_err_count > 0 {
let mut state = self.state.write().await;
state.reset();
}
}
result
}
}

#[cfg(test)]
mod tests {
use super::*;

struct WildErrorGenerator;

impl WildErrorGenerator {
fn err(&self) -> Result<(), WildError> {
Err(WildError::Inner)
}
}

#[derive(Debug)]
enum WildError {
Inner,
CircuitBreakerTriggered,
}

impl FallibleDataSource for WildErrorGenerator {
type Error = WildError;

fn is_countable_err(err: &Self::Error) -> bool {
matches!(err, WildError::Inner)
}

fn fallback(&self, _elapsed_ms: u128, _err_count: usize) -> Self::Error {
WildError::CircuitBreakerTriggered
}
}

#[tokio::test]
async fn circuit_breaker() {
let cb = CircuitBreaker::builder()
.with_max_timespan(Duration::from_secs(1))
.with_max_err_count_per_timespan(2)
.with_init_fn(|| Ok(WildErrorGenerator))
.build()
.unwrap();

// trigger 2 errors in cb
assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

// reset cb state with successful query
assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ());

// trigger 3 errors in cb (max errors limit exceeded)
assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

// cb fallback
assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::CircuitBreakerTriggered
));

assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ());
}
}
3 changes: 3 additions & 0 deletions wavesexchange_repos/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod circuit_breaker;

pub use circuit_breaker::CircuitBreaker;
plazmoid marked this conversation as resolved.
Show resolved Hide resolved