From 8b59cae41c4f49ed2bb6a58b05a6a0c9a6ebd2a6 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 16 Sep 2024 08:19:32 -0700 Subject: [PATCH] Update Oximeter-related DataStore methods for incoming reconfigurator support (#6518) This PR adds two new `DataStore` methods: * `oximeter_delete` (soft deletes an oximeter instance, which also requires a small migration to add a `time_deleted` column) * `oximeter_reassign_all_producers` (reassigns all metric producers from one oximeter instance to random other instances) and updates existing methods to account for the possibility that an oximeter instance may have been deleted (e.g., don't return deleted instances when listing). There are no callers of these methods in this PR, but there will be in the upcoming reconfigurator-can-rebalance-oximeter PR. --- nexus/db-model/src/oximeter_info.rs | 14 +- nexus/db-model/src/schema.rs | 1 + nexus/db-model/src/schema_versions.rs | 3 +- nexus/db-queries/src/db/datastore/mod.rs | 1 + nexus/db-queries/src/db/datastore/oximeter.rs | 544 +++++++++++++++++- nexus/db-queries/src/db/queries/mod.rs | 1 + nexus/db-queries/src/db/queries/oximeter.rs | 112 ++++ .../output/oximeter_reassign_producers.sql | 20 + schema/crdb/dbinit.sql | 15 +- .../crdb/oximeter-add-time-expunged/up1.sql | 1 + .../crdb/oximeter-add-time-expunged/up2.sql | 4 + 11 files changed, 705 insertions(+), 11 deletions(-) create mode 100644 nexus/db-queries/src/db/queries/oximeter.rs create mode 100644 nexus/db-queries/tests/output/oximeter_reassign_producers.sql create mode 100644 schema/crdb/oximeter-add-time-expunged/up1.sql create mode 100644 schema/crdb/oximeter-add-time-expunged/up2.sql diff --git a/nexus/db-model/src/oximeter_info.rs b/nexus/db-model/src/oximeter_info.rs index 39bde98ea8..5579425a63 100644 --- a/nexus/db-model/src/oximeter_info.rs +++ b/nexus/db-model/src/oximeter_info.rs @@ -9,7 +9,7 @@ use nexus_types::internal_api; use uuid::Uuid; /// A record representing a registered `oximeter` collector. -#[derive(Queryable, Insertable, Debug, Clone, Copy)] +#[derive(Queryable, Insertable, Debug, Clone, Copy, PartialEq, Eq)] #[diesel(table_name = oximeter)] pub struct OximeterInfo { /// The ID for this oximeter instance. @@ -18,6 +18,17 @@ pub struct OximeterInfo { pub time_created: DateTime, /// When this resource was last modified. pub time_modified: DateTime, + /// When this resource was expunged. + // + // We typically refer to _zones_ as expunged; this isn't quite the same + // thing since this is the record of a running Oximeter instance. Some time + // after an Oximeter zone has been expunged (usually not very long!), the + // blueprint_executor RPW will mark the Oximeter instance that was running + // in that zone as expunged, setting this field to a non-None value, which + // will cause it to no longer be chosen as a potential collector for + // producers (and will result in any producers it had been assigned being + // reassigned to some other collector). + pub time_expunged: Option>, /// The address on which this `oximeter` instance listens for requests. pub ip: ipnetwork::IpNetwork, /// The port on which this `oximeter` instance listens for requests. @@ -31,6 +42,7 @@ impl OximeterInfo { id: info.collector_id, time_created: now, time_modified: now, + time_expunged: None, ip: info.address.ip().into(), port: info.address.port().into(), } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 071c7e9229..8f137a7bbf 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -794,6 +794,7 @@ table! { id -> Uuid, time_created -> Timestamptz, time_modified -> Timestamptz, + time_expunged -> Nullable, ip -> Inet, port -> Int4, } diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 77d419efbf..bb5fc294d4 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(97, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(98, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(98, "oximeter-add-time-expunged"), KnownVersion::new(97, "lookup-region-snapshot-by-region-id"), KnownVersion::new(96, "inv-dataset"), KnownVersion::new(95, "turn-boot-on-fault-into-auto-restart"), diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 5b1163dc8b..6eec9500dc 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -115,6 +115,7 @@ pub use dns::DnsVersionUpdateBuilder; pub use instance::{InstanceAndActiveVmm, InstanceGestalt}; pub use inventory::DataStoreInventoryTest; use nexus_db_model::AllSchemaVersions; +pub use oximeter::CollectorReassignment; pub use rack::RackInit; pub use rack::SledUnderlayAllocationResult; pub use region::RegionAllocationFor; diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index 1aa3435cb6..0c4b5077f2 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -15,18 +15,35 @@ use crate::db::model::OximeterInfo; use crate::db::model::ProducerEndpoint; use crate::db::pagination::paginated; use crate::db::pagination::Paginator; +use crate::db::queries; use async_bb8_diesel::AsyncRunQueryDsl; use chrono::DateTime; use chrono::Utc; use diesel::prelude::*; +use diesel::result::DatabaseErrorKind; +use diesel::result::Error as DieselError; +use diesel::sql_types; +use nexus_db_model::ProducerKindEnum; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::ResourceType; use uuid::Uuid; +/// Type returned when reassigning producers from an Oximeter collector. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CollectorReassignment { + /// Success: `n` producers were reassigned to other collector(s). + Complete(usize), + /// Reassignment could not complete because there are no other collectors + /// available. + NoCollectorsAvailable, +} + impl DataStore { /// Lookup an oximeter instance by its ID. + /// + /// Fails if the instance has been expunged. pub async fn oximeter_lookup( &self, opctx: &OpContext, @@ -34,6 +51,7 @@ impl DataStore { ) -> Result { use db::schema::oximeter::dsl; dsl::oximeter + .filter(dsl::time_expunged.is_null()) .find(*id) .first_async(&*self.pool_connection_authorized(opctx).await?) .await @@ -48,10 +66,19 @@ impl DataStore { ) -> Result<(), Error> { use db::schema::oximeter::dsl; - // If we get a conflict on the Oximeter ID, this means that collector instance was - // previously registered, and it's re-registering due to something like a service restart. - // In this case, we update the time modified and the service address, rather than - // propagating a constraint violation to the caller. + // If we get a conflict on the Oximeter ID, this means that collector + // instance was previously registered, and it's re-registering due to + // something like a service restart. In this case, we update the time + // modified and the service address, rather than propagating a + // constraint violation to the caller. + // + // TODO-completeness - We should return an error if `info.id()` maps to + // an existing row that has been expunged. We don't expect that to + // happen in practice (it would mean an expunged Oximeter zone has come + // back to life and reregistered itself). If it does happen, as written + // we'll update time_modified/ip/port but leave time_expunged set to + // whatever it was (which will leave the Oximeter in the "expunged" + // state). diesel::insert_into(dsl::oximeter) .values(*info) .on_conflict(dsl::id) @@ -75,7 +102,33 @@ impl DataStore { Ok(()) } + /// Mark an Oximeter instance as expunged + /// + /// This method is idempotent and has no effect if called with the ID for an + /// already-expunged Oximeter. + pub async fn oximeter_expunge( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result<(), Error> { + use db::schema::oximeter::dsl; + + let now = Utc::now(); + + diesel::update(dsl::oximeter) + .filter(dsl::time_expunged.is_null()) + .filter(dsl::id.eq(id)) + .set(dsl::time_expunged.eq(now)) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(()) + } + /// List the oximeter collector instances + /// + /// Omits expunged instances. pub async fn oximeter_list( &self, opctx: &OpContext, @@ -83,6 +136,7 @@ impl DataStore { ) -> ListResultVec { use db::schema::oximeter::dsl; paginated(dsl::oximeter, dsl::id, page_params) + .filter(dsl::time_expunged.is_null()) .load_async::( &*self.pool_connection_authorized(opctx).await?, ) @@ -90,17 +144,77 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// Reassign all metric producers currently assigned to Oximeter `id` + /// + /// The new Oximeter instance for each producer will be randomly selected + /// from all available Oximeters. On success, returns the number of metric + /// producers reassigned. Fails if there are no available Oximeter instances + /// (e.g., all Oximeter instances have been expunged). + pub async fn oximeter_reassign_all_producers( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result { + match queries::oximeter::reassign_producers_query(id) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + { + Ok(n) => Ok(CollectorReassignment::Complete(n)), + Err(DieselError::DatabaseError( + DatabaseErrorKind::NotNullViolation, + _, + )) => Ok(CollectorReassignment::NoCollectorsAvailable), + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), + } + } + /// Create a record for a new producer endpoint pub async fn producer_endpoint_create( &self, opctx: &OpContext, producer: &ProducerEndpoint, ) -> Result<(), Error> { + // Our caller has already chosen an Oximeter instance for this producer, + // but we don't want to allow it to use a nonexistent or expunged + // Oximeter. This query turns into a `SELECT all_the_fields_of_producer + // WHERE producer.oximeter_id is legal` in a diesel-compatible way. I'm + // not aware of a helper method to generate "all the fields of + // `producer`", so instead we have a big tuple of its fields that must + // stay in sync with the `table!` definition and field ordering for the + // `metric_producer` table. The compiler will catch any mistakes + // _except_ incorrect orderings where the types still line up (e.g., + // swapping two Uuid columns), which is not ideal but is hopefully good + // enough. + let producer_subquery = { + use db::schema::oximeter::dsl; + + dsl::oximeter + .select(( + producer.id().into_sql::(), + producer + .time_created() + .into_sql::(), + producer + .time_modified() + .into_sql::(), + producer.kind.into_sql::(), + producer.ip.into_sql::(), + producer.port.into_sql::(), + producer.interval.into_sql::(), + producer.oximeter_id.into_sql::(), + )) + .filter( + dsl::id + .eq(producer.oximeter_id) + .and(dsl::time_expunged.is_null()), + ) + }; + use db::schema::metric_producer::dsl; // TODO: see https://github.com/oxidecomputer/omicron/issues/323 - diesel::insert_into(dsl::metric_producer) - .values(producer.clone()) + let n = diesel::insert_into(dsl::metric_producer) + .values(producer_subquery) .on_conflict(dsl::id) .do_update() .set(( @@ -121,7 +235,22 @@ impl DataStore { ), ) })?; - Ok(()) + + // We expect `n` to basically always be 1 (1 row was inserted or + // updated). It can be 0 if `producer.oximeter_id` doesn't exist or has + // been expunged. It can never be 2 or greater because + // `producer_subquery` filters on finding an exact row for its Oximeter + // instance's ID. + match n { + 0 => Err(Error::not_found_by_id( + ResourceType::Oximeter, + &producer.oximeter_id, + )), + 1 => Ok(()), + _ => Err(Error::internal_error(&format!( + "multiple rows inserted ({n}) in `producer_endpoint_create`" + ))), + } } /// Delete a record for a producer endpoint, by its ID. @@ -226,6 +355,7 @@ mod tests { use db::datastore::pub_test_utils::datastore_test; use nexus_test_utils::db::test_setup_database; use nexus_types::internal_api::params; + use omicron_common::api::external::LookupType; use omicron_common::api::internal::nexus; use omicron_test_utils::dev; use std::time::Duration; @@ -272,6 +402,406 @@ mod tests { expired_batched } + #[tokio::test] + async fn test_oximeter_expunge() { + // Setup + let logctx = dev::test_setup_log("test_oximeter_expunge"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let mut collector_ids = + (0..4).map(|_| Uuid::new_v4()).collect::>(); + + // Sort the IDs for easier comparisons later. + collector_ids.sort(); + + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Ensure all our collectors exist and aren't expunged. + let mut all_collectors = datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) + .await + .expect("listed collectors"); + all_collectors.sort_by_key(|info| info.id); + assert_eq!(all_collectors.len(), collector_ids.len()); + for (info, &expected_id) in all_collectors.iter().zip(&collector_ids) { + assert_eq!(info.id, expected_id); + assert!(info.time_expunged.is_none()); + } + + // Delete the first two of them. + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged collector"); + datastore + .oximeter_expunge(&opctx, collector_ids[1]) + .await + .expect("expunged collector"); + + // Ensure those two were expunged. + let mut all_collectors = datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) + .await + .expect("listed collectors"); + all_collectors.sort_by_key(|info| info.id); + assert_eq!(all_collectors.len(), collector_ids.len() - 2); + for (info, &expected_id) in + all_collectors.iter().zip(&collector_ids[2..]) + { + assert_eq!(info.id, expected_id); + assert!(info.time_expunged.is_none()); + } + + // Deletion is idempotent. To test, we'll read the expunged rows + // directly, expunge them again, and confirm the row contents haven't + // changed. + let find_oximeter_ignoring_expunged = |id| { + let datastore = &datastore; + let opctx = &opctx; + async move { + let conn = datastore + .pool_connection_authorized(opctx) + .await + .expect("acquired connection"); + use db::schema::oximeter::dsl; + let info: OximeterInfo = dsl::oximeter + .find(id) + .first_async(&*conn) + .await + .expect("found Oximeter by ID"); + info + } + }; + let expunged0a = + find_oximeter_ignoring_expunged(collector_ids[0]).await; + let expunged1a = + find_oximeter_ignoring_expunged(collector_ids[1]).await; + assert!(expunged0a.time_expunged.is_some()); + assert!(expunged1a.time_expunged.is_some()); + + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged collector"); + datastore + .oximeter_expunge(&opctx, collector_ids[1]) + .await + .expect("expunged collector"); + + let expunged0b = + find_oximeter_ignoring_expunged(collector_ids[0]).await; + let expunged1b = + find_oximeter_ignoring_expunged(collector_ids[1]).await; + assert_eq!(expunged0a, expunged0b); + assert_eq!(expunged1a, expunged1b); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_producer_endpoint_create_rejects_expunged_oximeters() { + // Setup + let logctx = dev::test_setup_log( + "test_producer_endpoint_create_rejects_expunged_oximeters", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // We can insert metric producers for each collector. + for &collector_id in &collector_ids { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + + // Delete the first collector. + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged collector"); + + // Attempting to insert a producer assigned to the first collector + // should fail, now that it's expunged. + let err = { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_ids[0], + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect_err("producer creation fails") + }; + assert_eq!( + err, + Error::ObjectNotFound { + type_name: ResourceType::Oximeter, + lookup_type: LookupType::ById(collector_ids[0]) + } + ); + + // We can still insert metric producers for the other collectors... + for &collector_id in &collector_ids[1..] { + let mut producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + + // ... and we can update them. + producer.port = 100.into(); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_oximeter_reassigns_randomly() { + // Setup + let logctx = dev::test_setup_log("test_oximeter_reassigns_randomly"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Insert 250 metric producers assigned to each collector. + for &collector_id in &collector_ids { + for _ in 0..250 { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + } + + // Delete one collector. + datastore + .oximeter_expunge(&opctx, collector_ids[0]) + .await + .expect("expunged Oximeter"); + + // Reassign producers that belonged to that collector. + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_ids[0]) + .await + .expect("reassigned producers"); + assert_eq!(num_reassigned, CollectorReassignment::Complete(250)); + + // Check the distribution of producers for each of the remaining + // collectors. We don't know the exact count, so we'll check that: + // + // * Each of the three remaining collectors gained at least one (the + // probability that any of the three collectors gained zero is low + // enough that most calculators give up and call it 0) + // * All 1000 producers are assigned to one of the three collectors + // + // to guard against "the reassignment query gave all 250 to exactly one + // of the remaining collectors", which is an easy failure mode for this + // kind of SQL query, where the query engine only evaluates the + // randomness once instead of once for each producer. + let mut producer_counts = [0; 4]; + for i in 0..4 { + producer_counts[i] = datastore + .producers_list_by_oximeter_id( + &opctx, + collector_ids[i], + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .len(); + } + assert_eq!(producer_counts[0], 0); // all reassigned + assert!(producer_counts[1] > 250); // gained at least one + assert!(producer_counts[2] > 250); // gained at least one + assert!(producer_counts[3] > 250); // gained at least one + assert_eq!(producer_counts[1..].iter().sum::(), 1000); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_oximeter_reassign_fails_if_no_collectors() { + // Setup + let logctx = dev::test_setup_log( + "test_oximeter_reassign_fails_if_no_collectors", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert a few Oximeter collectors. + let collector_ids = (0..4).map(|_| Uuid::new_v4()).collect::>(); + for &collector_id in &collector_ids { + let info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }); + datastore + .oximeter_create(&opctx, &info) + .await + .expect("inserted collector"); + } + + // Insert 10 metric producers assigned to each collector. + for &collector_id in &collector_ids { + for _ in 0..10 { + let producer = ProducerEndpoint::new( + &nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }, + collector_id, + ); + datastore + .producer_endpoint_create(&opctx, &producer) + .await + .expect("created producer"); + } + } + + // Delete all four collectors. + for &collector_id in &collector_ids { + datastore + .oximeter_expunge(&opctx, collector_id) + .await + .expect("expunged Oximeter"); + } + + // Try to reassign producers that belonged to each collector; this + // should fail, as all collectors have been expunged. + for &collector_id in &collector_ids { + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_id) + .await + .expect("reassigned producers"); + assert_eq!( + num_reassigned, + CollectorReassignment::NoCollectorsAvailable + ); + } + + // Now insert a new collector. + let new_collector_id = Uuid::new_v4(); + datastore + .oximeter_create( + &opctx, + &OximeterInfo::new(¶ms::OximeterInfo { + collector_id: new_collector_id, + address: "[::1]:0".parse().unwrap(), // unused + }), + ) + .await + .expect("inserted collector"); + + // Reassigning the original four collectors should now all succeed. + for &collector_id in &collector_ids { + let num_reassigned = datastore + .oximeter_reassign_all_producers(&opctx, collector_id) + .await + .expect("reassigned producers"); + assert_eq!(num_reassigned, CollectorReassignment::Complete(10)); + } + + // All 40 producers should be assigned to our new collector. + let nproducers = datastore + .producers_list_by_oximeter_id( + &opctx, + new_collector_id, + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .len(); + assert_eq!(nproducers, 40); + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + #[tokio::test] async fn test_producers_list_expired() { // Setup diff --git a/nexus/db-queries/src/db/queries/mod.rs b/nexus/db-queries/src/db/queries/mod.rs index f88b8fab6d..02800e3a3c 100644 --- a/nexus/db-queries/src/db/queries/mod.rs +++ b/nexus/db-queries/src/db/queries/mod.rs @@ -11,6 +11,7 @@ pub mod ip_pool; #[macro_use] mod next_item; pub mod network_interface; +pub mod oximeter; pub mod region_allocation; pub mod virtual_provisioning_collection_update; pub mod volume; diff --git a/nexus/db-queries/src/db/queries/oximeter.rs b/nexus/db-queries/src/db/queries/oximeter.rs new file mode 100644 index 0000000000..40f7a2b493 --- /dev/null +++ b/nexus/db-queries/src/db/queries/oximeter.rs @@ -0,0 +1,112 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Implementation of queries for Oximeter collectors and producers. + +use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; +use diesel::sql_types; +use uuid::Uuid; + +/// For a given Oximeter instance (which is presumably no longer running), +/// reassign any producers assigned to it to a different Oximeter. Each +/// assignment is randomly chosen from among the non-expunged Oximeter instances +/// recorded in the `oximeter` table. +pub fn reassign_producers_query(oximeter_id: Uuid) -> TypedSqlQuery<()> { + let builder = QueryBuilder::new(); + + // Find all non-expunged Oximeter instances. + let builder = builder.sql( + "\ + WITH available_oximeters AS ( \ + SELECT ARRAY( \ + SELECT id FROM oximeter WHERE time_expunged IS NULL + ) AS ids \ + ), ", + ); + + // Create a mapping of producer ID <-> new, random, non-expunged Oximeter ID + // for every producer assigned to `oximeter_id`. If the `ids` array from the + // previous expression is empty, every `new_id` column in this expression + // will be NULL. We'll catch that in the update below. + let builder = builder + .sql( + "\ + new_assignments AS ( \ + SELECT + metric_producer.id AS producer_id, + ids[1 + floor(random() * array_length(ids, 1)::float)::int] + AS new_id + FROM metric_producer + LEFT JOIN available_oximeters ON true + WHERE oximeter_id = ", + ) + .param() + .sql(")") + .bind::(oximeter_id); + + // Actually perform the update. If the `new_id` column from the previous + // step is `NULL` (because there aren't any non-expunged Oximeter + // instances), this will fail the `NOT NULL` constraint on the oximeter_id + // column. + let builder = builder + .sql( + "\ + UPDATE metric_producer SET oximeter_id = ( \ + SELECT new_id FROM new_assignments \ + WHERE new_assignments.producer_id = metric_producer.id \ + ) WHERE oximeter_id = ", + ) + .param() + .bind::(oximeter_id); + + builder.query() +} + +#[cfg(test)] +mod test { + use super::*; + use crate::db::explain::ExplainableAsync; + use crate::db::raw_query_builder::expectorate_query_contents; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + use uuid::Uuid; + + // This test is a bit of a "change detector", but it's here to help with + // debugging too. If you change this query, it can be useful to see exactly + // how the output SQL has been altered. + #[tokio::test] + async fn expectorate_query() { + let oximeter_id = Uuid::nil(); + + let query = reassign_producers_query(oximeter_id); + + expectorate_query_contents( + &query, + "tests/output/oximeter_reassign_producers.sql", + ) + .await; + } + + // Explain the SQL query to ensure that it creates a valid SQL string. + #[tokio::test] + async fn explainable() { + let logctx = dev::test_setup_log("explainable"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); + + let oximeter_id = Uuid::nil(); + + let query = reassign_producers_query(oximeter_id); + let _ = query + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/tests/output/oximeter_reassign_producers.sql b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql new file mode 100644 index 0000000000..4ef88a800a --- /dev/null +++ b/nexus/db-queries/tests/output/oximeter_reassign_producers.sql @@ -0,0 +1,20 @@ +WITH + available_oximeters + AS (SELECT ARRAY (SELECT id FROM oximeter WHERE time_expunged IS NULL) AS ids), + new_assignments + AS ( + SELECT + metric_producer.id AS producer_id, + ids[1 + floor(random() * array_length(ids, 1)::FLOAT8)::INT8] AS new_id + FROM + metric_producer LEFT JOIN available_oximeters ON true + WHERE + oximeter_id = $1 + ) +UPDATE + metric_producer +SET + oximeter_id + = (SELECT new_id FROM new_assignments WHERE new_assignments.producer_id = metric_producer.id) +WHERE + oximeter_id = $2 diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 9089c0032a..e26626818d 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -1369,9 +1369,20 @@ CREATE TABLE IF NOT EXISTS omicron.public.oximeter ( time_created TIMESTAMPTZ NOT NULL, time_modified TIMESTAMPTZ NOT NULL, ip INET NOT NULL, - port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL + port INT4 CHECK (port BETWEEN 0 AND 65535) NOT NULL, + time_expunged TIMESTAMPTZ ); +/* + * The query Nexus runs to choose an Oximeter instance for new metric producers + * involves listing the non-expunged instances sorted by ID, which would require + * a full table scan without this index. + */ +CREATE UNIQUE INDEX IF NOT EXISTS list_non_expunged_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_expunged IS NULL; + /* * The kind of metric producer each record corresponds to. */ @@ -4288,7 +4299,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '97.0.0', NULL) + (TRUE, NOW(), NOW(), '98.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/oximeter-add-time-expunged/up1.sql b/schema/crdb/oximeter-add-time-expunged/up1.sql new file mode 100644 index 0000000000..f6915d5765 --- /dev/null +++ b/schema/crdb/oximeter-add-time-expunged/up1.sql @@ -0,0 +1 @@ +ALTER TABLE omicron.public.oximeter ADD COLUMN IF NOT EXISTS time_expunged TIMESTAMPTZ; diff --git a/schema/crdb/oximeter-add-time-expunged/up2.sql b/schema/crdb/oximeter-add-time-expunged/up2.sql new file mode 100644 index 0000000000..710fe99ffe --- /dev/null +++ b/schema/crdb/oximeter-add-time-expunged/up2.sql @@ -0,0 +1,4 @@ +CREATE UNIQUE INDEX IF NOT EXISTS list_non_expunged_oximeter ON omicron.public.oximeter ( + id +) WHERE + time_expunged IS NULL;