Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker #37

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ members = [
"wavesexchange_topic",
"wavesexchange_loaders",
"wavesexchange_apis",
"wavesexchange_repos",
]
18 changes: 18 additions & 0 deletions wavesexchange_repos/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "wavesexchange_repos"
version = "0.1.0"
edition = "2021"
authors = ["Artem Sidorenko <[email protected]>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deadpool-diesel = "0.4.0"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
derive_builder = "0.11.2"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
diesel = { version = "2.0.2", features = ["postgres"] }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
envy = "0.4.2"
futures = "0.3.25"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.37"
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1", features = ["macros"] }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
wavesexchange_log = { path = "../wavesexchange_log" }
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions wavesexchange_repos/src/circuit_breaker/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::Duration;

use serde::Deserialize;

fn default_max_timespan_ms() -> u64 {
10000
}

fn default_max_err_count_per_timespan() -> usize {
5
}

#[derive(Deserialize)]
struct ConfigFlat {
#[serde(default = "default_max_timespan_ms")]
max_timespan_ms: u64,
#[serde(default = "default_max_err_count_per_timespan")]
max_err_count_per_timespan: usize,
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone)]
pub struct Config {
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
pub max_timespan: Duration,
pub max_err_count_per_timespan: usize,
}

pub fn load() -> Result<Config, envy::Error> {
let config_flat = envy::prefixed("CIRCUIT_BREAKER_").from_env::<ConfigFlat>()?;
plazmoid marked this conversation as resolved.
Show resolved Hide resolved

Ok(Config {
max_timespan: Duration::from_millis(config_flat.max_timespan_ms),
max_err_count_per_timespan: config_flat.max_err_count_per_timespan,
})
}
249 changes: 249 additions & 0 deletions wavesexchange_repos/src/circuit_breaker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
pub mod config;

pub use config::Config;
use wavesexchange_log::debug;

