Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: sort completed transactions #6733

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ impl wallet_server::Wallet for WalletGrpcServer {

let (mut sender, receiver) = mpsc::channel(transactions.len());
task::spawn(async move {
for (i, (_, txn)) in transactions.iter().enumerate() {
for (i, txn) in transactions.iter().enumerate() {
let response = GetCompletedTransactionsResponse {
transaction: Some(TransactionInfo {
tx_id: txn.tx_id.into(),
Expand Down Expand Up @@ -1088,15 +1088,17 @@ async fn handle_pending_outbound(
transaction_service: &mut TransactionServiceHandle,
sender: &mut Sender<Result<TransactionEventResponse, Status>>,
) {
match transaction_service.get_pending_outbound_transactions().await {
Ok(mut txs) => {
if let Some(tx) = txs.remove(&tx_id) {
mmrrnn marked this conversation as resolved.
Show resolved Hide resolved
use models::WalletTransaction::PendingOutbound;
match transaction_service.get_any_transaction(tx_id).await {
Ok(tx) => match tx {
Some(PendingOutbound(tx)) => {
let transaction_event =
convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx)));
convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx.clone())));
send_transaction_event(transaction_event, sender).await;
} else {
},
_ => {
error!(target: LOG_TARGET, "Not found in pending outbound set tx_id: {}", tx_id);
}
},
},
Err(e) => error!(target: LOG_TARGET, "Transaction service error: {}", e),
}
Expand Down
2 changes: 1 addition & 1 deletion applications/minotari_console_wallet/src/notifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Notifier {
self.handle.spawn(async move {
match transaction_service.get_pending_outbound_transactions().await {
Ok(txs) => {
if let Some(tx) = txs.get(&tx_id) {
if let Some(tx) = txs.iter().find(|tx| tx.tx_id == tx_id) {
let args = args_from_outbound(tx, event);
let result = Command::new(program).args(&args).output();
let message = WalletEventMessage::Outbound {
Expand Down
21 changes: 4 additions & 17 deletions applications/minotari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl AppStateInner {
.transaction_service
.get_pending_inbound_transactions()
.await?
.values()
.iter()
.map(|t| CompletedTransaction::from(t.clone()))
.collect::<Vec<CompletedTransaction>>(),
);
Expand All @@ -730,11 +730,10 @@ impl AppStateInner {
.transaction_service
.get_pending_outbound_transactions()
.await?
.values()
.iter()
.map(|t| CompletedTransaction::from(t.clone()))
.collect::<Vec<CompletedTransaction>>(),
);

pending_transactions.sort_by(|a: &CompletedTransaction, b: &CompletedTransaction| {
b.timestamp.partial_cmp(&a.timestamp).unwrap()
});
Expand All @@ -747,26 +746,14 @@ impl AppStateInner {
.collect::<Result<Vec<_>, _>>()?;

let mut completed_transactions: Vec<CompletedTransaction> = Vec::new();
completed_transactions.extend(
self.wallet
.transaction_service
.get_completed_transactions()
.await?
.values()
.cloned()
.collect::<Vec<CompletedTransaction>>(),
);
completed_transactions.extend(self.wallet.transaction_service.get_completed_transactions().await?);

completed_transactions.extend(
self.wallet
.transaction_service
.get_cancelled_completed_transactions()
.await?
.values()
.cloned()
.collect::<Vec<CompletedTransaction>>(),
.await?,
);

completed_transactions.sort_by(|a, b| {
b.timestamp
.partial_cmp(&a.timestamp)
Expand Down
20 changes: 9 additions & 11 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ pub enum TransactionServiceResponse {
template_registration: Box<CodeTemplateRegistration>,
},
TransactionCancelled,
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
CompletedTransactions(HashMap<TxId, CompletedTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
BaseNodePublicKeySet,
UtxoImported(TxId),
Expand Down Expand Up @@ -913,7 +913,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingInboundTransactions)
Expand All @@ -926,7 +926,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingInboundTransactions)
Expand All @@ -939,7 +939,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingOutboundTransactions)
Expand All @@ -952,7 +952,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingOutboundTransactions)
Expand All @@ -963,9 +963,7 @@ impl TransactionServiceHandle {
}
}

pub async fn get_completed_transactions(
&mut self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionServiceError> {
pub async fn get_completed_transactions(&mut self) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCompletedTransactions)
Expand All @@ -978,7 +976,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_completed_transactions(
&mut self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionServiceError> {
) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledCompletedTransactions)
Expand Down
18 changes: 9 additions & 9 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2750,42 +2750,42 @@ where
>,
) -> Result<(), TransactionServiceError> {
let outbound_txs = self.db.get_pending_outbound_transactions()?;
for (tx_id, tx) in outbound_txs {
for tx in outbound_txs {
let (sender_protocol, stage) = if tx.send_count > 0 {
(None, TransactionSendProtocolStage::WaitForReply)
} else {
(Some(tx.sender_protocol), TransactionSendProtocolStage::Queued)
};
let (not_yet_pending, queued) = (
!self.pending_transaction_reply_senders.contains_key(&tx_id),
!self.pending_transaction_reply_senders.contains_key(&tx.tx_id),
stage == TransactionSendProtocolStage::Queued,
);

if not_yet_pending {
debug!(
target: LOG_TARGET,
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx_id
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx.tx_id
);
} else if queued {
debug!(
target: LOG_TARGET,
"Retry sending queued Pending Outbound Transaction TxId: {}", tx_id
"Retry sending queued Pending Outbound Transaction TxId: {}", tx.tx_id
);
let _sender = self.pending_transaction_reply_senders.remove(&tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx_id);
let _sender = self.pending_transaction_reply_senders.remove(&tx.tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx.tx_id);
} else {
// dont care
}

if not_yet_pending || queued {
let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender);
self.pending_transaction_reply_senders.insert(tx.tx_id, tx_reply_sender);
self.send_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
.insert(tx.tx_id, cancellation_sender);

let protocol = TransactionSendProtocol::new(
tx_id,
tx.tx_id,
self.resources.clone(),
tx_reply_receiver,
cancellation_receiver,
Expand Down
31 changes: 12 additions & 19 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::HashMap,
fmt,
fmt::{Display, Error, Formatter},
sync::Arc,
Expand Down Expand Up @@ -245,9 +244,9 @@ pub enum DbValue {
PendingOutboundTransaction(Box<OutboundTransaction>),
PendingInboundTransaction(Box<InboundTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
CompletedTransactions(HashMap<TxId, CompletedTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
WalletTransaction(Box<WalletTransaction>),
}

Expand Down Expand Up @@ -508,22 +507,20 @@ where T: TransactionBackend + 'static
Ok(*t)
}

pub fn get_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
pub fn get_pending_inbound_transactions(&self) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(true)
}

fn get_pending_inbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingInboundTransactions
} else {
Expand All @@ -544,22 +541,20 @@ where T: TransactionBackend + 'static
Ok(t)
}

pub fn get_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
pub fn get_pending_outbound_transactions(&self) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(true)
}

fn get_pending_outbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingOutboundTransactions
} else {
Expand Down Expand Up @@ -588,13 +583,11 @@ where T: TransactionBackend + 'static
Ok(address)
}

pub fn get_completed_transactions(&self) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(false)
}

pub fn get_cancelled_completed_transactions(
&self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_cancelled_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(true)
}

Expand All @@ -620,7 +613,7 @@ where T: TransactionBackend + 'static
fn get_completed_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledCompletedTransactions
} else {
Expand Down
Loading
Loading