Skip to content

Commit

Permalink
chore: sort completed transactions (#6733)
Browse files Browse the repository at this point in the history
## Description

This pull request focuses on refactoring the transaction handling in the
wallet service by replacing `HashMap` with `Vec` for storing
transactions. This change impacts several parts of the codebase,
including transaction service responses, database storage, and
transaction handling logic. Additionaly, we sort txs by
`mined_timestamp` in desc order.

## Testing
Tested manually using `minotari_console_wallet`



![image](https://github.com/user-attachments/assets/eca20425-f8fe-430f-b97b-8673a7bdb9ef)
  • Loading branch information
mmrrnn authored Jan 17, 2025
1 parent ef81ccb commit e5193a8
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ impl wallet_server::Wallet for WalletGrpcServer {

let (mut sender, receiver) = mpsc::channel(transactions.len());
task::spawn(async move {
for (i, (_, txn)) in transactions.iter().enumerate() {
for (i, txn) in transactions.iter().enumerate() {
let response = GetCompletedTransactionsResponse {
transaction: Some(TransactionInfo {
tx_id: txn.tx_id.into(),
Expand Down Expand Up @@ -1088,15 +1088,17 @@ async fn handle_pending_outbound(
transaction_service: &mut TransactionServiceHandle,
sender: &mut Sender<Result<TransactionEventResponse, Status>>,
) {
match transaction_service.get_pending_outbound_transactions().await {
Ok(mut txs) => {
if let Some(tx) = txs.remove(&tx_id) {
use models::WalletTransaction::PendingOutbound;
match transaction_service.get_any_transaction(tx_id).await {
Ok(tx) => match tx {
Some(PendingOutbound(tx)) => {
let transaction_event =
convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx)));
convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx.clone())));
send_transaction_event(transaction_event, sender).await;
} else {
},
_ => {
error!(target: LOG_TARGET, "Not found in pending outbound set tx_id: {}", tx_id);
}
},
},
Err(e) => error!(target: LOG_TARGET, "Transaction service error: {}", e),
}
Expand Down
2 changes: 1 addition & 1 deletion applications/minotari_console_wallet/src/notifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Notifier {
self.handle.spawn(async move {
match transaction_service.get_pending_outbound_transactions().await {
Ok(txs) => {
if let Some(tx) = txs.get(&tx_id) {
if let Some(tx) = txs.iter().find(|tx| tx.tx_id == tx_id) {
let args = args_from_outbound(tx, event);
let result = Command::new(program).args(&args).output();
let message = WalletEventMessage::Outbound {
Expand Down
21 changes: 4 additions & 17 deletions applications/minotari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl AppStateInner {
.transaction_service
.get_pending_inbound_transactions()
.await?
.values()
.iter()
.map(|t| CompletedTransaction::from(t.clone()))
.collect::<Vec<CompletedTransaction>>(),
);
Expand All @@ -730,11 +730,10 @@ impl AppStateInner {
.transaction_service
.get_pending_outbound_transactions()
.await?
.values()
.iter()
.map(|t| CompletedTransaction::from(t.clone()))
.collect::<Vec<CompletedTransaction>>(),
);

pending_transactions.sort_by(|a: &CompletedTransaction, b: &CompletedTransaction| {
b.timestamp.partial_cmp(&a.timestamp).unwrap()
});
Expand All @@ -747,26 +746,14 @@ impl AppStateInner {
.collect::<Result<Vec<_>, _>>()?;

let mut completed_transactions: Vec<CompletedTransaction> = Vec::new();
completed_transactions.extend(
self.wallet
.transaction_service
.get_completed_transactions()
.await?
.values()
.cloned()
.collect::<Vec<CompletedTransaction>>(),
);
completed_transactions.extend(self.wallet.transaction_service.get_completed_transactions().await?);

completed_transactions.extend(
self.wallet
.transaction_service
.get_cancelled_completed_transactions()
.await?
.values()
.cloned()
.collect::<Vec<CompletedTransaction>>(),
.await?,
);

completed_transactions.sort_by(|a, b| {
b.timestamp
.partial_cmp(&a.timestamp)
Expand Down
20 changes: 9 additions & 11 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ pub enum TransactionServiceResponse {
template_registration: Box<CodeTemplateRegistration>,
},
TransactionCancelled,
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
CompletedTransactions(HashMap<TxId, CompletedTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
BaseNodePublicKeySet,
UtxoImported(TxId),
Expand Down Expand Up @@ -913,7 +913,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingInboundTransactions)
Expand All @@ -926,7 +926,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingInboundTransactions)
Expand All @@ -939,7 +939,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingOutboundTransactions)
Expand All @@ -952,7 +952,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingOutboundTransactions)
Expand All @@ -963,9 +963,7 @@ impl TransactionServiceHandle {
}
}

pub async fn get_completed_transactions(
&mut self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionServiceError> {
pub async fn get_completed_transactions(&mut self) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCompletedTransactions)
Expand All @@ -978,7 +976,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_completed_transactions(
&mut self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionServiceError> {
) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledCompletedTransactions)
Expand Down
18 changes: 9 additions & 9 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2750,42 +2750,42 @@ where
>,
) -> Result<(), TransactionServiceError> {
let outbound_txs = self.db.get_pending_outbound_transactions()?;
for (tx_id, tx) in outbound_txs {
for tx in outbound_txs {
let (sender_protocol, stage) = if tx.send_count > 0 {
(None, TransactionSendProtocolStage::WaitForReply)
} else {
(Some(tx.sender_protocol), TransactionSendProtocolStage::Queued)
};
let (not_yet_pending, queued) = (
!self.pending_transaction_reply_senders.contains_key(&tx_id),
!self.pending_transaction_reply_senders.contains_key(&tx.tx_id),
stage == TransactionSendProtocolStage::Queued,
);

if not_yet_pending {
debug!(
target: LOG_TARGET,
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx_id
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx.tx_id
);
} else if queued {
debug!(
target: LOG_TARGET,
"Retry sending queued Pending Outbound Transaction TxId: {}", tx_id
"Retry sending queued Pending Outbound Transaction TxId: {}", tx.tx_id
);
let _sender = self.pending_transaction_reply_senders.remove(&tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx_id);
let _sender = self.pending_transaction_reply_senders.remove(&tx.tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx.tx_id);
} else {
// dont care
}

if not_yet_pending || queued {
let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender);
self.pending_transaction_reply_senders.insert(tx.tx_id, tx_reply_sender);
self.send_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
.insert(tx.tx_id, cancellation_sender);

let protocol = TransactionSendProtocol::new(
tx_id,
tx.tx_id,
self.resources.clone(),
tx_reply_receiver,
cancellation_receiver,
Expand Down
31 changes: 12 additions & 19 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::HashMap,
fmt,
fmt::{Display, Error, Formatter},
sync::Arc,
Expand Down Expand Up @@ -245,9 +244,9 @@ pub enum DbValue {
PendingOutboundTransaction(Box<OutboundTransaction>),
PendingInboundTransaction(Box<InboundTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
CompletedTransactions(HashMap<TxId, CompletedTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
WalletTransaction(Box<WalletTransaction>),
}

Expand Down Expand Up @@ -508,22 +507,20 @@ where T: TransactionBackend + 'static
Ok(*t)
}

pub fn get_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
pub fn get_pending_inbound_transactions(&self) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(true)
}

fn get_pending_inbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingInboundTransactions
} else {
Expand All @@ -544,22 +541,20 @@ where T: TransactionBackend + 'static
Ok(t)
}

pub fn get_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
pub fn get_pending_outbound_transactions(&self) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(true)
}

fn get_pending_outbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingOutboundTransactions
} else {
Expand Down Expand Up @@ -588,13 +583,11 @@ where T: TransactionBackend + 'static
Ok(address)
}

pub fn get_completed_transactions(&self) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(false)
}

pub fn get_cancelled_completed_transactions(
&self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_cancelled_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(true)
}

Expand All @@ -620,7 +613,7 @@ where T: TransactionBackend + 'static
fn get_completed_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledCompletedTransactions
} else {
Expand Down
Loading

0 comments on commit e5193a8

Please sign in to comment.