Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add wallet state #6763

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions applications/minotari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import "network.proto";
service Wallet {
// This returns the current version
rpc GetVersion (GetVersionRequest) returns (GetVersionResponse);
// This returns the current state of the wallet
rpc GetState (GetStateRequest) returns (GetStateResponse);
// This checks if the wallet is healthy and running
rpc CheckConnectivity(GetConnectivityRequest) returns (CheckConnectivityResponse);
// Check for new updates
Expand Down Expand Up @@ -241,13 +243,21 @@ message GetCompletedTransactionsResponse {

message GetBalanceRequest {}

message GetStateRequest {}

message GetBalanceResponse {
uint64 available_balance = 1;
uint64 pending_incoming_balance = 2;
uint64 pending_outgoing_balance = 3;
uint64 timelocked_balance = 4;
}

message GetStateResponse {
uint64 scanned_height = 1;
GetBalanceResponse balance = 2;
NetworkStatusResponse network = 3;
}

message GetUnspentAmountsResponse {
repeated uint64 amount = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion applications/minotari_console_wallet/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

mod get_balance_debounced;
mod wallet_debouncer;
mod wallet_grpc_server;

use minotari_app_grpc::tari_rpc::TransactionEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::Arc;

use minotari_wallet::utxo_scanner_service::handle::UtxoScannerEvent;
use log::{info, trace, warn};
use minotari_app_grpc::tari_rpc::GetBalanceResponse;
use minotari_wallet::{
Expand All @@ -35,6 +35,7 @@ use minotari_wallet::{
use tari_shutdown::ShutdownSignal;
use tokio::sync::Mutex;
use tonic::Status;
use minotari_wallet::utxo_scanner_service::handle::UtxoScannerHandle;

const LOG_TARGET: &str = "wallet::ui::grpc::get_balance_debounced";

Expand All @@ -44,22 +45,25 @@ const LOG_TARGET: &str = "wallet::ui::grpc::get_balance_debounced";
/// balance needs to be updated. When ever a client requests the balance, it will be fetched from the backend if the
/// flag is set and clear the flag, otherwise the cached balance will be returned.
#[derive(Clone)]
pub struct GetBalanceDebounced {
pub struct WalletDebouncer {
balance: Arc<Mutex<Balance>>,
scanned_height: Arc<Mutex<u64>>,
refresh_needed: Arc<Mutex<bool>>,
output_manager_service: OutputManagerHandle,
transaction_service: TransactionServiceHandle,
wallet_connectivity: WalletConnectivityHandle,
utxo_scanner_handle: UtxoScannerHandle,
shutdown_signal: ShutdownSignal,
event_monitor_started: Arc<Mutex<bool>>,
}

impl GetBalanceDebounced {
/// Create a new GetBalanceDebounced instance.
impl WalletDebouncer {
/// Create a new WalletDebouncer instance.
pub fn new(
output_manager_service: OutputManagerHandle,
transaction_service: TransactionServiceHandle,
wallet_connectivity: WalletConnectivityHandle,
utxo_scanner_handle: UtxoScannerHandle,
shutdown_signal: ShutdownSignal,
) -> Self {
Self {
Expand All @@ -70,9 +74,11 @@ impl GetBalanceDebounced {
time_locked_balance: None,
})),
refresh_needed: Arc::new(Mutex::new(true)),
scanned_height: Arc::new(Mutex::new(0)),
output_manager_service,
transaction_service,
wallet_connectivity,
utxo_scanner_handle,
shutdown_signal,
event_monitor_started: Arc::new(Mutex::new(false)),
}
Expand Down Expand Up @@ -138,11 +144,29 @@ impl GetBalanceDebounced {
}
}

async fn update_scanned_height(&self, scanned_height: u64) {
let mut lock = self.scanned_height.lock().await;
if *lock != scanned_height {
trace!(target: LOG_TARGET, "set_scanned_height '{}'", scanned_height);
*lock = scanned_height;
}
}

pub async fn get_scanned_height(&mut self) -> u64 {
if !self.is_event_monitor_started().await {
self.start_event_monitor().await;
}
*self.scanned_height.lock().await

}


async fn monitor_events(&self) {
let mut shutdown_signal = self.shutdown_signal.clone();
let mut transaction_service_events = self.transaction_service.get_event_stream();
let mut base_node_changed = self.wallet_connectivity.clone().get_current_base_node_watcher();
let mut output_manager_service_events = self.output_manager_service.get_event_stream();
let mut utxo_scanner_events = self.utxo_scanner_handle.clone().get_event_receiver();

loop {
tokio::select! {
Expand Down Expand Up @@ -187,6 +211,29 @@ impl GetBalanceDebounced {
},
}
},
result = utxo_scanner_events.recv() => {
match result {
Ok(event) => {
match event {
UtxoScannerEvent::Progress {
current_height,..
}=> {
self.update_scanned_height(current_height).await;
}
UtxoScannerEvent::Completed {
final_height,
..
}=> {
self.update_scanned_height(final_height).await;
},
_ => {}
}
},
Err(e) => {
warn!(target: LOG_TARGET, "Problem with utxo scanner: {}",e);
},
}
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use minotari_app_grpc::tari_rpc::{
GetAddressResponse,
GetBalanceRequest,
GetBalanceResponse,
GetStateRequest,
GetStateResponse,
GetCompletedTransactionsRequest,
GetCompletedTransactionsResponse,
GetConnectivityRequest,
Expand Down Expand Up @@ -122,7 +124,7 @@ use tokio::{
use tonic::{Request, Response, Status};

use crate::{
grpc::{convert_to_transaction_event, get_balance_debounced::GetBalanceDebounced, TransactionWrapper},
grpc::{convert_to_transaction_event, wallet_debouncer::WalletDebouncer, TransactionWrapper},
notifier::{CANCELLED, CONFIRMATION, MINED, QUEUED, RECEIVED, SENT},
};

Expand All @@ -146,23 +148,24 @@ async fn send_transaction_event(
pub struct WalletGrpcServer {
wallet: WalletSqlite,
rules: ConsensusManager,
get_balance_debounced: Arc<Mutex<GetBalanceDebounced>>,
debouncer: Arc<Mutex<WalletDebouncer>>,
}

impl WalletGrpcServer {
#[allow(dead_code)]
pub fn new(wallet: WalletSqlite) -> Result<Self, ConsensusBuilderError> {
let rules = ConsensusManager::builder(wallet.network.as_network()).build()?;
let get_balance = GetBalanceDebounced::new(
let debouncer = WalletDebouncer::new(
wallet.output_manager_service.clone(),
wallet.transaction_service.clone(),
wallet.wallet_connectivity.clone(),
wallet.utxo_scanner_service.clone(),
wallet.comms.shutdown_signal(),
);
Ok(Self {
wallet,
rules,
get_balance_debounced: Arc::new(Mutex::new(get_balance)),
debouncer: Arc::new(Mutex::new(debouncer)),
})
}

Expand Down Expand Up @@ -281,16 +284,57 @@ impl wallet_server::Wallet for WalletGrpcServer {
async fn get_balance(&self, _request: Request<GetBalanceRequest>) -> Result<Response<GetBalanceResponse>, Status> {
let start = std::time::Instant::now();
let balance = {
let mut get_balance = self.get_balance_debounced.lock().await;
let mut get_balance = self.debouncer.lock().await;
match get_balance.get_balance().await {
Ok(b) => b,
Err(e) => return Err(Status::not_found(format!("GetBalanceDebounced error! {}", e))),
Err(e) => return Err(Status::not_found(format!("WalletDebouncer error! {}", e))),
}
};
trace!(target: LOG_TARGET, "'get_balance' completed in {:.2?}", start.elapsed());
Ok(Response::new(balance))
}

async fn get_state(&self, _request: Request<GetStateRequest>) -> Result<Response<GetStateResponse>, Status> {
let start = std::time::Instant::now();
let (balance, scanned_height) = {
let mut debouncer = self.debouncer.lock().await;
let balance = match debouncer.get_balance().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt ok?

Ok(b) => b,
Err(e) => return Err(Status::not_found(format!("WalletDebouncer error! {}", e))),
};
let scanned_height = debouncer.get_scanned_height().await;
(Some(balance), scanned_height)
};


let status = self
.comms()
.connectivity()
.get_connectivity_status()
.await
.map_err(|err| Status::internal(err.to_string()))?;
let mut base_node_service = self.wallet.base_node_service.clone();

let network = Some(tari_rpc::NetworkStatusResponse {
status: tari_rpc::ConnectivityStatus::from(status) as i32,
avg_latency_ms: base_node_service
.get_base_node_latency()
.await
.map_err(|err| Status::internal(err.to_string()))?
.map(|d| u32::try_from(d.as_millis()).unwrap_or(u32::MAX))
.unwrap_or_default(),
num_node_connections: u32::try_from(status.num_connected_nodes())
.map_err(|_| Status::internal("Count not convert u64 to usize".to_string()))?,
});

trace!(target: LOG_TARGET, "'get_state' completed in {:.2?}", start.elapsed());
Ok(Response::new(GetStateResponse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt ok?

scanned_height,
balance,
network,
}))
}

async fn get_unspent_amounts(
&self,
_: Request<tari_rpc::Empty>,
Expand Down
Loading