Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into map/location-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
maplant committed Jan 19, 2024
2 parents 9154bc2 + f6f1ed1 commit 8f8d7eb
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 211 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl FileSinkClient {
Err(Error::channel())
}
Err(SendTimeoutError::Timeout(_)) => {
tracing::error!("file_sink write failed due to send timeout");
tracing::error!("file_sink write failed for {:?} due to send timeout", self.metric);
Err(Error::SendTimeout)
}
},
Expand All @@ -222,7 +222,10 @@ impl FileSinkClient {
.send(Message::Commit(on_commit_tx))
.await
.map_err(|e| {
tracing::error!("file_sink failed to commit with {e:?}");
tracing::error!(
"file_sink failed to commit for {:?} with {e:?}",
self.metric
);
Error::channel()
})
.map(|_| on_commit_rx)
Expand All @@ -234,7 +237,10 @@ impl FileSinkClient {
.send(Message::Rollback(on_rollback_tx))
.await
.map_err(|e| {
tracing::error!("file_sink failed to rollback with {e:?}");
tracing::error!(
"file_sink failed to rollback for {:?} with {e:?}",
self.metric
);
Error::channel()
})
.map(|_| on_rollback_rx)
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rand = {workspace = true}
async-trait = {workspace = true}
retainer = {workspace = true}
uuid = {workspace = true}
task-manager = {path = "../task_manager"}

[dev-dependencies]
backon = "0"
135 changes: 41 additions & 94 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry,
Settings,
};
use anyhow::{Error, Result};
use anyhow::Result;
use chrono::Duration;
use file_store::{
coverage::CoverageObjectIngestReport, file_info_poller::LookbackBehavior, file_sink,
Expand All @@ -14,12 +14,11 @@ use file_store::{
speedtest::CellSpeedtestIngestReport, wifi_heartbeat::WifiHeartbeatIngestReport, FileStore,
FileType,
};
use futures_util::TryFutureExt;
use mobile_config::client::{
entity_client::EntityClient, AuthorizationClient, CarrierServiceClient, GatewayClient,
};
use price::PriceTracker;
use tokio::signal;
use task_manager::TaskManager;

#[derive(Debug, clap::Args)]
pub struct Cmd {}
Expand All @@ -28,23 +27,13 @@ impl Cmd {
pub async fn run(self, settings: &Settings) -> Result<()> {
poc_metrics::start_metrics(&settings.metrics)?;

let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
tokio::spawn(async move {
tokio::select! {
_ = sigterm.recv() => shutdown_trigger.trigger(),
_ = signal::ctrl_c() => shutdown_trigger.trigger(),
}
});

let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?;
sqlx::migrate!().run(&pool).await?;

telemetry::initialize(&pool).await?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?;
let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&settings.output).await?;

let store_base_path = std::path::Path::new(&settings.cache);

Expand All @@ -58,8 +47,7 @@ impl Cmd {
let carrier_client = CarrierServiceClient::from_settings(&settings.config_client)?;

// price tracker
let (price_tracker, tracker_process) =
PriceTracker::start(&settings.price_tracker, shutdown_listener.clone()).await?;
let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?;

// CBRS Heartbeats
let (cbrs_heartbeats, cbrs_heartbeats_server) =
Expand All @@ -70,9 +58,6 @@ impl Cmd {
.prefix(FileType::CbrsHeartbeatIngestReport.to_string())
.queue_size(1)
.create()?;
let cbrs_heartbeats_join_handle = cbrs_heartbeats_server
.start(shutdown_listener.clone())
.await?;

// Wifi Heartbeats
let (wifi_heartbeats, wifi_heartbeats_server) =
Expand All @@ -82,16 +67,13 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::WifiHeartbeatIngestReport.to_string())
.create()?;
let wifi_heartbeats_join_handle = wifi_heartbeats_server
.start(shutdown_listener.clone())
.await?;

let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new(
FileType::ValidatedHeartbeat,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_heartbeat"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
Expand All @@ -103,7 +85,7 @@ impl Cmd {
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_seniority_update"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
Expand Down Expand Up @@ -139,14 +121,13 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CellSpeedtestIngestReport.to_string())
.create()?;
let speedtests_join_handle = speedtests_server.start(shutdown_listener.clone()).await?;

let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new(
FileType::SpeedtestAvg,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
Expand All @@ -157,7 +138,7 @@ impl Cmd {
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_verified_speedtest"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
Expand All @@ -179,16 +160,13 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CoverageObjectIngestReport.to_string())
.create()?;
let coverage_objs_join_handle = coverage_objs_server
.start(shutdown_listener.clone())
.await?;

let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new(
FileType::CoverageObject,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_coverage_object"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
Expand All @@ -208,7 +186,7 @@ impl Cmd {
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_radio_reward_shares"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.create()
.await?;
Expand All @@ -218,7 +196,7 @@ impl Cmd {
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_reward_manifest"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.create()
.await?;
Expand All @@ -241,17 +219,14 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::SubscriberLocationIngestReport.to_string())
.create()?;
let subscriber_location_ingest_join_handle = subscriber_location_ingest_server
.start(shutdown_listener.clone())
.await?;

let (verified_subscriber_location, verified_subscriber_location_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedSubscriberLocationIngestReport,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"),
)
.deposits(Some(file_upload_tx.clone()))
.file_upload(Some(file_upload.clone()))
.auto_commit(false)
.create()
.await?;
Expand All @@ -272,62 +247,34 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::ValidDataTransferSession.to_string())
.create()?;
let data_session_ingest_join_handle = data_session_ingest_server
.start(shutdown_listener.clone())
.await?;

let data_session_ingestor = DataSessionIngestor::new(pool.clone());

tokio::try_join!(
valid_heartbeats_server
.run(shutdown_listener.clone())
.map_err(Error::from),
speedtests_avg_server
.run(shutdown_listener.clone())
.map_err(Error::from),
speedtests_validity_server
.run(shutdown_listener.clone())
.map_err(Error::from),
valid_coverage_objs_server
.run(shutdown_listener.clone())
.map_err(Error::from),
seniority_updates_server
.run(shutdown_listener.clone())
.map_err(Error::from),
mobile_rewards_server
.run(shutdown_listener.clone())
.map_err(Error::from),
file_upload
.run(shutdown_listener.clone())
.map_err(Error::from),
reward_manifests_server
.run(shutdown_listener.clone())
.map_err(Error::from),
verified_subscriber_location_server
.run(shutdown_listener.clone())
.map_err(Error::from),
subscriber_location_ingestor
.run(&shutdown_listener)
.map_err(Error::from),
data_session_ingestor
.run(data_session_ingest, shutdown_listener.clone())
.map_err(Error::from),
tracker_process.map_err(Error::from),
cbrs_heartbeats_join_handle.map_err(Error::from),
wifi_heartbeats_join_handle.map_err(Error::from),
speedtests_join_handle.map_err(Error::from),
coverage_objs_join_handle.map_err(Error::from),
cbrs_heartbeat_daemon.run(shutdown_listener.clone()),
wifi_heartbeat_daemon.run(shutdown_listener.clone()),
speedtest_daemon.run(shutdown_listener.clone()),
coverage_daemon.run(shutdown_listener.clone()),
rewarder.run(shutdown_listener.clone()),
subscriber_location_ingest_join_handle.map_err(anyhow::Error::from),
data_session_ingest_join_handle.map_err(anyhow::Error::from),
)?;

tracing::info!("Shutting down verifier server");

Ok(())
let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest);

TaskManager::builder()
.add_task(file_upload_server)
.add_task(cbrs_heartbeats_server)
.add_task(wifi_heartbeats_server)
.add_task(valid_heartbeats_server)
.add_task(speedtests_avg_server)
.add_task(speedtests_validity_server)
.add_task(valid_coverage_objs_server)
.add_task(seniority_updates_server)
.add_task(mobile_rewards_server)
.add_task(reward_manifests_server)
.add_task(verified_subscriber_location_server)
.add_task(subscriber_location_ingestor)
.add_task(data_session_ingest_server)
.add_task(price_daemon)
.add_task(cbrs_heartbeat_daemon)
.add_task(wifi_heartbeat_daemon)
.add_task(speedtests_server)
.add_task(coverage_objs_server)
.add_task(speedtest_daemon)
.add_task(coverage_daemon)
.add_task(rewarder)
.add_task(subscriber_location_ingest_server)
.add_task(data_session_ingestor)
.start()
.await
}
}
46 changes: 28 additions & 18 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use retainer::{entry::CacheReadGuard, Cache};
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use sqlx::{FromRow, PgPool, Pool, Postgres, QueryBuilder, Transaction, Type};
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;
use uuid::Uuid;

Expand Down Expand Up @@ -80,27 +81,22 @@ impl CoverageDaemon {
}

pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tokio::spawn(async move {
loop {
#[rustfmt::skip]
tokio::select! {
_ = shutdown.clone() => {
tracing::info!("CoverageDaemon shutting down");
break;
}
Some(file) = self.coverage_objs.recv() => {
let start = Instant::now();
self.process_file(file).await?;
metrics::histogram!("coverage_object_processing_time", start.elapsed());
}
loop {
#[rustfmt::skip]
tokio::select! {
_ = shutdown.clone() => {
tracing::info!("CoverageDaemon shutting down");
break;
}
Some(file) = self.coverage_objs.recv() => {
let start = Instant::now();
self.process_file(file).await?;
metrics::histogram!("coverage_object_processing_time", start.elapsed());
}
}
}

Ok(())
})
.map_err(anyhow::Error::from)
.and_then(|result| async move { result })
.await
Ok(())
}

async fn process_file(
Expand Down Expand Up @@ -131,6 +127,20 @@ impl CoverageDaemon {
}
}

impl ManagedTask for CoverageDaemon {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

pub struct CoverageObject {
pub coverage_object: file_store::coverage::CoverageObject,
pub validity: CoverageObjectValidity,
Expand Down
Loading

0 comments on commit 8f8d7eb

Please sign in to comment.