Skip to content

Commit

Permalink
use arc-crutch to deal with borrowck
Browse files Browse the repository at this point in the history
  • Loading branch information
Artyom Sidorenko authored and plazmoid committed Jan 19, 2023
1 parent 14a286f commit f5f72d7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
1 change: 1 addition & 0 deletions wavesexchange_repos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ futures = "0.3.25"
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.37"
tokio = "1.22.0"
wavesexchange_log = { path = "../wavesexchange_log" }
32 changes: 20 additions & 12 deletions wavesexchange_repos/src/circuit_breaker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ pub mod error;

pub use config::Config;
pub use error::Error;
use wavesexchange_log::debug;

use std::{
future::Future,
mem::drop,
num::NonZeroUsize,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::RwLock;
Expand All @@ -30,7 +32,7 @@ pub struct CircuitBreaker<S: FallibleDataSource> {
}

pub struct CBState<S: FallibleDataSource> {
data_source: S,
data_source: Arc<S>,
err_count: usize, // current errors count
first_err_ts: Option<Instant>,
}
Expand Down Expand Up @@ -84,7 +86,7 @@ impl<S: FallibleDataSource> CircuitBreakerBuilder<S> {

Ok(CircuitBreaker {
state: RwLock::new(CBState {
data_source: init_fn(),
data_source: Arc::new(init_fn()),
err_count: 0,
first_err_ts: None,
}),
Expand All @@ -108,32 +110,36 @@ impl<S: FallibleDataSource> CircuitBreaker<S> {

pub async fn query<T, F, Fut>(&self, query_fn: F) -> Result<T, S::Error>
where
F: Fn(&S) -> Fut,
//todo: figure out how to FnOnce(&S)
F: FnOnce(Arc<S>) -> Fut,
Fut: Future<Output = Result<T, S::Error>>,
{
let data_src_read_lock = &self.state.read().await.data_source;
let result = query_fn(&data_src_read_lock).await;
let state_read_lock = self.state.read().await;
let result = query_fn(state_read_lock.data_source.clone()).await;

if let Err(e) = &result {
if S::is_countable_err(e) {
drop(data_src_read_lock);
drop(state_read_lock);
let mut state = self.state.write().await;
state.err_count += 1;
debug!("err count: {}", state.err_count);
match state.first_err_ts {
Some(ts) if ts.elapsed() <= self.max_timespan => {
Some(ts) if ts.elapsed() > self.max_timespan => {
if state.err_count > self.max_err_count_per_timespan.get() {
return Err(state.data_source.fallback());
return Err(state
.data_source
.fallback(ts.elapsed().as_millis(), state.err_count));
}
}
None => state.first_err_ts = Some(Instant::now()),
_ => {}
}
if S::REINIT_ON_FAIL {
state.data_source = (self.init_fn)();
state.data_source = Arc::new((self.init_fn)());
}
}
} else {
drop(data_src_read_lock);
drop(state_read_lock);
let mut state = self.state.write().await;
state.err_count = 0;
state.first_err_ts = None;
Expand All @@ -148,7 +154,9 @@ pub trait FallibleDataSource {

fn is_countable_err(err: &Self::Error) -> bool;

fn fallback(&self) -> Self::Error {
panic!("Я ГОВОРЮ НЕ БЫЛО РАЗРЫВОВ СВЯЗИ! С НОЯБРЯ ПРОШЛОГО ГОДА! А СЕЙЧАС ЦЕЛЫХ ЧЕТЫРЕ РАЗРЫВА БЫЛО!")
fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error {
panic!(
"CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms"
)
}
}

0 comments on commit f5f72d7

Please sign in to comment.