Skip to content

Commit

Permalink
Location Validation Timestamp Cache (#938)
Browse files Browse the repository at this point in the history
* Wifi heartbeat will discard location validation timestamp > 24 hours

location_validation_timestamps are only valid within a 24 hour window of the time the heartbeat was received.

* Protect LocationCache

- rename cache methods to be more like cache methods.
- validation timestamps that are in the past will not be cached.
- validation timestamps older than 24 hours from the heartbeat will not be returned from the database.
- cached locations that are older than 24 hours from the current hearbeat requesting a value will not be returned even if they still have a valid TTL.

* (WIP) Fix integration tests for updated LocationCache

- location validation timestamps must be within the last 24 hours.

- LocationCache will only respect the 12 hour time limit when it's fresh.

* pin production version of postgres for tests

* something is always being cached

reflect this in the name

* Fix test inputs

The initial heartbeat has a validation timestamp that is just barely valid, so when the second heartbeat comes through over 24 hours past the validation timestamp, it will correctly not consider it.

* formatting

* correct comment

* make sure test timestamps and DB timestamps have the same granularity for comparing

truncate to nanoseconds, some postgres DB version only store seconds up
to 5 decimal places.

* printing for debug CI

* truncate to 6 decimal places

* remove debug printing
  • Loading branch information
michaeldjeffrey authored Feb 11, 2025
1 parent a45edef commit bcac912
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
cancel-in-progress: true
services:
postgres:
image: postgres
image: postgres:14.9-alpine
env:
POSTGRES_PASSWORD: postgres
options: >-
Expand Down Expand Up @@ -208,4 +208,4 @@ jobs:
PACKAGECLOUD_API_KEY: ${{ secrets.PACKAGECLOUD_API_KEY }}
run: |
chmod +x ./.github/scripts/make_debian.sh
./.github/scripts/make_debian.sh
./.github/scripts/make_debian.sh
274 changes: 235 additions & 39 deletions mobile_verifier/src/heartbeats/last_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use helium_crypto::PublicKeyBinary;
use retainer::Cache;
use sqlx::PgPool;

#[derive(sqlx::FromRow, Copy, Clone)]
use super::Heartbeat;

#[derive(Debug, sqlx::FromRow, Copy, Clone, PartialEq)]
pub struct LastLocation {
pub location_validation_timestamp: DateTime<Utc>,
pub latest_timestamp: DateTime<Utc>,
Expand All @@ -28,9 +30,31 @@ impl LastLocation {
}
}

/// Calculates the duration from now in which last_valid_timestamp is 12 hours old
pub fn duration_to_expiration(&self) -> Duration {
((self.latest_timestamp + Duration::hours(12)) - Utc::now()).max(Duration::zero())
pub fn from_heartbeat(
heartbeat: &Heartbeat,
location_validation_timestamp: DateTime<Utc>,
) -> Self {
Self::new(
location_validation_timestamp,
heartbeat.timestamp,
heartbeat.lat,
heartbeat.lon,
)
}

fn still_valid(&self, heartbeat_timestamp: DateTime<Utc>) -> bool {
let diff = heartbeat_timestamp - self.location_validation_timestamp;
diff <= Duration::hours(24)
}

fn cache_expiration_duration(&self) -> Option<std::time::Duration> {
// A validation_timestamp is valid for 24 hours past itself,
// but could still be in the past
let until = self.location_validation_timestamp + Duration::hours(24);
let diff = until - Utc::now();

// Converting to_ std() with a negative Duration casts to None
diff.to_std().ok()
}
}

Expand All @@ -56,9 +80,36 @@ impl LocationCache {
}
}

