diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index b10bd00f..264e851e 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -55,7 +55,7 @@ impl DecisionOutboxService { .await .map_err(|insert_error| SystemServiceError { kind: SystemServiceErrorKind::DBError, - reason: insert_error.reason, + reason: format!("Datastore error kind = {:?} and reason = {}", insert_error.kind, insert_error.reason), data: insert_error.data, service: "Decision Outbox Service".to_string(), })?; @@ -114,11 +114,12 @@ impl SystemService for DecisionOutboxService { message: decision_message, } = decision_channel_message; tokio::spawn({ - let decision_headers = DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new( - DEFAULT_DECISION_MESSAGE_VERSION, // major version of decision message - self.system.name.clone(), - None, - )); + let decision_headers: DecisionHeaderBuilder = + DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new( + DEFAULT_DECISION_MESSAGE_VERSION, // major version of decision message + self.system.name.clone(), + None, + )); async move { match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await { diff --git a/packages/talos_certifier_adapters/src/postgres/config.rs b/packages/talos_certifier_adapters/src/postgres/config.rs index 37ce1616..d4c3eb47 100644 --- a/packages/talos_certifier_adapters/src/postgres/config.rs +++ b/packages/talos_certifier_adapters/src/postgres/config.rs @@ -9,12 +9,14 @@ pub struct PgConfig { pub port: String, pub database: String, pub pool_size: Option, + pub max_retries: Option, } impl PgConfig { pub fn from_env() -> PgConfig { let pool_size = env_var_with_defaults!("PG_POOL_SIZE", Option::); debug!("Pool size used... {pool_size:?}"); + let max_retries = env_var_with_defaults!("PG_MAX_RETRIES", Option::); PgConfig { user: env_var!("PG_USER"), password: env_var!("PG_PASSWORD"), @@ -22,6 +24,7 @@ impl PgConfig { port: env_var!("PG_PORT"), database: env_var!("PG_DATABASE"), pool_size, + max_retries, } } pub fn get_base_connection_string(&self) -> String { diff --git a/packages/talos_certifier_adapters/src/postgres/pg.rs b/packages/talos_certifier_adapters/src/postgres/pg.rs index c1df2865..76bf590c 100644 --- a/packages/talos_certifier_adapters/src/postgres/pg.rs +++ b/packages/talos_certifier_adapters/src/postgres/pg.rs @@ -2,7 +2,7 @@ use std::time::Duration; use async_trait::async_trait; use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolConfig, PoolError, Runtime}; -use log::warn; +use log::{debug, error, warn}; use serde_json::{json, Value}; use talos_certifier::{ model::DecisionMessage, @@ -12,6 +12,7 @@ use talos_certifier::{ DecisionStore, }, }; + use tokio_postgres::NoTls; use crate::{PgConfig, PgError}; @@ -21,6 +22,8 @@ use super::utils::{get_uuid_key, parse_json_column}; #[derive(Clone)] pub struct Pg { pub pool: Pool, + /// The max times to retry any db operation. Defaults to 5. + pub max_retries: u32, } // create postgres client impl Pg { @@ -49,7 +52,10 @@ impl Pg { //test connection let _ = pool.get().await.map_err(PgError::GetClientFromPool)?; - Ok(Pg { pool }) + Ok(Pg { + pool, + max_retries: pg_config.max_retries.unwrap_or(5_u32), + }) } pub async fn get_client(&self) -> Result { @@ -111,58 +117,86 @@ impl DecisionStore for Pg { } async fn insert_decision(&self, key: String, decision: Self::Decision) -> Result { - let client = self.get_client_with_retry().await.map_err(|e| DecisionStoreError { - kind: DecisionStoreErrorKind::ClientError, - reason: e.to_string(), - data: None, - })?; - - let key_uuid = get_uuid_key(&key)?; + let mut current_count = 0; - let stmt = client - .prepare_cached( - "WITH ins AS ( - INSERT INTO xdb(xid, decision) - VALUES ($1, $2) - ON CONFLICT DO NOTHING - RETURNING xid, decision - ) - SELECT * from ins - UNION - SELECT xid, decision from xdb where xid = $1", - ) - .await - .map_err(|e| DecisionStoreError { - kind: DecisionStoreErrorKind::InsertDecision, - reason: e.to_string(), - data: Some(key.clone()), - })?; - - // Execute insert returning the row. If duplicate is found, return the existing row in table. - let result = client.query_one(&stmt, &[&key_uuid, &json!(decision)]).await; - match result { - Ok(row) => { - let decision = match row.get::<&str, Option>("decision") { - Some(value) => Ok(parse_json_column(&key, value)?), - _ => Err(DecisionStoreError { - kind: DecisionStoreErrorKind::NoRowReturned, - reason: "Insert did not return rows".to_owned(), + let mut result_f: Result = Err(DecisionStoreError { + kind: DecisionStoreErrorKind::InsertDecision, + reason: format!("Max retries exhausted for key={key} to insert decision to XDB"), + data: Some(format!("{:?}", decision)), + }); + while current_count <= self.max_retries { + let client_result = self.get_client_with_retry().await.map_err(|e| DecisionStoreError { + kind: DecisionStoreErrorKind::ClientError, + reason: format!("Failed to get client with error {}", e), + data: None, + }); + + if let Ok(client) = client_result { + let key_uuid = get_uuid_key(&key)?; + + let stmt = client + .prepare_cached( + "WITH ins AS ( + INSERT INTO xdb(xid, decision) + VALUES ($1, $2) + ON CONFLICT DO NOTHING + RETURNING xid, decision + ) + SELECT * from ins + UNION + SELECT xid, decision from xdb where xid = $1", + ) + .await + .map_err(|e| DecisionStoreError { + kind: DecisionStoreErrorKind::InsertDecision, + reason: format!("Failed to prepare the insert statement to XDB {}", e), data: Some(key.clone()), - }), + })?; + + // Execute insert returning the row. If duplicate is found, return the existing row in table. + let result = client.query_one(&stmt, &[&key_uuid, &json!(decision)]).await; + match result { + Ok(row) => { + let decision = match row.get::<&str, Option>("decision") { + Some(value) => Ok(parse_json_column(&key, value)?), + _ => Err(DecisionStoreError { + kind: DecisionStoreErrorKind::NoRowReturned, + reason: "Insert did not return rows".to_owned(), + data: Some(key.clone()), + }), + }; + + result_f = decision; + debug!("Exiting from okay handle of decision insert result"); + break; + } + Err(e) => { + result_f = Err(DecisionStoreError { + kind: DecisionStoreErrorKind::InsertDecision, + reason: format!("Failed to insert decision into XDB with error {}", e), + data: Some(key.clone()), + }); + + error!("{result_f:#?}"); + } }; - - return decision; - } - Err(e) => { - return Err(DecisionStoreError { - kind: DecisionStoreErrorKind::InsertDecision, - reason: e.to_string(), - data: Some(key.clone()), - }); + } else if let Some(client_err) = client_result.err() { + warn!( + "Error getting connection from pool prior to inserting decision to XDB with reason {}", + client_err.to_string() + ); + result_f = Err(client_err); } + + // 10 milliseconds multiplied with 2^current_count. + // eg. if max_retries = 5, then sleep_duration_ms would be 10, 20, 40, 80, 160 + let sleep_duration_ms = 10 * 2u64.pow(current_count); + warn!("Retrying inserting to XDB after waiting for {sleep_duration_ms}"); + tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await; + current_count += 1; } - // Ok(()) + result_f } }