Skip to content

Commit

Permalink
add lockable state
Browse files Browse the repository at this point in the history
  • Loading branch information
Artyom Sidorenko authored and plazmoid committed Jan 19, 2023
1 parent 1872fc6 commit 14a286f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 33 deletions.
2 changes: 2 additions & 0 deletions wavesexchange_repos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ deadpool-diesel = "0.4.0"
derive_builder = "0.11.2"
diesel = { version = "2.0.2", features = ["postgres"] }
envy = "0.4.2"
futures = "0.3.25"
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.37"
tokio = "1.22.0"
15 changes: 0 additions & 15 deletions wavesexchange_repos/src/circuit_breaker/impls.rs

This file was deleted.

47 changes: 29 additions & 18 deletions wavesexchange_repos/src/circuit_breaker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,32 @@
*/
pub mod config;
pub mod error;
pub mod impls;

pub use config::Config;
pub use error::Error;
pub use impls::*;

use std::{
future::Future,
mem::drop,
num::NonZeroUsize,
time::{Duration, Instant},
};
use tokio::sync::RwLock;

pub trait SharedFn<S>: Fn() -> S + Send + Sync + 'static {}
impl<T, S> SharedFn<S> for T where T: Fn() -> S + Send + Sync + 'static {}

pub struct CircuitBreaker<S: FallibleDataSource> {
data_source: S,
err_count: usize, // current errors count
first_err_ts: Option<Instant>,
max_timespan: Duration, // максимальный временной промежуток, в котором будут считаться ошибки
max_err_count_per_timespan: NonZeroUsize,
init_fn: Box<dyn SharedFn<S>>,
state: RwLock<CBState<S>>,
}

pub struct CBState<S: FallibleDataSource> {
data_source: S,
err_count: usize, // current errors count
first_err_ts: Option<Instant>,
}

pub struct CircuitBreakerBuilder<S: FallibleDataSource> {
Expand Down Expand Up @@ -79,9 +83,11 @@ impl<S: FallibleDataSource> CircuitBreakerBuilder<S> {
let init_fn = self.init_fn.unwrap();

Ok(CircuitBreaker {
data_source: init_fn(),
err_count: 0,
first_err_ts: None,
state: RwLock::new(CBState {
data_source: init_fn(),
err_count: 0,
first_err_ts: None,
}),
max_timespan: self.max_timespan.unwrap(),
max_err_count_per_timespan: self.max_err_count_per_timespan.unwrap(),
init_fn,
Expand All @@ -100,32 +106,37 @@ impl<S: FallibleDataSource> CircuitBreaker<S> {
.max_timespan(cfg.max_timespan)
}

pub async fn query<T, F, Fut>(&mut self, query_fn: F) -> Result<T, S::Error>
pub async fn query<T, F, Fut>(&self, query_fn: F) -> Result<T, S::Error>
where
F: Fn(&S) -> Fut,
Fut: Future<Output = Result<T, S::Error>>,
{
let result = query_fn(&self.data_source).await;
let data_src_read_lock = &self.state.read().await.data_source;
let result = query_fn(&data_src_read_lock).await;

if let Err(e) = &result {
if S::is_countable_err(e) {
self.err_count += 1;
match self.first_err_ts {
drop(data_src_read_lock);
let mut state = self.state.write().await;
state.err_count += 1;
match state.first_err_ts {
Some(ts) if ts.elapsed() <= self.max_timespan => {
if self.err_count > self.max_err_count_per_timespan.get() {
return Err(self.data_source.fallback());
if state.err_count > self.max_err_count_per_timespan.get() {
return Err(state.data_source.fallback());
}
}
None => self.first_err_ts = Some(Instant::now()),
None => state.first_err_ts = Some(Instant::now()),
_ => {}
}
if S::REINIT_ON_FAIL {
self.data_source = (self.init_fn)();
state.data_source = (self.init_fn)();
}
}
} else {
self.err_count = 0;
self.first_err_ts = None;
drop(data_src_read_lock);
let mut state = self.state.write().await;
state.err_count = 0;
state.first_err_ts = None;
}
result
}
Expand Down

0 comments on commit 14a286f

Please sign in to comment.