Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mobile Packet Verifier Transaction Recovery #958

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iot_packet_verifier/src/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::sync::Mutex;

/// Caches balances fetched from the solana chain and debits made by the
/// packet verifier.
#[derive(Clone)]
pub struct BalanceCache<S> {
payer_accounts: BalanceStore,
solana: S,
Expand Down Expand Up @@ -55,6 +56,10 @@ impl<S> BalanceCache<S> {
pub fn balances(&self) -> BalanceStore {
self.payer_accounts.clone()
}

pub async fn get_payer_balance(&self, payer: &PublicKeyBinary) -> Option<PayerAccount> {
self.payer_accounts.lock().await.get(payer).cloned()
}
}

#[async_trait::async_trait]
Expand Down
14 changes: 11 additions & 3 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ where

loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
Expand All @@ -94,9 +94,17 @@ where
}

pub async fn burn(&mut self) -> Result<(), BurnError> {
// There should only be a single pending txn at a time
let pending_txns = self.pending_tables.fetch_all_pending_txns().await?;
if !pending_txns.is_empty() {
tracing::info!(pending_txns = pending_txns.len(), "skipping burn");
return Ok(());
}

// Fetch the next payer and amount that should be burn. If no such burn
// exists, perform no action.
let Some(Burn { payer, amount }) = self.pending_tables.fetch_next_burn().await? else {
tracing::info!("no pending burns");
return Ok(());
};

Expand Down
231 changes: 15 additions & 216 deletions iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::balances::BalanceStore;

/// To avoid excessive burn transaction (which cost us money), we institute a minimum
/// amount of Data Credits accounted for before we burn from a payer:
const BURN_THRESHOLD: i64 = 10_000;
pub const BURN_THRESHOLD: i64 = 10_000;

#[async_trait]
pub trait AddPendingBurn {
Expand Down Expand Up @@ -39,6 +39,17 @@ pub trait PendingTables {
payer: &PublicKeyBinary,
amount: u64,
signature: &Signature,
) -> Result<(), sqlx::Error> {
self.do_add_pending_transaction(payer, amount, signature, Utc::now())
.await
}

async fn do_add_pending_transaction(
&self,
payer: &PublicKeyBinary,
amount: u64,
signature: &Signature,
time_of_submission: DateTime<Utc>,
) -> Result<(), sqlx::Error>;

async fn begin<'a>(&'a self) -> Result<Self::Transaction<'a>, sqlx::Error>;
Expand Down Expand Up @@ -142,11 +153,12 @@ impl PendingTables for PgPool {
.await
}

async fn add_pending_transaction(
async fn do_add_pending_transaction(
&self,
payer: &PublicKeyBinary,
amount: u64,
signature: &Signature,
time_of_submission: DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
Expand All @@ -157,7 +169,7 @@ impl PendingTables for PgPool {
.bind(signature.to_string())
.bind(payer)
.bind(amount as i64)
.bind(Utc::now())
.bind(time_of_submission)
.execute(self)
.await?;
Ok(())
Expand Down Expand Up @@ -279,216 +291,3 @@ impl AddPendingBurn for Arc<Mutex<HashMap<PublicKeyBinary, u64>>> {
Ok(())
}
}

#[derive(Clone)]
pub struct MockPendingTxn {
payer: PublicKeyBinary,
amount: u64,
time_of_submission: DateTime<Utc>,
}

#[derive(Default, Clone)]
pub struct MockPendingTables {
pub pending_txns: Arc<Mutex<HashMap<Signature, MockPendingTxn>>>,
pub pending_burns: Arc<Mutex<HashMap<PublicKeyBinary, u64>>>,
}

#[async_trait]
impl PendingTables for MockPendingTables {
type Transaction<'a> = &'a MockPendingTables;

async fn fetch_next_burn(&self) -> Result<Option<Burn>, sqlx::Error> {
Ok(self
.pending_burns
.lock()
.await
.iter()
.max_by_key(|(_, amount)| **amount)
.map(|(payer, &amount)| Burn {
payer: payer.clone(),
amount,
}))
}

async fn fetch_all_pending_burns(&self) -> Result<Vec<Burn>, sqlx::Error> {
Ok(self
.pending_burns
.lock()
.await
.clone()
.into_iter()
.map(|(payer, amount)| Burn { payer, amount })
.collect())
}

async fn fetch_all_pending_txns(&self) -> Result<Vec<PendingTxn>, sqlx::Error> {
Ok(self
.pending_txns
.lock()
.await
.clone()
.into_iter()
.map(|(signature, mock)| PendingTxn {
signature,
payer: mock.payer,
amount: mock.amount,
time_of_submission: mock.time_of_submission,
})
.collect())
}

async fn add_pending_transaction(
&self,
payer: &PublicKeyBinary,
amount: u64,
signature: &Signature,
) -> Result<(), sqlx::Error> {
self.pending_txns.lock().await.insert(
*signature,
MockPendingTxn {
payer: payer.clone(),
amount,
time_of_submission: Utc::now(),
},
);
Ok(())
}

async fn begin<'a>(&'a self) -> Result<Self::Transaction<'a>, sqlx::Error> {
Ok(self)
}
}

#[async_trait]
impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables {
async fn remove_pending_transaction(
&mut self,
signature: &Signature,
) -> Result<(), sqlx::Error> {
self.pending_txns.lock().await.remove(signature);
Ok(())
}

async fn subtract_burned_amount(
&mut self,
payer: &PublicKeyBinary,
amount: u64,
) -> Result<(), sqlx::Error> {
let mut map = self.pending_burns.lock().await;
let balance = map.get_mut(payer).unwrap();
*balance -= amount;
Ok(())
}

async fn commit(self) -> Result<(), sqlx::Error> {
Ok(())
}
}

#[cfg(test)]
mod test {

use crate::balances::PayerAccount;

use super::*;
use std::collections::HashSet;

#[derive(Clone)]
struct MockConfirmed(HashSet<Signature>);

#[async_trait]
impl SolanaNetwork for MockConfirmed {
type Transaction = Signature;

#[allow(clippy::diverging_sub_expression)]
async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result<u64, SolanaRpcError> {
unreachable!()
}

#[allow(clippy::diverging_sub_expression)]
async fn make_burn_transaction(
&self,
_payer: &PublicKeyBinary,
_amount: u64,
) -> Result<Self::Transaction, SolanaRpcError> {
unreachable!()
}

#[allow(clippy::diverging_sub_expression)]
async fn submit_transaction(
&self,
_transaction: &Self::Transaction,
) -> Result<(), SolanaRpcError> {
unreachable!()
}

async fn confirm_transaction(&self, txn: &Signature) -> Result<bool, SolanaRpcError> {
Ok(self.0.contains(txn))
}
}

#[tokio::test]
async fn test_confirm_pending_txns() {
let confirmed = Signature::new_unique();
let unconfirmed = Signature::new_unique();
let payer: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"
.parse()
.unwrap();
let mut pending_txns = HashMap::new();
const CONFIRMED_BURN_AMOUNT: u64 = 7;
const UNCONFIRMED_BURN_AMOUNT: u64 = 11;
pending_txns.insert(
confirmed,
MockPendingTxn {
payer: payer.clone(),
amount: CONFIRMED_BURN_AMOUNT,
time_of_submission: Utc::now() - Duration::minutes(1),
},
);
pending_txns.insert(
unconfirmed,
MockPendingTxn {
payer: payer.clone(),
amount: UNCONFIRMED_BURN_AMOUNT,
time_of_submission: Utc::now() - Duration::minutes(1),
},
);
let mut balances = HashMap::new();
balances.insert(
payer.clone(),
PayerAccount {
balance: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
},
);
let mut pending_burns = HashMap::new();
pending_burns.insert(
payer.clone(),
CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
);
let pending_txns = Arc::new(Mutex::new(pending_txns));
let pending_burns = Arc::new(Mutex::new(pending_burns));
let pending_tables = MockPendingTables {
pending_txns,
pending_burns,
};
let mut confirmed_txns = HashSet::new();
confirmed_txns.insert(confirmed);
let confirmed = MockConfirmed(confirmed_txns);
// Confirm and resolve transactions:
confirm_pending_txns(&pending_tables, &confirmed, &Arc::new(Mutex::new(balances)))
.await
.unwrap();
// The amount left in the pending burns table should only be the unconfirmed
// burn amount:
assert_eq!(
*pending_tables
.pending_burns
.lock()
.await
.get(&payer)
.unwrap(),
UNCONFIRMED_BURN_AMOUNT,
);
}
}
15 changes: 0 additions & 15 deletions iot_packet_verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,6 @@ pub trait Debiter {
) -> Result<Option<u64>, SolanaRpcError>;
}

#[async_trait]
impl Debiter for Arc<Mutex<HashMap<PublicKeyBinary, u64>>> {
async fn debit_if_sufficient(
&self,
payer: &PublicKeyBinary,
amount: u64,
_trigger_balance_check_threshold: u64,
) -> Result<Option<u64>, SolanaRpcError> {
let map = self.lock().await;
let balance = map.get(payer).unwrap();
// Don't debit the amount if we're mocking. That is a job for the burner.
Ok((*balance >= amount).then(|| balance.saturating_sub(amount)))
}
}

// TODO: Move these to a separate module

pub struct Org {
Expand Down
Loading