From 905a21c9198175b3f386ee226a5c6c5514075ef5 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Mon, 21 Nov 2022 08:39:09 +0300 Subject: [PATCH 01/21] initial circuit_breaker --- Cargo.toml | 1 + wavesexchange_repos/Cargo.toml | 14 ++++ wavesexchange_repos/src/circuit_breaker.rs | 88 ++++++++++++++++++++++ wavesexchange_repos/src/lib.rs | 4 + 4 files changed, 107 insertions(+) create mode 100644 wavesexchange_repos/Cargo.toml create mode 100644 wavesexchange_repos/src/circuit_breaker.rs create mode 100644 wavesexchange_repos/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index bff7e01..9394714 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,5 @@ members = [ "wavesexchange_topic", "wavesexchange_loaders", "wavesexchange_apis", + "wavesexchange_repos", ] diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml new file mode 100644 index 0000000..06720e2 --- /dev/null +++ b/wavesexchange_repos/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "wavesexchange_repos" +version = "0.1.0" +edition = "2021" +authors = ["Artem Sidorenko "] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.58" +deadpool-diesel = "0.4.0" +derive_builder = "0.11.2" +diesel = { version = "2.0.2", features = ["postgres"] } +thiserror = "1.0.37" \ No newline at end of file diff --git a/wavesexchange_repos/src/circuit_breaker.rs b/wavesexchange_repos/src/circuit_breaker.rs new file mode 100644 index 0000000..a3d2a48 --- /dev/null +++ b/wavesexchange_repos/src/circuit_breaker.rs @@ -0,0 +1,88 @@ +/* +послать запрос к удалённому ресурсу +если пришла ошибка соединения, то зафиксировать и вернуть ошибку в Result +если разрывы соединений продолжаются, выкинуть панику (или вызвать соотв обработчик) +разрывы соединений: + разные бд (pg, redis), разные пулы (bb8, deadpool, r2d2), одиночный запрос, возможность расширения +*/ +use derive_builder::Builder; +use std::{ + future::Future, + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +#[derive(Debug, Builder)] +#[builder(pattern = "owned")] +pub struct CircuitBreaker { + repo: Repo, + #[builder(setter(skip))] + err_count: usize, // current errors count + #[builder(setter(skip))] + first_err_ts: Option, + max_timespan: Duration, // максимальный временной промежуток, в котором будут считаться ошибки + #[builder(setter(custom))] + max_err_count_per_timespan: NonZeroUsize, +} + +impl CircuitBreakerBuilder { + pub fn max_err_count_per_timespan(&mut self, ts: usize) { + self.max_err_count_per_timespan = NonZeroUsize::new(ts) + } +} + +impl CircuitBreaker { + pub fn builder() -> CircuitBreakerBuilder { + CircuitBreakerBuilder::default() + } + + pub async fn query(&mut self, query_fn: F) -> Result + where + F: Fn(&Repo) -> Fut, + Fut: Future>, + { + let result = query_fn(&self.repo).await; + + if let Err(e) = &result { + if Repo::is_countable_err(e) { + self.err_count += 1; + match self.first_err_ts { + Some(ts) if ts.elapsed() <= self.max_timespan => { + if self.err_count > self.max_err_count_per_timespan.get() { + return Err(self.repo.fallback()); + } + } + None => self.first_err_ts = Some(Instant::now()), + _ => {} + } + } + } else { + self.err_count = 0; + self.first_err_ts = None; + } + result + } +} + +#[async_trait] +pub trait FallibleRepo { + type Error; + + async fn init(cfg: &C) -> Self; + + fn is_countable_err(err: &Self::Error) -> bool; + + fn fallback(&self) -> Self::Error { + panic!("Я ГОВОРЮ НЕ БЫЛО РАЗРЫВОВ СВЯЗИ! С НОЯБРЯ ПРОШЛОГО ГОДА! А СЕЙЧАС ЦЕЛЫХ ЧЕТЫРЕ РАЗРЫВА БЫЛО!") + } +} + +pub mod impls { + use super::*; +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("CircuitBreakerBuilderError: {0}")] + BuilderError(String), +} diff --git a/wavesexchange_repos/src/lib.rs b/wavesexchange_repos/src/lib.rs new file mode 100644 index 0000000..e8fd4e2 --- /dev/null +++ b/wavesexchange_repos/src/lib.rs @@ -0,0 +1,4 @@ +#[macro_use] +extern crate async_trait; + +pub mod circuit_breaker; From 7a721065a8d0ace890e82453d20844060fe82da8 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Mon, 21 Nov 2022 08:46:30 +0300 Subject: [PATCH 02/21] owned max_err_count --- wavesexchange_repos/src/circuit_breaker.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker.rs b/wavesexchange_repos/src/circuit_breaker.rs index a3d2a48..df67d8b 100644 --- a/wavesexchange_repos/src/circuit_breaker.rs +++ b/wavesexchange_repos/src/circuit_breaker.rs @@ -26,8 +26,9 @@ pub struct CircuitBreaker { } impl CircuitBreakerBuilder { - pub fn max_err_count_per_timespan(&mut self, ts: usize) { - self.max_err_count_per_timespan = NonZeroUsize::new(ts) + pub fn max_err_count_per_timespan(mut self, ts: usize) -> CircuitBreakerBuilder { + self.max_err_count_per_timespan = NonZeroUsize::new(ts); + self } } From 7b1557f5d71d438dc451b7a89b4841e741a22621 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 01:03:42 +0300 Subject: [PATCH 03/21] rest --- wavesexchange_repos/src/circuit_breaker.rs | 106 ++++++++++++++++----- 1 file changed, 84 insertions(+), 22 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker.rs b/wavesexchange_repos/src/circuit_breaker.rs index df67d8b..1074019 100644 --- a/wavesexchange_repos/src/circuit_breaker.rs +++ b/wavesexchange_repos/src/circuit_breaker.rs @@ -5,57 +5,106 @@ разрывы соединений: разные бд (pg, redis), разные пулы (bb8, deadpool, r2d2), одиночный запрос, возможность расширения */ -use derive_builder::Builder; use std::{ future::Future, num::NonZeroUsize, time::{Duration, Instant}, }; -#[derive(Debug, Builder)] -#[builder(pattern = "owned")] -pub struct CircuitBreaker { - repo: Repo, - #[builder(setter(skip))] +pub struct CircuitBreaker { + data_source: S, err_count: usize, // current errors count - #[builder(setter(skip))] first_err_ts: Option, max_timespan: Duration, // максимальный временной промежуток, в котором будут считаться ошибки - #[builder(setter(custom))] max_err_count_per_timespan: NonZeroUsize, + init_fn: Box S>, } -impl CircuitBreakerBuilder { - pub fn max_err_count_per_timespan(mut self, ts: usize) -> CircuitBreakerBuilder { +pub struct CircuitBreakerBuilder { + max_timespan: Option, + max_err_count_per_timespan: Option, + init_fn: Option S>>, +} + +impl CircuitBreakerBuilder { + pub fn new() -> CircuitBreakerBuilder { + CircuitBreakerBuilder { + max_timespan: None, + max_err_count_per_timespan: None, + init_fn: None, + } + } + + pub fn max_timespan(mut self, ts: usize) -> CircuitBreakerBuilder { self.max_err_count_per_timespan = NonZeroUsize::new(ts); self } + + pub fn max_err_count_per_timespan(mut self, count: usize) -> CircuitBreakerBuilder { + self.max_err_count_per_timespan = NonZeroUsize::new(count); + self + } + + pub fn init_fn(mut self, f: impl Fn() -> S + 'static) -> CircuitBreakerBuilder { + self.init_fn = Some(Box::new(f)); + self + } + + pub fn build(self) -> Result, Error> { + let build_err = |s: &str| Err(Error::BuilderError(s.to_string())); + + if self.max_err_count_per_timespan.is_none() { + return build_err("max_err_count_per_timespan is not set"); + } + + if self.max_timespan.is_none() { + return build_err("max_timespan is not set"); + } + + if self.init_fn.is_none() { + return build_err("init_fn is not set"); + } + + let init_fn = self.init_fn.unwrap(); + + Ok(CircuitBreaker { + data_source: init_fn(), + err_count: 0, + first_err_ts: None, + max_timespan: self.max_timespan.unwrap(), + max_err_count_per_timespan: self.max_err_count_per_timespan.unwrap(), + init_fn, + }) + } } -impl CircuitBreaker { - pub fn builder() -> CircuitBreakerBuilder { - CircuitBreakerBuilder::default() +impl CircuitBreaker { + pub fn builder() -> CircuitBreakerBuilder { + CircuitBreakerBuilder::new() } - pub async fn query(&mut self, query_fn: F) -> Result + pub async fn query(&mut self, query_fn: F) -> Result where - F: Fn(&Repo) -> Fut, - Fut: Future>, + F: Fn(&S) -> Fut, + Fut: Future>, { - let result = query_fn(&self.repo).await; + let result = query_fn(&self.data_source).await; if let Err(e) = &result { - if Repo::is_countable_err(e) { + if S::is_countable_err(e) { self.err_count += 1; match self.first_err_ts { Some(ts) if ts.elapsed() <= self.max_timespan => { if self.err_count > self.max_err_count_per_timespan.get() { - return Err(self.repo.fallback()); + return Err(self.data_source.fallback()); } } None => self.first_err_ts = Some(Instant::now()), _ => {} } + if S::REINIT_ON_FAIL { + self.data_source = (self.init_fn)(); + } } } else { self.err_count = 0; @@ -66,11 +115,10 @@ impl CircuitBreaker { } #[async_trait] -pub trait FallibleRepo { +pub trait FallibleDataSource { + const REINIT_ON_FAIL: bool; type Error; - async fn init(cfg: &C) -> Self; - fn is_countable_err(err: &Self::Error) -> bool; fn fallback(&self) -> Self::Error { @@ -80,6 +128,20 @@ pub trait FallibleRepo { pub mod impls { use super::*; + use deadpool_diesel::{Manager, Pool}; + use diesel::pg::PgConnection; + use diesel::result::Error as DslError; + + pub struct DeadpoolPgBreaker(Pool>); + + impl FallibleDataSource for DeadpoolPgBreaker { + const REINIT_ON_FAIL: bool = true; + type Error = DslError; + + fn is_countable_err(err: &Self::Error) -> bool { + err.to_string().contains("no connection to the server") + } + } } #[derive(Debug, thiserror::Error)] From 72053e7cc11daa45f8608bf51e86d84e27edaa27 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 01:29:36 +0300 Subject: [PATCH 04/21] split into submodules --- wavesexchange_repos/Cargo.toml | 5 +-- .../src/circuit_breaker/config.rs | 36 +++++++++++++++++++ .../src/circuit_breaker/error.rs | 8 +++++ .../src/circuit_breaker/impls.rs | 16 +++++++++ .../mod.rs} | 30 +++------------- wavesexchange_repos/src/lib.rs | 3 -- 6 files changed, 68 insertions(+), 30 deletions(-) create mode 100644 wavesexchange_repos/src/circuit_breaker/config.rs create mode 100644 wavesexchange_repos/src/circuit_breaker/error.rs create mode 100644 wavesexchange_repos/src/circuit_breaker/impls.rs rename wavesexchange_repos/src/{circuit_breaker.rs => circuit_breaker/mod.rs} (87%) diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index 06720e2..c987c02 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -7,8 +7,9 @@ authors = ["Artem Sidorenko "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.58" deadpool-diesel = "0.4.0" derive_builder = "0.11.2" diesel = { version = "2.0.2", features = ["postgres"] } -thiserror = "1.0.37" \ No newline at end of file +envy = "0.4.2" +serde = { version = "1.0.147", features = ["derive"] } +thiserror = "1.0.37" diff --git a/wavesexchange_repos/src/circuit_breaker/config.rs b/wavesexchange_repos/src/circuit_breaker/config.rs new file mode 100644 index 0000000..7b4a921 --- /dev/null +++ b/wavesexchange_repos/src/circuit_breaker/config.rs @@ -0,0 +1,36 @@ +use std::time::Duration; + +use serde::Deserialize; + +use super::error::Error; + +fn default_max_timespan_ms() -> u64 { + 10000 +} + +fn default_max_err_count_per_timespan() -> usize { + 5 +} + +#[derive(Deserialize)] +struct ConfigFlat { + #[serde(default = "default_max_timespan_ms")] + max_timespan_ms: u64, + #[serde(default = "default_max_err_count_per_timespan")] + max_err_count_per_timespan: usize, +} + +#[derive(Debug, Clone)] +pub struct Config { + pub max_timespan: Duration, + pub max_err_count_per_timespan: usize, +} + +pub fn load() -> Result { + let config_flat = envy::prefixed("CIRCUIT_BREAKER_").from_env::()?; + + Ok(Config { + max_timespan: Duration::from_millis(config_flat.max_timespan_ms), + max_err_count_per_timespan: config_flat.max_err_count_per_timespan, + }) +} diff --git a/wavesexchange_repos/src/circuit_breaker/error.rs b/wavesexchange_repos/src/circuit_breaker/error.rs new file mode 100644 index 0000000..1ccb3ee --- /dev/null +++ b/wavesexchange_repos/src/circuit_breaker/error.rs @@ -0,0 +1,8 @@ +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("CircuitBreaker BuilderError: {0}")] + BuilderError(String), + + #[error("CircuitBreaker ConfigLoadError: {0}")] + ConfigLoadError(#[from] envy::Error), +} diff --git a/wavesexchange_repos/src/circuit_breaker/impls.rs b/wavesexchange_repos/src/circuit_breaker/impls.rs new file mode 100644 index 0000000..961d583 --- /dev/null +++ b/wavesexchange_repos/src/circuit_breaker/impls.rs @@ -0,0 +1,16 @@ + +use super::*; +use deadpool_diesel::{Manager, Pool}; +use diesel::pg::PgConnection; +use diesel::result::Error as DslError; + +pub struct DeadpoolPgBreaker(Pool>); + +impl FallibleDataSource for DeadpoolPgBreaker { + const REINIT_ON_FAIL: bool = true; + type Error = DslError; + + fn is_countable_err(err: &Self::Error) -> bool { + err.to_string().contains("no connection to the server") + } +} diff --git a/wavesexchange_repos/src/circuit_breaker.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs similarity index 87% rename from wavesexchange_repos/src/circuit_breaker.rs rename to wavesexchange_repos/src/circuit_breaker/mod.rs index 1074019..557d5a4 100644 --- a/wavesexchange_repos/src/circuit_breaker.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -5,6 +5,11 @@ разрывы соединений: разные бд (pg, redis), разные пулы (bb8, deadpool, r2d2), одиночный запрос, возможность расширения */ +pub mod config; +pub mod error; +pub mod impls; + +use error::Error; use std::{ future::Future, num::NonZeroUsize, @@ -114,7 +119,6 @@ impl CircuitBreaker { } } -#[async_trait] pub trait FallibleDataSource { const REINIT_ON_FAIL: bool; type Error; @@ -125,27 +129,3 @@ pub trait FallibleDataSource { panic!("Я ГОВОРЮ НЕ БЫЛО РАЗРЫВОВ СВЯЗИ! С НОЯБРЯ ПРОШЛОГО ГОДА! А СЕЙЧАС ЦЕЛЫХ ЧЕТЫРЕ РАЗРЫВА БЫЛО!") } } - -pub mod impls { - use super::*; - use deadpool_diesel::{Manager, Pool}; - use diesel::pg::PgConnection; - use diesel::result::Error as DslError; - - pub struct DeadpoolPgBreaker(Pool>); - - impl FallibleDataSource for DeadpoolPgBreaker { - const REINIT_ON_FAIL: bool = true; - type Error = DslError; - - fn is_countable_err(err: &Self::Error) -> bool { - err.to_string().contains("no connection to the server") - } - } -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("CircuitBreakerBuilderError: {0}")] - BuilderError(String), -} diff --git a/wavesexchange_repos/src/lib.rs b/wavesexchange_repos/src/lib.rs index e8fd4e2..f120e4c 100644 --- a/wavesexchange_repos/src/lib.rs +++ b/wavesexchange_repos/src/lib.rs @@ -1,4 +1 @@ -#[macro_use] -extern crate async_trait; - pub mod circuit_breaker; From 808f27b9b6e4be8e3c4bf2fccb30451ca968f2e8 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 01:41:22 +0300 Subject: [PATCH 05/21] pub uses --- wavesexchange_repos/src/circuit_breaker/config.rs | 4 +--- wavesexchange_repos/src/circuit_breaker/error.rs | 3 --- wavesexchange_repos/src/circuit_breaker/mod.rs | 5 ++++- wavesexchange_repos/src/lib.rs | 2 ++ 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/config.rs b/wavesexchange_repos/src/circuit_breaker/config.rs index 7b4a921..9fa9d48 100644 --- a/wavesexchange_repos/src/circuit_breaker/config.rs +++ b/wavesexchange_repos/src/circuit_breaker/config.rs @@ -2,8 +2,6 @@ use std::time::Duration; use serde::Deserialize; -use super::error::Error; - fn default_max_timespan_ms() -> u64 { 10000 } @@ -26,7 +24,7 @@ pub struct Config { pub max_err_count_per_timespan: usize, } -pub fn load() -> Result { +pub fn load() -> Result { let config_flat = envy::prefixed("CIRCUIT_BREAKER_").from_env::()?; Ok(Config { diff --git a/wavesexchange_repos/src/circuit_breaker/error.rs b/wavesexchange_repos/src/circuit_breaker/error.rs index 1ccb3ee..75e3342 100644 --- a/wavesexchange_repos/src/circuit_breaker/error.rs +++ b/wavesexchange_repos/src/circuit_breaker/error.rs @@ -2,7 +2,4 @@ pub enum Error { #[error("CircuitBreaker BuilderError: {0}")] BuilderError(String), - - #[error("CircuitBreaker ConfigLoadError: {0}")] - ConfigLoadError(#[from] envy::Error), } diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 557d5a4..5a424a8 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -9,7 +9,10 @@ pub mod config; pub mod error; pub mod impls; -use error::Error; +pub use config::Config; +pub use error::Error; +pub use impls::*; + use std::{ future::Future, num::NonZeroUsize, diff --git a/wavesexchange_repos/src/lib.rs b/wavesexchange_repos/src/lib.rs index f120e4c..f2cf33c 100644 --- a/wavesexchange_repos/src/lib.rs +++ b/wavesexchange_repos/src/lib.rs @@ -1 +1,3 @@ pub mod circuit_breaker; + +pub use circuit_breaker::CircuitBreaker; From 88506a1d0d56b4c974f85b34c3c5610ecb4e1d8c Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 01:48:48 +0300 Subject: [PATCH 06/21] pass cfg directly in builder --- wavesexchange_repos/src/circuit_breaker/impls.rs | 3 +-- wavesexchange_repos/src/circuit_breaker/mod.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/impls.rs b/wavesexchange_repos/src/circuit_breaker/impls.rs index 961d583..e38b503 100644 --- a/wavesexchange_repos/src/circuit_breaker/impls.rs +++ b/wavesexchange_repos/src/circuit_breaker/impls.rs @@ -1,10 +1,9 @@ - use super::*; use deadpool_diesel::{Manager, Pool}; use diesel::pg::PgConnection; use diesel::result::Error as DslError; -pub struct DeadpoolPgBreaker(Pool>); +pub struct DeadpoolPgBreaker(pub Pool>); impl FallibleDataSource for DeadpoolPgBreaker { const REINIT_ON_FAIL: bool = true; diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 5a424a8..370a553 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -43,8 +43,8 @@ impl CircuitBreakerBuilder { } } - pub fn max_timespan(mut self, ts: usize) -> CircuitBreakerBuilder { - self.max_err_count_per_timespan = NonZeroUsize::new(ts); + pub fn max_timespan(mut self, ts: Duration) -> CircuitBreakerBuilder { + self.max_timespan = Some(ts); self } @@ -91,6 +91,12 @@ impl CircuitBreaker { CircuitBreakerBuilder::new() } + pub fn builder_from_cfg(cfg: &Config) -> CircuitBreakerBuilder { + Self::builder() + .max_err_count_per_timespan(cfg.max_err_count_per_timespan) + .max_timespan(cfg.max_timespan) + } + pub async fn query(&mut self, query_fn: F) -> Result where F: Fn(&S) -> Fut, From 1872fc6a1fd62b3bd69711ecca57395455d02c2c Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 02:17:12 +0300 Subject: [PATCH 07/21] add send sync --- wavesexchange_repos/src/circuit_breaker/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 370a553..a1f859b 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -19,19 +19,22 @@ use std::{ time::{Duration, Instant}, }; +pub trait SharedFn: Fn() -> S + Send + Sync + 'static {} +impl SharedFn for T where T: Fn() -> S + Send + Sync + 'static {} + pub struct CircuitBreaker { data_source: S, err_count: usize, // current errors count first_err_ts: Option, max_timespan: Duration, // максимальный временной промежуток, в котором будут считаться ошибки max_err_count_per_timespan: NonZeroUsize, - init_fn: Box S>, + init_fn: Box>, } pub struct CircuitBreakerBuilder { max_timespan: Option, max_err_count_per_timespan: Option, - init_fn: Option S>>, + init_fn: Option>>, } impl CircuitBreakerBuilder { @@ -53,7 +56,7 @@ impl CircuitBreakerBuilder { self } - pub fn init_fn(mut self, f: impl Fn() -> S + 'static) -> CircuitBreakerBuilder { + pub fn init_fn(mut self, f: impl SharedFn) -> CircuitBreakerBuilder { self.init_fn = Some(Box::new(f)); self } From 14a286f77206d6d8fa1c2c14c1203b4f81166acc Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 02:59:56 +0300 Subject: [PATCH 08/21] add lockable state --- wavesexchange_repos/Cargo.toml | 2 + .../src/circuit_breaker/impls.rs | 15 ------ .../src/circuit_breaker/mod.rs | 47 ++++++++++++------- 3 files changed, 31 insertions(+), 33 deletions(-) delete mode 100644 wavesexchange_repos/src/circuit_breaker/impls.rs diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index c987c02..08e8b25 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -11,5 +11,7 @@ deadpool-diesel = "0.4.0" derive_builder = "0.11.2" diesel = { version = "2.0.2", features = ["postgres"] } envy = "0.4.2" +futures = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } thiserror = "1.0.37" +tokio = "1.22.0" diff --git a/wavesexchange_repos/src/circuit_breaker/impls.rs b/wavesexchange_repos/src/circuit_breaker/impls.rs deleted file mode 100644 index e38b503..0000000 --- a/wavesexchange_repos/src/circuit_breaker/impls.rs +++ /dev/null @@ -1,15 +0,0 @@ -use super::*; -use deadpool_diesel::{Manager, Pool}; -use diesel::pg::PgConnection; -use diesel::result::Error as DslError; - -pub struct DeadpoolPgBreaker(pub Pool>); - -impl FallibleDataSource for DeadpoolPgBreaker { - const REINIT_ON_FAIL: bool = true; - type Error = DslError; - - fn is_countable_err(err: &Self::Error) -> bool { - err.to_string().contains("no connection to the server") - } -} diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index a1f859b..cfef5b9 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -7,28 +7,32 @@ */ pub mod config; pub mod error; -pub mod impls; pub use config::Config; pub use error::Error; -pub use impls::*; use std::{ future::Future, + mem::drop, num::NonZeroUsize, time::{Duration, Instant}, }; +use tokio::sync::RwLock; pub trait SharedFn: Fn() -> S + Send + Sync + 'static {} impl SharedFn for T where T: Fn() -> S + Send + Sync + 'static {} pub struct CircuitBreaker { - data_source: S, - err_count: usize, // current errors count - first_err_ts: Option, max_timespan: Duration, // максимальный временной промежуток, в котором будут считаться ошибки max_err_count_per_timespan: NonZeroUsize, init_fn: Box>, + state: RwLock>, +} + +pub struct CBState { + data_source: S, + err_count: usize, // current errors count + first_err_ts: Option, } pub struct CircuitBreakerBuilder { @@ -79,9 +83,11 @@ impl CircuitBreakerBuilder { let init_fn = self.init_fn.unwrap(); Ok(CircuitBreaker { - data_source: init_fn(), - err_count: 0, - first_err_ts: None, + state: RwLock::new(CBState { + data_source: init_fn(), + err_count: 0, + first_err_ts: None, + }), max_timespan: self.max_timespan.unwrap(), max_err_count_per_timespan: self.max_err_count_per_timespan.unwrap(), init_fn, @@ -100,32 +106,37 @@ impl CircuitBreaker { .max_timespan(cfg.max_timespan) } - pub async fn query(&mut self, query_fn: F) -> Result + pub async fn query(&self, query_fn: F) -> Result where F: Fn(&S) -> Fut, Fut: Future>, { - let result = query_fn(&self.data_source).await; + let data_src_read_lock = &self.state.read().await.data_source; + let result = query_fn(&data_src_read_lock).await; if let Err(e) = &result { if S::is_countable_err(e) { - self.err_count += 1; - match self.first_err_ts { + drop(data_src_read_lock); + let mut state = self.state.write().await; + state.err_count += 1; + match state.first_err_ts { Some(ts) if ts.elapsed() <= self.max_timespan => { - if self.err_count > self.max_err_count_per_timespan.get() { - return Err(self.data_source.fallback()); + if state.err_count > self.max_err_count_per_timespan.get() { + return Err(state.data_source.fallback()); } } - None => self.first_err_ts = Some(Instant::now()), + None => state.first_err_ts = Some(Instant::now()), _ => {} } if S::REINIT_ON_FAIL { - self.data_source = (self.init_fn)(); + state.data_source = (self.init_fn)(); } } } else { - self.err_count = 0; - self.first_err_ts = None; + drop(data_src_read_lock); + let mut state = self.state.write().await; + state.err_count = 0; + state.first_err_ts = None; } result } From f5f72d7f2a02d3b9a0ea3bf6a5422755821ef127 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 04:57:39 +0300 Subject: [PATCH 09/21] use arc-crutch to deal with borrowck --- wavesexchange_repos/Cargo.toml | 1 + .../src/circuit_breaker/mod.rs | 32 ++++++++++++------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index 08e8b25..8dd2626 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -15,3 +15,4 @@ futures = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } thiserror = "1.0.37" tokio = "1.22.0" +wavesexchange_log = { path = "../wavesexchange_log" } diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index cfef5b9..4ed0fad 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -10,11 +10,13 @@ pub mod error; pub use config::Config; pub use error::Error; +use wavesexchange_log::debug; use std::{ future::Future, mem::drop, num::NonZeroUsize, + sync::Arc, time::{Duration, Instant}, }; use tokio::sync::RwLock; @@ -30,7 +32,7 @@ pub struct CircuitBreaker { } pub struct CBState { - data_source: S, + data_source: Arc, err_count: usize, // current errors count first_err_ts: Option, } @@ -84,7 +86,7 @@ impl CircuitBreakerBuilder { Ok(CircuitBreaker { state: RwLock::new(CBState { - data_source: init_fn(), + data_source: Arc::new(init_fn()), err_count: 0, first_err_ts: None, }), @@ -108,32 +110,36 @@ impl CircuitBreaker { pub async fn query(&self, query_fn: F) -> Result where - F: Fn(&S) -> Fut, + //todo: figure out how to FnOnce(&S) + F: FnOnce(Arc) -> Fut, Fut: Future>, { - let data_src_read_lock = &self.state.read().await.data_source; - let result = query_fn(&data_src_read_lock).await; + let state_read_lock = self.state.read().await; + let result = query_fn(state_read_lock.data_source.clone()).await; if let Err(e) = &result { if S::is_countable_err(e) { - drop(data_src_read_lock); + drop(state_read_lock); let mut state = self.state.write().await; state.err_count += 1; + debug!("err count: {}", state.err_count); match state.first_err_ts { - Some(ts) if ts.elapsed() <= self.max_timespan => { + Some(ts) if ts.elapsed() > self.max_timespan => { if state.err_count > self.max_err_count_per_timespan.get() { - return Err(state.data_source.fallback()); + return Err(state + .data_source + .fallback(ts.elapsed().as_millis(), state.err_count)); } } None => state.first_err_ts = Some(Instant::now()), _ => {} } if S::REINIT_ON_FAIL { - state.data_source = (self.init_fn)(); + state.data_source = Arc::new((self.init_fn)()); } } } else { - drop(data_src_read_lock); + drop(state_read_lock); let mut state = self.state.write().await; state.err_count = 0; state.first_err_ts = None; @@ -148,7 +154,9 @@ pub trait FallibleDataSource { fn is_countable_err(err: &Self::Error) -> bool; - fn fallback(&self) -> Self::Error { - panic!("Я ГОВОРЮ НЕ БЫЛО РАЗРЫВОВ СВЯЗИ! С НОЯБРЯ ПРОШЛОГО ГОДА! А СЕЙЧАС ЦЕЛЫХ ЧЕТЫРЕ РАЗРЫВА БЫЛО!") + fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error { + panic!( + "CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms" + ) } } From 4e3f0f6fecace5e37198c212113a6561b8400c5b Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 05:11:49 +0300 Subject: [PATCH 10/21] remove comms --- wavesexchange_repos/src/circuit_breaker/mod.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 4ed0fad..947f0ca 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -1,10 +1,3 @@ -/* -послать запрос к удалённому ресурсу -если пришла ошибка соединения, то зафиксировать и вернуть ошибку в Result -если разрывы соединений продолжаются, выкинуть панику (или вызвать соотв обработчик) -разрывы соединений: - разные бд (pg, redis), разные пулы (bb8, deadpool, r2d2), одиночный запрос, возможность расширения -*/ pub mod config; pub mod error; @@ -25,7 +18,7 @@ pub trait SharedFn: Fn() -> S + Send + Sync + 'static {} impl SharedFn for T where T: Fn() -> S + Send + Sync + 'static {} pub struct CircuitBreaker { - max_timespan: Duration, // максимальный временной промежуток, в котором будут считаться ошибки + max_timespan: Duration, max_err_count_per_timespan: NonZeroUsize, init_fn: Box>, state: RwLock>, @@ -33,7 +26,7 @@ pub struct CircuitBreaker { pub struct CBState { data_source: Arc, - err_count: usize, // current errors count + err_count: usize, first_err_ts: Option, } @@ -110,7 +103,6 @@ impl CircuitBreaker { pub async fn query(&self, query_fn: F) -> Result where - //todo: figure out how to FnOnce(&S) F: FnOnce(Arc) -> Fut, Fut: Future>, { From 61ff55c1ccff3703583c76183c966c2210b09654 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 04:54:52 +0500 Subject: [PATCH 11/21] improve circuit breaker & add tests --- wavesexchange_repos/Cargo.toml | 4 +- .../src/circuit_breaker/error.rs | 5 - .../src/circuit_breaker/mod.rs | 180 ++++++++++++++---- 3 files changed, 142 insertions(+), 47 deletions(-) delete mode 100644 wavesexchange_repos/src/circuit_breaker/error.rs diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index 8dd2626..529d283 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -14,5 +14,5 @@ envy = "0.4.2" futures = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } thiserror = "1.0.37" -tokio = "1.22.0" -wavesexchange_log = { path = "../wavesexchange_log" } +tokio = { version = "1", features = ["macros"] } +wavesexchange_log = { path = "../wavesexchange_log" } \ No newline at end of file diff --git a/wavesexchange_repos/src/circuit_breaker/error.rs b/wavesexchange_repos/src/circuit_breaker/error.rs deleted file mode 100644 index 75e3342..0000000 --- a/wavesexchange_repos/src/circuit_breaker/error.rs +++ /dev/null @@ -1,5 +0,0 @@ -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("CircuitBreaker BuilderError: {0}")] - BuilderError(String), -} diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 947f0ca..8394c49 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -1,26 +1,43 @@ pub mod config; -pub mod error; pub use config::Config; -pub use error::Error; use wavesexchange_log::debug; use std::{ future::Future, mem::drop, - num::NonZeroUsize, sync::Arc, time::{Duration, Instant}, }; use tokio::sync::RwLock; -pub trait SharedFn: Fn() -> S + Send + Sync + 'static {} -impl SharedFn for T where T: Fn() -> S + Send + Sync + 'static {} +pub trait FallibleDataSource { + const REINIT_ON_FAIL: bool; + type Error; + + fn is_countable_err(err: &Self::Error) -> bool; + + fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error { + panic!( + "CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms" + ) + } +} + +pub trait DataSrcInitFn: + Fn() -> Result + Send + Sync + 'static +{ +} + +impl DataSrcInitFn for T where + T: Fn() -> Result + Send + Sync + 'static +{ +} pub struct CircuitBreaker { max_timespan: Duration, - max_err_count_per_timespan: NonZeroUsize, - init_fn: Box>, + max_err_count_per_timespan: usize, + init_fn: Box>, state: RwLock>, } @@ -30,10 +47,25 @@ pub struct CBState { first_err_ts: Option, } +impl CBState { + fn inc(&mut self) { + self.err_count += 1; + } + + fn reset(&mut self) { + self.err_count = 0; + self.first_err_ts = None; + } + + fn reinit(&mut self, src: S) { + self.data_source = Arc::new(src); + } +} + pub struct CircuitBreakerBuilder { max_timespan: Option, - max_err_count_per_timespan: Option, - init_fn: Option>>, + max_err_count_per_timespan: Option, + init_fn: Option>>, } impl CircuitBreakerBuilder { @@ -45,41 +77,40 @@ impl CircuitBreakerBuilder { } } - pub fn max_timespan(mut self, ts: Duration) -> CircuitBreakerBuilder { + pub fn with_max_timespan(mut self, ts: Duration) -> CircuitBreakerBuilder { self.max_timespan = Some(ts); self } - pub fn max_err_count_per_timespan(mut self, count: usize) -> CircuitBreakerBuilder { - self.max_err_count_per_timespan = NonZeroUsize::new(count); + pub fn with_max_err_count_per_timespan(mut self, count: usize) -> CircuitBreakerBuilder { + self.max_err_count_per_timespan = Some(count); self } - pub fn init_fn(mut self, f: impl SharedFn) -> CircuitBreakerBuilder { + pub fn with_init_fn(mut self, f: impl DataSrcInitFn) -> CircuitBreakerBuilder { self.init_fn = Some(Box::new(f)); self } - pub fn build(self) -> Result, Error> { - let build_err = |s: &str| Err(Error::BuilderError(s.to_string())); - + pub fn build(self) -> Result, S::Error> { + // probably there is a better way to force use all with_* methods on builder if self.max_err_count_per_timespan.is_none() { - return build_err("max_err_count_per_timespan is not set"); + panic!("max_err_count_per_timespan is not set"); } if self.max_timespan.is_none() { - return build_err("max_timespan is not set"); + panic!("max_timespan is not set"); } if self.init_fn.is_none() { - return build_err("init_fn is not set"); + panic!("init_fn is not set"); } let init_fn = self.init_fn.unwrap(); Ok(CircuitBreaker { state: RwLock::new(CBState { - data_source: Arc::new(init_fn()), + data_source: Arc::new(init_fn()?), err_count: 0, first_err_ts: None, }), @@ -97,8 +128,8 @@ impl CircuitBreaker { pub fn builder_from_cfg(cfg: &Config) -> CircuitBreakerBuilder { Self::builder() - .max_err_count_per_timespan(cfg.max_err_count_per_timespan) - .max_timespan(cfg.max_timespan) + .with_max_err_count_per_timespan(cfg.max_err_count_per_timespan) + .with_max_timespan(cfg.max_timespan) } pub async fn query(&self, query_fn: F) -> Result @@ -109,46 +140,115 @@ impl CircuitBreaker { let state_read_lock = self.state.read().await; let result = query_fn(state_read_lock.data_source.clone()).await; + drop(state_read_lock); + if let Err(e) = &result { if S::is_countable_err(e) { - drop(state_read_lock); let mut state = self.state.write().await; - state.err_count += 1; + state.inc(); + debug!("err count: {}", state.err_count); + match state.first_err_ts { - Some(ts) if ts.elapsed() > self.max_timespan => { - if state.err_count > self.max_err_count_per_timespan.get() { + Some(ts) => { + let elapsed = ts.elapsed(); + + if state.err_count <= self.max_err_count_per_timespan { + if elapsed > self.max_timespan { + state.reset(); + } + } else { return Err(state .data_source - .fallback(ts.elapsed().as_millis(), state.err_count)); + .fallback(elapsed.as_millis(), state.err_count)); } } None => state.first_err_ts = Some(Instant::now()), - _ => {} } if S::REINIT_ON_FAIL { - state.data_source = Arc::new((self.init_fn)()); + state.reinit((self.init_fn)()?); } } } else { - drop(state_read_lock); let mut state = self.state.write().await; - state.err_count = 0; - state.first_err_ts = None; + state.reset(); } result } } -pub trait FallibleDataSource { - const REINIT_ON_FAIL: bool; - type Error; +#[cfg(test)] +mod tests { + use super::*; - fn is_countable_err(err: &Self::Error) -> bool; + struct WildErrorGenerator; - fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error { - panic!( - "CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms" - ) + impl WildErrorGenerator { + fn err(&self) -> Result<(), WildError> { + Err(WildError::Inner) + } + } + + #[derive(Debug)] + enum WildError { + Inner, + CircuitBreakerTriggered, + } + + impl FallibleDataSource for WildErrorGenerator { + const REINIT_ON_FAIL: bool = true; + + type Error = WildError; + + fn is_countable_err(err: &Self::Error) -> bool { + matches!(err, WildError::Inner) + } + + fn fallback(&self, _elapsed_ms: u128, _err_count: usize) -> Self::Error { + WildError::CircuitBreakerTriggered + } + } + + #[tokio::test] + async fn circuit_breaker() { + let cb = CircuitBreaker::builder() + .with_max_timespan(Duration::from_secs(1)) + .with_max_err_count_per_timespan(2) + .with_init_fn(|| Ok(WildErrorGenerator)) + .build() + .unwrap(); + + // trigger 2 errors in cb + assert!(matches!( + cb.query(|weg| async move { weg.err() }).await.unwrap_err(), + WildError::Inner + )); + + assert!(matches!( + cb.query(|weg| async move { weg.err() }).await.unwrap_err(), + WildError::Inner + )); + + // reset cb state with successful query + assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ()); + + // trigger 3 errors in cb (max errors limit exceeded) + assert!(matches!( + cb.query(|weg| async move { weg.err() }).await.unwrap_err(), + WildError::Inner + )); + + assert!(matches!( + cb.query(|weg| async move { weg.err() }).await.unwrap_err(), + WildError::Inner + )); + + // cb fallback + assert!(matches!( + cb.query(|weg| async move { weg.err() }).await.unwrap_err(), + WildError::CircuitBreakerTriggered + )); + + assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ()); } } From a94cca27c6c427dabace6ef08d244ea70d32bf7e Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 05:05:49 +0500 Subject: [PATCH 12/21] remove REINIT_ON_FAIL --- wavesexchange_repos/src/circuit_breaker/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 8394c49..d035ef0 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -12,7 +12,6 @@ use std::{ use tokio::sync::RwLock; pub trait FallibleDataSource { - const REINIT_ON_FAIL: bool; type Error; fn is_countable_err(err: &Self::Error) -> bool; @@ -165,9 +164,7 @@ impl CircuitBreaker { } None => state.first_err_ts = Some(Instant::now()), } - if S::REINIT_ON_FAIL { - state.reinit((self.init_fn)()?); - } + state.reinit((self.init_fn)()?); } } else { let mut state = self.state.write().await; @@ -196,8 +193,6 @@ mod tests { } impl FallibleDataSource for WildErrorGenerator { - const REINIT_ON_FAIL: bool = true; - type Error = WildError; fn is_countable_err(err: &Self::Error) -> bool { From dbc0eaaee7136c7f6e3054f98c5007628c7018d3 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 05:08:07 +0500 Subject: [PATCH 13/21] remove pub from cb state --- wavesexchange_repos/src/circuit_breaker/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index d035ef0..7f78c25 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -40,7 +40,7 @@ pub struct CircuitBreaker { state: RwLock>, } -pub struct CBState { +struct CBState { data_source: Arc, err_count: usize, first_err_ts: Option, From df2138a1a7e772b386a082a383eaf849417e8333 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 13:35:01 +0500 Subject: [PATCH 14/21] docs --- .../src/circuit_breaker/mod.rs | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 7f78c25..0a124e3 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -11,11 +11,15 @@ use std::{ }; use tokio::sync::RwLock; +/// A trait represents some data source that can fail. +/// Must be implemented for a struct that used in circuit breaker. pub trait FallibleDataSource { type Error; + /// Shows if error will increase CB's counter fn is_countable_err(err: &Self::Error) -> bool; + /// Set up CB's behaviour after maximum errors limit exceeded fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error { panic!( "CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms" @@ -33,10 +37,47 @@ impl DataSrcInitFn for T where { } +/// Count erroneous attempts while quering some data source and perform reinitialization/fallback. +/// +/// To use within an object, you must implement `FallibleDataSource` first. +/// +/// Example: +/// ```rust +/// fn main() { +/// struct Repo; +/// struct RepoError; +/// +/// impl FallibleDataSource for Repo { +/// type Error = RepoError; +/// +/// fn is_countable_err(err: &Self::Error) -> bool { +/// true +/// } +/// } +/// +/// let cb = CircuitBreaker::builder() +/// .with_max_timespan(Duration::from_secs(1)) +/// .with_max_err_count_per_timespan(5) +/// .with_init_fn(|| Ok(Repo)); +/// +/// cb.query(|src| async move { Err(RepoError) }).unwrap_err() +/// cb.query(|src| async move { Ok(()) }).unwrap() +/// +/// // see CB test for more verbose example +/// } +/// ``` pub struct CircuitBreaker { + /// Timespan that errors will be counted in. + /// After it elapsed, error counter will be resetted. max_timespan: Duration, + + /// Maximum error count per timespan. Example: 3 errors per 1 sec (max_timespan) max_err_count_per_timespan: usize, + + /// A function that may be called on every fail to reinitialize data source init_fn: Box>, + + /// Current state of CB state: RwLock>, } @@ -91,6 +132,8 @@ impl CircuitBreakerBuilder { self } + /// Build the circuit breaker. + /// Note: you should set up all 3 fields within this builder, either it will panic pub fn build(self) -> Result, S::Error> { // probably there is a better way to force use all with_* methods on builder if self.max_err_count_per_timespan.is_none() { @@ -131,6 +174,10 @@ impl CircuitBreaker { .with_max_timespan(cfg.max_timespan) } + /// Query the data source. If succeeded, CB resets internal error counter. + /// If error returned, counter increases. + /// If (N > max_err_count_per_timespan) errors appeared, CB is falling back (panic as default). + /// If not enough errors in a timespan appeared to trigger CB's fallback, error counter will be reset. pub async fn query(&self, query_fn: F) -> Result where F: FnOnce(Arc) -> Fut, From f701c5b957ddc187f4edeb79a7af51bf52f3fb82 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 14:44:01 +0500 Subject: [PATCH 15/21] don't reset counter on every ok --- wavesexchange_repos/src/circuit_breaker/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 0a124e3..a12037c 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -185,6 +185,7 @@ impl CircuitBreaker { { let state_read_lock = self.state.read().await; let result = query_fn(state_read_lock.data_source.clone()).await; + let old_err_count = state_read_lock.err_count; drop(state_read_lock); @@ -214,8 +215,10 @@ impl CircuitBreaker { state.reinit((self.init_fn)()?); } } else { - let mut state = self.state.write().await; - state.reset(); + if old_err_count > 0 { + let mut state = self.state.write().await; + state.reset(); + } } result } From 4e9a1cafc4b30781e5240c0a75ab9580f2339dc7 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 17:50:13 +0500 Subject: [PATCH 16/21] resolve issues --- wavesexchange_repos/Cargo.toml | 11 +- .../src/circuit_breaker/config.rs | 20 +- .../src/circuit_breaker/error.rs | 7 + .../src/circuit_breaker/mod.rs | 237 ++++++------------ wavesexchange_repos/src/lib.rs | 2 - 5 files changed, 106 insertions(+), 171 deletions(-) create mode 100644 wavesexchange_repos/src/circuit_breaker/error.rs diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index 529d283..53fcee7 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -7,12 +7,11 @@ authors = ["Artem Sidorenko "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -deadpool-diesel = "0.4.0" -derive_builder = "0.11.2" -diesel = { version = "2.0.2", features = ["postgres"] } envy = "0.4.2" -futures = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } -thiserror = "1.0.37" +thiserror = "1.0.38" +tokio = { version = "1", default-features = false, features = ["sync", "rt-multi-thread"] } +wavesexchange_log = { git = "https://github.com/waves-exchange/wavesexchange-rs", tag = "wavesexchange_log/0.5.1" } + +[dev-dependencies] tokio = { version = "1", features = ["macros"] } -wavesexchange_log = { path = "../wavesexchange_log" } \ No newline at end of file diff --git a/wavesexchange_repos/src/circuit_breaker/config.rs b/wavesexchange_repos/src/circuit_breaker/config.rs index 9fa9d48..2401e04 100644 --- a/wavesexchange_repos/src/circuit_breaker/config.rs +++ b/wavesexchange_repos/src/circuit_breaker/config.rs @@ -6,7 +6,7 @@ fn default_max_timespan_ms() -> u64 { 10000 } -fn default_max_err_count_per_timespan() -> usize { +fn default_max_err_count_per_timespan() -> u16 { 5 } @@ -15,20 +15,22 @@ struct ConfigFlat { #[serde(default = "default_max_timespan_ms")] max_timespan_ms: u64, #[serde(default = "default_max_err_count_per_timespan")] - max_err_count_per_timespan: usize, + max_err_count_per_timespan: u16, } #[derive(Debug, Clone)] pub struct Config { pub max_timespan: Duration, - pub max_err_count_per_timespan: usize, + pub max_err_count_per_timespan: u16, } -pub fn load() -> Result { - let config_flat = envy::prefixed("CIRCUIT_BREAKER_").from_env::()?; +impl Config { + pub fn load() -> Result { + let config_flat = envy::prefixed("CIRCUIT_BREAKER__").from_env::()?; - Ok(Config { - max_timespan: Duration::from_millis(config_flat.max_timespan_ms), - max_err_count_per_timespan: config_flat.max_err_count_per_timespan, - }) + Ok(Config { + max_timespan: Duration::from_millis(config_flat.max_timespan_ms), + max_err_count_per_timespan: config_flat.max_err_count_per_timespan, + }) + } } diff --git a/wavesexchange_repos/src/circuit_breaker/error.rs b/wavesexchange_repos/src/circuit_breaker/error.rs new file mode 100644 index 0000000..ea0cbff --- /dev/null +++ b/wavesexchange_repos/src/circuit_breaker/error.rs @@ -0,0 +1,7 @@ +use std::time::Duration; + +#[derive(Debug)] +pub enum CBError { + CircuitBroke { err_count: u16, elapsed: Duration }, + Inner(E), +} diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index a12037c..cc0260a 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -1,6 +1,8 @@ -pub mod config; +mod config; +mod error; pub use config::Config; +pub use error::CBError; use wavesexchange_log::debug; use std::{ @@ -11,31 +13,9 @@ use std::{ }; use tokio::sync::RwLock; -/// A trait represents some data source that can fail. -/// Must be implemented for a struct that used in circuit breaker. -pub trait FallibleDataSource { - type Error; +pub trait DataSrcInitFn: FnMut() -> Result + Send + Sync + 'static {} - /// Shows if error will increase CB's counter - fn is_countable_err(err: &Self::Error) -> bool; - - /// Set up CB's behaviour after maximum errors limit exceeded - fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error { - panic!( - "CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms" - ) - } -} - -pub trait DataSrcInitFn: - Fn() -> Result + Send + Sync + 'static -{ -} - -impl DataSrcInitFn for T where - T: Fn() -> Result + Send + Sync + 'static -{ -} +impl DataSrcInitFn for T where T: FnMut() -> Result + Send + Sync + 'static {} /// Count erroneous attempts while quering some data source and perform reinitialization/fallback. /// @@ -43,8 +23,14 @@ impl DataSrcInitFn for T where /// /// Example: /// ```rust -/// fn main() { +/// use wavesexchange_repos::circuit_breaker::{FallibleDataSource, CircuitBreakerBuilder}; +/// use std::time::Duration; +/// +/// #[tokio::main] +/// async fn main() { /// struct Repo; +/// +/// #[derive(Debug)] /// struct RepoError; /// /// impl FallibleDataSource for Repo { @@ -55,39 +41,40 @@ impl DataSrcInitFn for T where /// } /// } /// -/// let cb = CircuitBreaker::builder() -/// .with_max_timespan(Duration::from_secs(1)) -/// .with_max_err_count_per_timespan(5) -/// .with_init_fn(|| Ok(Repo)); +/// let cb = CircuitBreakerBuilder { +/// max_timespan: Duration::from_secs(1), +/// max_err_count_per_timespan: 5, +/// init_fn: Box::new(|| Ok(Repo)) +/// }.build().unwrap(); /// -/// cb.query(|src| async move { Err(RepoError) }).unwrap_err() -/// cb.query(|src| async move { Ok(()) }).unwrap() +/// cb.query(|src| async move { Err::<(), _>(RepoError) }).await.unwrap_err(); +/// cb.query(|src| async move { Ok(()) }).await.unwrap() /// /// // see CB test for more verbose example /// } /// ``` -pub struct CircuitBreaker { +pub struct CircuitBreaker { /// Timespan that errors will be counted in. /// After it elapsed, error counter will be resetted. max_timespan: Duration, /// Maximum error count per timespan. Example: 3 errors per 1 sec (max_timespan) - max_err_count_per_timespan: usize, - - /// A function that may be called on every fail to reinitialize data source - init_fn: Box>, + max_err_count_per_timespan: u16, /// Current state of CB - state: RwLock>, + state: RwLock>, } -struct CBState { +struct CBState { data_source: Arc, - err_count: usize, + err_count: u16, first_err_ts: Option, + + /// A function that may be called on every fail to reinitialize data source + init_fn: Box>, } -impl CBState { +impl CBState { fn inc(&mut self) { self.err_count += 1; } @@ -97,91 +84,48 @@ impl CBState { self.first_err_ts = None; } - fn reinit(&mut self, src: S) { - self.data_source = Arc::new(src); + fn reinit(&mut self) -> Result<(), E> { + self.data_source = Arc::new((self.init_fn)()?); + Ok(()) } } -pub struct CircuitBreakerBuilder { - max_timespan: Option, - max_err_count_per_timespan: Option, - init_fn: Option>>, +pub struct CircuitBreakerBuilder { + pub max_timespan: Duration, + pub max_err_count_per_timespan: u16, + pub init_fn: Box>, } -impl CircuitBreakerBuilder { - pub fn new() -> CircuitBreakerBuilder { - CircuitBreakerBuilder { - max_timespan: None, - max_err_count_per_timespan: None, - init_fn: None, - } - } - - pub fn with_max_timespan(mut self, ts: Duration) -> CircuitBreakerBuilder { - self.max_timespan = Some(ts); - self - } - - pub fn with_max_err_count_per_timespan(mut self, count: usize) -> CircuitBreakerBuilder { - self.max_err_count_per_timespan = Some(count); - self - } - - pub fn with_init_fn(mut self, f: impl DataSrcInitFn) -> CircuitBreakerBuilder { - self.init_fn = Some(Box::new(f)); - self - } - - /// Build the circuit breaker. - /// Note: you should set up all 3 fields within this builder, either it will panic - pub fn build(self) -> Result, S::Error> { - // probably there is a better way to force use all with_* methods on builder - if self.max_err_count_per_timespan.is_none() { - panic!("max_err_count_per_timespan is not set"); - } - - if self.max_timespan.is_none() { - panic!("max_timespan is not set"); - } - - if self.init_fn.is_none() { - panic!("init_fn is not set"); - } - - let init_fn = self.init_fn.unwrap(); +impl CircuitBreakerBuilder { + pub fn build(self) -> Result, E> { + let Self { + max_timespan, + max_err_count_per_timespan, + mut init_fn, + } = self; Ok(CircuitBreaker { state: RwLock::new(CBState { data_source: Arc::new(init_fn()?), err_count: 0, first_err_ts: None, + init_fn, }), - max_timespan: self.max_timespan.unwrap(), - max_err_count_per_timespan: self.max_err_count_per_timespan.unwrap(), - init_fn, + max_timespan, + max_err_count_per_timespan, }) } } -impl CircuitBreaker { - pub fn builder() -> CircuitBreakerBuilder { - CircuitBreakerBuilder::new() - } - - pub fn builder_from_cfg(cfg: &Config) -> CircuitBreakerBuilder { - Self::builder() - .with_max_err_count_per_timespan(cfg.max_err_count_per_timespan) - .with_max_timespan(cfg.max_timespan) - } - +impl CircuitBreaker { /// Query the data source. If succeeded, CB resets internal error counter. /// If error returned, counter increases. /// If (N > max_err_count_per_timespan) errors appeared, CB is falling back (panic as default). /// If not enough errors in a timespan appeared to trigger CB's fallback, error counter will be reset. - pub async fn query(&self, query_fn: F) -> Result + pub async fn query(&self, query_fn: F) -> Result> where F: FnOnce(Arc) -> Fut, - Fut: Future>, + Fut: Future>, { let state_read_lock = self.state.read().await; let result = query_fn(state_read_lock.data_source.clone()).await; @@ -189,38 +133,37 @@ impl CircuitBreaker { drop(state_read_lock); - if let Err(e) = &result { - if S::is_countable_err(e) { - let mut state = self.state.write().await; - state.inc(); - - debug!("err count: {}", state.err_count); - - match state.first_err_ts { - Some(ts) => { - let elapsed = ts.elapsed(); - - if state.err_count <= self.max_err_count_per_timespan { - if elapsed > self.max_timespan { - state.reset(); - } - } else { - return Err(state - .data_source - .fallback(elapsed.as_millis(), state.err_count)); + if let Err(_) = &result { + let mut state = self.state.write().await; + state.inc(); + + debug!("err count: {}", state.err_count); + + match state.first_err_ts { + Some(ts) => { + let elapsed = ts.elapsed(); + + if state.err_count <= self.max_err_count_per_timespan { + if elapsed > self.max_timespan { + state.reset(); } + } else { + return Err(CBError::CircuitBroke { + err_count: state.err_count, + elapsed, + }); } - None => state.first_err_ts = Some(Instant::now()), } - state.reinit((self.init_fn)()?); + None => state.first_err_ts = Some(Instant::now()), } + state.reinit().map_err(CBError::Inner)?; } else { if old_err_count > 0 { let mut state = self.state.write().await; state.reset(); } } - result + result.map_err(CBError::Inner) } } @@ -232,46 +175,32 @@ mod tests { impl WildErrorGenerator { fn err(&self) -> Result<(), WildError> { - Err(WildError::Inner) + Err(WildError) } } #[derive(Debug)] - enum WildError { - Inner, - CircuitBreakerTriggered, - } - - impl FallibleDataSource for WildErrorGenerator { - type Error = WildError; - - fn is_countable_err(err: &Self::Error) -> bool { - matches!(err, WildError::Inner) - } - - fn fallback(&self, _elapsed_ms: u128, _err_count: usize) -> Self::Error { - WildError::CircuitBreakerTriggered - } - } + struct WildError; #[tokio::test] async fn circuit_breaker() { - let cb = CircuitBreaker::builder() - .with_max_timespan(Duration::from_secs(1)) - .with_max_err_count_per_timespan(2) - .with_init_fn(|| Ok(WildErrorGenerator)) - .build() - .unwrap(); + let cb = CircuitBreakerBuilder { + max_timespan: Duration::from_secs(1), + max_err_count_per_timespan: 2, + init_fn: Box::new(|| Ok(WildErrorGenerator)), + } + .build() + .unwrap(); // trigger 2 errors in cb assert!(matches!( cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - WildError::Inner + CBError::Inner(WildError) )); assert!(matches!( cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - WildError::Inner + CBError::Inner(WildError) )); // reset cb state with successful query @@ -280,18 +209,18 @@ mod tests { // trigger 3 errors in cb (max errors limit exceeded) assert!(matches!( cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - WildError::Inner + CBError::Inner(WildError) )); assert!(matches!( cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - WildError::Inner + CBError::Inner(WildError) )); // cb fallback assert!(matches!( cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - WildError::CircuitBreakerTriggered + CBError::CircuitBroke { .. } )); assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ()); diff --git a/wavesexchange_repos/src/lib.rs b/wavesexchange_repos/src/lib.rs index f2cf33c..f120e4c 100644 --- a/wavesexchange_repos/src/lib.rs +++ b/wavesexchange_repos/src/lib.rs @@ -1,3 +1 @@ pub mod circuit_breaker; - -pub use circuit_breaker::CircuitBreaker; From 8dd797dfab80dd847a9efedc0ad6a41807931ead Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 17:53:22 +0500 Subject: [PATCH 17/21] fix doctest --- wavesexchange_repos/src/circuit_breaker/mod.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index cc0260a..edfbcc8 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -23,7 +23,7 @@ impl DataSrcInitFn for T where T: FnMut() -> Result + Send /// /// Example: /// ```rust -/// use wavesexchange_repos::circuit_breaker::{FallibleDataSource, CircuitBreakerBuilder}; +/// use wavesexchange_repos::circuit_breaker::CircuitBreakerBuilder; /// use std::time::Duration; /// /// #[tokio::main] @@ -33,14 +33,6 @@ impl DataSrcInitFn for T where T: FnMut() -> Result + Send /// #[derive(Debug)] /// struct RepoError; /// -/// impl FallibleDataSource for Repo { -/// type Error = RepoError; -/// -/// fn is_countable_err(err: &Self::Error) -> bool { -/// true -/// } -/// } -/// /// let cb = CircuitBreakerBuilder { /// max_timespan: Duration::from_secs(1), /// max_err_count_per_timespan: 5, From f5db3958cd92702e9f38c9b9e75bbade0738a681 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 18:00:25 +0500 Subject: [PATCH 18/21] add docs --- wavesexchange_repos/src/circuit_breaker/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index edfbcc8..df89249 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -82,6 +82,15 @@ impl CBState { } } +/// A circuit breaker builder. As all fields are mandatory, use struct creation syntax to init. +/// Example: +/// ```no_compile +/// CircuitBreakerBuilder { +/// max_timespan: Duration::from_secs(1), +/// max_err_count_per_timespan: 5, +/// init_fn: Box::new(|| Ok(Repo)) +/// }.build().unwrap() +/// ``` pub struct CircuitBreakerBuilder { pub max_timespan: Duration, pub max_err_count_per_timespan: u16, From 07ac0cf0e3c9b3107832fb870903252f672708a9 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 18:04:55 +0500 Subject: [PATCH 19/21] move optional configuration under the feature --- wavesexchange_repos/Cargo.toml | 3 +++ wavesexchange_repos/src/circuit_breaker/mod.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index 53fcee7..77d5d07 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -15,3 +15,6 @@ wavesexchange_log = { git = "https://github.com/waves-exchange/wavesexchange-rs" [dev-dependencies] tokio = { version = "1", features = ["macros"] } + +[features] +config = [] \ No newline at end of file diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index df89249..5c69530 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -1,6 +1,8 @@ +#[cfg(feature = "config")] mod config; mod error; +#[cfg(feature = "config")] pub use config::Config; pub use error::CBError; use wavesexchange_log::debug; From d815b771c552f6f5d9f940dad73c491864352921 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Fri, 20 Jan 2023 18:33:11 +0500 Subject: [PATCH 20/21] Arc -> Rc --- wavesexchange_repos/src/circuit_breaker/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index 5c69530..f65bfd6 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -10,7 +10,7 @@ use wavesexchange_log::debug; use std::{ future::Future, mem::drop, - sync::Arc, + rc::Rc, time::{Duration, Instant}, }; use tokio::sync::RwLock; @@ -60,7 +60,7 @@ pub struct CircuitBreaker { } struct CBState { - data_source: Arc, + data_source: Rc, err_count: u16, first_err_ts: Option, @@ -79,7 +79,7 @@ impl CBState { } fn reinit(&mut self) -> Result<(), E> { - self.data_source = Arc::new((self.init_fn)()?); + self.data_source = Rc::new((self.init_fn)()?); Ok(()) } } @@ -109,7 +109,7 @@ impl CircuitBreakerBuilder { Ok(CircuitBreaker { state: RwLock::new(CBState { - data_source: Arc::new(init_fn()?), + data_source: Rc::new(init_fn()?), err_count: 0, first_err_ts: None, init_fn, @@ -127,7 +127,7 @@ impl CircuitBreaker { /// If not enough errors in a timespan appeared to trigger CB's fallback, error counter will be reset. pub async fn query(&self, query_fn: F) -> Result> where - F: FnOnce(Arc) -> Fut, + F: FnOnce(Rc) -> Fut, Fut: Future>, { let state_read_lock = self.state.read().await; From b6e7a063af80a219ae4b29fc5cbce1424313e017 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Mon, 20 Mar 2023 11:04:58 +0300 Subject: [PATCH 21/21] fix issues --- Cargo.toml | 2 +- .../src/circuit_breaker/config.rs | 36 --- .../src/circuit_breaker/mod.rs | 231 ------------------ .../Cargo.toml | 10 +- .../src/circuit_breaker/error.rs | 0 .../src/circuit_breaker/mod.rs | 191 +++++++++++++++ .../src/lib.rs | 0 7 files changed, 194 insertions(+), 276 deletions(-) delete mode 100644 wavesexchange_repos/src/circuit_breaker/config.rs delete mode 100644 wavesexchange_repos/src/circuit_breaker/mod.rs rename {wavesexchange_repos => wavesexchange_utils}/Cargo.toml (58%) rename {wavesexchange_repos => wavesexchange_utils}/src/circuit_breaker/error.rs (100%) create mode 100644 wavesexchange_utils/src/circuit_breaker/mod.rs rename {wavesexchange_repos => wavesexchange_utils}/src/lib.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 9394714..4e94ef1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,5 @@ members = [ "wavesexchange_topic", "wavesexchange_loaders", "wavesexchange_apis", - "wavesexchange_repos", + "wavesexchange_utils", ] diff --git a/wavesexchange_repos/src/circuit_breaker/config.rs b/wavesexchange_repos/src/circuit_breaker/config.rs deleted file mode 100644 index 2401e04..0000000 --- a/wavesexchange_repos/src/circuit_breaker/config.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::time::Duration; - -use serde::Deserialize; - -fn default_max_timespan_ms() -> u64 { - 10000 -} - -fn default_max_err_count_per_timespan() -> u16 { - 5 -} - -#[derive(Deserialize)] -struct ConfigFlat { - #[serde(default = "default_max_timespan_ms")] - max_timespan_ms: u64, - #[serde(default = "default_max_err_count_per_timespan")] - max_err_count_per_timespan: u16, -} - -#[derive(Debug, Clone)] -pub struct Config { - pub max_timespan: Duration, - pub max_err_count_per_timespan: u16, -} - -impl Config { - pub fn load() -> Result { - let config_flat = envy::prefixed("CIRCUIT_BREAKER__").from_env::()?; - - Ok(Config { - max_timespan: Duration::from_millis(config_flat.max_timespan_ms), - max_err_count_per_timespan: config_flat.max_err_count_per_timespan, - }) - } -} diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs deleted file mode 100644 index f65bfd6..0000000 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ /dev/null @@ -1,231 +0,0 @@ -#[cfg(feature = "config")] -mod config; -mod error; - -#[cfg(feature = "config")] -pub use config::Config; -pub use error::CBError; -use wavesexchange_log::debug; - -use std::{ - future::Future, - mem::drop, - rc::Rc, - time::{Duration, Instant}, -}; -use tokio::sync::RwLock; - -pub trait DataSrcInitFn: FnMut() -> Result + Send + Sync + 'static {} - -impl DataSrcInitFn for T where T: FnMut() -> Result + Send + Sync + 'static {} - -/// Count erroneous attempts while quering some data source and perform reinitialization/fallback. -/// -/// To use within an object, you must implement `FallibleDataSource` first. -/// -/// Example: -/// ```rust -/// use wavesexchange_repos::circuit_breaker::CircuitBreakerBuilder; -/// use std::time::Duration; -/// -/// #[tokio::main] -/// async fn main() { -/// struct Repo; -/// -/// #[derive(Debug)] -/// struct RepoError; -/// -/// let cb = CircuitBreakerBuilder { -/// max_timespan: Duration::from_secs(1), -/// max_err_count_per_timespan: 5, -/// init_fn: Box::new(|| Ok(Repo)) -/// }.build().unwrap(); -/// -/// cb.query(|src| async move { Err::<(), _>(RepoError) }).await.unwrap_err(); -/// cb.query(|src| async move { Ok(()) }).await.unwrap() -/// -/// // see CB test for more verbose example -/// } -/// ``` -pub struct CircuitBreaker { - /// Timespan that errors will be counted in. - /// After it elapsed, error counter will be resetted. - max_timespan: Duration, - - /// Maximum error count per timespan. Example: 3 errors per 1 sec (max_timespan) - max_err_count_per_timespan: u16, - - /// Current state of CB - state: RwLock>, -} - -struct CBState { - data_source: Rc, - err_count: u16, - first_err_ts: Option, - - /// A function that may be called on every fail to reinitialize data source - init_fn: Box>, -} - -impl CBState { - fn inc(&mut self) { - self.err_count += 1; - } - - fn reset(&mut self) { - self.err_count = 0; - self.first_err_ts = None; - } - - fn reinit(&mut self) -> Result<(), E> { - self.data_source = Rc::new((self.init_fn)()?); - Ok(()) - } -} - -/// A circuit breaker builder. As all fields are mandatory, use struct creation syntax to init. -/// Example: -/// ```no_compile -/// CircuitBreakerBuilder { -/// max_timespan: Duration::from_secs(1), -/// max_err_count_per_timespan: 5, -/// init_fn: Box::new(|| Ok(Repo)) -/// }.build().unwrap() -/// ``` -pub struct CircuitBreakerBuilder { - pub max_timespan: Duration, - pub max_err_count_per_timespan: u16, - pub init_fn: Box>, -} - -impl CircuitBreakerBuilder { - pub fn build(self) -> Result, E> { - let Self { - max_timespan, - max_err_count_per_timespan, - mut init_fn, - } = self; - - Ok(CircuitBreaker { - state: RwLock::new(CBState { - data_source: Rc::new(init_fn()?), - err_count: 0, - first_err_ts: None, - init_fn, - }), - max_timespan, - max_err_count_per_timespan, - }) - } -} - -impl CircuitBreaker { - /// Query the data source. If succeeded, CB resets internal error counter. - /// If error returned, counter increases. - /// If (N > max_err_count_per_timespan) errors appeared, CB is falling back (panic as default). - /// If not enough errors in a timespan appeared to trigger CB's fallback, error counter will be reset. - pub async fn query(&self, query_fn: F) -> Result> - where - F: FnOnce(Rc) -> Fut, - Fut: Future>, - { - let state_read_lock = self.state.read().await; - let result = query_fn(state_read_lock.data_source.clone()).await; - let old_err_count = state_read_lock.err_count; - - drop(state_read_lock); - - if let Err(_) = &result { - let mut state = self.state.write().await; - state.inc(); - - debug!("err count: {}", state.err_count); - - match state.first_err_ts { - Some(ts) => { - let elapsed = ts.elapsed(); - - if state.err_count <= self.max_err_count_per_timespan { - if elapsed > self.max_timespan { - state.reset(); - } - } else { - return Err(CBError::CircuitBroke { - err_count: state.err_count, - elapsed, - }); - } - } - None => state.first_err_ts = Some(Instant::now()), - } - state.reinit().map_err(CBError::Inner)?; - } else { - if old_err_count > 0 { - let mut state = self.state.write().await; - state.reset(); - } - } - result.map_err(CBError::Inner) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - struct WildErrorGenerator; - - impl WildErrorGenerator { - fn err(&self) -> Result<(), WildError> { - Err(WildError) - } - } - - #[derive(Debug)] - struct WildError; - - #[tokio::test] - async fn circuit_breaker() { - let cb = CircuitBreakerBuilder { - max_timespan: Duration::from_secs(1), - max_err_count_per_timespan: 2, - init_fn: Box::new(|| Ok(WildErrorGenerator)), - } - .build() - .unwrap(); - - // trigger 2 errors in cb - assert!(matches!( - cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - CBError::Inner(WildError) - )); - - assert!(matches!( - cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - CBError::Inner(WildError) - )); - - // reset cb state with successful query - assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ()); - - // trigger 3 errors in cb (max errors limit exceeded) - assert!(matches!( - cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - CBError::Inner(WildError) - )); - - assert!(matches!( - cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - CBError::Inner(WildError) - )); - - // cb fallback - assert!(matches!( - cb.query(|weg| async move { weg.err() }).await.unwrap_err(), - CBError::CircuitBroke { .. } - )); - - assert_eq!(cb.query(|_weg| async move { Ok(()) }).await.unwrap(), ()); - } -} diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_utils/Cargo.toml similarity index 58% rename from wavesexchange_repos/Cargo.toml rename to wavesexchange_utils/Cargo.toml index 77d5d07..aa7e5df 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_utils/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "wavesexchange_repos" +name = "wavesexchange_utils" version = "0.1.0" edition = "2021" authors = ["Artem Sidorenko "] @@ -7,14 +7,8 @@ authors = ["Artem Sidorenko "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -envy = "0.4.2" -serde = { version = "1.0.147", features = ["derive"] } thiserror = "1.0.38" -tokio = { version = "1", default-features = false, features = ["sync", "rt-multi-thread"] } wavesexchange_log = { git = "https://github.com/waves-exchange/wavesexchange-rs", tag = "wavesexchange_log/0.5.1" } [dev-dependencies] -tokio = { version = "1", features = ["macros"] } - -[features] -config = [] \ No newline at end of file +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/wavesexchange_repos/src/circuit_breaker/error.rs b/wavesexchange_utils/src/circuit_breaker/error.rs similarity index 100% rename from wavesexchange_repos/src/circuit_breaker/error.rs rename to wavesexchange_utils/src/circuit_breaker/error.rs diff --git a/wavesexchange_utils/src/circuit_breaker/mod.rs b/wavesexchange_utils/src/circuit_breaker/mod.rs new file mode 100644 index 0000000..a92be25 --- /dev/null +++ b/wavesexchange_utils/src/circuit_breaker/mod.rs @@ -0,0 +1,191 @@ +mod error; + +pub use error::CBError; +use wavesexchange_log::debug; + +use std::{ + future::Future, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +/// Count erroneous attempts while quering some data source. +/// +/// Example: +/// ```rust +/// use wavesexchange_utils::circuit_breaker::CircuitBreaker; +/// use std::time::Duration; +/// +/// #[tokio::main] +/// async fn main() { +/// struct Repo; +/// +/// #[derive(Debug)] +/// struct RepoError; +/// +/// let cb = CircuitBreaker::new( +/// Duration::from_secs(1), +/// 5, +/// Repo +/// ); +/// +/// cb.access(|src| async move { Err::<(), _>(RepoError) }).await.unwrap_err(); +/// cb.access(|src| async move { Ok::<_, ()>(()) }).await.unwrap() +/// +/// // see CB test below for more verbose example +/// } +/// ``` +pub struct CircuitBreaker { + /// Timespan that errors will be counted in. + /// After it elapsed, error counter will be resetted. + max_timespan: Duration, + + /// Maximum error count per timespan. Example: 3 errors per 1 sec (max_timespan) + max_err_count_per_timespan: u16, + + data_source: Arc, + + /// Current state of CB + state: Mutex, +} + +impl CircuitBreaker { + pub fn new(max_timespan: Duration, max_err_count_per_timespan: u16, data_source: S) -> Self { + Self { + max_timespan, + max_err_count_per_timespan, + data_source: Arc::new(data_source), + state: Mutex::new(CBState::default()), + } + } +} + +#[derive(Default)] +struct CBState { + err_count: u16, + first_err_ts: Option, +} + +impl CBState { + fn inc(&mut self) { + self.err_count += 1; + } + + fn reset(&mut self) { + self.err_count = 0; + self.first_err_ts = None; + } +} + +impl CircuitBreaker { + /// Access the data source. If succeeded, CB resets internal error counter. + /// If error returned, counter is increased. + /// If (N > max_err_count_per_timespan) errors appeared, CB breaks a circuit, + /// otherwise error counter will be reset. + pub async fn access(&self, query_fn: F) -> Result> + where + F: FnOnce(Arc) -> Fut, + Fut: Future>, + { + let result = query_fn(self.data_source.clone()).await; + self.handle_result(result) + } + + /// Sync version of `access` method. + pub fn access_blocking(&self, query_fn: F) -> Result> + where + F: FnOnce(Arc) -> Result, + { + let result = query_fn(self.data_source.clone()); + self.handle_result(result) + } + + fn handle_result(&self, result: Result) -> Result> { + let mut state = self.state.lock().unwrap(); + + if let Err(_) = &result { + state.inc(); + + debug!("CircuitBreaker: err count: {}", state.err_count); + + match state.first_err_ts { + Some(ts) => { + let elapsed = ts.elapsed(); + + if state.err_count <= self.max_err_count_per_timespan { + if elapsed > self.max_timespan { + state.reset(); + } + } else { + return Err(CBError::CircuitBroke { + err_count: state.err_count, + elapsed, + }); + } + } + None => state.first_err_ts = Some(Instant::now()), + } + } else { + if state.err_count > 0 { + state.reset(); + } + } + result.map_err(CBError::Inner) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct WildErrorGenerator; + + impl WildErrorGenerator { + fn err(&self) -> Result<(), WildError> { + Err(WildError) + } + } + + const EMPTY_OK: Result<(), ()> = Ok(()); + + #[derive(Debug)] + struct WildError; + + #[tokio::test] + async fn circuit_breaker() { + let cb = CircuitBreaker::new(Duration::from_secs(1), 2, WildErrorGenerator); + + // trigger 2 errors in cb + assert!(matches!( + cb.access(|weg| async move { weg.err() }).await.unwrap_err(), + CBError::Inner(WildError) + )); + + assert!(matches!( + cb.access(|weg| async move { weg.err() }).await.unwrap_err(), + CBError::Inner(WildError) + )); + + // reset cb state with successful query + assert_eq!(cb.access(|_weg| async move { EMPTY_OK }).await.unwrap(), ()); + + // trigger 3 errors in cb (max errors limit exceeded) + assert!(matches!( + cb.access(|weg| async move { weg.err() }).await.unwrap_err(), + CBError::Inner(WildError) + )); + + assert!(matches!( + cb.access(|weg| async move { weg.err() }).await.unwrap_err(), + CBError::Inner(WildError) + )); + + // break circuit + assert!(matches!( + cb.access(|weg| async move { weg.err() }).await.unwrap_err(), + CBError::CircuitBroke { .. } + )); + + assert_eq!(cb.access(|_weg| async move { EMPTY_OK }).await.unwrap(), ()); + } +} diff --git a/wavesexchange_repos/src/lib.rs b/wavesexchange_utils/src/lib.rs similarity index 100% rename from wavesexchange_repos/src/lib.rs rename to wavesexchange_utils/src/lib.rs