Skip to content

Commit

Permalink
Bye bye tokio (#224)
Browse files Browse the repository at this point in the history
* remove tokio and rewrite send/recv

* maker refactoring

 - Move shutdown sync and save logic to main thread.
 - Scope read and write lock on wallet to avoid dead lock situations.

* taker refactoring

Simplify the connection retry logic.

* DNS refactoring

* APP refactoring

* test refactoring

Modify the fidelity test to handle new insufficient fund behavior. Instead
of hard error. Added todo to handle assertion of log asking for funds.

* Miscellaneous
  • Loading branch information
mojoX911 authored Aug 28, 2024
1 parent eaf8fae commit 73e17a8
Show file tree
Hide file tree
Showing 36 changed files with 1,627 additions and 1,563 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ bitcoin = { version = "0.32", features = ["rand"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"
tokio = { version = "1.16.1", features = ["full"] }
log = "^0.4"
futures = "0.3"
dirs = "3.0.1"
tokio-socks = "0.5"
socks = "0.3.4"
clap = { version = "3.2.22", features = ["derive"] }
bitcoind = "0.36"
libtor = { version = "47.13.0", optional = true, features = ["vendored-openssl"] }
Expand Down
44 changes: 18 additions & 26 deletions src/bin/directory-cli.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{net::TcpStream, time::Duration};

use clap::Parser;

use coinswap::{
maker::error::MakerError,
market::rpc::{read_resp_message, RpcMsgReq, RpcMsgResp},
utill::{send_message, setup_logger},
market::rpc::{RpcMsgReq, RpcMsgResp},
utill::{read_message, send_message, setup_logger},
};

use tokio::{io::BufReader, net::TcpStream};

/// directory-cli is a command line app to send RPC messages to directory server.
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand All @@ -23,37 +22,30 @@ enum Commands {
ListAddresses,
}

async fn send_rpc_req(req: &RpcMsgReq) -> Result<(), MakerError> {
let mut stream = TcpStream::connect("127.0.0.1:4321").await?;
println!("{:?}", stream);
fn send_rpc_req(req: &RpcMsgReq) {
let mut stream = TcpStream::connect("127.0.0.1:4321").unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(20)))
.unwrap();
stream
.set_write_timeout(Some(Duration::from_secs(20)))
.unwrap();

let (read_half, mut write_half) = stream.split();
send_message(&mut stream, &req).unwrap();

if let Err(e) = send_message(&mut write_half, &req).await {
log::error!("Error Sending RPC message : {:?}", e);
};
let resp_bytes = read_message(&mut stream).unwrap();
let resp: RpcMsgResp = serde_cbor::from_slice(&resp_bytes).unwrap();

if let Some(RpcMsgResp::ListAddressesResp(list)) =
read_resp_message(&mut BufReader::new(read_half)).await?
{
println!("Maker Addresses: {:?}", list);
} else {
log::error!("RPC response received: None");
}

Ok(())
println!("{:?}", resp);
}

#[tokio::main]
async fn main() -> Result<(), MakerError> {
fn main() {
setup_logger();
let cli = App::parse();

match cli.command {
Commands::ListAddresses => {
send_rpc_req(&RpcMsgReq::ListAddresses).await?;
send_rpc_req(&RpcMsgReq::ListAddresses);
}
}

Ok(())
}
57 changes: 0 additions & 57 deletions src/bin/directory.rs

This file was deleted.

54 changes: 23 additions & 31 deletions src/bin/maker-cli.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{net::TcpStream, time::Duration};

use clap::Parser;
use coinswap::{
maker::{
error::MakerError,
rpc::{read_rpc_message, RpcMsgReq},
},
utill::{send_message, setup_logger},
maker::{MakerError, RpcMsgReq, RpcMsgResp},
utill::{read_message, send_message, setup_logger},
};
use tokio::{io::BufReader, net::TcpStream};

/// maker-cli is a command line app to send RPC messages to maker server.
#[derive(Parser, Debug)]
Expand Down Expand Up @@ -41,63 +39,57 @@ enum Commands {
NewAddress,
}

#[tokio::main]
async fn main() -> Result<(), MakerError> {
fn main() -> Result<(), MakerError> {
setup_logger();
let cli = App::parse();

match cli.command {
Commands::Ping => {
send_rpc_req(&RpcMsgReq::Ping).await?;
send_rpc_req(&RpcMsgReq::Ping)?;
}
Commands::ContractUtxo => {
send_rpc_req(&RpcMsgReq::ContractUtxo).await?;
send_rpc_req(&RpcMsgReq::ContractUtxo)?;
}
Commands::ContractBalance => {
send_rpc_req(&RpcMsgReq::ContractBalance).await?;
send_rpc_req(&RpcMsgReq::ContractBalance)?;
}
Commands::FidelityBalance => {
send_rpc_req(&RpcMsgReq::FidelityBalance).await?;
send_rpc_req(&RpcMsgReq::FidelityBalance)?;
}
Commands::FidelityUtxo => {
send_rpc_req(&RpcMsgReq::FidelityUtxo).await?;
send_rpc_req(&RpcMsgReq::FidelityUtxo)?;
}
Commands::SeedBalance => {
send_rpc_req(&RpcMsgReq::SeedBalance).await?;
send_rpc_req(&RpcMsgReq::SeedBalance)?;
}
Commands::SeedUtxo => {
send_rpc_req(&RpcMsgReq::SeedUtxo).await?;
send_rpc_req(&RpcMsgReq::SeedUtxo)?;
}
Commands::SwapBalance => {
send_rpc_req(&RpcMsgReq::SwapBalance).await?;
send_rpc_req(&RpcMsgReq::SwapBalance)?;
}
Commands::SwapUtxo => {
send_rpc_req(&RpcMsgReq::SwapUtxo).await?;
send_rpc_req(&RpcMsgReq::SwapUtxo)?;
}
Commands::NewAddress => {
send_rpc_req(&RpcMsgReq::NewAddress).await?;
send_rpc_req(&RpcMsgReq::NewAddress)?;
}
}

Ok(())
}

async fn send_rpc_req(req: &RpcMsgReq) -> Result<(), MakerError> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;

let (read_half, mut write_half) = stream.split();
fn send_rpc_req(req: &RpcMsgReq) -> Result<(), MakerError> {
let mut stream = TcpStream::connect("127.0.0.1:8080")?;
stream.set_read_timeout(Some(Duration::from_secs(20)))?;
stream.set_write_timeout(Some(Duration::from_secs(20)))?;

if let Err(e) = send_message(&mut write_half, &req).await {
log::error!("Error Sending RPC message : {:?}", e);
};
send_message(&mut stream, &req)?;

let mut read_half = BufReader::new(read_half);
let response_bytes = read_message(&mut stream)?;
let response: RpcMsgResp = serde_cbor::from_slice(&response_bytes)?;

if let Some(rpc_resp) = read_rpc_message(&mut read_half).await? {
println!("{:?}", rpc_resp);
} else {
log::error!("No RPC response received");
}
println!("{:?}", response);

Ok(())
}
10 changes: 3 additions & 7 deletions src/bin/taker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct Cli {
pub wallet_name: String,
/// Sets the maker count to initiate coinswap with.
#[clap(name = "maker_count", default_value = "2")]
pub maker_count: u16,
pub maker_count: usize,
/// Sets the send amount.
#[clap(name = "send_amount", default_value = "500000")]
pub send_amount: u64,
Expand Down Expand Up @@ -178,14 +178,10 @@ fn main() {
)
.unwrap();
let config = taker2.config.clone();
let _ = futures::executor::block_on(taker.sync_offerbook(
read_bitcoin_network_string(&args.network).unwrap(),
&config,
args.maker_count,
));
taker.sync_offerbook(&config, args.maker_count).unwrap();
}
Commands::DoCoinswap => {
let _ = taker.do_coinswap(swap_params);
taker.do_coinswap(swap_params).unwrap();
}
}
}
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum NetError {
IO(std::io::Error),
ReachedEOF,
ConnectionTimedOut,
InvalidNetworkAddress,
Cbor(serde_cbor::Error),
}

Expand Down
5 changes: 0 additions & 5 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,6 @@ impl Maker {

/// Triggers a shutdown event for the Maker.
pub fn shutdown(&self) -> Result<(), MakerError> {
log::info!("Shutdown wallet sync initiated.");
self.wallet.write()?.sync()?;
log::info!("Shutdown wallet syncing completed.");
self.wallet.read()?.save_to_disk()?;
log::info!("Wallet file saved to disk.");
let mut flag = self.shutdown.write()?;
*flag = true;
Ok(())
Expand Down
29 changes: 28 additions & 1 deletion src/maker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ use std::sync::{MutexGuard, PoisonError, RwLockReadGuard, RwLockWriteGuard};

use bitcoin::secp256k1;

use crate::{protocol::error::ContractError, wallet::WalletError};
use crate::{
error::{NetError, ProtocolError},
protocol::error::ContractError,
wallet::WalletError,
};

use super::MakerBehavior;

/// Enum to handle Maker related errors.
#[derive(Debug)]
Expand All @@ -16,7 +22,10 @@ pub enum MakerError {
Secp(secp256k1::Error),
ContractError(ContractError),
Wallet(WalletError),
Net(NetError),
Deserialize(serde_cbor::Error),
SpecialBehaviour(MakerBehavior),
Protocol(ProtocolError),
}

impl From<std::io::Error> for MakerError {
Expand Down Expand Up @@ -66,3 +75,21 @@ impl From<WalletError> for MakerError {
Self::Wallet(value)
}
}

impl From<MakerBehavior> for MakerError {
fn from(value: MakerBehavior) -> Self {
Self::SpecialBehaviour(value)
}
}

impl From<NetError> for MakerError {
fn from(value: NetError) -> Self {
Self::Net(value)
}
}

impl From<ProtocolError> for MakerError {
fn from(value: ProtocolError) -> Self {
Self::Protocol(value)
}
}
Loading

0 comments on commit 73e17a8

Please sign in to comment.