diff --git a/astroplant-mqtt-ingest/src/database.rs b/astroplant-mqtt-ingest/src/database.rs index 199550b..d1e4f02 100644 --- a/astroplant-mqtt-ingest/src/database.rs +++ b/astroplant-mqtt-ingest/src/database.rs @@ -27,6 +27,7 @@ pub(crate) struct Db { get_config_and_kit: Statement, insert_raw_measurement: Statement, insert_aggregate_measurement: Statement, + upsert_kit_last_seen: Statement, } const GET_CONFIG_AND_KIT: &str = " @@ -46,6 +47,13 @@ const INSERT_AGGREGATE_MEASUREMENT: &str = " VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "; +const UPSERT_KIT_LAST_SEEN: &str = " + INSERT INTO kit_last_seen (kit_id, datetime_last_seen) + VALUES ($1, NOW()) + ON CONFLICT (kit_id) DO UPDATE + SET datetime_last_seen = EXCLUDED.datetime_last_seen +"; + impl Db { pub(crate) async fn new(client: Client) -> anyhow::Result { let get_config_and_kit = client @@ -83,12 +91,17 @@ impl Db { ) .await?; + let upsert_kit_last_seen = client + .prepare_typed(UPSERT_KIT_LAST_SEEN, &[Type::INT4]) + .await?; + let db = Self { config_cache: RefCell::new(HashMap::new()), client, get_config_and_kit, insert_raw_measurement, insert_aggregate_measurement, + upsert_kit_last_seen, }; Ok(db) } @@ -171,6 +184,10 @@ impl Db { ) .await?; + self.client + .query(&self.upsert_kit_last_seen, &[&config.kit_id]) + .await?; + tracing::trace!( "Inserted raw measurement {} of kit {} and peripheral {}", raw.id, @@ -211,6 +228,10 @@ impl Db { ) .await?; + self.client + .query(&self.upsert_kit_last_seen, &[&config.kit_id]) + .await?; + tracing::trace!( "Inserted aggregate measurement {} of kit {} and peripheral {}", raw.id,