From e5193a8f6ff2e9ea867bb1fcdab3c84ef05009e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Papie=C5=BC?= Date: Fri, 17 Jan 2025 08:02:12 +0100 Subject: [PATCH] chore: sort completed transactions (#6733) ## Description This pull request focuses on refactoring the transaction handling in the wallet service by replacing `HashMap` with `Vec` for storing transactions. This change impacts several parts of the codebase, including transaction service responses, database storage, and transaction handling logic. Additionaly, we sort txs by `mined_timestamp` in desc order. ## Testing Tested manually using `minotari_console_wallet` ![image](https://github.com/user-attachments/assets/eca20425-f8fe-430f-b97b-8673a7bdb9ef) --- .../src/grpc/wallet_grpc_server.rs | 16 +- .../src/notifier/mod.rs | 2 +- .../src/ui/state/app_state.rs | 21 +-- .../wallet/src/transaction_service/handle.rs | 20 ++- .../wallet/src/transaction_service/service.rs | 18 +-- .../transaction_service/storage/database.rs | 31 ++-- .../transaction_service/storage/sqlite_db.rs | 49 ++---- .../transaction_service_tests/service.rs | 151 ++++++++++++++---- .../transaction_service_tests/storage.rs | 37 +++-- .../transaction_protocols.rs | 131 +++++++++++---- base_layer/wallet_ffi/src/lib.rs | 49 +++--- 11 files changed, 333 insertions(+), 192 deletions(-) diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index 5e1490d508..718dd6addd 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -771,7 +771,7 @@ impl wallet_server::Wallet for WalletGrpcServer { let (mut sender, receiver) = mpsc::channel(transactions.len()); task::spawn(async move { - for (i, (_, txn)) in transactions.iter().enumerate() { + for (i, txn) in transactions.iter().enumerate() { let response = GetCompletedTransactionsResponse { transaction: Some(TransactionInfo { tx_id: txn.tx_id.into(), @@ -1088,15 +1088,17 @@ async fn handle_pending_outbound( transaction_service: &mut TransactionServiceHandle, sender: &mut Sender>, ) { - match transaction_service.get_pending_outbound_transactions().await { - Ok(mut txs) => { - if let Some(tx) = txs.remove(&tx_id) { + use models::WalletTransaction::PendingOutbound; + match transaction_service.get_any_transaction(tx_id).await { + Ok(tx) => match tx { + Some(PendingOutbound(tx)) => { let transaction_event = - convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx))); + convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx.clone()))); send_transaction_event(transaction_event, sender).await; - } else { + }, + _ => { error!(target: LOG_TARGET, "Not found in pending outbound set tx_id: {}", tx_id); - } + }, }, Err(e) => error!(target: LOG_TARGET, "Transaction service error: {}", e), } diff --git a/applications/minotari_console_wallet/src/notifier/mod.rs b/applications/minotari_console_wallet/src/notifier/mod.rs index 8508920c74..93b0d7f188 100644 --- a/applications/minotari_console_wallet/src/notifier/mod.rs +++ b/applications/minotari_console_wallet/src/notifier/mod.rs @@ -196,7 +196,7 @@ impl Notifier { self.handle.spawn(async move { match transaction_service.get_pending_outbound_transactions().await { Ok(txs) => { - if let Some(tx) = txs.get(&tx_id) { + if let Some(tx) = txs.iter().find(|tx| tx.tx_id == tx_id) { let args = args_from_outbound(tx, event); let result = Command::new(program).args(&args).output(); let message = WalletEventMessage::Outbound { diff --git a/applications/minotari_console_wallet/src/ui/state/app_state.rs b/applications/minotari_console_wallet/src/ui/state/app_state.rs index 973c2a0a80..1373e29087 100644 --- a/applications/minotari_console_wallet/src/ui/state/app_state.rs +++ b/applications/minotari_console_wallet/src/ui/state/app_state.rs @@ -721,7 +721,7 @@ impl AppStateInner { .transaction_service .get_pending_inbound_transactions() .await? - .values() + .iter() .map(|t| CompletedTransaction::from(t.clone())) .collect::>(), ); @@ -730,11 +730,10 @@ impl AppStateInner { .transaction_service .get_pending_outbound_transactions() .await? - .values() + .iter() .map(|t| CompletedTransaction::from(t.clone())) .collect::>(), ); - pending_transactions.sort_by(|a: &CompletedTransaction, b: &CompletedTransaction| { b.timestamp.partial_cmp(&a.timestamp).unwrap() }); @@ -747,26 +746,14 @@ impl AppStateInner { .collect::, _>>()?; let mut completed_transactions: Vec = Vec::new(); - completed_transactions.extend( - self.wallet - .transaction_service - .get_completed_transactions() - .await? - .values() - .cloned() - .collect::>(), - ); + completed_transactions.extend(self.wallet.transaction_service.get_completed_transactions().await?); completed_transactions.extend( self.wallet .transaction_service .get_cancelled_completed_transactions() - .await? - .values() - .cloned() - .collect::>(), + .await?, ); - completed_transactions.sort_by(|a, b| { b.timestamp .partial_cmp(&a.timestamp) diff --git a/base_layer/wallet/src/transaction_service/handle.rs b/base_layer/wallet/src/transaction_service/handle.rs index d138ca875a..853d69e768 100644 --- a/base_layer/wallet/src/transaction_service/handle.rs +++ b/base_layer/wallet/src/transaction_service/handle.rs @@ -417,9 +417,9 @@ pub enum TransactionServiceResponse { template_registration: Box, }, TransactionCancelled, - PendingInboundTransactions(HashMap), - PendingOutboundTransactions(HashMap), - CompletedTransactions(HashMap), + PendingInboundTransactions(Vec), + PendingOutboundTransactions(Vec), + CompletedTransactions(Vec), CompletedTransaction(Box), BaseNodePublicKeySet, UtxoImported(TxId), @@ -913,7 +913,7 @@ impl TransactionServiceHandle { pub async fn get_pending_inbound_transactions( &mut self, - ) -> Result, TransactionServiceError> { + ) -> Result, TransactionServiceError> { match self .handle .call(TransactionServiceRequest::GetPendingInboundTransactions) @@ -926,7 +926,7 @@ impl TransactionServiceHandle { pub async fn get_cancelled_pending_inbound_transactions( &mut self, - ) -> Result, TransactionServiceError> { + ) -> Result, TransactionServiceError> { match self .handle .call(TransactionServiceRequest::GetCancelledPendingInboundTransactions) @@ -939,7 +939,7 @@ impl TransactionServiceHandle { pub async fn get_pending_outbound_transactions( &mut self, - ) -> Result, TransactionServiceError> { + ) -> Result, TransactionServiceError> { match self .handle .call(TransactionServiceRequest::GetPendingOutboundTransactions) @@ -952,7 +952,7 @@ impl TransactionServiceHandle { pub async fn get_cancelled_pending_outbound_transactions( &mut self, - ) -> Result, TransactionServiceError> { + ) -> Result, TransactionServiceError> { match self .handle .call(TransactionServiceRequest::GetCancelledPendingOutboundTransactions) @@ -963,9 +963,7 @@ impl TransactionServiceHandle { } } - pub async fn get_completed_transactions( - &mut self, - ) -> Result, TransactionServiceError> { + pub async fn get_completed_transactions(&mut self) -> Result, TransactionServiceError> { match self .handle .call(TransactionServiceRequest::GetCompletedTransactions) @@ -978,7 +976,7 @@ impl TransactionServiceHandle { pub async fn get_cancelled_completed_transactions( &mut self, - ) -> Result, TransactionServiceError> { + ) -> Result, TransactionServiceError> { match self .handle .call(TransactionServiceRequest::GetCancelledCompletedTransactions) diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index c6108fb797..645030b354 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -2750,29 +2750,29 @@ where >, ) -> Result<(), TransactionServiceError> { let outbound_txs = self.db.get_pending_outbound_transactions()?; - for (tx_id, tx) in outbound_txs { + for tx in outbound_txs { let (sender_protocol, stage) = if tx.send_count > 0 { (None, TransactionSendProtocolStage::WaitForReply) } else { (Some(tx.sender_protocol), TransactionSendProtocolStage::Queued) }; let (not_yet_pending, queued) = ( - !self.pending_transaction_reply_senders.contains_key(&tx_id), + !self.pending_transaction_reply_senders.contains_key(&tx.tx_id), stage == TransactionSendProtocolStage::Queued, ); if not_yet_pending { debug!( target: LOG_TARGET, - "Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx_id + "Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx.tx_id ); } else if queued { debug!( target: LOG_TARGET, - "Retry sending queued Pending Outbound Transaction TxId: {}", tx_id + "Retry sending queued Pending Outbound Transaction TxId: {}", tx.tx_id ); - let _sender = self.pending_transaction_reply_senders.remove(&tx_id); - let _sender = self.send_transaction_cancellation_senders.remove(&tx_id); + let _sender = self.pending_transaction_reply_senders.remove(&tx.tx_id); + let _sender = self.send_transaction_cancellation_senders.remove(&tx.tx_id); } else { // dont care } @@ -2780,12 +2780,12 @@ where if not_yet_pending || queued { let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100); let (cancellation_sender, cancellation_receiver) = oneshot::channel(); - self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender); + self.pending_transaction_reply_senders.insert(tx.tx_id, tx_reply_sender); self.send_transaction_cancellation_senders - .insert(tx_id, cancellation_sender); + .insert(tx.tx_id, cancellation_sender); let protocol = TransactionSendProtocol::new( - tx_id, + tx.tx_id, self.resources.clone(), tx_reply_receiver, cancellation_receiver, diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index e4c95c2e53..143e8936aa 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -21,7 +21,6 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ - collections::HashMap, fmt, fmt::{Display, Error, Formatter}, sync::Arc, @@ -245,9 +244,9 @@ pub enum DbValue { PendingOutboundTransaction(Box), PendingInboundTransaction(Box), CompletedTransaction(Box), - PendingOutboundTransactions(HashMap), - PendingInboundTransactions(HashMap), - CompletedTransactions(HashMap), + PendingOutboundTransactions(Vec), + PendingInboundTransactions(Vec), + CompletedTransactions(Vec), WalletTransaction(Box), } @@ -508,22 +507,20 @@ where T: TransactionBackend + 'static Ok(*t) } - pub fn get_pending_inbound_transactions( - &self, - ) -> Result, TransactionStorageError> { + pub fn get_pending_inbound_transactions(&self) -> Result, TransactionStorageError> { self.get_pending_inbound_transactions_by_cancelled(false) } pub fn get_cancelled_pending_inbound_transactions( &self, - ) -> Result, TransactionStorageError> { + ) -> Result, TransactionStorageError> { self.get_pending_inbound_transactions_by_cancelled(true) } fn get_pending_inbound_transactions_by_cancelled( &self, cancelled: bool, - ) -> Result, TransactionStorageError> { + ) -> Result, TransactionStorageError> { let key = if cancelled { DbKey::CancelledPendingInboundTransactions } else { @@ -544,22 +541,20 @@ where T: TransactionBackend + 'static Ok(t) } - pub fn get_pending_outbound_transactions( - &self, - ) -> Result, TransactionStorageError> { + pub fn get_pending_outbound_transactions(&self) -> Result, TransactionStorageError> { self.get_pending_outbound_transactions_by_cancelled(false) } pub fn get_cancelled_pending_outbound_transactions( &self, - ) -> Result, TransactionStorageError> { + ) -> Result, TransactionStorageError> { self.get_pending_outbound_transactions_by_cancelled(true) } fn get_pending_outbound_transactions_by_cancelled( &self, cancelled: bool, - ) -> Result, TransactionStorageError> { + ) -> Result, TransactionStorageError> { let key = if cancelled { DbKey::CancelledPendingOutboundTransactions } else { @@ -588,13 +583,11 @@ where T: TransactionBackend + 'static Ok(address) } - pub fn get_completed_transactions(&self) -> Result, TransactionStorageError> { + pub fn get_completed_transactions(&self) -> Result, TransactionStorageError> { self.get_completed_transactions_by_cancelled(false) } - pub fn get_cancelled_completed_transactions( - &self, - ) -> Result, TransactionStorageError> { + pub fn get_cancelled_completed_transactions(&self) -> Result, TransactionStorageError> { self.get_completed_transactions_by_cancelled(true) } @@ -620,7 +613,7 @@ where T: TransactionBackend + 'static fn get_completed_transactions_by_cancelled( &self, cancelled: bool, - ) -> Result, TransactionStorageError> { + ) -> Result, TransactionStorageError> { let key = if cancelled { DbKey::CancelledCompletedTransactions } else { diff --git a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs index 038b339ac3..ce1744473a 100644 --- a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs @@ -21,7 +21,6 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ - collections::HashMap, convert::{TryFrom, TryInto}, sync::{Arc, RwLock}, }; @@ -266,67 +265,49 @@ impl TransactionBackend for TransactionServiceSqliteDatabase { None }, DbKey::PendingOutboundTransactions => { - let mut result = HashMap::new(); + let mut result = Vec::new(); for o in OutboundTransactionSql::index_by_cancelled(&mut conn, false)? { - result.insert( - (o.tx_id as u64).into(), - OutboundTransaction::try_from(o.clone(), &cipher)?, - ); + result.push(OutboundTransaction::try_from(o.clone(), &cipher)?); } Some(DbValue::PendingOutboundTransactions(result)) }, DbKey::PendingInboundTransactions => { - let mut result = HashMap::new(); + let mut result = Vec::new(); for i in InboundTransactionSql::index_by_cancelled(&mut conn, false)? { - result.insert( - (i.tx_id as u64).into(), - InboundTransaction::try_from((i).clone(), &cipher)?, - ); + result.push(InboundTransaction::try_from((i).clone(), &cipher)?); } Some(DbValue::PendingInboundTransactions(result)) }, DbKey::CompletedTransactions => { - let mut result = HashMap::new(); + let mut result = Vec::new(); for c in CompletedTransactionSql::index_by_cancelled(&mut conn, false)? { - result.insert( - (c.tx_id as u64).into(), - CompletedTransaction::try_from((c).clone(), &cipher)?, - ); + result.push(CompletedTransaction::try_from((c).clone(), &cipher)?); } Some(DbValue::CompletedTransactions(result)) }, DbKey::CancelledPendingOutboundTransactions => { - let mut result = HashMap::new(); + let mut result = Vec::new(); for o in OutboundTransactionSql::index_by_cancelled(&mut conn, true)? { - result.insert( - (o.tx_id as u64).into(), - OutboundTransaction::try_from((o).clone(), &cipher)?, - ); + result.push(OutboundTransaction::try_from((o).clone(), &cipher)?); } Some(DbValue::PendingOutboundTransactions(result)) }, DbKey::CancelledPendingInboundTransactions => { - let mut result = HashMap::new(); + let mut result = Vec::new(); for i in InboundTransactionSql::index_by_cancelled(&mut conn, true)? { - result.insert( - (i.tx_id as u64).into(), - InboundTransaction::try_from(i.clone(), &cipher)?, - ); + result.push(InboundTransaction::try_from(i.clone(), &cipher)?); } Some(DbValue::PendingInboundTransactions(result)) }, DbKey::CancelledCompletedTransactions => { - let mut result = HashMap::new(); + let mut result = Vec::new(); for c in CompletedTransactionSql::index_by_cancelled(&mut conn, true)? { - result.insert( - (c.tx_id as u64).into(), - CompletedTransaction::try_from((c).clone(), &cipher)?, - ); + result.push(CompletedTransaction::try_from((c).clone(), &cipher)?); } Some(DbValue::CompletedTransactions(result)) @@ -1192,6 +1173,7 @@ impl InboundTransactionSql { ) -> Result, TransactionStorageError> { Ok(inbound_transactions::table .filter(inbound_transactions::cancelled.eq(i32::from(cancelled))) + .order_by(inbound_transactions::timestamp.desc()) .load::(conn)?) } @@ -1456,6 +1438,7 @@ impl OutboundTransactionSql { ) -> Result, TransactionStorageError> { Ok(outbound_transactions::table .filter(outbound_transactions::cancelled.eq(i32::from(cancelled))) + .order_by(outbound_transactions::timestamp.desc()) .load::(conn)?) } @@ -1726,7 +1709,9 @@ impl CompletedTransactionSql { query.filter(completed_transactions::cancelled.is_null()) }; - Ok(query.load::(conn)?) + Ok(query + .order_by(completed_transactions::mined_timestamp.desc()) + .load::(conn)?) } pub fn index_by_status_and_cancelled( diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 4fd0ae0149..12cda11661 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -30,7 +30,7 @@ use std::{ use blake2::Blake2b; use chacha20poly1305::{Key, KeyInit, XChaCha20Poly1305}; -use chrono::{Duration as ChronoDuration, Utc}; +use chrono::{DateTime, Duration as ChronoDuration, Utc}; use digest::consts::U32; use futures::{ channel::{mpsc, mpsc::Sender}, @@ -803,9 +803,10 @@ async fn large_interactive_transaction() { // We want to ensure that we can get the pending outbound transaction from the database, // and excercise the sender_protocol let pending_outbound = alice_ts.get_pending_outbound_transactions().await.unwrap(); - pending_outbound.get(id).unwrap().sender_protocol.get_amount_to_recipient().unwrap(); + let po_tx = pending_outbound.iter().find(|tx| tx.tx_id == *id).unwrap(); + po_tx.sender_protocol.get_amount_to_recipient().unwrap(); assert_eq!( - pending_outbound.get(id).unwrap().sender_protocol.get_amount_to_recipient().unwrap(), + po_tx.sender_protocol.get_amount_to_recipient().unwrap(), transaction_value ); }, @@ -825,8 +826,9 @@ async fn large_interactive_transaction() { // We want to ensure that we can get the pending inbound transaction from the database, // and excercise the receiver_protocol let pending_inbound = bob_ts.get_pending_inbound_transactions().await.unwrap(); - assert!(pending_inbound.get(id).unwrap().receiver_protocol.get_signed_data().is_ok()); - assert_eq!(pending_inbound.get(id).unwrap().amount, transaction_value); + let pi_tx = pending_inbound.iter().find(|tx| tx.tx_id == *id).unwrap(); + assert!(pi_tx.receiver_protocol.get_signed_data().is_ok()); + assert_eq!(pi_tx.amount, transaction_value); }, TransactionEvent::ReceivedFinalizedTransaction(id) => { tx_id = *id; @@ -3246,7 +3248,8 @@ async fn test_transaction_cancellation() { .get_pending_outbound_transactions() .await .unwrap() - .remove(&tx_id) + .iter() + .find(|tx| tx.tx_id == tx_id) { None => (), Some(_) => break, @@ -3298,13 +3301,13 @@ async fn test_transaction_cancellation() { let alice_cancel_message = try_decode_transaction_cancelled_message(call.1.to_vec()).unwrap(); assert_eq!(alice_cancel_message.tx_id, tx_id.as_u64(), "SAF"); - assert!(alice_ts_interface + assert!(!alice_ts_interface .transaction_service_handle .get_pending_outbound_transactions() .await .unwrap() - .remove(&tx_id) - .is_none()); + .iter() + .any(|tx| tx.tx_id == tx_id)); let key_manager = create_memory_db_key_manager().unwrap(); let input = create_wallet_output_with_data( @@ -3373,13 +3376,13 @@ async fn test_transaction_cancellation() { } } - alice_ts_interface + assert!(alice_ts_interface .transaction_service_handle .get_pending_inbound_transactions() .await .unwrap() - .remove(&tx_id2) - .expect("Pending Transaction 2 should be in list"); + .iter() + .any(|tx| tx.tx_id == tx_id2)); alice_ts_interface .transaction_service_handle @@ -3387,13 +3390,13 @@ async fn test_transaction_cancellation() { .await .unwrap(); - assert!(alice_ts_interface + assert!(!alice_ts_interface .transaction_service_handle .get_pending_inbound_transactions() .await .unwrap() - .remove(&tx_id2) - .is_none()); + .iter() + .any(|tx| tx.tx_id == tx_id2)); // Lets cancel the last one using a Comms stack message let input = create_wallet_output_with_data( @@ -3460,13 +3463,13 @@ async fn test_transaction_cancellation() { } } - alice_ts_interface + assert!(alice_ts_interface .transaction_service_handle .get_pending_inbound_transactions() .await .unwrap() - .remove(&tx_id3) - .expect("Pending Transaction 3 should be in list"); + .iter() + .any(|tx| tx.tx_id == tx_id3)); let proto_message = proto::TransactionCancelledMessage { tx_id: tx_id3.as_u64() }; // Sent from the wrong source address so should not cancel @@ -3481,13 +3484,13 @@ async fn test_transaction_cancellation() { sleep(Duration::from_secs(5)).await; - alice_ts_interface + assert!(alice_ts_interface .transaction_service_handle .get_pending_inbound_transactions() .await .unwrap() - .remove(&tx_id3) - .expect("Pending Transaction 3 should be in list"); + .iter() + .any(|tx| tx.tx_id == tx_id3)); let proto_message = proto::TransactionCancelledMessage { tx_id: tx_id3.as_u64() }; alice_ts_interface @@ -3514,13 +3517,13 @@ async fn test_transaction_cancellation() { } assert!(cancelled, "Should received cancelled event"); - assert!(alice_ts_interface + assert!(!alice_ts_interface .transaction_service_handle .get_pending_inbound_transactions() .await .unwrap() - .remove(&tx_id3) - .is_none()); + .iter() + .any(|tx| tx.tx_id == tx_id3)); } #[tokio::test] async fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { @@ -5531,12 +5534,14 @@ async fn transaction_service_tx_broadcast() { } assert!(tx1_received); - let alice_completed_tx1 = alice_ts_interface + let alice_completed_txs = alice_ts_interface .transaction_service_handle .get_completed_transactions() .await - .unwrap() - .remove(&tx_id1) + .unwrap(); + let alice_completed_tx1 = alice_completed_txs + .iter() + .find(|tx| tx.tx_id == tx_id1) .expect("Transaction must be in collection"); let tx1_fee = alice_completed_tx1.fee; @@ -5637,12 +5642,14 @@ async fn transaction_service_tx_broadcast() { mined_timestamp: None, }); - let alice_completed_tx2 = alice_ts_interface + let alice_completed_txs = alice_ts_interface .transaction_service_handle .get_completed_transactions() .await - .unwrap() - .remove(&tx_id2) + .unwrap(); + let alice_completed_tx2 = alice_completed_txs + .iter() + .find(|tx| tx.tx_id == tx_id2) .expect("Transaction must be in collection"); assert!( @@ -6202,3 +6209,87 @@ async fn test_get_fee_per_gram_per_block_basic() { assert_eq!(estimates.stats, stats.into_iter().map(Into::into).collect::>()); assert_eq!(estimates.stats.len(), 1) } + +#[tokio::test] +async fn test_completed_transactions_ordering() { + let factories = CryptoFactories::default(); + let connection = make_wallet_database_memory_connection(); + + let mut alice_ts_interface = setup_transaction_service_no_comms(factories.clone(), connection, None).await; + let tx_backend = alice_ts_interface.ts_db; + + let kernel = KernelBuilder::new() + .with_excess(&factories.commitment.zero()) + .with_signature(Signature::default()) + .build() + .unwrap(); + let tx = Transaction::new( + vec![], + vec![], + vec![kernel], + PrivateKey::random(&mut OsRng), + PrivateKey::random(&mut OsRng), + ); + let source_address = TariAddress::new_dual_address_with_default_features( + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + Network::LocalNet, + ); + let destination_address = TariAddress::new_dual_address_with_default_features( + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + Network::LocalNet, + ); + + for i in 1u32..5u32 { + let random_timestamp = i64::from(OsRng.next_u32()); + let completed_tx = CompletedTransaction { + tx_id: u64::from(i).into(), + source_address: source_address.clone(), + destination_address: destination_address.clone(), + amount: MicroMinotari::from(1000), + fee: MicroMinotari::from(100), + transaction: tx.clone(), + status: TransactionStatus::Completed, + timestamp: DateTime::::from_timestamp(random_timestamp, 0).unwrap(), + cancelled: None, + direction: TransactionDirection::Outbound, + send_count: 0, + last_send_timestamp: None, + transaction_signature: tx.first_kernel_excess_sig().unwrap_or(&Signature::default()).clone(), + confirmations: None, + mined_height: None, + mined_in_block: None, + mined_timestamp: DateTime::::from_timestamp(random_timestamp + 100i64, 0), + payment_id: PaymentId::open_from_str("Yo!"), + }; + + tx_backend + .write(WriteOperation::Insert(DbKeyValuePair::CompletedTransaction( + u64::from(i).into(), + Box::new(completed_tx), + ))) + .unwrap(); + } + + let alice_completed_transactions = alice_ts_interface + .transaction_service_handle + .get_completed_transactions() + .await + .unwrap(); + + let mut mined_timestamps: Vec<_> = alice_completed_transactions + .iter() + .map(|tx| tx.mined_timestamp.unwrap_or_default()) + .collect(); + mined_timestamps.sort_by(|a, b| b.cmp(a)); + + assert_eq!(alice_completed_transactions.len(), 4); + assert_eq!( + alice_completed_transactions + .iter() + .map(|tx| tx.mined_timestamp.unwrap_or_default()) + .collect::>(), + mined_timestamps + ); +} diff --git a/base_layer/wallet/tests/transaction_service_tests/storage.rs b/base_layer/wallet/tests/transaction_service_tests/storage.rs index bed5f10ba4..e0e39ce1e4 100644 --- a/base_layer/wallet/tests/transaction_service_tests/storage.rs +++ b/base_layer/wallet/tests/transaction_service_tests/storage.rs @@ -163,8 +163,7 @@ pub async fn test_db_backend(backend: T) { assert_eq!(&retrieved_outbound_tx, i); assert_eq!(retrieved_outbound_tx.send_count, 0); assert!(retrieved_outbound_tx.last_send_timestamp.is_none()); - - assert_eq!(&retrieved_outbound_txs.get(&i.tx_id).unwrap(), &i); + assert!(retrieved_outbound_txs.iter().any(|tx| tx == i)); } db.increment_send_count(outbound_txs[0].tx_id).unwrap(); @@ -270,7 +269,7 @@ pub async fn test_db_backend(backend: T) { let retrieved_inbound_txs = db.get_pending_inbound_transactions().unwrap(); assert_eq!(inbound_txs.len(), messages.len()); for i in inbound_txs.iter().take(messages.len()) { - let retrieved_tx = retrieved_inbound_txs.get(&i.tx_id).unwrap(); + let retrieved_tx = retrieved_inbound_txs.iter().find(|tx| tx.tx_id == i.tx_id).unwrap(); assert_eq!(&retrieved_tx, &i); assert_eq!(retrieved_tx.send_count, 0); assert!(retrieved_tx.last_send_timestamp.is_none()); @@ -361,14 +360,20 @@ pub async fn test_db_backend(backend: T) { for i in 0..messages.len() { assert_eq!( - retrieved_completed_txs.get(&inbound_txs[i].tx_id).unwrap(), + retrieved_completed_txs + .iter() + .find(|tx| tx.tx_id == inbound_txs[i].tx_id) + .unwrap(), &CompletedTransaction { tx_id: inbound_txs[i].tx_id, ..completed_txs[i].clone() } ); assert_eq!( - retrieved_completed_txs.get(&outbound_txs[i].tx_id).unwrap(), + retrieved_completed_txs + .iter() + .find(|tx| tx.tx_id == outbound_txs[i].tx_id) + .unwrap(), &completed_txs[i] ); } @@ -408,23 +413,23 @@ pub async fn test_db_backend(backend: T) { panic!("Should have found completed tx"); } - let completed_txs_map = db.get_completed_transactions().unwrap(); - let num_completed_txs = completed_txs_map.len(); + let completed_txs = db.get_completed_transactions().unwrap(); + let num_completed_txs = completed_txs.len(); assert_eq!(db.get_cancelled_completed_transactions().unwrap().len(), 0); - let cancelled_tx_id = completed_txs_map[&1u64.into()].tx_id; + let cancelled_tx_id = completed_txs[1].tx_id; assert!(db.get_cancelled_completed_transaction(cancelled_tx_id).is_err()); db.reject_completed_transaction(cancelled_tx_id, TxCancellationReason::Unknown) .unwrap(); - let completed_txs_map = db.get_completed_transactions().unwrap(); - assert_eq!(completed_txs_map.len(), num_completed_txs - 1); + let completed_txs = db.get_completed_transactions().unwrap(); + assert_eq!(completed_txs.len(), num_completed_txs - 1); db.get_cancelled_completed_transaction(cancelled_tx_id) .expect("Should find cancelled transaction"); - let mut cancelled_txs = db.get_cancelled_completed_transactions().unwrap(); + let cancelled_txs = db.get_cancelled_completed_transactions().unwrap(); assert_eq!(cancelled_txs.len(), 1); - assert!(cancelled_txs.remove(&cancelled_tx_id).is_some()); + assert!(cancelled_txs.iter().any(|c_tx| c_tx.tx_id == cancelled_tx_id)); let any_cancelled_completed_tx = db.get_any_transaction(cancelled_tx_id).unwrap().unwrap(); if let WalletTransaction::Completed(tx) = any_cancelled_completed_tx { @@ -481,9 +486,9 @@ pub async fn test_db_backend(backend: T) { panic!("Should have found cancelled inbound tx"); } - let mut cancelled_txs = db.get_cancelled_pending_inbound_transactions().unwrap(); + let cancelled_txs = db.get_cancelled_pending_inbound_transactions().unwrap(); assert_eq!(cancelled_txs.len(), 1); - assert!(cancelled_txs.remove(&999u64.into()).is_some()); + assert!(cancelled_txs.iter().any(|c_tx| c_tx.tx_id == TxId::from(999u64))); let address = TariAddress::new_dual_address_with_default_features( PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), @@ -530,9 +535,9 @@ pub async fn test_db_backend(backend: T) { assert_eq!(db.get_pending_outbound_transactions().unwrap().len(), 0); - let mut cancelled_txs = db.get_cancelled_pending_outbound_transactions().unwrap(); + let cancelled_txs = db.get_cancelled_pending_outbound_transactions().unwrap(); assert_eq!(cancelled_txs.len(), 1); - assert!(cancelled_txs.remove(&998u64.into()).is_some()); + assert!(cancelled_txs.iter().any(|c_tx| c_tx.tx_id == TxId::from(998u64))); let any_cancelled_outbound_tx = db.get_any_transaction(998u64.into()).unwrap().unwrap(); if let WalletTransaction::PendingOutbound(tx) = any_cancelled_outbound_tx { diff --git a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index b94d648e81..184d3aa608 100644 --- a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -844,11 +844,19 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( - completed_txs.get(&1u64.into()).unwrap().status, + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(1u64)) + .unwrap() + .status, TransactionStatus::Broadcast ); assert_eq!( - completed_txs.get(&2u64.into()).unwrap().status, + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .status, TransactionStatus::MinedUnconfirmed ); @@ -871,11 +879,19 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( - completed_txs.get(&1u64.into()).unwrap().status, + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(1u64)) + .unwrap() + .status, TransactionStatus::Broadcast ); assert_eq!( - completed_txs.get(&2u64.into()).unwrap().status, + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .status, TransactionStatus::Completed ); @@ -916,10 +932,22 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( - completed_txs.get(&2u64.into()).unwrap().status, + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .status, TransactionStatus::MinedConfirmed ); - assert_eq!(completed_txs.get(&2u64.into()).unwrap().confirmations.unwrap(), 4); + assert_eq!( + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .confirmations + .unwrap(), + 4 + ); } /// Test that revalidation clears the correct db fields and calls for validation of is said transactions @@ -997,10 +1025,22 @@ async fn tx_revalidation() { let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( - completed_txs.get(&2u64.into()).unwrap().status, + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .status, TransactionStatus::MinedConfirmed ); - assert_eq!(completed_txs.get(&2u64.into()).unwrap().confirmations.unwrap(), 4); + assert_eq!( + completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .confirmations + .unwrap(), + 4 + ); let transaction_query_batch_responses = vec![TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -1028,12 +1068,13 @@ async fn tx_revalidation() { .mark_all_non_coinbases_transactions_as_unvalidated() .unwrap(); let completed_txs = resources.db.get_completed_transactions().unwrap(); - assert_eq!( - completed_txs.get(&2u64.into()).unwrap().status, - TransactionStatus::MinedConfirmed - ); - assert_eq!(completed_txs.get(&2u64.into()).unwrap().mined_height, None); - assert_eq!(completed_txs.get(&2u64.into()).unwrap().mined_in_block, None); + let completed_tx_2 = completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap(); + assert_eq!(completed_tx_2.status, TransactionStatus::MinedConfirmed); + assert_eq!(completed_tx_2.mined_height, None); + assert_eq!(completed_tx_2.mined_in_block, None); let protocol = TransactionValidationProtocol::new( 5.into(), @@ -1049,11 +1090,12 @@ async fn tx_revalidation() { let completed_txs = resources.db.get_completed_transactions().unwrap(); // data should now be updated and changed - assert_eq!( - completed_txs.get(&2u64.into()).unwrap().status, - TransactionStatus::MinedConfirmed - ); - assert_eq!(completed_txs.get(&2u64.into()).unwrap().confirmations.unwrap(), 8); + let completed_tx_2 = completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap(); + assert_eq!(completed_tx_2.status, TransactionStatus::MinedConfirmed); + assert_eq!(completed_tx_2.confirmations.unwrap(), 8); } /// Test that validation detects transactions becoming mined unconfirmed and then confirmed with some going back to @@ -1219,7 +1261,7 @@ async fn tx_validation_protocol_reorg() { let completed_txs = resources.db.get_completed_transactions().unwrap(); let mut unconfirmed_count = 0; let mut confirmed_count = 0; - for tx in completed_txs.values() { + for tx in completed_txs { if tx.status == TransactionStatus::MinedUnconfirmed { unconfirmed_count += 1; } @@ -1335,23 +1377,58 @@ async fn tx_validation_protocol_reorg() { let completed_txs = resources.db.get_completed_transactions().unwrap(); // Tx 1 - assert!(completed_txs.get(&1u64.into()).unwrap().mined_in_block.is_some()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(1u64)) + .unwrap() + .mined_in_block + .is_some()); // Tx 2 - assert!(completed_txs.get(&2u64.into()).unwrap().mined_in_block.is_some()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(2u64)) + .unwrap() + .mined_in_block + .is_some()); // Tx 3 - assert!(completed_txs.get(&3u64.into()).unwrap().mined_in_block.is_some()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(3u64)) + .unwrap() + .mined_in_block + .is_some()); // Tx 4 (reorged out) - assert!(completed_txs.get(&4u64.into()).unwrap().mined_in_block.is_none()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(4u64)) + .unwrap() + .mined_in_block + .is_none()); // Tx 5 - assert!(completed_txs.get(&5u64.into()).unwrap().mined_in_block.is_some()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(5u64)) + .unwrap() + .mined_in_block + .is_some()); // Tx 6 (reorged out) - assert!(completed_txs.get(&6u64.into()).unwrap().mined_in_block.is_none()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(6u64)) + .unwrap() + .mined_in_block + .is_none()); // Tx 7 (reorged out) - assert!(completed_txs.get(&7u64.into()).unwrap().mined_in_block.is_none()); + assert!(completed_txs + .iter() + .find(|c_tx| c_tx.tx_id == TxId::from(7u64)) + .unwrap() + .mined_in_block + .is_none()); } diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 95c3dc1843..9c717b0519 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -7642,7 +7642,7 @@ pub unsafe extern "C" fn wallet_get_completed_transactions( // definitions and storage of a MimbleWimble CompletedTransaction we will remove CompletedTransactions with // the Completed and Broadcast states from the list returned by this FFI function for tx in completed_transactions - .values() + .iter() .filter(|ct| ct.status != TransactionStatus::Completed) .filter(|ct| ct.status != TransactionStatus::Broadcast) .filter(|ct| ct.status != TransactionStatus::Imported) @@ -7695,7 +7695,7 @@ pub unsafe extern "C" fn wallet_get_pending_inbound_transactions( match pending_transactions { Ok(pending_transactions) => { - for tx in pending_transactions.values() { + for tx in &pending_transactions { pending.push(tx.clone()); } @@ -7708,7 +7708,7 @@ pub unsafe extern "C" fn wallet_get_pending_inbound_transactions( // definitions and storage of a MimbleWimble CompletedTransaction we will add those transaction to the // list here in the FFI interface for ct in completed_txs - .values() + .iter() .filter(|ct| { ct.status == TransactionStatus::Completed || ct.status == TransactionStatus::Broadcast || @@ -7765,7 +7765,7 @@ pub unsafe extern "C" fn wallet_get_pending_outbound_transactions( .block_on((*wallet).wallet.transaction_service.get_pending_outbound_transactions()); match pending_transactions { Ok(pending_transactions) => { - for tx in pending_transactions.values() { + for tx in &pending_transactions { pending.push(tx.clone()); } if let Ok(completed_txs) = (*wallet) @@ -7777,7 +7777,7 @@ pub unsafe extern "C" fn wallet_get_pending_outbound_transactions( // definitions and storage of a MimbleWimble CompletedTransaction we will add those transaction to the // list here in the FFI interface for ct in completed_txs - .values() + .iter() .filter(|ct| ct.status == TransactionStatus::Completed || ct.status == TransactionStatus::Broadcast) .filter(|ct| ct.direction == TransactionDirection::Outbound) { @@ -7864,7 +7864,7 @@ pub unsafe extern "C" fn wallet_get_cancelled_transactions( }; let mut completed = Vec::new(); - for tx in completed_transactions.values() { + for tx in &completed_transactions { completed.push(tx.clone()); } let runtime = match Runtime::new() { @@ -7883,12 +7883,12 @@ pub unsafe extern "C" fn wallet_get_cancelled_transactions( return ptr::null_mut(); }, }; - for tx in inbound_transactions.values() { + for tx in &inbound_transactions { let mut inbound_tx = CompletedTransaction::from(tx.clone()); inbound_tx.destination_address = wallet_address.clone(); completed.push(inbound_tx); } - for tx in outbound_transactions.values() { + for tx in &outbound_transactions { let mut outbound_tx = CompletedTransaction::from(tx.clone()); outbound_tx.source_address = wallet_address.clone(); completed.push(outbound_tx); @@ -7932,7 +7932,10 @@ pub unsafe extern "C" fn wallet_get_completed_transaction_by_id( match completed_transactions { Ok(completed_transactions) => { - if let Some(tx) = completed_transactions.get(&TxId::from(transaction_id)) { + if let Some(tx) = completed_transactions + .iter() + .find(|tx| tx.tx_id == TxId::from(transaction_id)) + { if tx.status != TransactionStatus::Completed && tx.status != TransactionStatus::Broadcast { let completed = tx.clone(); return Box::into_raw(Box::new(completed)); @@ -7990,7 +7993,7 @@ pub unsafe extern "C" fn wallet_get_pending_inbound_transaction_by_id( match completed_transactions { Ok(completed_transactions) => { - if let Some(tx) = completed_transactions.get(&transaction_id) { + if let Some(tx) = completed_transactions.iter().find(|tx| tx.tx_id == transaction_id) { if (tx.status == TransactionStatus::Broadcast || tx.status == TransactionStatus::Completed) && tx.direction == TransactionDirection::Inbound { @@ -8008,7 +8011,7 @@ pub unsafe extern "C" fn wallet_get_pending_inbound_transaction_by_id( match pending_transactions { Ok(pending_transactions) => { - if let Some(tx) = pending_transactions.get(&transaction_id) { + if let Some(tx) = pending_transactions.iter().find(|tx| tx.tx_id == transaction_id) { let pending = tx.clone(); return Box::into_raw(Box::new(pending)); } @@ -8064,7 +8067,7 @@ pub unsafe extern "C" fn wallet_get_pending_outbound_transaction_by_id( match completed_transactions { Ok(completed_transactions) => { - if let Some(tx) = completed_transactions.get(&transaction_id) { + if let Some(tx) = completed_transactions.iter().find(|tx| tx.tx_id == transaction_id) { if (tx.status == TransactionStatus::Broadcast || tx.status == TransactionStatus::Completed) && tx.direction == TransactionDirection::Outbound { @@ -8082,7 +8085,7 @@ pub unsafe extern "C" fn wallet_get_pending_outbound_transaction_by_id( match pending_transactions { Ok(pending_transactions) => { - if let Some(tx) = pending_transactions.get(&transaction_id) { + if let Some(tx) = pending_transactions.iter().find(|tx| tx.tx_id == transaction_id) { let pending = tx.clone(); return Box::into_raw(Box::new(pending)); } @@ -8131,7 +8134,7 @@ pub unsafe extern "C" fn wallet_get_cancelled_transaction_by_id( let mut transaction = None; - let mut completed_transactions = match (*wallet).runtime.block_on( + let completed_transactions = match (*wallet).runtime.block_on( (*wallet) .wallet .transaction_service @@ -8145,10 +8148,10 @@ pub unsafe extern "C" fn wallet_get_cancelled_transaction_by_id( }, }; - if let Some(tx) = completed_transactions.remove(&transaction_id) { - transaction = Some(tx); + if let Some(tx) = completed_transactions.iter().find(|tx| tx.tx_id == transaction_id) { + transaction = Some(tx.clone()); } else { - let mut outbound_transactions = match (*wallet).runtime.block_on( + let outbound_transactions = match (*wallet).runtime.block_on( (*wallet) .wallet .transaction_service @@ -8177,12 +8180,12 @@ pub unsafe extern "C" fn wallet_get_cancelled_transaction_by_id( return ptr::null_mut(); }, }; - if let Some(tx) = outbound_transactions.remove(&transaction_id) { - let mut outbound_tx = CompletedTransaction::from(tx); + if let Some(tx) = outbound_transactions.iter().find(|tx| tx.tx_id == transaction_id) { + let mut outbound_tx = CompletedTransaction::from(tx.clone()); outbound_tx.source_address = address; transaction = Some(outbound_tx); } else { - let mut inbound_transactions = match (*wallet).runtime.block_on( + let inbound_transactions = match (*wallet).runtime.block_on( (*wallet) .wallet .transaction_service @@ -8195,8 +8198,8 @@ pub unsafe extern "C" fn wallet_get_cancelled_transaction_by_id( return ptr::null_mut(); }, }; - if let Some(tx) = inbound_transactions.remove(&transaction_id) { - let mut inbound_tx = CompletedTransaction::from(tx); + if let Some(tx) = inbound_transactions.iter().find(|tx| tx.tx_id == transaction_id) { + let mut inbound_tx = CompletedTransaction::from(tx.clone()); inbound_tx.destination_address = address; transaction = Some(inbound_tx); } @@ -8205,7 +8208,7 @@ pub unsafe extern "C" fn wallet_get_cancelled_transaction_by_id( match transaction { Some(tx) => { - return Box::into_raw(Box::new(tx)); + return Box::into_raw(Box::new(tx.clone())); }, None => { error = LibWalletError::from(WalletError::TransactionServiceError(