Skip to content

Commit

Permalink
Disable CBRS Take 2 - disable at ingest layer (#954)
Browse files Browse the repository at this point in the history
* Deny CBRS heartbeats and data transfer requests after deadline

* fix ingest tests

* Added test to cover heartbeat ingest

* Add test for data_transfer wifi and cbrs

* Update to accept cbrs heartbeat and data transfer session after the disable date, but just drop them
  • Loading branch information
bbalser authored and macpie committed Feb 28, 2025
1 parent ee06538 commit c14791b
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 16 deletions.
38 changes: 31 additions & 7 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::Settings;
use anyhow::{bail, Error, Result};
use chrono::Utc;
use chrono::{DateTime, Utc};
use file_store::{
file_sink::FileSinkClient,
file_upload,
Expand All @@ -12,8 +12,8 @@ use helium_crypto::{Network, PublicKey, PublicKeyBinary};
use helium_proto::services::poc_mobile::{
self, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1,
CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1,
HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
RadioThresholdReportRespV1, RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1,
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct GrpcServer<AV> {
address: SocketAddr,
api_token: MetadataValue<Ascii>,
authorization_verifier: AV,
cbrs_disable_time: DateTime<Utc>,
}

impl<AV> ManagedTask for GrpcServer<AV>
Expand Down Expand Up @@ -109,6 +110,7 @@ where
address: SocketAddr,
api_token: MetadataValue<Ascii>,
authorization_verifier: AV,
cbrs_disable_time: DateTime<Utc>,
) -> Self {
GrpcServer {
heartbeat_report_sink,
Expand All @@ -128,6 +130,7 @@ where
address,
api_token,
authorization_verifier,
cbrs_disable_time,
}
}

Expand Down Expand Up @@ -218,7 +221,14 @@ where
&self,
request: Request<CellHeartbeatReqV1>,
) -> GrpcResult<CellHeartbeatRespV1> {
let timestamp: u64 = Utc::now().timestamp_millis() as u64;
let timestamp = Utc::now();

if timestamp >= self.cbrs_disable_time {
return Ok(Response::new(CellHeartbeatRespV1 {
id: timestamp.to_string(),
}));
}

let event = request.into_inner();

custom_tracing::record_b58("pub_key", &event.pub_key);
Expand All @@ -228,7 +238,7 @@ where
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| CellHeartbeatIngestReportV1 {
received_timestamp: timestamp,
received_timestamp: timestamp.timestamp_millis() as u64,
report: Some(event),
})?;

Expand Down Expand Up @@ -266,17 +276,23 @@ where
&self,
request: Request<DataTransferSessionReqV1>,
) -> GrpcResult<DataTransferSessionRespV1> {
let timestamp = Utc::now().timestamp_millis() as u64;
let timestamp = Utc::now();
let event = request.into_inner();

if is_data_transfer_for_cbrs(&event) && timestamp > self.cbrs_disable_time {
return Ok(Response::new(DataTransferSessionRespV1 {
id: timestamp.to_string(),
}));
}

custom_tracing::record_b58("pub_key", &event.pub_key);

let report = self
.verify_public_key(event.pub_key.as_ref())
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| DataTransferSessionIngestReportV1 {
received_timestamp: timestamp,
received_timestamp: timestamp.timestamp_millis() as u64,
report: Some(event),
})?;

Expand Down Expand Up @@ -556,6 +572,13 @@ where
}
}

fn is_data_transfer_for_cbrs(event: &DataTransferSessionReqV1) -> bool {
event
.data_transfer_usage
.as_ref()
.is_some_and(|u| u.radio_access_technology() == DataTransferRadioAccessTechnology::Eutran)
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
// Initialize uploader
let (file_upload, file_upload_server) =
Expand Down Expand Up @@ -723,6 +746,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
settings.listen_addr,
api_token,
AuthorizationClient::from_settings(config_client)?,
settings.cbrs_disable_time,
);

