From c14791b930a3d096d7fcc0feccfa72428de349e0 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 24 Feb 2025 16:57:54 -0500 Subject: [PATCH] Disable CBRS Take 2 - disable at ingest layer (#954) * Deny CBRS heartbeats and data transfer requests after deadline * fix ingest tests * Added test to cover heartbeat ingest * Add test for data_transfer wifi and cbrs * Update to accept cbrs heartbeat and data transfer session after the disable date, but just drop them --- ingest/src/server_mobile.rs | 38 ++++++++-- ingest/src/settings.rs | 9 +++ ingest/tests/common/mod.rs | 130 ++++++++++++++++++++++++++++++++-- ingest/tests/mobile_ingest.rs | 119 +++++++++++++++++++++++++++++-- 4 files changed, 280 insertions(+), 16 deletions(-) diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index f2434c150..1f2c8928e 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -1,6 +1,6 @@ use crate::Settings; use anyhow::{bail, Error, Result}; -use chrono::Utc; +use chrono::{DateTime, Utc}; use file_store::{ file_sink::FileSinkClient, file_upload, @@ -12,8 +12,8 @@ use helium_crypto::{Network, PublicKey, PublicKeyBinary}; use helium_proto::services::poc_mobile::{ self, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1, CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, - DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, - HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1, + DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, + DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1, InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1, InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, RadioThresholdReportRespV1, RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1, @@ -60,6 +60,7 @@ pub struct GrpcServer { address: SocketAddr, api_token: MetadataValue, authorization_verifier: AV, + cbrs_disable_time: DateTime, } impl ManagedTask for GrpcServer @@ -109,6 +110,7 @@ where address: SocketAddr, api_token: MetadataValue, authorization_verifier: AV, + cbrs_disable_time: DateTime, ) -> Self { GrpcServer { heartbeat_report_sink, @@ -128,6 +130,7 @@ where address, api_token, authorization_verifier, + cbrs_disable_time, } } @@ -218,7 +221,14 @@ where &self, request: Request, ) -> GrpcResult { - let timestamp: u64 = Utc::now().timestamp_millis() as u64; + let timestamp = Utc::now(); + + if timestamp >= self.cbrs_disable_time { + return Ok(Response::new(CellHeartbeatRespV1 { + id: timestamp.to_string(), + })); + } + let event = request.into_inner(); custom_tracing::record_b58("pub_key", &event.pub_key); @@ -228,7 +238,7 @@ where .and_then(|public_key| self.verify_network(public_key)) .and_then(|public_key| self.verify_signature(public_key, event)) .map(|(_, event)| CellHeartbeatIngestReportV1 { - received_timestamp: timestamp, + received_timestamp: timestamp.timestamp_millis() as u64, report: Some(event), })?; @@ -266,9 +276,15 @@ where &self, request: Request, ) -> GrpcResult { - let timestamp = Utc::now().timestamp_millis() as u64; + let timestamp = Utc::now(); let event = request.into_inner(); + if is_data_transfer_for_cbrs(&event) && timestamp > self.cbrs_disable_time { + return Ok(Response::new(DataTransferSessionRespV1 { + id: timestamp.to_string(), + })); + } + custom_tracing::record_b58("pub_key", &event.pub_key); let report = self @@ -276,7 +292,7 @@ where .and_then(|public_key| self.verify_network(public_key)) .and_then(|public_key| self.verify_signature(public_key, event)) .map(|(_, event)| DataTransferSessionIngestReportV1 { - received_timestamp: timestamp, + received_timestamp: timestamp.timestamp_millis() as u64, report: Some(event), })?; @@ -556,6 +572,13 @@ where } } +fn is_data_transfer_for_cbrs(event: &DataTransferSessionReqV1) -> bool { + event + .data_transfer_usage + .as_ref() + .is_some_and(|u| u.radio_access_technology() == DataTransferRadioAccessTechnology::Eutran) +} + pub async fn grpc_server(settings: &Settings) -> Result<()> { // Initialize uploader let (file_upload, file_upload_server) = @@ -723,6 +746,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { settings.listen_addr, api_token, AuthorizationClient::from_settings(config_client)?, + settings.cbrs_disable_time, ); tracing::info!( diff --git a/ingest/src/settings.rs b/ingest/src/settings.rs index c135080ca..f6ead6e54 100644 --- a/ingest/src/settings.rs +++ b/ingest/src/settings.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use config::{Config, Environment, File}; use helium_crypto::Network; use humantime_serde::re::humantime; @@ -44,6 +45,14 @@ pub struct Settings { // mobile config client settings // optional to avoid having to define a client for IOT mode pub config_client: Option, + #[serde(default = "default_cbrs_disable_time")] + pub cbrs_disable_time: DateTime, +} + +fn default_cbrs_disable_time() -> DateTime { + "2025-03-01 00:00:00Z" + .parse::>() + .expect("invalid default date") } fn default_roll_time() -> Duration { diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index bb28c4065..4f0452280 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -4,7 +4,9 @@ use chrono::{DateTime, Utc}; use file_store::file_sink::FileSinkClient; use helium_crypto::{KeyTag, Keypair, Network, PublicKeyBinary, Sign}; use helium_proto::services::poc_mobile::{ - HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1, + CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellHeartbeatRespV1, DataTransferEvent, + DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, + DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1, RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1, RadioUsageStatsResV1, UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, UniqueConnectionsRespV1, }; @@ -20,6 +22,7 @@ use mobile_config::client::authorization_client::AuthorizationVerifier; use prost::Message; use rand::rngs::OsRng; use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::sync::mpsc::error::TryRecvError; use tokio::{net::TcpListener, sync::mpsc::Receiver, time::timeout}; use tonic::{ async_trait, @@ -49,7 +52,9 @@ impl AuthorizationVerifier for MockAuthorizationClient { Ok(true) } } -pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { +pub async fn setup_mobile( + cbrs_disable_time: DateTime, +) -> anyhow::Result<(TestClient, Trigger)> { let key_pair = generate_keypair(); let socket_addr = { @@ -65,10 +70,10 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (trigger, listener) = triggered::trigger(); - let (cbrs_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10); + let (cbrs_heartbeat_tx, cbrs_hearbeat_rx) = tokio::sync::mpsc::channel(10); let (wifi_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10); let (speedtest_tx, _rx) = tokio::sync::mpsc::channel(10); - let (data_transfer_tx, _rx) = tokio::sync::mpsc::channel(10); + let (data_transfer_tx, data_transfer_rx) = tokio::sync::mpsc::channel(10); let (subscriber_location_tx, _rx) = tokio::sync::mpsc::channel(10); let (radio_threshold_tx, _rx) = tokio::sync::mpsc::channel(10); let (invalidated_threshold_tx, _rx) = tokio::sync::mpsc::channel(10); @@ -100,6 +105,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { socket_addr, api_token, auth_client, + cbrs_disable_time, ); grpc_server.run(listener).await @@ -113,6 +119,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { hex_usage_stat_rx, radio_usage_stat_rx, unique_connections_rx, + cbrs_hearbeat_rx, + data_transfer_rx, ) .await; @@ -131,9 +139,12 @@ pub struct TestClient { Receiver>, unique_connections_file_sink_rx: Receiver>, + cell_heartbeat_rx: Receiver>, + data_transfer_rx: Receiver>, } impl TestClient { + #[allow(clippy::too_many_arguments)] pub async fn new( socket_addr: SocketAddr, key_pair: Keypair, @@ -150,6 +161,10 @@ impl TestClient { unique_connections_file_sink_rx: Receiver< file_store::file_sink::Message, >, + cell_heartbeat_rx: Receiver>, + data_transfer_rx: Receiver< + file_store::file_sink::Message, + >, ) -> TestClient { let client = (|| PocMobileClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) @@ -164,6 +179,48 @@ impl TestClient { hex_usage_stats_file_sink_rx, radio_usage_stats_file_sink_rx, unique_connections_file_sink_rx, + cell_heartbeat_rx, + data_transfer_rx, + } + } + + pub async fn cell_heartbeat_recv(mut self) -> anyhow::Result { + match timeout(Duration::from_secs(2), self.cell_heartbeat_rx.recv()).await { + Ok(Some(msg)) => match msg { + file_store::file_sink::Message::Data(_, data) => Ok(data), + file_store::file_sink::Message::Commit(_) => bail!("got Commit"), + file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"), + }, + Ok(None) => bail!("got none"), + Err(reason) => bail!("got error {reason}"), + } + } + + pub fn is_cell_heartbeat_rx_empty(&mut self) -> anyhow::Result { + match self.cell_heartbeat_rx.try_recv() { + Ok(_) => Ok(false), + Err(TryRecvError::Empty) => Ok(true), + Err(err) => bail!(err), + } + } + + pub async fn data_transfer_recv(mut self) -> anyhow::Result { + match timeout(Duration::from_secs(2), self.data_transfer_rx.recv()).await { + Ok(Some(msg)) => match msg { + file_store::file_sink::Message::Data(_, data) => Ok(data), + file_store::file_sink::Message::Commit(_) => bail!("got Commit"), + file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"), + }, + Ok(None) => bail!("got none"), + Err(reason) => bail!("got error {reason}"), + } + } + + pub fn is_data_transfer_rx_empty(&mut self) -> anyhow::Result { + match self.data_transfer_rx.try_recv() { + Ok(_) => Ok(false), + Err(TryRecvError::Empty) => Ok(true), + Err(err) => bail!(err), } } @@ -368,6 +425,71 @@ impl TestClient { Ok(res.into_inner()) } + + pub async fn submit_cell_heartbeat( + &mut self, + keypair: &Keypair, + cbsd_id: &str, + ) -> anyhow::Result { + let mut heartbeat = CellHeartbeatReqV1 { + pub_key: keypair.public_key().into(), + hotspot_type: "unknown".to_string(), + cell_id: 1, + timestamp: Utc::now().timestamp() as u64, + lat: 0.0, + lon: 0.0, + operation_mode: true, + cbsd_category: "unknown".to_string(), + cbsd_id: cbsd_id.to_owned(), + signature: vec![], + coverage_object: vec![1, 2, 3, 4], + }; + + heartbeat.signature = keypair.sign(&heartbeat.encode_to_vec()).expect("sign"); + + let mut request = Request::new(heartbeat); + let metadata = request.metadata_mut(); + + metadata.insert("authorization", self.authorization.clone()); + + let res = self.client.submit_cell_heartbeat(request).await?; + + Ok(res.into_inner()) + } + + pub async fn submit_data_transfer( + &mut self, + keypair: &Keypair, + technology: DataTransferRadioAccessTechnology, + ) -> anyhow::Result { + let mut data_transfer = DataTransferSessionReqV1 { + data_transfer_usage: Some(DataTransferEvent { + pub_key: keypair.public_key().into(), + upload_bytes: 0, + download_bytes: 0, + radio_access_technology: technology as i32, + event_id: "event-1".to_string(), + payer: vec![1, 2, 3, 4], + timestamp: Utc::now().timestamp() as u64, + signature: vec![], + }), + reward_cancelled: false, + pub_key: keypair.public_key().into(), + signature: vec![], + rewardable_bytes: 0, + }; + + data_transfer.signature = keypair.sign(&data_transfer.encode_to_vec())?; + + let mut request = Request::new(data_transfer); + let metadata = request.metadata_mut(); + + metadata.insert("authorization", self.authorization.clone()); + + let res = self.client.submit_data_transfer_session(request).await?; + + Ok(res.into_inner()) + } } pub fn generate_keypair() -> Keypair { diff --git a/ingest/tests/mobile_ingest.rs b/ingest/tests/mobile_ingest.rs index aa9fcb478..0cce6a910 100644 --- a/ingest/tests/mobile_ingest.rs +++ b/ingest/tests/mobile_ingest.rs @@ -1,5 +1,7 @@ -use chrono::Utc; +use chrono::{DateTime, Duration, Utc}; +use common::generate_keypair; use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology; use std::str::FromStr; mod common; @@ -8,7 +10,8 @@ const PUBKEY1: &str = "113HRxtzxFbFUjDEJJpyeMRZRtdAW38LAUnB5mshRwi6jt7uFbt"; #[tokio::test] async fn submit_unique_connections() -> anyhow::Result<()> { - let (mut client, trigger) = common::setup_mobile().await?; + let (mut client, trigger) = + common::setup_mobile("2025-03-01 00:00:00Z".parse::>()?).await?; let pubkey = PublicKeyBinary::from_str(PUBKEY1)?; let timestamp = Utc::now(); @@ -37,7 +40,8 @@ async fn submit_unique_connections() -> anyhow::Result<()> { #[tokio::test] async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { - let (mut client, trigger) = common::setup_mobile().await?; + let (mut client, trigger) = + common::setup_mobile("2025-03-01 00:00:00Z".parse::>()?).await?; let subscriber_id = vec![0]; let total_reward_points = 100; @@ -71,7 +75,8 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { #[tokio::test] async fn submit_hex_usage_report() -> anyhow::Result<()> { - let (mut client, trigger) = common::setup_mobile().await?; + let (mut client, trigger) = + common::setup_mobile("2025-03-01 00:00:00Z".parse::>()?).await?; const HEX: u64 = 360; const SERVICE_PROVIDER_USER_COUNT: u64 = 10; @@ -126,7 +131,8 @@ async fn submit_hex_usage_report() -> anyhow::Result<()> { #[tokio::test] async fn submit_radio_usage_report() -> anyhow::Result<()> { - let (mut client, trigger) = common::setup_mobile().await?; + let (mut client, trigger) = + common::setup_mobile("2025-03-01 00:00:00Z".parse::>()?).await?; let hotspot_pubkey = PublicKeyBinary::from_str(PUBKEY1)?; let cbsd_id = "cbsd_id".to_string(); @@ -181,3 +187,106 @@ async fn submit_radio_usage_report() -> anyhow::Result<()> { trigger.trigger(); Ok(()) } + +#[tokio::test] +async fn cell_heartbeat_before() -> anyhow::Result<()> { + let cbrs_disable_time = Utc::now() + Duration::hours(1); + let (mut client, trigger) = common::setup_mobile(cbrs_disable_time).await?; + + let keypair = generate_keypair(); + + client.submit_cell_heartbeat(&keypair, "cbsd-1").await?; + + let ingest_report = client.cell_heartbeat_recv().await?; + + assert!(ingest_report + .report + .is_some_and(|r| r.pub_key == keypair.public_key().to_vec() && r.cbsd_id == "cbsd-1")); + + trigger.trigger(); + Ok(()) +} + +#[tokio::test] +async fn cell_heartbeat_after() -> anyhow::Result<()> { + let cbrs_disable_time = Utc::now() - Duration::hours(1); + let (mut client, trigger) = common::setup_mobile(cbrs_disable_time).await?; + + let keypair = generate_keypair(); + + client.submit_cell_heartbeat(&keypair, "cbsd-1").await?; + + assert!(client.is_cell_heartbeat_rx_empty()?); + + trigger.trigger(); + Ok(()) +} + +#[tokio::test] +async fn wifi_data_transfer() -> anyhow::Result<()> { + let cbrs_disable_time = Utc::now() - Duration::hours(1); + let (mut client, trigger) = common::setup_mobile(cbrs_disable_time).await?; + + let keypair = generate_keypair(); + + client + .submit_data_transfer(&keypair, DataTransferRadioAccessTechnology::Wlan) + .await?; + + let ingest_report = client.data_transfer_recv().await?; + + let ingest_pubkey = ingest_report + .report + .unwrap() + .data_transfer_usage + .unwrap() + .pub_key; + + assert_eq!(ingest_pubkey, keypair.public_key().to_vec()); + + trigger.trigger(); + Ok(()) +} + +#[tokio::test] +async fn cbrs_data_transfer_before() -> anyhow::Result<()> { + let cbrs_disable_time = Utc::now() + Duration::hours(1); + let (mut client, trigger) = common::setup_mobile(cbrs_disable_time).await?; + + let keypair = generate_keypair(); + + client + .submit_data_transfer(&keypair, DataTransferRadioAccessTechnology::Eutran) + .await?; + + let ingest_report = client.data_transfer_recv().await?; + + let ingest_pubkey = ingest_report + .report + .unwrap() + .data_transfer_usage + .unwrap() + .pub_key; + + assert_eq!(ingest_pubkey, keypair.public_key().to_vec()); + + trigger.trigger(); + Ok(()) +} + +#[tokio::test] +async fn cbrs_data_transfer_after() -> anyhow::Result<()> { + let cbrs_disable_time = Utc::now() - Duration::hours(1); + let (mut client, trigger) = common::setup_mobile(cbrs_disable_time).await?; + + let keypair = generate_keypair(); + + client + .submit_data_transfer(&keypair, DataTransferRadioAccessTechnology::Eutran) + .await?; + + assert!(client.is_data_transfer_rx_empty()?); + + trigger.trigger(); + Ok(()) +}