From f5f72d7f2a02d3b9a0ea3bf6a5422755821ef127 Mon Sep 17 00:00:00 2001 From: Artyom Sidorenko Date: Wed, 23 Nov 2022 04:57:39 +0300 Subject: [PATCH] use arc-crutch to deal with borrowck --- wavesexchange_repos/Cargo.toml | 1 + .../src/circuit_breaker/mod.rs | 32 ++++++++++++------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/wavesexchange_repos/Cargo.toml b/wavesexchange_repos/Cargo.toml index 08e8b25..8dd2626 100644 --- a/wavesexchange_repos/Cargo.toml +++ b/wavesexchange_repos/Cargo.toml @@ -15,3 +15,4 @@ futures = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } thiserror = "1.0.37" tokio = "1.22.0" +wavesexchange_log = { path = "../wavesexchange_log" } diff --git a/wavesexchange_repos/src/circuit_breaker/mod.rs b/wavesexchange_repos/src/circuit_breaker/mod.rs index cfef5b9..4ed0fad 100644 --- a/wavesexchange_repos/src/circuit_breaker/mod.rs +++ b/wavesexchange_repos/src/circuit_breaker/mod.rs @@ -10,11 +10,13 @@ pub mod error; pub use config::Config; pub use error::Error; +use wavesexchange_log::debug; use std::{ future::Future, mem::drop, num::NonZeroUsize, + sync::Arc, time::{Duration, Instant}, }; use tokio::sync::RwLock; @@ -30,7 +32,7 @@ pub struct CircuitBreaker { } pub struct CBState { - data_source: S, + data_source: Arc, err_count: usize, // current errors count first_err_ts: Option, } @@ -84,7 +86,7 @@ impl CircuitBreakerBuilder { Ok(CircuitBreaker { state: RwLock::new(CBState { - data_source: init_fn(), + data_source: Arc::new(init_fn()), err_count: 0, first_err_ts: None, }), @@ -108,32 +110,36 @@ impl CircuitBreaker { pub async fn query(&self, query_fn: F) -> Result where - F: Fn(&S) -> Fut, + //todo: figure out how to FnOnce(&S) + F: FnOnce(Arc) -> Fut, Fut: Future>, { - let data_src_read_lock = &self.state.read().await.data_source; - let result = query_fn(&data_src_read_lock).await; + let state_read_lock = self.state.read().await; + let result = query_fn(state_read_lock.data_source.clone()).await; if let Err(e) = &result { if S::is_countable_err(e) { - drop(data_src_read_lock); + drop(state_read_lock); let mut state = self.state.write().await; state.err_count += 1; + debug!("err count: {}", state.err_count); match state.first_err_ts { - Some(ts) if ts.elapsed() <= self.max_timespan => { + Some(ts) if ts.elapsed() > self.max_timespan => { if state.err_count > self.max_err_count_per_timespan.get() { - return Err(state.data_source.fallback()); + return Err(state + .data_source + .fallback(ts.elapsed().as_millis(), state.err_count)); } } None => state.first_err_ts = Some(Instant::now()), _ => {} } if S::REINIT_ON_FAIL { - state.data_source = (self.init_fn)(); + state.data_source = Arc::new((self.init_fn)()); } } } else { - drop(data_src_read_lock); + drop(state_read_lock); let mut state = self.state.write().await; state.err_count = 0; state.first_err_ts = None; @@ -148,7 +154,9 @@ pub trait FallibleDataSource { fn is_countable_err(err: &Self::Error) -> bool; - fn fallback(&self) -> Self::Error { - panic!("Я ГОВОРЮ НЕ БЫЛО РАЗРЫВОВ СВЯЗИ! С НОЯБРЯ ПРОШЛОГО ГОДА! А СЕЙЧАС ЦЕЛЫХ ЧЕТЫРЕ РАЗРЫВА БЫЛО!") + fn fallback(&self, elapsed_ms: u128, err_count: usize) -> Self::Error { + panic!( + "CircuitBreaker panicked after {err_count} errors in a row happened in {elapsed_ms}ms" + ) } }