use std::{
future::Future,
mem::drop,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::RwLock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the justification of using async mutex here? It seems that it is unnecessary.
Also, what is the justification to use RwLock? It seems that every attempt to lock this mutex requires write access, so a regular Mutex would do.

Copy link
Contributor Author

@plazmoid plazmoid Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read-only from rwlock if current and previous queries were successful, so we don't need to obtain write lock and reset state every time.
See upper comm about rwlock's purpose here


pub trait FallibleDataSource {
type Error;

fn is_countable_err(err: &Self::Error) -> bool;

fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error {
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
panic!(
"CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms"
)
}
}

pub trait DataSrcInitFn<S: FallibleDataSource>:
Fn() -> Result<S, S::Error> + Send + Sync + 'static
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
{
}

impl<T, S: FallibleDataSource> DataSrcInitFn<S> for T where
T: Fn() -> Result<S, S::Error> + Send + Sync + 'static
{
}

pub struct CircuitBreaker<S: FallibleDataSource> {
max_timespan: Duration,
max_err_count_per_timespan: usize,
init_fn: Box<dyn DataSrcInitFn<S>>,
state: RwLock<CBState<S>>,
}

struct CBState<S: FallibleDataSource> {
data_source: Arc<S>,
err_count: usize,
first_err_ts: Option<Instant>,
}

impl<S: FallibleDataSource> CBState<S> {
fn inc(&mut self) {
self.err_count += 1;
}

fn reset(&mut self) {
self.err_count = 0;
self.first_err_ts = None;
}

fn reinit(&mut self, src: S) {
self.data_source = Arc::new(src);
}
}

pub struct CircuitBreakerBuilder<S: FallibleDataSource> {
max_timespan: Option<Duration>,
max_err_count_per_timespan: Option<usize>,
init_fn: Option<Box<dyn DataSrcInitFn<S>>>,
}

impl<S: FallibleDataSource> CircuitBreakerBuilder<S> {
pub fn new() -> CircuitBreakerBuilder<S> {
CircuitBreakerBuilder {
max_timespan: None,
max_err_count_per_timespan: None,
init_fn: None,
}
}

pub fn with_max_timespan(mut self, ts: Duration) -> CircuitBreakerBuilder<S> {
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
self.max_timespan = Some(ts);
self
}

pub fn with_max_err_count_per_timespan(mut self, count: usize) -> CircuitBreakerBuilder<S> {
self.max_err_count_per_timespan = Some(count);
self
}

pub fn with_init_fn(mut self, f: impl DataSrcInitFn<S>) -> CircuitBreakerBuilder<S> {
self.init_fn = Some(Box::new(f));
self
}

pub fn build(self) -> Result<CircuitBreaker<S>, S::Error> {
// probably there is a better way to force use all with_* methods on builder
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
if self.max_err_count_per_timespan.is_none() {
panic!("max_err_count_per_timespan is not set");
}

if self.max_timespan.is_none() {
panic!("max_timespan is not set");
}

if self.init_fn.is_none() {
panic!("init_fn is not set");
}

let init_fn = self.init_fn.unwrap();

Ok(CircuitBreaker {
state: RwLock::new(CBState {
data_source: Arc::new(init_fn()?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of putting Arc under Mutex? It is thread-safe by design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see below

err_count: 0,
first_err_ts: None,
}),
max_timespan: self.max_timespan.unwrap(),
max_err_count_per_timespan: self.max_err_count_per_timespan.unwrap(),
init_fn,
})
}
}

impl<S: FallibleDataSource> CircuitBreaker<S> {
pub fn builder() -> CircuitBreakerBuilder<S> {
CircuitBreakerBuilder::new()
}

pub fn builder_from_cfg(cfg: &Config) -> CircuitBreakerBuilder<S> {
Self::builder()
.with_max_err_count_per_timespan(cfg.max_err_count_per_timespan)
.with_max_timespan(cfg.max_timespan)
}

pub async fn query<T, F, Fut>(&self, query_fn: F) -> Result<T, S::Error>
where
F: FnOnce(Arc<S>) -> Fut,
Fut: Future<Output = Result<T, S::Error>>,
{
let state_read_lock = self.state.read().await;
let result = query_fn(state_read_lock.data_source.clone()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling (and awaiting) an external async fn while holding lock makes this query() effectively single-threaded which is unacceptable in a real-world application.

Copy link
Contributor Author

@plazmoid plazmoid Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust docs:
An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.

So, read is not blocking, and write will block thread only if query fn failed


drop(state_read_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks silly - the thread-safe dasta structure such as Arc was put under mutex, even async mutex, and now we need to lock that mutex just to clone the Arc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CB is designed to interfere client's code as less as possible. RwLock is used here to provide interior state mutability, without it, the end client will suffer while refactoring &self to &mut self.
Arc is used to pass repo to query function. I'm not using & due to lifetime problems (query_fn is async)


if let Err(e) = &result {
if S::is_countable_err(e) {
let mut state = self.state.write().await;
state.inc();

debug!("err count: {}", state.err_count);

match state.first_err_ts {
Some(ts) => {
let elapsed = ts.elapsed();

if state.err_count <= self.max_err_count_per_timespan {
if elapsed > self.max_timespan {
state.reset();
}
} else {
return Err(state
.data_source
.fallback(elapsed.as_millis(), state.err_count));
plazmoid marked this conversation as resolved.
Show resolved Hide resolved
}
}
None => state.first_err_ts = Some(Instant::now()),
}
state.reinit((self.init_fn)()?);
}
} else {
let mut state = self.state.write().await;
state.reset();
}
result
}
}

#[cfg(test)]
mod tests {
use super::*;

struct WildErrorGenerator;

impl WildErrorGenerator {
fn err(&self) -> Result<(), WildError> {
Err(WildError::Inner)
}
}

#[derive(Debug)]
enum WildError {
Inner,
CircuitBreakerTriggered,
}

impl FallibleDataSource for WildErrorGenerator {
type Error = WildError;

fn is_countable_err(err: &Self::Error) -> bool {
matches!(err, WildError::Inner)
}

fn fallback(&self, _elapsed_ms: u128, _err_count: usize) -> Self::Error {
WildError::CircuitBreakerTriggered
}
}

#[tokio::test]
async fn circuit_breaker() {
let cb = CircuitBreaker::builder()
.with_max_timespan(Duration::from_secs(1))
.with_max_err_count_per_timespan(2)
.with_init_fn(|| Ok(WildErrorGenerator))
.build()
.unwrap();

// trigger 2 errors in cb
assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

// reset cb state with successful query
assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ());

// trigger 3 errors in cb (max errors limit exceeded)
assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::Inner
));

// cb fallback
assert!(matches!(
cb.query(|weg| async move { weg.err() }).await.unwrap_err(),
WildError::CircuitBreakerTriggered
));

assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ());
}
}
3 changes: 3 additions & 0 deletions wavesexchange_repos/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod circuit_breaker;

pub use circuit_breaker::CircuitBreaker;
plazmoid marked this conversation as resolved.
Show resolved Hide resolved