diff --git a/Cargo.lock b/Cargo.lock index ad3df51f..bb4b64d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -689,6 +689,7 @@ dependencies = [ "camino", "hyper", "hyper-rustls", + "rusqlite", "rustls", "rustls-pemfile", "serde", diff --git a/crates/consul-client/Cargo.toml b/crates/consul-client/Cargo.toml index 0c7b5d55..4167b5b7 100644 --- a/crates/consul-client/Cargo.toml +++ b/crates/consul-client/Cargo.toml @@ -13,6 +13,7 @@ hyper = { workspace = true } hyper-rustls = { workspace = true } rustls = { workspace = true } rustls-pemfile = { workspace = true } +rusqlite = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } diff --git a/crates/consul-client/src/lib.rs b/crates/consul-client/src/lib.rs index e7614652..bfa425f9 100644 --- a/crates/consul-client/src/lib.rs +++ b/crates/consul-client/src/lib.rs @@ -12,6 +12,8 @@ use rustls::{Certificate, PrivateKey, RootCertStore}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_with::{serde_as, NoneAsEmptyString}; +use rusqlite::types::{FromSql, FromSqlError, ValueRef}; + pub mod config; pub use config::Config; @@ -191,6 +193,22 @@ impl ConsulCheckStatus { } } +impl FromSql for ConsulCheckStatus { + fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult { + match value { + ValueRef::Text(s) => Ok(match String::from_utf8_lossy(s).as_ref() { + "passing" => Self::Passing, + "warning" => Self::Warning, + "critical" => Self::Critical, + _ => { + return Err(FromSqlError::InvalidType); + } + }), + _ => Err(FromSqlError::InvalidType), + } + } +} + impl fmt::Display for ConsulCheckStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.as_str().fmt(f) diff --git a/crates/corrosion/src/command/consul/sync.rs b/crates/corrosion/src/command/consul/sync.rs index 0c20d06f..2580fffc 100644 --- a/crates/corrosion/src/command/consul/sync.rs +++ b/crates/corrosion/src/command/consul/sync.rs @@ -84,6 +84,7 @@ pub async fn run>( consul_checks.insert(row.get::<_, String>(0)?, u64::from_be_bytes(row.get(1)?)); } } + update_hashes(&corrosion, node, &mut consul_services, &mut consul_checks).await?; let mut pull_interval = interval(CONSUL_PULL_INTERVAL); @@ -195,6 +196,115 @@ async fn setup( Ok(()) } +async fn update_hashes( + corrosion: &CorrosionClient, + nodename: &str, + service_hashes: &mut HashMap, + check_hashes: &mut HashMap, +) -> eyre::Result<()> { + let (services, checks) = { + let conn = corrosion.pool().get().await?; + + let services = conn + .prepare( + "SELECT id, name, tags, meta, port, address FROM consul_services WHERE node = ?", + )? + .query_map([nodename], |row| { + let tag_json: String = row.get(2)?; + let meta_json: String = row.get(3)?; + Ok(AgentService { + id: row.get(0)?, + name: row.get(1)?, + tags: serde_json::from_str(&tag_json).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 0, + rusqlite::types::Type::Text, + Box::new(e), + ) + })?, + meta: serde_json::from_str(&meta_json).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure( + 0, + rusqlite::types::Type::Text, + Box::new(e), + ) + })?, + port: row.get(4)?, + address: row.get(5)?, + }) + }) + .map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))? + .collect::, _>>()?; + + let checks = conn.prepare("SELECT id, name, status, output, service_id, service_name FROM consul_checks WHERE node = ?")? + .query_map([nodename], |row| { + Ok(AgentCheck{ + id: row.get(0)?, + name: row.get(1)?, + status: row.get(2)?, + output: row.get(3)?, + service_id: row.get(4)?, + service_name: row.get(5)?, + notes: None, + }) + }) + .map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))? + .collect::, _>>()?; + + (services, checks) + }; + + let mut statements = Vec::with_capacity(services.len() + checks.len()); + + let mut svc_insert = vec![]; + let mut checks_insert = vec![]; + + for svc in services { + if service_hashes.get(&svc.id).is_none() { + let hash = hash_service(&svc); + statements.push(Statement::WithParams( + "INSERT INTO __corro_consul_services ( id, hash ) VALUES (?, ?);".into(), + vec![svc.id.clone().into(), hash.to_be_bytes().to_vec().into()], + )); + svc_insert.push((svc.id, hash)); + } + } + + for check in checks { + if check_hashes.get(&check.id).is_none() { + let hash = hash_check(&check); + statements.push(Statement::WithParams( + "INSERT OR REPLACE INTO __corro_consul_checks ( id, hash ) VALUES (?, ?)".into(), + vec![check.id.clone().into(), hash.to_be_bytes().to_vec().into()], + )); + checks_insert.push((check.id, hash)); + } + } + + if !statements.is_empty() { + if let Some(e) = corrosion + .execute(&statements) + .await? + .results + .into_iter() + .find_map(|res| match res { + corro_api_types::ExecResult::Execute { .. } => None, + corro_api_types::ExecResult::Error { error } => Some(error), + }) + { + return Err(ExecuteError::Sqlite(e).into()); + } + } + + for (id, hash) in svc_insert { + service_hashes.insert(id, hash); + } + for (id, hash) in checks_insert { + check_hashes.insert(id, hash); + } + Ok(()) +} + #[derive(Debug, Serialize, Deserialize)] struct ConsulCheckNotesDirectives { hash_include: Vec, @@ -637,6 +747,53 @@ mod tests { ) .await?; + let mut svc_hashes = HashMap::new(); + let mut check_hashes = HashMap::new(); + + // insert some services in the database + let svc0 = AgentService { + id: "service-id0".into(), + name: "service-name0".into(), + tags: vec![], + meta: vec![("app_id".to_string(), "1233".to_string())] + .into_iter() + .collect(), + port: 1337, + address: "127.0.0.2".into(), + }; + + // let conn = ta1_client.pool().get().await?; + let svc0_clone = svc0.clone(); + ta1_client + .execute(&[Statement::WithParams( + "INSERT INTO consul_services ( node, id, name, tags, meta, port, address) + VALUES (?,?,?,?,?,?,?)" + .into(), + vec![ + "node-1".into(), + svc0_clone.id.into(), + svc0_clone.name.into(), + serde_json::to_string(&svc0_clone.tags).unwrap().into(), + serde_json::to_string(&svc0_clone.meta).unwrap().into(), + svc0_clone.port.into(), + svc0_clone.address.into(), + ], + )]) + .await?; + + update_hashes(&ta1_client, "node-1", &mut svc_hashes, &mut check_hashes).await?; + + assert!(!svc_hashes.is_empty()); + { + let conn = ta1_client.pool().get().await?; + let hash: Vec = conn.query_row( + "SELECT hash FROM __corro_consul_services WHERE id = 'service-id0'", + (), + |row| row.get(0), + )?; + assert_eq!(hash, hash_service(&svc0).to_be_bytes().to_vec()); + } + let mut services = HashMap::new(); let svc = AgentService { @@ -651,9 +808,7 @@ mod tests { }; services.insert("service-id".into(), svc.clone()); - - let mut svc_hashes = HashMap::new(); - let mut check_hashes = HashMap::new(); + services.insert("service-id0".into(), svc0.clone()); let (applied, check_applied) = execute("node-1", &ta1_client, update_services(services.clone(), &svc_hashes, false), &mut svc_hashes, Default::default(), &mut check_hashes).await?; @@ -698,10 +853,11 @@ mod tests { { let conn = ta2_client.pool().get().await?; - let app_id: i64 = - conn.query_row("SELECT app_id FROM consul_services LIMIT 1", (), |row| { - row.get(0) - })?; + let app_id: i64 = conn.query_row( + "SELECT app_id FROM consul_services WHERE id = 'service-id'", + (), + |row| row.get(0), + )?; assert_eq!(app_id, 123); } @@ -710,7 +866,7 @@ mod tests { assert!(check_applied.is_zero()); assert_eq!(applied.upserted, 0); - assert_eq!(applied.deleted, 1); + assert_eq!(applied.deleted, 2); assert_eq!(svc_hashes.get("service-id"), None);