diff --git a/Cargo.lock b/Cargo.lock index 7958d1363..c052ef265 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4196,6 +4196,7 @@ dependencies = [ "serde_json", "sha2 0.10.6", "sqlx", + "task-manager", "thiserror", "tokio", "tokio-stream", diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 773f53425..e02b72a4c 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -209,7 +209,7 @@ impl FileSinkClient { Err(Error::channel()) } Err(SendTimeoutError::Timeout(_)) => { - tracing::error!("file_sink write failed due to send timeout"); + tracing::error!("file_sink write failed for {:?} due to send timeout", self.metric); Err(Error::SendTimeout) } }, @@ -222,7 +222,10 @@ impl FileSinkClient { .send(Message::Commit(on_commit_tx)) .await .map_err(|e| { - tracing::error!("file_sink failed to commit with {e:?}"); + tracing::error!( + "file_sink failed to commit for {:?} with {e:?}", + self.metric + ); Error::channel() }) .map(|_| on_commit_rx) @@ -234,7 +237,10 @@ impl FileSinkClient { .send(Message::Rollback(on_rollback_tx)) .await .map_err(|e| { - tracing::error!("file_sink failed to rollback with {e:?}"); + tracing::error!( + "file_sink failed to rollback for {:?} with {e:?}", + self.metric + ); Error::channel() }) .map(|_| on_rollback_rx) diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 30020ba2c..ef6668c4a 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -47,6 +47,7 @@ rand = {workspace = true} async-trait = {workspace = true} retainer = {workspace = true} uuid = {workspace = true} +task-manager = {path = "../task_manager"} [dev-dependencies] backon = "0" diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 16cd55b4e..8dbea02ed 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -5,7 +5,7 @@ use crate::{ speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry, Settings, }; -use anyhow::{Error, Result}; +use anyhow::Result; use chrono::Duration; use file_store::{ coverage::CoverageObjectIngestReport, file_info_poller::LookbackBehavior, file_sink, @@ -14,12 +14,11 @@ use file_store::{ speedtest::CellSpeedtestIngestReport, wifi_heartbeat::WifiHeartbeatIngestReport, FileStore, FileType, }; -use futures_util::TryFutureExt; use mobile_config::client::{ entity_client::EntityClient, AuthorizationClient, CarrierServiceClient, GatewayClient, }; use price::PriceTracker; -use tokio::signal; +use task_manager::TaskManager; #[derive(Debug, clap::Args)] pub struct Cmd {} @@ -28,23 +27,13 @@ impl Cmd { pub async fn run(self, settings: &Settings) -> Result<()> { poc_metrics::start_metrics(&settings.metrics)?; - let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?; - tokio::spawn(async move { - tokio::select! { - _ = sigterm.recv() => shutdown_trigger.trigger(), - _ = signal::ctrl_c() => shutdown_trigger.trigger(), - } - }); - let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; sqlx::migrate!().run(&pool).await?; telemetry::initialize(&pool).await?; - let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); - let file_upload = - file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?; + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_settings_tm(&settings.output).await?; let store_base_path = std::path::Path::new(&settings.cache); @@ -58,8 +47,7 @@ impl Cmd { let carrier_client = CarrierServiceClient::from_settings(&settings.config_client)?; // price tracker - let (price_tracker, tracker_process) = - PriceTracker::start(&settings.price_tracker, shutdown_listener.clone()).await?; + let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; // CBRS Heartbeats let (cbrs_heartbeats, cbrs_heartbeats_server) = @@ -70,9 +58,6 @@ impl Cmd { .prefix(FileType::CbrsHeartbeatIngestReport.to_string()) .queue_size(1) .create()?; - let cbrs_heartbeats_join_handle = cbrs_heartbeats_server - .start(shutdown_listener.clone()) - .await?; // Wifi Heartbeats let (wifi_heartbeats, wifi_heartbeats_server) = @@ -82,16 +67,13 @@ impl Cmd { .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::WifiHeartbeatIngestReport.to_string()) .create()?; - let wifi_heartbeats_join_handle = wifi_heartbeats_server - .start(shutdown_listener.clone()) - .await?; let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new( FileType::ValidatedHeartbeat, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_heartbeat"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .roll_time(Duration::minutes(15)) .create() @@ -103,7 +85,7 @@ impl Cmd { store_base_path, concat!(env!("CARGO_PKG_NAME"), "_seniority_update"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .roll_time(Duration::minutes(15)) .create() @@ -139,14 +121,13 @@ impl Cmd { .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::CellSpeedtestIngestReport.to_string()) .create()?; - let speedtests_join_handle = speedtests_server.start(shutdown_listener.clone()).await?; let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new( FileType::SpeedtestAvg, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .roll_time(Duration::minutes(15)) .create() @@ -157,7 +138,7 @@ impl Cmd { store_base_path, concat!(env!("CARGO_PKG_NAME"), "_verified_speedtest"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .roll_time(Duration::minutes(15)) .create() @@ -179,16 +160,13 @@ impl Cmd { .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::CoverageObjectIngestReport.to_string()) .create()?; - let coverage_objs_join_handle = coverage_objs_server - .start(shutdown_listener.clone()) - .await?; let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new( FileType::CoverageObject, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_coverage_object"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .roll_time(Duration::minutes(15)) .create() @@ -208,7 +186,7 @@ impl Cmd { store_base_path, concat!(env!("CARGO_PKG_NAME"), "_radio_reward_shares"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .create() .await?; @@ -218,7 +196,7 @@ impl Cmd { store_base_path, concat!(env!("CARGO_PKG_NAME"), "_reward_manifest"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .create() .await?; @@ -241,9 +219,6 @@ impl Cmd { .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::SubscriberLocationIngestReport.to_string()) .create()?; - let subscriber_location_ingest_join_handle = subscriber_location_ingest_server - .start(shutdown_listener.clone()) - .await?; let (verified_subscriber_location, verified_subscriber_location_server) = file_sink::FileSinkBuilder::new( @@ -251,7 +226,7 @@ impl Cmd { store_base_path, concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .create() .await?; @@ -272,62 +247,34 @@ impl Cmd { .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::ValidDataTransferSession.to_string()) .create()?; - let data_session_ingest_join_handle = data_session_ingest_server - .start(shutdown_listener.clone()) - .await?; - - let data_session_ingestor = DataSessionIngestor::new(pool.clone()); - - tokio::try_join!( - valid_heartbeats_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - speedtests_avg_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - speedtests_validity_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - valid_coverage_objs_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - seniority_updates_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - mobile_rewards_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - file_upload - .run(shutdown_listener.clone()) - .map_err(Error::from), - reward_manifests_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - verified_subscriber_location_server - .run(shutdown_listener.clone()) - .map_err(Error::from), - subscriber_location_ingestor - .run(&shutdown_listener) - .map_err(Error::from), - data_session_ingestor - .run(data_session_ingest, shutdown_listener.clone()) - .map_err(Error::from), - tracker_process.map_err(Error::from), - cbrs_heartbeats_join_handle.map_err(Error::from), - wifi_heartbeats_join_handle.map_err(Error::from), - speedtests_join_handle.map_err(Error::from), - coverage_objs_join_handle.map_err(Error::from), - cbrs_heartbeat_daemon.run(shutdown_listener.clone()), - wifi_heartbeat_daemon.run(shutdown_listener.clone()), - speedtest_daemon.run(shutdown_listener.clone()), - coverage_daemon.run(shutdown_listener.clone()), - rewarder.run(shutdown_listener.clone()), - subscriber_location_ingest_join_handle.map_err(anyhow::Error::from), - data_session_ingest_join_handle.map_err(anyhow::Error::from), - )?; - - tracing::info!("Shutting down verifier server"); - Ok(()) + let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest); + + TaskManager::builder() + .add_task(file_upload_server) + .add_task(cbrs_heartbeats_server) + .add_task(wifi_heartbeats_server) + .add_task(valid_heartbeats_server) + .add_task(speedtests_avg_server) + .add_task(speedtests_validity_server) + .add_task(valid_coverage_objs_server) + .add_task(seniority_updates_server) + .add_task(mobile_rewards_server) + .add_task(reward_manifests_server) + .add_task(verified_subscriber_location_server) + .add_task(subscriber_location_ingestor) + .add_task(data_session_ingest_server) + .add_task(price_daemon) + .add_task(cbrs_heartbeat_daemon) + .add_task(wifi_heartbeat_daemon) + .add_task(speedtests_server) + .add_task(coverage_objs_server) + .add_task(speedtest_daemon) + .add_task(coverage_daemon) + .add_task(rewarder) + .add_task(subscriber_location_ingest_server) + .add_task(data_session_ingestor) + .start() + .await } } diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 9e61afe23..d6ed6c2e3 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -28,6 +28,7 @@ use retainer::{entry::CacheReadGuard, Cache}; use rust_decimal::Decimal; use rust_decimal_macros::dec; use sqlx::{FromRow, PgPool, Pool, Postgres, QueryBuilder, Transaction, Type}; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; use uuid::Uuid; @@ -80,27 +81,22 @@ impl CoverageDaemon { } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - tokio::spawn(async move { - loop { - #[rustfmt::skip] - tokio::select! { - _ = shutdown.clone() => { - tracing::info!("CoverageDaemon shutting down"); - break; - } - Some(file) = self.coverage_objs.recv() => { - let start = Instant::now(); - self.process_file(file).await?; - metrics::histogram!("coverage_object_processing_time", start.elapsed()); - } + loop { + #[rustfmt::skip] + tokio::select! { + _ = shutdown.clone() => { + tracing::info!("CoverageDaemon shutting down"); + break; + } + Some(file) = self.coverage_objs.recv() => { + let start = Instant::now(); + self.process_file(file).await?; + metrics::histogram!("coverage_object_processing_time", start.elapsed()); } } + } - Ok(()) - }) - .map_err(anyhow::Error::from) - .and_then(|result| async move { result }) - .await + Ok(()) } async fn process_file( @@ -131,6 +127,20 @@ impl CoverageDaemon { } } +impl ManagedTask for CoverageDaemon { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + pub struct CoverageObject { pub coverage_object: file_store::coverage::CoverageObject, pub validity: CoverageObjectValidity, diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 3f4a24679..0c2f3f552 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -9,9 +9,11 @@ use helium_proto::ServiceProvider; use rust_decimal::Decimal; use sqlx::{PgPool, Postgres, Row, Transaction}; use std::{collections::HashMap, ops::Range, time::Instant}; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; pub struct DataSessionIngestor { + pub receiver: Receiver>, pub pool: PgPool, } @@ -30,15 +32,14 @@ pub struct ServiceProviderDataSession { pub type HotspotMap = HashMap; impl DataSessionIngestor { - pub fn new(pool: sqlx::Pool) -> Self { - Self { pool } + pub fn new( + pool: sqlx::Pool, + receiver: Receiver>, + ) -> Self { + Self { pool, receiver } } - pub async fn run( - self, - mut receiver: Receiver>, - shutdown: triggered::Listener, - ) -> anyhow::Result<()> { + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("starting DataSessionIngestor"); tokio::spawn(async move { loop { @@ -49,7 +50,7 @@ impl DataSessionIngestor { tracing::info!("DataSessionIngestor shutting down"); break; } - Some(file) = receiver.recv() => { + Some(file) = self.receiver.recv() => { let start = Instant::now(); self.process_file(file).await?; metrics::histogram!( @@ -94,6 +95,20 @@ impl DataSessionIngestor { } } +impl ManagedTask for DataSessionIngestor { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + #[derive(sqlx::FromRow)] pub struct HotspotDataSession { pub pub_key: PublicKeyBinary, diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 2807bfc14..f7f753f32 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -15,6 +15,7 @@ use std::{ sync::Arc, time::{self, Instant}, }; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; pub struct HeartbeatDaemon { @@ -56,46 +57,41 @@ where } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - tokio::spawn(async move { - tracing::info!("Starting CBRS HeartbeatDaemon"); - let heartbeat_cache = Arc::new(Cache::<(String, DateTime), ()>::new()); + tracing::info!("Starting CBRS HeartbeatDaemon"); + let heartbeat_cache = Arc::new(Cache::<(String, DateTime), ()>::new()); - let heartbeat_cache_clone = heartbeat_cache.clone(); - tokio::spawn(async move { - heartbeat_cache_clone - .monitor(4, 0.25, time::Duration::from_secs(60 * 60 * 3)) - .await - }); + let heartbeat_cache_clone = heartbeat_cache.clone(); + tokio::spawn(async move { + heartbeat_cache_clone + .monitor(4, 0.25, time::Duration::from_secs(60 * 60 * 3)) + .await + }); - let coverage_claim_time_cache = CoverageClaimTimeCache::new(); - let coverage_object_cache = CoverageObjectCache::new(&self.pool); + let coverage_claim_time_cache = CoverageClaimTimeCache::new(); + let coverage_object_cache = CoverageObjectCache::new(&self.pool); - loop { - #[rustfmt::skip] - tokio::select! { - biased; - _ = shutdown.clone() => { - tracing::info!("CBRS HeartbeatDaemon shutting down"); - break; - } - Some(file) = self.heartbeats.recv() => { - let start = Instant::now(); - self.process_file( - file, - &heartbeat_cache, - &coverage_claim_time_cache, - &coverage_object_cache, - ).await?; - metrics::histogram!("cbrs_heartbeat_processing_time", start.elapsed()); - } + loop { + #[rustfmt::skip] + tokio::select! { + biased; + _ = shutdown.clone() => { + tracing::info!("CBRS HeartbeatDaemon shutting down"); + break; + } + Some(file) = self.heartbeats.recv() => { + let start = Instant::now(); + self.process_file( + file, + &heartbeat_cache, + &coverage_claim_time_cache, + &coverage_object_cache, + ).await?; + metrics::histogram!("cbrs_heartbeat_processing_time", start.elapsed()); } } + } - Ok(()) - }) - .map_err(anyhow::Error::from) - .and_then(|result| async move { result }) - .await + Ok(()) } async fn process_file( @@ -136,3 +132,20 @@ where Ok(()) } } + +impl ManagedTask for HeartbeatDaemon +where + GIR: GatewayResolver, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 761654618..11cf176cc 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -14,6 +14,7 @@ use std::{ sync::Arc, time::{self, Instant}, }; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; pub struct HeartbeatDaemon { @@ -55,46 +56,41 @@ where } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - tokio::spawn(async move { - tracing::info!("Starting Wifi HeartbeatDaemon"); - let heartbeat_cache = Arc::new(Cache::<(String, DateTime), ()>::new()); + tracing::info!("Starting Wifi HeartbeatDaemon"); + let heartbeat_cache = Arc::new(Cache::<(String, DateTime), ()>::new()); - let heartbeat_cache_clone = heartbeat_cache.clone(); - tokio::spawn(async move { - heartbeat_cache_clone - .monitor(4, 0.25, time::Duration::from_secs(60 * 60 * 3)) - .await - }); + let heartbeat_cache_clone = heartbeat_cache.clone(); + tokio::spawn(async move { + heartbeat_cache_clone + .monitor(4, 0.25, time::Duration::from_secs(60 * 60 * 3)) + .await + }); - let coverage_claim_time_cache = CoverageClaimTimeCache::new(); - let coverage_object_cache = CoverageObjectCache::new(&self.pool); + let coverage_claim_time_cache = CoverageClaimTimeCache::new(); + let coverage_object_cache = CoverageObjectCache::new(&self.pool); - loop { - #[rustfmt::skip] - tokio::select! { - biased; - _ = shutdown.clone() => { - tracing::info!("Wifi HeartbeatDaemon shutting down"); - break; - } - Some(file) = self.heartbeats.recv() => { - let start = Instant::now(); - self.process_file( - file, - &heartbeat_cache, - &coverage_claim_time_cache, - &coverage_object_cache, - ).await?; - metrics::histogram!("wifi_heartbeat_processing_time", start.elapsed()); - } + loop { + #[rustfmt::skip] + tokio::select! { + biased; + _ = shutdown.clone() => { + tracing::info!("Wifi HeartbeatDaemon shutting down"); + break; + } + Some(file) = self.heartbeats.recv() => { + let start = Instant::now(); + self.process_file( + file, + &heartbeat_cache, + &coverage_claim_time_cache, + &coverage_object_cache, + ).await?; + metrics::histogram!("wifi_heartbeat_processing_time", start.elapsed()); } } + } - Ok(()) - }) - .map_err(anyhow::Error::from) - .and_then(|result| async move { result }) - .await + Ok(()) } async fn process_file( @@ -135,3 +131,20 @@ where Ok(()) } } + +impl ManagedTask for HeartbeatDaemon +where + GIR: GatewayResolver, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 283b74b22..b3e980d29 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -11,6 +11,7 @@ use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use futures_util::TryFutureExt; use helium_proto::services::{ poc_mobile as proto, poc_mobile::mobile_reward_share::Reward as ProtoReward, poc_mobile::UnallocatedReward, poc_mobile::UnallocatedRewardType, @@ -23,6 +24,7 @@ use rust_decimal::{prelude::*, Decimal}; use rust_decimal_macros::dec; use sqlx::{PgExecutor, Pool, Postgres}; use std::ops::Range; +use task_manager::ManagedTask; use tokio::time::sleep; const REWARDS_NOT_CURRENT_DELAY_PERIOD: i64 = 5; @@ -232,6 +234,23 @@ where } } +impl ManagedTask for Rewarder +where + A: CarrierServiceVerifier + Send + Sync + 'static, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + pub async fn reward_poc_and_dc( pool: &Pool, mobile_rewards: &FileSinkClient, diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index c336f2635..94af0acc1 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -17,6 +17,7 @@ use helium_proto::services::poc_mobile::{ use mobile_config::client::gateway_client::GatewayInfoResolver; use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction}; use std::{collections::HashMap, time::Instant}; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; @@ -72,28 +73,23 @@ where } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - tokio::spawn(async move { - loop { - #[rustfmt::skip] - tokio::select! { - biased; - _ = shutdown.clone() => { - tracing::info!("SpeedtestDaemon shutting down"); - break; - } - Some(file) = self.speedtests.recv() => { - let start = Instant::now(); - self.process_file(file).await?; - metrics::histogram!("speedtest_processing_time", start.elapsed()); - } + loop { + #[rustfmt::skip] + tokio::select! { + biased; + _ = shutdown.clone() => { + tracing::info!("SpeedtestDaemon shutting down"); + break; + } + Some(file) = self.speedtests.recv() => { + let start = Instant::now(); + self.process_file(file).await?; + metrics::histogram!("speedtest_processing_time", start.elapsed()); } } + } - Ok(()) - }) - .map_err(anyhow::Error::from) - .and_then(|result| async move { result }) - .await + Ok(()) } async fn process_file( @@ -163,6 +159,23 @@ where } } +impl ManagedTask for SpeedtestDaemon +where + GIR: GatewayInfoResolver, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + pub async fn save_speedtest( speedtest: &CellSpeedtest, exec: &mut Transaction<'_, Postgres>, diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 0a8d8822a..722af3424 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -8,6 +8,7 @@ use file_store::{ }, }; use futures::{StreamExt, TryStreamExt}; +use futures_util::TryFutureExt; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::NetworkKeyRole; use helium_proto::services::poc_mobile::{ @@ -18,6 +19,7 @@ use mobile_config::client::{ }; use sqlx::{PgPool, Postgres, Transaction}; use std::{ops::Range, time::Instant}; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; const SUBSCRIBER_REWARD_PERIOD_IN_DAYS: i64 = 1; @@ -53,7 +55,7 @@ where } } - pub async fn run(mut self, shutdown: &triggered::Listener) -> anyhow::Result<()> { + async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { loop { #[rustfmt::skip] tokio::select! { @@ -156,6 +158,24 @@ where } } +impl ManagedTask for SubscriberLocationIngestor +where + AV: AuthorizationVerifier + Send + Sync + 'static, + EV: EntityVerifier + Send + Sync + 'static, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + pub async fn save( loc_ingest_report: &SubscriberLocationIngestReport, db: &mut Transaction<'_, Postgres>,