tracing::info!(
Expand Down
9 changes: 9 additions & 0 deletions ingest/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use config::{Config, Environment, File};
use helium_crypto::Network;
use humantime_serde::re::humantime;
Expand Down Expand Up @@ -44,6 +45,14 @@ pub struct Settings {
// mobile config client settings
// optional to avoid having to define a client for IOT mode
pub config_client: Option<mobile_config::ClientSettings>,
#[serde(default = "default_cbrs_disable_time")]
pub cbrs_disable_time: DateTime<Utc>,
}

fn default_cbrs_disable_time() -> DateTime<Utc> {
"2025-03-01 00:00:00Z"
.parse::<DateTime<Utc>>()
.expect("invalid default date")
}

fn default_roll_time() -> Duration {
Expand Down
130 changes: 126 additions & 4 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use helium_crypto::{KeyTag, Keypair, Network, PublicKeyBinary, Sign};
use helium_proto::services::poc_mobile::{
HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1, DataTransferEvent,
DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1, RadioUsageStatsResV1,
UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, UniqueConnectionsRespV1,
};
Expand All @@ -20,6 +22,7 @@ use mobile_config::client::authorization_client::AuthorizationVerifier;
use prost::Message;
use rand::rngs::OsRng;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::{net::TcpListener, sync::mpsc::Receiver, time::timeout};
use tonic::{
async_trait,
Expand Down Expand Up @@ -49,7 +52,9 @@ impl AuthorizationVerifier for MockAuthorizationClient {
Ok(true)
}
}
pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
pub async fn setup_mobile(
cbrs_disable_time: DateTime<Utc>,
) -> anyhow::Result<(TestClient, Trigger)> {
let key_pair = generate_keypair();

let socket_addr = {
Expand All @@ -65,10 +70,10 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {

let (trigger, listener) = triggered::trigger();

let (cbrs_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10);
let (cbrs_heartbeat_tx, cbrs_hearbeat_rx) = tokio::sync::mpsc::channel(10);
let (wifi_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10);
let (speedtest_tx, _rx) = tokio::sync::mpsc::channel(10);
let (data_transfer_tx, _rx) = tokio::sync::mpsc::channel(10);
let (data_transfer_tx, data_transfer_rx) = tokio::sync::mpsc::channel(10);
let (subscriber_location_tx, _rx) = tokio::sync::mpsc::channel(10);
let (radio_threshold_tx, _rx) = tokio::sync::mpsc::channel(10);
let (invalidated_threshold_tx, _rx) = tokio::sync::mpsc::channel(10);
Expand Down Expand Up @@ -100,6 +105,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
socket_addr,
api_token,
auth_client,
cbrs_disable_time,
);

grpc_server.run(listener).await
Expand All @@ -113,6 +119,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
hex_usage_stat_rx,
radio_usage_stat_rx,
unique_connections_rx,
cbrs_hearbeat_rx,
data_transfer_rx,
)
.await;

Expand All @@ -131,9 +139,12 @@ pub struct TestClient {
Receiver<file_store::file_sink::Message<RadioUsageStatsIngestReportV1>>,
unique_connections_file_sink_rx:
Receiver<file_store::file_sink::Message<UniqueConnectionsIngestReportV1>>,
cell_heartbeat_rx: Receiver<file_store::file_sink::Message<CellHeartbeatIngestReportV1>>,
data_transfer_rx: Receiver<file_store::file_sink::Message<DataTransferSessionIngestReportV1>>,
}

impl TestClient {
#[allow(clippy::too_many_arguments)]
pub async fn new(
socket_addr: SocketAddr,
key_pair: Keypair,
Expand All @@ -150,6 +161,10 @@ impl TestClient {
unique_connections_file_sink_rx: Receiver<
file_store::file_sink::Message<UniqueConnectionsIngestReportV1>,
>,
cell_heartbeat_rx: Receiver<file_store::file_sink::Message<CellHeartbeatIngestReportV1>>,
data_transfer_rx: Receiver<
file_store::file_sink::Message<DataTransferSessionIngestReportV1>,
>,
) -> TestClient {
let client = (|| PocMobileClient::connect(format!("http://{socket_addr}")))
.retry(&ExponentialBuilder::default())
Expand All @@ -164,6 +179,48 @@ impl TestClient {
hex_usage_stats_file_sink_rx,
radio_usage_stats_file_sink_rx,
unique_connections_file_sink_rx,
cell_heartbeat_rx,
data_transfer_rx,
}
}

pub async fn cell_heartbeat_recv(mut self) -> anyhow::Result<CellHeartbeatIngestReportV1> {
match timeout(Duration::from_secs(2), self.cell_heartbeat_rx.recv()).await {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Data(_, data) => Ok(data),
file_store::file_sink::Message::Commit(_) => bail!("got Commit"),
file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"),
},
Ok(None) => bail!("got none"),
Err(reason) => bail!("got error {reason}"),
}
}

pub fn is_cell_heartbeat_rx_empty(&mut self) -> anyhow::Result<bool> {
match self.cell_heartbeat_rx.try_recv() {
Ok(_) => Ok(false),
Err(TryRecvError::Empty) => Ok(true),
Err(err) => bail!(err),
}
}

pub async fn data_transfer_recv(mut self) -> anyhow::Result<DataTransferSessionIngestReportV1> {
match timeout(Duration::from_secs(2), self.data_transfer_rx.recv()).await {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Data(_, data) => Ok(data),
file_store::file_sink::Message::Commit(_) => bail!("got Commit"),
file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"),
},
Ok(None) => bail!("got none"),
Err(reason) => bail!("got error {reason}"),
}
}

pub fn is_data_transfer_rx_empty(&mut self) -> anyhow::Result<bool> {
match self.data_transfer_rx.try_recv() {
Ok(_) => Ok(false),
Err(TryRecvError::Empty) => Ok(true),
Err(err) => bail!(err),
}
}

Expand Down Expand Up @@ -368,6 +425,71 @@ impl TestClient {

Ok(res.into_inner())
}

pub async fn submit_cell_heartbeat(
&mut self,
keypair: &Keypair,
cbsd_id: &str,
) -> anyhow::Result<CellHeartbeatRespV1> {
let mut heartbeat = CellHeartbeatReqV1 {
pub_key: keypair.public_key().into(),
hotspot_type: "unknown".to_string(),
cell_id: 1,
timestamp: Utc::now().timestamp() as u64,
lat: 0.0,
lon: 0.0,
operation_mode: true,
cbsd_category: "unknown".to_string(),
cbsd_id: cbsd_id.to_owned(),
signature: vec![],
coverage_object: vec![1, 2, 3, 4],
};

heartbeat.signature = keypair.sign(&heartbeat.encode_to_vec()).expect("sign");

let mut request = Request::new(heartbeat);
let metadata = request.metadata_mut();

metadata.insert("authorization", self.authorization.clone());

let res = self.client.submit_cell_heartbeat(request).await?;

Ok(res.into_inner())
}

pub async fn submit_data_transfer(
&mut self,
keypair: &Keypair,
technology: DataTransferRadioAccessTechnology,
) -> anyhow::Result<DataTransferSessionRespV1> {
let mut data_transfer = DataTransferSessionReqV1 {
data_transfer_usage: Some(DataTransferEvent {
pub_key: keypair.public_key().into(),
upload_bytes: 0,
download_bytes: 0,
radio_access_technology: technology as i32,
event_id: "event-1".to_string(),
payer: vec![1, 2, 3, 4],
timestamp: Utc::now().timestamp() as u64,
signature: vec![],
}),
reward_cancelled: false,
pub_key: keypair.public_key().into(),
signature: vec![],
rewardable_bytes: 0,
};

data_transfer.signature = keypair.sign(&data_transfer.encode_to_vec())?;

let mut request = Request::new(data_transfer);
let metadata = request.metadata_mut();

metadata.insert("authorization", self.authorization.clone());

let res = self.client.submit_data_transfer_session(request).await?;

Ok(res.into_inner())
}
}

pub fn generate_keypair() -> Keypair {
Expand Down
Loading

0 comments on commit c14791b

Please sign in to comment.