async fn fetch_from_db_and_set(
pub async fn set(&self, hotspot: &PublicKeyBinary, last_location: LastLocation) {
self.cache_last_location(hotspot, Some(last_location)).await;
}

pub async fn get(
&self,
hotspot: &PublicKeyBinary,
heartbeat_timestamp: DateTime<Utc>,
) -> anyhow::Result<Option<LastLocation>> {
let location = match self.locations.get(hotspot).await {
Some(last_location) => {
// The value may still be cached according to the system clock
// but not valid based on the time of the heartbeat in question.
let last = *last_location;
last.filter(|l| l.still_valid(heartbeat_timestamp))
}
None => {
let last = self.fetch_from_db(hotspot, heartbeat_timestamp).await?;
self.cache_last_location(hotspot, last).await;
last
}
};

Ok(location)
}

async fn fetch_from_db(
&self,
hotspot: &PublicKeyBinary,
heartbeat_timestamp: DateTime<Utc>,
) -> anyhow::Result<Option<LastLocation>> {
let last_location: Option<LastLocation> = sqlx::query_as(
r#"
Expand All @@ -67,58 +118,203 @@ impl LocationCache {
WHERE location_validation_timestamp IS NOT NULL
AND latest_timestamp >= $1
AND hotspot_key = $2
AND $3 - location_validation_timestamp <= INTERVAL '24 hours'
ORDER BY latest_timestamp DESC
LIMIT 1
"#,
)
.bind(Utc::now() - Duration::hours(12))
.bind(Utc::now() - Duration::hours(24))
.bind(hotspot)
.bind(heartbeat_timestamp)
.fetch_optional(&self.pool)
.await?;
self.locations
.insert(
hotspot.clone(),
last_location,
last_location
.map(|x| x.duration_to_expiration())
.unwrap_or_else(|| Duration::days(365))
.to_std()?,
)
.await;

Ok(last_location)
}

pub async fn fetch_last_location(
async fn cache_last_location(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
Ok(
if let Some(last_location) = self.locations.get(hotspot).await {
*last_location
} else {
self.fetch_from_db_and_set(hotspot).await?
},
)
last_location: Option<LastLocation>,
) {
match location_with_expiration(last_location) {
Some((last, cache_duration)) => {
self.locations
.insert(hotspot.clone(), Some(last), cache_duration)
.await;
}
None => {
self.locations
.insert(hotspot.clone(), None, Duration::days(365).to_std().unwrap())
.await;
}
}
}

pub async fn set_last_location(
&self,
/// Only used for testing.
pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) {
self.locations.remove(hotspot).await;
}
}

fn location_with_expiration(
last_location: Option<LastLocation>,
) -> Option<(LastLocation, std::time::Duration)> {
let last = last_location?;
let cache_duration = last.cache_expiration_duration()?;
Some((last, cache_duration))
}

#[cfg(test)]
mod tests {
use super::*;

use chrono::{Duration, DurationRound, Utc};
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use uuid::Uuid;

// Make sure test timestamps and DB timestamps have the same granularity.
// 6 decimal places.
fn nanos_trunc(ts: DateTime<Utc>) -> DateTime<Utc> {
ts.duration_trunc(Duration::nanoseconds(1000)).unwrap()
}
fn hour_trunc(ts: DateTime<Utc>) -> DateTime<Utc> {
ts.duration_trunc(Duration::hours(1)).unwrap()
}

async fn insert_heartbeat(
pool: &PgPool,
hotspot: &PublicKeyBinary,
last_location: LastLocation,
received_timestamp: DateTime<Utc>,
validation_timestamp: DateTime<Utc>,
) -> anyhow::Result<()> {
let duration_to_expiration = last_location.duration_to_expiration();
self.locations
.insert(
hotspot.clone(),
Some(last_location),
duration_to_expiration.to_std()?,
)
.await;
sqlx::query(
r#"
INSERT INTO wifi_heartbeats
(
hotspot_key, location_validation_timestamp, latest_timestamp,
truncated_timestamp, coverage_object,
-- hardcoded values
lat, lon, cell_type, distance_to_asserted, location_trust_score_multiplier
)
VALUES
(
$1, $2, $3, $4, $5,
-- harcoded values
0.0, 0.0, 'novagenericwifiindoor', 0, 1000
)
"#,
)
.bind(hotspot)
.bind(nanos_trunc(validation_timestamp))
.bind(nanos_trunc(received_timestamp))
.bind(hour_trunc(received_timestamp))
.bind(Uuid::new_v4())
.execute(pool)
.await?;

Ok(())
}

/// Only used for testing.
pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) {
self.locations.remove(hotspot).await;
fn test_last_location(
latest_timestamp: DateTime<Utc>,
location_validation_timestamp: DateTime<Utc>,
) -> LastLocation {
LastLocation {
location_validation_timestamp: nanos_trunc(location_validation_timestamp),
latest_timestamp: nanos_trunc(latest_timestamp),
lat: 0.0,
lon: 0.0,
}
}

#[sqlx::test]
async fn test_invalid_validation_timestamp(pool: PgPool) -> anyhow::Result<()> {
let now = Utc::now();

let hotspot_one = PublicKeyBinary::from(vec![1]);
let stale_timestamp = now - Duration::hours(24) - Duration::seconds(1);
insert_heartbeat(&pool, &hotspot_one, now, stale_timestamp).await?;

let hotspot_two = PublicKeyBinary::from(vec![2]);
let limit_timestamp = now - Duration::hours(24);
insert_heartbeat(&pool, &hotspot_two, now, limit_timestamp).await?;

let hotspot_three = PublicKeyBinary::from(vec![3]);
let good_timestamp = now - Duration::hours(12);
insert_heartbeat(&pool, &hotspot_three, now, good_timestamp).await?;

let cache = LocationCache::new(&pool);
assert_eq!(
None,
cache.get(&hotspot_one, now).await?,
"Invalid timestamp current"
);
assert_eq!(
Some(test_last_location(now, limit_timestamp)),
cache.get(&hotspot_two, now).await?,
"Limit timestamp current"
);
assert_eq!(
Some(test_last_location(now, good_timestamp)),
cache.get(&hotspot_three, now).await?,
"Good timestamp current"
);

// Moving an 1 day into the future should invalidate all timestamps
// regardless of what has already been cached.
let future = now + Duration::days(1);
assert_eq!(
None,
cache.get(&hotspot_one, future).await?,
"Invalid timestamp future"
);
assert_eq!(
None,
cache.get(&hotspot_two, future).await?,
"Limit timestamp future"
);
assert_eq!(
None,
cache.get(&hotspot_three, future).await?,
"Good timestamp future"
);

Ok(())
}

#[sqlx::test]
async fn will_not_cache_invalid_validation_timestamps(pool: PgPool) -> anyhow::Result<()> {
let cache = LocationCache::new(&pool);

let now = Utc::now();
let validation_timestamp = now - Duration::hours(25);

let hotspot = PublicKeyBinary::from(vec![1]);
let invalid_location = test_last_location(now, validation_timestamp);
cache.set(&hotspot, invalid_location).await;

assert_eq!(None, cache.get(&hotspot, now).await?);

Ok(())
}

#[sqlx::test]
async fn will_cache_valid_validation_timestamps(pool: PgPool) -> anyhow::Result<()> {
let cache = LocationCache::new(&pool);

let now = Utc::now();
let validation_timestamp = now - Duration::hours(12);

let hotspot = PublicKeyBinary::from(vec![1]);
let valid_location = test_last_location(now, validation_timestamp);
cache.set(&hotspot, valid_location).await;

assert_eq!(Some(valid_location), cache.get(&hotspot, now).await?);

Ok(())
}
}
Loading

0 comments on commit bcac912

Please sign in to comment.