Skip to content

Commit

Permalink
Merge pull request #225 from superfly/consul-hash
Browse files Browse the repository at this point in the history
Insert missing hashes of consul services at start of consul sync
  • Loading branch information
somtochiama authored Jun 26, 2024
2 parents c0ba909 + 4f8b519 commit b07c13d
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/consul-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ hyper = { workspace = true }
hyper-rustls = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
rusqlite = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
Expand Down
18 changes: 18 additions & 0 deletions crates/consul-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use rustls::{Certificate, PrivateKey, RootCertStore};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};

use rusqlite::types::{FromSql, FromSqlError, ValueRef};

pub mod config;
pub use config::Config;

Expand Down Expand Up @@ -191,6 +193,22 @@ impl ConsulCheckStatus {
}
}

impl FromSql for ConsulCheckStatus {
fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
match value {
ValueRef::Text(s) => Ok(match String::from_utf8_lossy(s).as_ref() {
"passing" => Self::Passing,
"warning" => Self::Warning,
"critical" => Self::Critical,
_ => {
return Err(FromSqlError::InvalidType);
}
}),
_ => Err(FromSqlError::InvalidType),
}
}
}

impl fmt::Display for ConsulCheckStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.as_str().fmt(f)
Expand Down
172 changes: 164 additions & 8 deletions crates/corrosion/src/command/consul/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub async fn run<P: AsRef<Path>>(
consul_checks.insert(row.get::<_, String>(0)?, u64::from_be_bytes(row.get(1)?));
}
}
update_hashes(&corrosion, node, &mut consul_services, &mut consul_checks).await?;

let mut pull_interval = interval(CONSUL_PULL_INTERVAL);

Expand Down Expand Up @@ -195,6 +196,115 @@ async fn setup(
Ok(())
}

async fn update_hashes(
corrosion: &CorrosionClient,
nodename: &str,
service_hashes: &mut HashMap<String, u64>,
check_hashes: &mut HashMap<String, u64>,
) -> eyre::Result<()> {
let (services, checks) = {
let conn = corrosion.pool().get().await?;

let services = conn
.prepare(
"SELECT id, name, tags, meta, port, address FROM consul_services WHERE node = ?",
)?
.query_map([nodename], |row| {
let tag_json: String = row.get(2)?;
let meta_json: String = row.get(3)?;
Ok(AgentService {
id: row.get(0)?,
name: row.get(1)?,
tags: serde_json::from_str(&tag_json).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(e),
)
})?,
meta: serde_json::from_str(&meta_json).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(e),
)
})?,
port: row.get(4)?,
address: row.get(5)?,
})
})
.map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?
.collect::<Result<Vec<_>, _>>()?;

let checks = conn.prepare("SELECT id, name, status, output, service_id, service_name FROM consul_checks WHERE node = ?")?
.query_map([nodename], |row| {
Ok(AgentCheck{
id: row.get(0)?,
name: row.get(1)?,
status: row.get(2)?,
output: row.get(3)?,
service_id: row.get(4)?,
service_name: row.get(5)?,
notes: None,
})
})
.map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?
.collect::<Result<Vec<_>, _>>()?;

(services, checks)
};

let mut statements = Vec::with_capacity(services.len() + checks.len());

let mut svc_insert = vec![];
let mut checks_insert = vec![];

for svc in services {
if service_hashes.get(&svc.id).is_none() {
let hash = hash_service(&svc);
statements.push(Statement::WithParams(
"INSERT INTO __corro_consul_services ( id, hash ) VALUES (?, ?);".into(),
vec![svc.id.clone().into(), hash.to_be_bytes().to_vec().into()],
));
svc_insert.push((svc.id, hash));
}
}

for check in checks {
if check_hashes.get(&check.id).is_none() {
let hash = hash_check(&check);
statements.push(Statement::WithParams(
"INSERT OR REPLACE INTO __corro_consul_checks ( id, hash ) VALUES (?, ?)".into(),
vec![check.id.clone().into(), hash.to_be_bytes().to_vec().into()],
));
checks_insert.push((check.id, hash));
}
}

if !statements.is_empty() {
if let Some(e) = corrosion
.execute(&statements)
.await?
.results
.into_iter()
.find_map(|res| match res {
corro_api_types::ExecResult::Execute { .. } => None,
corro_api_types::ExecResult::Error { error } => Some(error),
})
{
return Err(ExecuteError::Sqlite(e).into());
}
}

for (id, hash) in svc_insert {
service_hashes.insert(id, hash);
}
for (id, hash) in checks_insert {
check_hashes.insert(id, hash);
}
Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
struct ConsulCheckNotesDirectives {
hash_include: Vec<ConsulCheckField>,
Expand Down Expand Up @@ -637,6 +747,53 @@ mod tests {
)
.await?;

let mut svc_hashes = HashMap::new();
let mut check_hashes = HashMap::new();

// insert some services in the database
let svc0 = AgentService {
id: "service-id0".into(),
name: "service-name0".into(),
tags: vec![],
meta: vec![("app_id".to_string(), "1233".to_string())]
.into_iter()
.collect(),
port: 1337,
address: "127.0.0.2".into(),
};

// let conn = ta1_client.pool().get().await?;
let svc0_clone = svc0.clone();
ta1_client
.execute(&[Statement::WithParams(
"INSERT INTO consul_services ( node, id, name, tags, meta, port, address)
VALUES (?,?,?,?,?,?,?)"
.into(),
vec![
"node-1".into(),
svc0_clone.id.into(),
svc0_clone.name.into(),
serde_json::to_string(&svc0_clone.tags).unwrap().into(),
serde_json::to_string(&svc0_clone.meta).unwrap().into(),
svc0_clone.port.into(),
svc0_clone.address.into(),
],
)])
.await?;

update_hashes(&ta1_client, "node-1", &mut svc_hashes, &mut check_hashes).await?;

assert!(!svc_hashes.is_empty());
{
let conn = ta1_client.pool().get().await?;
let hash: Vec<u8> = conn.query_row(
"SELECT hash FROM __corro_consul_services WHERE id = 'service-id0'",
(),
|row| row.get(0),
)?;
assert_eq!(hash, hash_service(&svc0).to_be_bytes().to_vec());
}

let mut services = HashMap::new();

let svc = AgentService {
Expand All @@ -651,9 +808,7 @@ mod tests {
};

services.insert("service-id".into(), svc.clone());

let mut svc_hashes = HashMap::new();
let mut check_hashes = HashMap::new();
services.insert("service-id0".into(), svc0.clone());

let (applied, check_applied) = execute("node-1", &ta1_client, update_services(services.clone(), &svc_hashes, false), &mut svc_hashes, Default::default(), &mut check_hashes).await?;

Expand Down Expand Up @@ -698,10 +853,11 @@ mod tests {

{
let conn = ta2_client.pool().get().await?;
let app_id: i64 =
conn.query_row("SELECT app_id FROM consul_services LIMIT 1", (), |row| {
row.get(0)
})?;
let app_id: i64 = conn.query_row(
"SELECT app_id FROM consul_services WHERE id = 'service-id'",
(),
|row| row.get(0),
)?;
assert_eq!(app_id, 123);
}

Expand All @@ -710,7 +866,7 @@ mod tests {
assert!(check_applied.is_zero());

assert_eq!(applied.upserted, 0);
assert_eq!(applied.deleted, 1);
assert_eq!(applied.deleted, 2);

assert_eq!(svc_hashes.get("service-id"), None);

Expand Down

0 comments on commit b07c13d

Please sign in to comment.