Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry inserting to XDB when connection closed #113

Merged
merged 2 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions packages/talos_certifier/src/services/decision_outbox_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl DecisionOutboxService {
.await
.map_err(|insert_error| SystemServiceError {
kind: SystemServiceErrorKind::DBError,
reason: insert_error.reason,
reason: format!("Datastore error kind = {:?} and reason = {}", insert_error.kind, insert_error.reason),
data: insert_error.data,
service: "Decision Outbox Service".to_string(),
})?;
Expand Down Expand Up @@ -114,11 +114,12 @@ impl SystemService for DecisionOutboxService {
message: decision_message,
} = decision_channel_message;
tokio::spawn({
let decision_headers = DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new(
DEFAULT_DECISION_MESSAGE_VERSION, // major version of decision message
self.system.name.clone(),
None,
));
let decision_headers: DecisionHeaderBuilder<crate::model::decision_headers::MetaHeaders, crate::model::decision_headers::NoCertHeaders> =
DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new(
DEFAULT_DECISION_MESSAGE_VERSION, // major version of decision message
self.system.name.clone(),
None,
));

async move {
match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
Expand Down
3 changes: 3 additions & 0 deletions packages/talos_certifier_adapters/src/postgres/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ pub struct PgConfig {
pub port: String,
pub database: String,
pub pool_size: Option<u32>,
pub max_retries: Option<u32>,
}

impl PgConfig {
pub fn from_env() -> PgConfig {
let pool_size = env_var_with_defaults!("PG_POOL_SIZE", Option::<u32>);
debug!("Pool size used... {pool_size:?}");
let max_retries = env_var_with_defaults!("PG_MAX_RETRIES", Option::<u32>);
PgConfig {
user: env_var!("PG_USER"),
password: env_var!("PG_PASSWORD"),
host: env_var!("PG_HOST"),
port: env_var!("PG_PORT"),
database: env_var!("PG_DATABASE"),
pool_size,
max_retries,
}
}
pub fn get_base_connection_string(&self) -> String {
Expand Down
130 changes: 82 additions & 48 deletions packages/talos_certifier_adapters/src/postgres/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use async_trait::async_trait;
use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolConfig, PoolError, Runtime};
use log::warn;
use log::{debug, error, warn};
use serde_json::{json, Value};
use talos_certifier::{
model::DecisionMessage,
Expand All @@ -12,6 +12,7 @@ use talos_certifier::{
DecisionStore,
},
};

use tokio_postgres::NoTls;

use crate::{PgConfig, PgError};
Expand All @@ -21,6 +22,8 @@ use super::utils::{get_uuid_key, parse_json_column};
#[derive(Clone)]
pub struct Pg {
pub pool: Pool,
/// The max times to retry any db operation. Defaults to 5.
pub max_retries: u32,
}
// create postgres client
impl Pg {
Expand Down Expand Up @@ -49,7 +52,10 @@ impl Pg {
//test connection
let _ = pool.get().await.map_err(PgError::GetClientFromPool)?;

Ok(Pg { pool })
Ok(Pg {
pool,
max_retries: pg_config.max_retries.unwrap_or(5_u32),
})
}

pub async fn get_client(&self) -> Result<Object, PgError> {
Expand Down Expand Up @@ -111,58 +117,86 @@ impl DecisionStore for Pg {
}

async fn insert_decision(&self, key: String, decision: Self::Decision) -> Result<Self::Decision, DecisionStoreError> {
let client = self.get_client_with_retry().await.map_err(|e| DecisionStoreError {
kind: DecisionStoreErrorKind::ClientError,
reason: e.to_string(),
data: None,
})?;

let key_uuid = get_uuid_key(&key)?;
let mut current_count = 0;

let stmt = client
.prepare_cached(
"WITH ins AS (
INSERT INTO xdb(xid, decision)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
RETURNING xid, decision
)
SELECT * from ins
UNION
SELECT xid, decision from xdb where xid = $1",
)
.await
.map_err(|e| DecisionStoreError {
kind: DecisionStoreErrorKind::InsertDecision,
reason: e.to_string(),
data: Some(key.clone()),
})?;

// Execute insert returning the row. If duplicate is found, return the existing row in table.
let result = client.query_one(&stmt, &[&key_uuid, &json!(decision)]).await;
match result {
Ok(row) => {
let decision = match row.get::<&str, Option<Value>>("decision") {
Some(value) => Ok(parse_json_column(&key, value)?),
_ => Err(DecisionStoreError {
kind: DecisionStoreErrorKind::NoRowReturned,
reason: "Insert did not return rows".to_owned(),
let mut result_f: Result<DecisionMessage, DecisionStoreError> = Err(DecisionStoreError {
kind: DecisionStoreErrorKind::InsertDecision,
reason: format!("Max retries exhausted for key={key} to insert decision to XDB"),
data: Some(format!("{:?}", decision)),
});
while current_count <= self.max_retries {
let client_result = self.get_client_with_retry().await.map_err(|e| DecisionStoreError {
kind: DecisionStoreErrorKind::ClientError,
reason: format!("Failed to get client with error {}", e),
data: None,
});

if let Ok(client) = client_result {
let key_uuid = get_uuid_key(&key)?;

let stmt = client
.prepare_cached(
"WITH ins AS (
INSERT INTO xdb(xid, decision)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
RETURNING xid, decision
)
SELECT * from ins
UNION
SELECT xid, decision from xdb where xid = $1",
)
.await
.map_err(|e| DecisionStoreError {
kind: DecisionStoreErrorKind::InsertDecision,
reason: format!("Failed to prepare the insert statement to XDB {}", e),
data: Some(key.clone()),
}),
})?;

// Execute insert returning the row. If duplicate is found, return the existing row in table.
let result = client.query_one(&stmt, &[&key_uuid, &json!(decision)]).await;
match result {
Ok(row) => {
let decision = match row.get::<&str, Option<Value>>("decision") {
Some(value) => Ok(parse_json_column(&key, value)?),
_ => Err(DecisionStoreError {
kind: DecisionStoreErrorKind::NoRowReturned,
reason: "Insert did not return rows".to_owned(),
data: Some(key.clone()),
}),
};

result_f = decision;
debug!("Exiting from okay handle of decision insert result");
break;
}
Err(e) => {
result_f = Err(DecisionStoreError {
kind: DecisionStoreErrorKind::InsertDecision,
reason: format!("Failed to insert decision into XDB with error {}", e),
data: Some(key.clone()),
});

error!("{result_f:#?}");
}
};

return decision;
}
Err(e) => {
return Err(DecisionStoreError {
kind: DecisionStoreErrorKind::InsertDecision,
reason: e.to_string(),
data: Some(key.clone()),
});
} else if let Some(client_err) = client_result.err() {
warn!(
"Error getting connection from pool prior to inserting decision to XDB with reason {}",
client_err.to_string()
);
result_f = Err(client_err);
}

// 10 milliseconds multiplied with 2^current_count.
// eg. if max_retries = 5, then sleep_duration_ms would be 10, 20, 40, 80, 160
let sleep_duration_ms = 10 * 2u64.pow(current_count);
warn!("Retrying inserting to XDB after waiting for {sleep_duration_ms}");
tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await;
current_count += 1;
}

// Ok(())
result_f
}
}

Expand Down
Loading