diff --git a/Cargo.toml b/Cargo.toml index 6f2227b0..9f020574 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,9 @@ bitcoin = { version = "0.32", features = ["rand"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.2" -tokio = { version = "1.16.1", features = ["full"] } log = "^0.4" -futures = "0.3" dirs = "3.0.1" -tokio-socks = "0.5" +socks = "0.3.4" clap = { version = "3.2.22", features = ["derive"] } bitcoind = "0.36" libtor = { version = "47.13.0", optional = true, features = ["vendored-openssl"] } diff --git a/src/bin/directory-cli.rs b/src/bin/directory-cli.rs index 49dcc5f9..39c6ec63 100644 --- a/src/bin/directory-cli.rs +++ b/src/bin/directory-cli.rs @@ -1,13 +1,12 @@ +use std::{net::TcpStream, time::Duration}; + use clap::Parser; use coinswap::{ - maker::error::MakerError, - market::rpc::{read_resp_message, RpcMsgReq, RpcMsgResp}, - utill::{send_message, setup_logger}, + market::rpc::{RpcMsgReq, RpcMsgResp}, + utill::{read_message, send_message, setup_logger}, }; -use tokio::{io::BufReader, net::TcpStream}; - /// directory-cli is a command line app to send RPC messages to directory server. #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -23,37 +22,30 @@ enum Commands { ListAddresses, } -async fn send_rpc_req(req: &RpcMsgReq) -> Result<(), MakerError> { - let mut stream = TcpStream::connect("127.0.0.1:4321").await?; - println!("{:?}", stream); +fn send_rpc_req(req: &RpcMsgReq) { + let mut stream = TcpStream::connect("127.0.0.1:4321").unwrap(); + stream + .set_read_timeout(Some(Duration::from_secs(20))) + .unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(20))) + .unwrap(); - let (read_half, mut write_half) = stream.split(); + send_message(&mut stream, &req).unwrap(); - if let Err(e) = send_message(&mut write_half, &req).await { - log::error!("Error Sending RPC message : {:?}", e); - }; + let resp_bytes = read_message(&mut stream).unwrap(); + let resp: RpcMsgResp = serde_cbor::from_slice(&resp_bytes).unwrap(); - if let Some(RpcMsgResp::ListAddressesResp(list)) = - read_resp_message(&mut BufReader::new(read_half)).await? - { - println!("Maker Addresses: {:?}", list); - } else { - log::error!("RPC response received: None"); - } - - Ok(()) + println!("{:?}", resp); } -#[tokio::main] -async fn main() -> Result<(), MakerError> { +fn main() { setup_logger(); let cli = App::parse(); match cli.command { Commands::ListAddresses => { - send_rpc_req(&RpcMsgReq::ListAddresses).await?; + send_rpc_req(&RpcMsgReq::ListAddresses); } } - - Ok(()) } diff --git a/src/bin/directory.rs b/src/bin/directory.rs deleted file mode 100644 index 09f35b26..00000000 --- a/src/bin/directory.rs +++ /dev/null @@ -1,57 +0,0 @@ -use clap::{Parser, Subcommand}; -use coinswap::{ - market::directory::{start_directory_server, DirectoryServer}, - utill::{get_dns_dir, setup_logger, ConnectionType}, -}; -use std::{path::PathBuf, sync::Arc}; - -/// The DNS Server. -/// -/// This app starts the DNS server to serve Maker addresses to the Taker clients. -#[derive(Parser)] -#[clap(version = option_env ! ("CARGO_PKG_VERSION").unwrap_or("unknown"), -author = option_env ! ("CARGO_PKG_AUTHORS").unwrap_or(""))] -struct Cli { - /// Top level subcommands - #[clap(subcommand)] - command: Commands, -} - -#[derive(Subcommand)] -enum Commands { - /// Starts the directory server - Start { - /// Optional DNS data directory. Default value : "~/.coinswap/directory" - #[clap(long, short = 'd')] - data_directory: Option, - /// Optional network type. - #[clap(long, short = 'n', default_value = "clearnet", possible_values = &["tor", "clearnet"])] - network: String, - }, -} - -fn main() { - setup_logger(); - - let cli = Cli::parse(); - - match cli.command { - Commands::Start { - data_directory, - network, - } => { - let network_type = match network.as_str() { - "tor" => ConnectionType::TOR, - _ => ConnectionType::CLEARNET, - }; - - let data_directory = data_directory.unwrap_or(get_dns_dir()); - let directory_server = - DirectoryServer::new(Some(data_directory.join("config.toml")), Some(network_type)) - .unwrap(); - let arc_directory_server = Arc::new(directory_server); - - start_directory_server(arc_directory_server); - } - } -} diff --git a/src/bin/maker-cli.rs b/src/bin/maker-cli.rs index 46167a6b..3b6d8989 100644 --- a/src/bin/maker-cli.rs +++ b/src/bin/maker-cli.rs @@ -1,12 +1,10 @@ +use std::{net::TcpStream, time::Duration}; + use clap::Parser; use coinswap::{ - maker::{ - error::MakerError, - rpc::{read_rpc_message, RpcMsgReq}, - }, - utill::{send_message, setup_logger}, + maker::{MakerError, RpcMsgReq, RpcMsgResp}, + utill::{read_message, send_message, setup_logger}, }; -use tokio::{io::BufReader, net::TcpStream}; /// maker-cli is a command line app to send RPC messages to maker server. #[derive(Parser, Debug)] @@ -41,63 +39,57 @@ enum Commands { NewAddress, } -#[tokio::main] -async fn main() -> Result<(), MakerError> { +fn main() -> Result<(), MakerError> { setup_logger(); let cli = App::parse(); match cli.command { Commands::Ping => { - send_rpc_req(&RpcMsgReq::Ping).await?; + send_rpc_req(&RpcMsgReq::Ping)?; } Commands::ContractUtxo => { - send_rpc_req(&RpcMsgReq::ContractUtxo).await?; + send_rpc_req(&RpcMsgReq::ContractUtxo)?; } Commands::ContractBalance => { - send_rpc_req(&RpcMsgReq::ContractBalance).await?; + send_rpc_req(&RpcMsgReq::ContractBalance)?; } Commands::FidelityBalance => { - send_rpc_req(&RpcMsgReq::FidelityBalance).await?; + send_rpc_req(&RpcMsgReq::FidelityBalance)?; } Commands::FidelityUtxo => { - send_rpc_req(&RpcMsgReq::FidelityUtxo).await?; + send_rpc_req(&RpcMsgReq::FidelityUtxo)?; } Commands::SeedBalance => { - send_rpc_req(&RpcMsgReq::SeedBalance).await?; + send_rpc_req(&RpcMsgReq::SeedBalance)?; } Commands::SeedUtxo => { - send_rpc_req(&RpcMsgReq::SeedUtxo).await?; + send_rpc_req(&RpcMsgReq::SeedUtxo)?; } Commands::SwapBalance => { - send_rpc_req(&RpcMsgReq::SwapBalance).await?; + send_rpc_req(&RpcMsgReq::SwapBalance)?; } Commands::SwapUtxo => { - send_rpc_req(&RpcMsgReq::SwapUtxo).await?; + send_rpc_req(&RpcMsgReq::SwapUtxo)?; } Commands::NewAddress => { - send_rpc_req(&RpcMsgReq::NewAddress).await?; + send_rpc_req(&RpcMsgReq::NewAddress)?; } } Ok(()) } -async fn send_rpc_req(req: &RpcMsgReq) -> Result<(), MakerError> { - let mut stream = TcpStream::connect("127.0.0.1:8080").await?; - - let (read_half, mut write_half) = stream.split(); +fn send_rpc_req(req: &RpcMsgReq) -> Result<(), MakerError> { + let mut stream = TcpStream::connect("127.0.0.1:8080")?; + stream.set_read_timeout(Some(Duration::from_secs(20)))?; + stream.set_write_timeout(Some(Duration::from_secs(20)))?; - if let Err(e) = send_message(&mut write_half, &req).await { - log::error!("Error Sending RPC message : {:?}", e); - }; + send_message(&mut stream, &req)?; - let mut read_half = BufReader::new(read_half); + let response_bytes = read_message(&mut stream)?; + let response: RpcMsgResp = serde_cbor::from_slice(&response_bytes)?; - if let Some(rpc_resp) = read_rpc_message(&mut read_half).await? { - println!("{:?}", rpc_resp); - } else { - log::error!("No RPC response received"); - } + println!("{:?}", response); Ok(()) } diff --git a/src/bin/taker.rs b/src/bin/taker.rs index aed93032..d5064744 100644 --- a/src/bin/taker.rs +++ b/src/bin/taker.rs @@ -46,7 +46,7 @@ struct Cli { pub wallet_name: String, /// Sets the maker count to initiate coinswap with. #[clap(name = "maker_count", default_value = "2")] - pub maker_count: u16, + pub maker_count: usize, /// Sets the send amount. #[clap(name = "send_amount", default_value = "500000")] pub send_amount: u64, @@ -178,14 +178,10 @@ fn main() { ) .unwrap(); let config = taker2.config.clone(); - let _ = futures::executor::block_on(taker.sync_offerbook( - read_bitcoin_network_string(&args.network).unwrap(), - &config, - args.maker_count, - )); + taker.sync_offerbook(&config, args.maker_count).unwrap(); } Commands::DoCoinswap => { - let _ = taker.do_coinswap(swap_params); + taker.do_coinswap(swap_params).unwrap(); } } } diff --git a/src/error.rs b/src/error.rs index b676eed9..2ef0649d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,6 +10,7 @@ pub enum NetError { IO(std::io::Error), ReachedEOF, ConnectionTimedOut, + InvalidNetworkAddress, Cbor(serde_cbor::Error), } diff --git a/src/maker/api.rs b/src/maker/api.rs index 48947c06..399d771b 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -222,11 +222,6 @@ impl Maker { /// Triggers a shutdown event for the Maker. pub fn shutdown(&self) -> Result<(), MakerError> { - log::info!("Shutdown wallet sync initiated."); - self.wallet.write()?.sync()?; - log::info!("Shutdown wallet syncing completed."); - self.wallet.read()?.save_to_disk()?; - log::info!("Wallet file saved to disk."); let mut flag = self.shutdown.write()?; *flag = true; Ok(()) diff --git a/src/maker/error.rs b/src/maker/error.rs index e0e5038a..f8106ea3 100644 --- a/src/maker/error.rs +++ b/src/maker/error.rs @@ -4,7 +4,13 @@ use std::sync::{MutexGuard, PoisonError, RwLockReadGuard, RwLockWriteGuard}; use bitcoin::secp256k1; -use crate::{protocol::error::ContractError, wallet::WalletError}; +use crate::{ + error::{NetError, ProtocolError}, + protocol::error::ContractError, + wallet::WalletError, +}; + +use super::MakerBehavior; /// Enum to handle Maker related errors. #[derive(Debug)] @@ -16,7 +22,10 @@ pub enum MakerError { Secp(secp256k1::Error), ContractError(ContractError), Wallet(WalletError), + Net(NetError), Deserialize(serde_cbor::Error), + SpecialBehaviour(MakerBehavior), + Protocol(ProtocolError), } impl From for MakerError { @@ -66,3 +75,21 @@ impl From for MakerError { Self::Wallet(value) } } + +impl From for MakerError { + fn from(value: MakerBehavior) -> Self { + Self::SpecialBehaviour(value) + } +} + +impl From for MakerError { + fn from(value: NetError) -> Self { + Self::Net(value) + } +} + +impl From for MakerError { + fn from(value: ProtocolError) -> Self { + Self::Protocol(value) + } +} diff --git a/src/maker/handlers.rs b/src/maker/handlers.rs index 89227773..e245c33c 100644 --- a/src/maker/handlers.rs +++ b/src/maker/handlers.rs @@ -17,9 +17,10 @@ use bitcoin::{ use bitcoind::bitcoincore_rpc::RpcApi; use crate::{ + error::ProtocolError, maker::api::recover_from_swap, protocol::{ - messages::{MultisigPrivkey, PrivKeyHandover}, + messages::{MakerHello, MultisigPrivkey, PrivKeyHandover}, Hash160, }, wallet::WalletSwapCoin, @@ -48,7 +49,7 @@ use crate::{ /// The Global Handle Message function. Takes in a [`Arc`] and handle messages /// according to a [ConnectionState]. -pub async fn handle_message( +pub fn handle_message( maker: &Arc, connection_state: &mut ConnectionState, message: TakerToMakerMessage, @@ -56,9 +57,23 @@ pub async fn handle_message( ) -> Result, MakerError> { let outgoing_message = match connection_state.allowed_message { ExpectedMessage::TakerHello => { - if let TakerToMakerMessage::TakerHello(_) = message { + if let TakerToMakerMessage::TakerHello(m) = message { + if m.protocol_version_min != 1 && m.protocol_version_max != 1 { + return Err(ProtocolError::WrongMessage { + expected: "Only protocol version 1 is allowed".to_string(), + received: format!( + "min/max version = {}/{}", + m.protocol_version_min, m.protocol_version_max + ), + } + .into()); + } connection_state.allowed_message = ExpectedMessage::NewlyConnectedTaker; - None + let reply = MakerToTakerMessage::MakerHello(MakerHello { + protocol_version_min: 1, + protocol_version_max: 1, + }); + Some(reply) } else { return Err(MakerError::UnexpectedMessage { expected: "TakerHello".to_string(), @@ -146,14 +161,14 @@ pub async fn handle_message( TakerToMakerMessage::RespContractSigsForRecvrAndSender(message) => { // Nothing to send. Maker now creates and broadcasts his funding Txs connection_state.allowed_message = ExpectedMessage::ReqContractSigsForRecvr; - maker - .handle_contract_sigs_for_recvr_and_sender(connection_state, message, ip) - .await?; + maker.handle_contract_sigs_for_recvr_and_sender( + connection_state, + message, + ip, + )?; if let MakerBehavior::BroadcastContractAfterSetup = maker.behavior { unexpected_recovery(maker.clone())?; - return Err(MakerError::General( - "Special Maker Behavior BroadcastContractAfterSetup", - )); + return Err(maker.behavior.into()); } else { None } @@ -207,9 +222,7 @@ impl Maker { message: ReqContractSigsForSender, ) -> Result { if let MakerBehavior::CloseAtReqContractSigsForSender = self.behavior { - return Err(MakerError::General( - "Special Behavior: CloseAtReqContractSigsForSender", - )); + return Err(self.behavior.into()); } // Verify and sign the contract transaction, check function definition for all the checks. @@ -251,9 +264,7 @@ impl Maker { ip: IpAddr, ) -> Result { if let MakerBehavior::CloseAtProofOfFunding = self.behavior { - return Err(MakerError::General( - "Special Behavior: CloseAtProofOfFunding", - )); + return Err(self.behavior.into()); } // Basic verification of ProofOfFunding Message. @@ -458,16 +469,14 @@ impl Maker { } /// Handles [ContractSigsForRecvrAndSender] message and updates the wallet state - pub async fn handle_contract_sigs_for_recvr_and_sender( + pub fn handle_contract_sigs_for_recvr_and_sender( &self, connection_state: &mut ConnectionState, message: ContractSigsForRecvrAndSender, ip: IpAddr, ) -> Result<(), MakerError> { if let MakerBehavior::CloseAtContractSigsForRecvrAndSender = self.behavior { - return Err(MakerError::General( - "Special Behavior: CloseAtContractSigsForRecvrAndSender", - )); + return Err(self.behavior.into()); } if message.receivers_sigs.len() != connection_state.incoming_swapcoins.len() { @@ -542,9 +551,7 @@ impl Maker { message: ReqContractSigsForRecvr, ) -> Result { if let MakerBehavior::CloseAtContractSigsForRecvr = self.behavior { - return Err(MakerError::General( - "Speacial Behavior: CloseAtContractSigsForRecvr", - )); + return Err(self.behavior.into()); } let sigs = message @@ -571,7 +578,7 @@ impl Maker { message: HashPreimage, ) -> Result { if let MakerBehavior::CloseAtHashPreimage = self.behavior { - return Err(MakerError::General("Special Behavior: CloseAtHashPreimage")); + return Err(self.behavior.into()); } let hashvalue = Hash160::hash(&message.preimage); diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 260bfcc8..7ff02a50 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -1,455 +1,23 @@ -//! Defines a Coinswap Maker Server. +//! The Coinswap Maker. //! -//! Handles connections, communication with takers, various aspects of the -//! Maker's behavior, includes functionalities such as checking for new connections, -//! handling messages from takers, refreshing offer caches, and interacting with the Bitcoin node. +//! A Maker server that acts as a swap service provider. +//! It can be run in an unix/mac system with local access to Bitcoin Core RPC. +//! +//! Maker server responds to RPC requests via `maker-cli` app, which is used as an +//! operating tool for the server. +//! +//! Default Ports: +//! 6102: Client connection for swaps. +//! 6103: RPC Connection for operations. -pub mod api; -pub mod config; -pub mod error; +mod api; +mod config; +mod error; mod handlers; -pub mod rpc; - -use std::{ - fs, - net::Ipv4Addr, - path::{Path, PathBuf}, - sync::Arc, - thread, - time::{Duration, Instant}, -}; - -use bitcoin::{absolute::LockTime, Amount}; -use bitcoind::bitcoincore_rpc::RpcApi; - -use tokio::{ - io::{AsyncReadExt, BufReader}, - net::{tcp::ReadHalf, TcpListener, TcpStream}, - select, - sync::mpsc, - time::sleep, -}; +mod rpc; +mod server; pub use api::{Maker, MakerBehavior}; - -use std::io::Read; -use tokio::io::AsyncWriteExt; -use tokio_socks::tcp::Socks5Stream; - -use crate::{ - maker::{ - api::{check_for_broadcasted_contracts, check_for_idle_states, ConnectionState}, - handlers::handle_message, - rpc::start_rpc_server_thread, - }, - protocol::messages::{MakerHello, MakerToTakerMessage, TakerToMakerMessage}, - utill::{monitor_log_for_completion, send_message, ConnectionType}, - wallet::WalletError, -}; - -use crate::maker::error::MakerError; - -/// Initializes and starts the Maker server, handling connections and various -/// aspects of the Maker's behavior. -#[tokio::main] -pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { - let maker_port = maker.config.port; - - let mut handle = None; - - let mut maker_address = format!("127.0.0.1:{}", maker_port); - - match maker.config.connection_type { - ConnectionType::CLEARNET => { - let mut directory_address = maker.config.directory_server_clearnet_address.clone(); - if cfg!(feature = "integration-test") { - directory_address = format!("127.0.0.1:{}", 8080); - } - loop { - match TcpStream::connect(directory_address.clone()).await { - Ok(mut stream) => { - let request_line = format!("POST {}\n", maker_address); - if let Err(e) = stream.write_all(request_line.as_bytes()).await { - // Error sending the payload, log and retry after waiting - log::warn!( - "[{}] Failed to send maker address to directory, reattempting: {}", - maker_port, - e - ); - thread::sleep(Duration::from_secs( - maker.config.heart_beat_interval_secs, - )); - continue; - } - // Payload sent successfully, exit the loop - log::info!( - "[{}] Successfully sent maker address to directory", - maker_port - ); - break; - } - Err(e) => { - // Connection error, log and retry after waiting - log::warn!( - "[{}] TCP connection error with directory, reattempting: {}", - maker_port, - e - ); - thread::sleep(Duration::from_secs(maker.config.heart_beat_interval_secs)); - continue; - } - } - } - } - ConnectionType::TOR => { - if cfg!(feature = "tor") { - let maker_socks_port = maker.config.socks_port; - - let tor_log_dir = format!("/tmp/tor-rust-maker{}/log", maker_port); - - if Path::new(tor_log_dir.as_str()).exists() { - match fs::remove_file(Path::new(tor_log_dir.as_str())) { - Ok(_) => log::info!( - "[{}] Previous Maker log file deleted successfully", - maker_port - ), - Err(_) => log::error!("[{}] Error deleting Maker log file", maker_port), - } - } - - handle = Some(crate::tor::spawn_tor( - maker_socks_port, - maker_port, - format!("/tmp/tor-rust-maker{}", maker_port), - )); - thread::sleep(Duration::from_secs(10)); - - if let Err(e) = monitor_log_for_completion(&PathBuf::from(tor_log_dir), "100%") { - log::error!("[{}] Error monitoring log file: {}", maker_port, e); - } - - log::info!("Maker tor is instantiated"); - - let maker_hs_path_str = - format!("/tmp/tor-rust-maker{}/hs-dir/hostname", maker.config.port); - let maker_hs_path = PathBuf::from(maker_hs_path_str); - let mut maker_file = fs::File::open(&maker_hs_path).unwrap(); - let mut maker_onion_addr: String = String::new(); - maker_file.read_to_string(&mut maker_onion_addr).unwrap(); - maker_onion_addr.pop(); - maker_address = format!("{}:{}", maker_onion_addr, maker.config.port); - - let mut directory_onion_address = - maker.config.directory_server_onion_address.clone(); - - if cfg!(feature = "integration-test") { - let directory_hs_path_str = - "/tmp/tor-rust-directory/hs-dir/hostname".to_string(); - let directory_hs_path = PathBuf::from(directory_hs_path_str); - let mut directory_file = fs::File::open(directory_hs_path).unwrap(); - let mut directory_onion_addr: String = String::new(); - directory_file - .read_to_string(&mut directory_onion_addr) - .unwrap(); - directory_onion_addr.pop(); - directory_onion_address = format!("{}:{}", directory_onion_addr, 8080); - } - - let address = directory_onion_address.as_str(); - - log::info!( - "[{}] Directory onion address : {}", - maker_port, - directory_onion_address - ); - - loop { - match Socks5Stream::connect( - format!("127.0.0.1:{}", maker_socks_port).as_str(), - address, - ) - .await - { - Ok(socks_stream) => { - let mut stream = socks_stream.into_inner(); - let request_line = format!("POST {}\n", maker_address); - if let Err(e) = stream.write_all(request_line.as_bytes()).await { - log::warn!( - "[{}] Failed to send maker address to directory, reattempting: {}", - maker_port, - e - ); - thread::sleep(Duration::from_secs( - maker.config.heart_beat_interval_secs, - )); - continue; - } - log::info!( - "[{}] Sucessfuly sent maker address to directory", - maker_port - ); - break; - } - Err(e) => { - log::warn!( - "[{}] Socks connection error with directory, reattempting: {}", - maker_port, - e - ); - thread::sleep(Duration::from_secs( - maker.config.heart_beat_interval_secs, - )); - continue; - } - } - } - } - } - } - - maker.wallet.write()?.refresh_offer_maxsize_cache()?; - - let network = maker.get_wallet().read()?.store.network; - log::info!("Network: {:?}", network); - - let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, maker.config.port)).await?; - log::info!("Listening On Port {}", maker.config.port); - - let (server_loop_comms_tx, mut server_loop_comms_rx) = mpsc::channel::(100); - let mut accepting_clients = true; - let mut last_rpc_ping = Instant::now(); - // let mut last_directory_servers_refresh = Instant::now(); - - let maker_clone_1 = maker.clone(); - std::thread::spawn(move || { - log::info!( - "[{}] Spawning Connection status check thread", - maker_clone_1.config.port - ); - check_for_idle_states(maker_clone_1).unwrap(); - }); - - let maker_clone_2 = maker.clone(); - std::thread::spawn(move || { - log::info!( - "[{}] Spawning contract-watcher thread", - maker_clone_2.config.port - ); - check_for_broadcasted_contracts(maker_clone_2).unwrap(); - }); - - // Get the highest value fidelity bond from the wallet. - { - let mut wallet = maker.wallet.write()?; - if let Some(i) = wallet.get_highest_fidelity_index()? { - let highest_proof = wallet.generate_fidelity_proof(i, maker.config.port.to_string())?; - let mut proof = maker.highest_fidelity_proof.write()?; - *proof = Some(highest_proof); - } else { - // No bond in the wallet. Lets attempt to create one. - let amount = Amount::from_sat(maker.config.fidelity_value); - let current_height = wallet.rpc.get_block_count().map_err(WalletError::Rpc)? as u32; - - // Set 100 blocks locktime for test - let locktime = if cfg!(feature = "integration-test") { - LockTime::from_height(current_height + 100).unwrap() - } else { - LockTime::from_height(maker.config.fidelity_timelock + current_height).unwrap() - }; - - match wallet.create_fidelity(amount, locktime) { - // Hard error if we cant create fidelity. As without this Maker can't send a valid - // Offer to taker. - Err(e) => { - log::error!( - "[{}] Fidelity Bond Creation failed: {:?}. Shutting Down Maker server", - maker.config.port, - e - ); - return Err(e.into()); - } - Ok(i) => { - log::info!("[{}] Successfully created fidelity bond", maker.config.port); - - let address_string = maker_address.clone(); - let highest_proof = wallet.generate_fidelity_proof(i, address_string)?; - let mut proof = maker.highest_fidelity_proof.write()?; - *proof = Some(highest_proof); - } - } - } - log::info!("[{}] Syncing and saving wallet data", maker.config.port); - wallet.sync()?; - wallet.save_to_disk()?; - log::info!("[{}] Sync and save successful", maker.config.port); - } - - // Spawn the RPC Thread here. - let rpc_maker = maker.clone(); - let _ = start_rpc_server_thread(rpc_maker).await; - - maker.setup_complete()?; - - log::info!("[{}] Maker setup is ready", maker.config.port); - - // Loop to keep checking for new connections - let result = loop { - if *maker.shutdown.read()? { - log::warn!("[{}] Maker is shutting down", maker.config.port); - break Ok(()); - } - let (mut socket, addr) = select! { - - new_client = listener.accept() => new_client?, - client_err = server_loop_comms_rx.recv() => { - //unwrap the option here because we'll never close the mscp so it will always work - match client_err.as_ref().unwrap() { - MakerError::Wallet(WalletError::Rpc(e)) => { - //doublecheck the rpc connection here because sometimes the rpc error - //will be unrelated to the connection itmaker e.g. "insufficent funds" - let rpc_connection_success = maker.wallet.read()?.rpc.get_best_block_hash().is_ok(); - if !rpc_connection_success { - log::warn!("lost connection with bitcoin node, temporarily shutting \ - down server until connection reestablished, error={:?}", e); - accepting_clients = false; - } - continue; - }, - _ => { - log::error!("[{}] Maker Handling Error : {:?}", maker.config.port, client_err.unwrap()); - // Either in special Maker behavior, or something went worng. - // Quitely shutdown. - // TODO: Handle this behavior separately for prod/test. - maker.shutdown()?; - // We continue, as the shutdown flag will be caught in the next iteration of the loop. - // In the case below. - // Shutting down tor here - continue; - } - } - }, - _ = sleep(Duration::from_secs(maker.config.heart_beat_interval_secs)) => { - let mut rpc_ping_success = true; - - let rpc_ping_interval = Duration::from_secs(maker.config.rpc_ping_interval_secs); - if Instant::now().saturating_duration_since(last_rpc_ping) > rpc_ping_interval { - last_rpc_ping = Instant::now(); - rpc_ping_success = maker.wallet.write()?.refresh_offer_maxsize_cache().is_ok(); - } - accepting_clients = rpc_ping_success; - if !accepting_clients { - log::warn!("not accepting clients, rpc_ping_success={}", rpc_ping_success); - } - continue; - }, - }; - - if !accepting_clients { - log::warn!("Rejecting Connection From {:?}", addr); - continue; - } - - log::info!( - "[{}] <=== Accepted Connection on port={}", - maker.config.port, - addr.port() - ); - let server_loop_comms_tx = server_loop_comms_tx.clone(); - let maker_clone = maker.clone(); - - // Spawn a thread to handle one taker connection. - tokio::spawn(async move { - log::info!("[{}] Spawning Handler Thread", maker_clone.config.port); - let (socket_reader, mut socket_writer) = socket.split(); - let mut reader = BufReader::new(socket_reader); - - let mut connection_state = ConnectionState::default(); - - if let Err(e) = send_message( - &mut socket_writer, - &MakerToTakerMessage::MakerHello(MakerHello { - protocol_version_min: 0, - protocol_version_max: 0, - }), - ) - .await - { - log::error!("IO error sending first message: {:?}", e); - return; - } - log::info!("[{}] ===> MakerHello", maker_clone.config.port); - - loop { - let message = select! { - read_result = read_taker_message(&mut reader) => { - match read_result { - Ok(None) => { - log::info!("[{}] Connection closed by peer", maker_clone.config.port); - break; - }, - Ok(Some(msg)) => msg, - Err(e) => { - log::error!("error reading from socket: {:?}", e); - break; - } - } - }, - _ = sleep(Duration::from_secs(maker_clone.config.idle_connection_timeout)) => { - log::info!("[{}] Idle connection closed", addr.port()); - break; - }, - }; - - log::info!("[{}] <=== {} ", maker_clone.config.port, message); - - let reply: Result, MakerError> = - handle_message(&maker_clone, &mut connection_state, message, addr.ip()).await; - - match reply { - Ok(reply) => { - if let Some(message) = reply { - log::info!("[{}] ===> {} ", maker_clone.config.port, message); - if let Err(e) = send_message(&mut socket_writer, &message).await { - log::error!("Closing due to IO error in sending message: {:?}", e); - continue; - } - } - // if reply is None then don't send anything to client - } - Err(err) => { - server_loop_comms_tx.send(err).await.unwrap(); - break; - } - } - } - }); - }; - - if maker.config.connection_type == ConnectionType::TOR && cfg!(feature = "tor") { - crate::tor::kill_tor_handles(handle.unwrap()); - } - - result -} - -/// Reads a Taker Message. -async fn read_taker_message( - reader: &mut BufReader>, -) -> Result, MakerError> { - let read_result = reader.read_u32().await; - // If its EOF, return None - if read_result - .as_ref() - .is_err_and(|e| e.kind() == std::io::ErrorKind::UnexpectedEof) - { - return Ok(None); - } - let length = read_result?; - if length == 0 { - return Ok(None); - } - let mut buffer = vec![0; length as usize]; - reader.read_exact(&mut buffer).await?; - let message: TakerToMakerMessage = serde_cbor::from_slice(&buffer)?; - Ok(Some(message)) -} +pub use error::MakerError; +pub use rpc::{RpcMsgReq, RpcMsgResp}; +pub use server::start_maker_server; diff --git a/src/maker/rpc/mod.rs b/src/maker/rpc/mod.rs index d6a8a7dc..be888195 100644 --- a/src/maker/rpc/mod.rs +++ b/src/maker/rpc/mod.rs @@ -1,5 +1,5 @@ mod messages; mod server; -pub use messages::RpcMsgReq; -pub use server::{read_rpc_message, start_rpc_server_thread}; +pub use messages::{RpcMsgReq, RpcMsgResp}; +pub use server::start_rpc_server; diff --git a/src/maker/rpc/server.rs b/src/maker/rpc/server.rs index d0dc8fb4..a6cf6f87 100644 --- a/src/maker/rpc/server.rs +++ b/src/maker/rpc/server.rs @@ -1,158 +1,155 @@ -use std::sync::Arc; - -use tokio::{ - io::{AsyncReadExt, BufReader}, - net::{tcp::ReadHalf, TcpListener, TcpStream}, +use std::{ + io::ErrorKind, + net::{TcpListener, TcpStream}, + sync::Arc, + thread::sleep, + time::Duration, }; use crate::{ maker::{error::MakerError, rpc::messages::RpcMsgResp, Maker}, - utill::send_message, + utill::{read_message, send_message}, }; use super::messages::RpcMsgReq; -/// Reads a RPC Message. -pub async fn read_rpc_message( - reader: &mut BufReader>, -) -> Result, MakerError> { - let read_result = reader.read_u32().await; - // If its EOF, return None - if read_result - .as_ref() - .is_err_and(|e| e.kind() == std::io::ErrorKind::UnexpectedEof) - { - return Ok(None); - } - let length = read_result?; - if length == 0 { - return Ok(None); - } - let mut buffer = vec![0; length as usize]; - reader.read_exact(&mut buffer).await?; - let message: RpcMsgReq = serde_cbor::from_slice(&buffer)?; - Ok(Some(message)) -} - -async fn handle_request(maker: &Arc, mut socket: TcpStream) -> Result<(), MakerError> { - let (socket_reader, mut socket_writer) = socket.split(); - let mut reader = BufReader::new(socket_reader); +fn handle_request(maker: &Arc, socket: &mut TcpStream) -> Result<(), MakerError> { + let msg_bytes = read_message(socket)?; + let rpc_request: RpcMsgReq = serde_cbor::from_slice(&msg_bytes)?; - if let Some(rpc_request) = read_rpc_message(&mut reader).await? { - match rpc_request { - RpcMsgReq::Ping => { - log::info!("RPC request received: {:?}", rpc_request); - if let Err(e) = send_message(&mut socket_writer, &RpcMsgResp::Pong).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::ContractUtxo => { - let utxos = maker - .get_wallet() - .read()? - .list_live_contract_spend_info(None)? - .iter() - .map(|(l, _)| l.clone()) - .collect::>(); - let resp = RpcMsgResp::ContractUtxoResp { utxos }; - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::FidelityUtxo => { - let utxos = maker - .get_wallet() - .read()? - .list_fidelity_spend_info(None)? - .iter() - .map(|(l, _)| l.clone()) - .collect::>(); - let resp = RpcMsgResp::FidelityUtxoResp { utxos }; - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::SeedUtxo => { - let utxos = maker - .get_wallet() - .read()? - .list_descriptor_utxo_spend_info(None)? - .iter() - .map(|(l, _)| l.clone()) - .collect::>(); - let resp = RpcMsgResp::SeedUtxoResp { utxos }; - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::SwapUtxo => { - let utxos = maker - .get_wallet() - .read()? - .list_swap_coin_utxo_spend_info(None)? - .iter() - .map(|(l, _)| l.clone()) - .collect::>(); - let resp = RpcMsgResp::SwapUtxoResp { utxos }; - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::ContractBalance => { - let balance = maker.get_wallet().read()?.balance_live_contract(None)?; - let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::FidelityBalance => { - let balance = maker.get_wallet().read()?.balance_fidelity_bonds(None)?; - let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::SeedBalance => { - let balance = maker.get_wallet().read()?.balance_descriptor_utxo(None)?; - let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::SwapBalance => { - let balance = maker.get_wallet().read()?.balance_swap_coins(None)?; - let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } - RpcMsgReq::NewAddress => { - let new_address = maker.get_wallet().write()?.get_next_external_address()?; - let resp = RpcMsgResp::NewAddressResp(new_address.to_string()); - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } + match rpc_request { + RpcMsgReq::Ping => { + log::info!("RPC request received: {:?}", rpc_request); + if let Err(e) = send_message(socket, &RpcMsgResp::Pong) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::ContractUtxo => { + let utxos = maker + .get_wallet() + .read()? + .list_live_contract_spend_info(None)? + .iter() + .map(|(l, _)| l.clone()) + .collect::>(); + let resp = RpcMsgResp::ContractUtxoResp { utxos }; + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::FidelityUtxo => { + let utxos = maker + .get_wallet() + .read()? + .list_fidelity_spend_info(None)? + .iter() + .map(|(l, _)| l.clone()) + .collect::>(); + let resp = RpcMsgResp::FidelityUtxoResp { utxos }; + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::SeedUtxo => { + let utxos = maker + .get_wallet() + .read()? + .list_descriptor_utxo_spend_info(None)? + .iter() + .map(|(l, _)| l.clone()) + .collect::>(); + let resp = RpcMsgResp::SeedUtxoResp { utxos }; + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::SwapUtxo => { + let utxos = maker + .get_wallet() + .read()? + .list_swap_coin_utxo_spend_info(None)? + .iter() + .map(|(l, _)| l.clone()) + .collect::>(); + let resp = RpcMsgResp::SwapUtxoResp { utxos }; + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::ContractBalance => { + let balance = maker.get_wallet().read()?.balance_live_contract(None)?; + let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::FidelityBalance => { + let balance = maker.get_wallet().read()?.balance_fidelity_bonds(None)?; + let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::SeedBalance => { + let balance = maker.get_wallet().read()?.balance_descriptor_utxo(None)?; + let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::SwapBalance => { + let balance = maker.get_wallet().read()?.balance_swap_coins(None)?; + let resp = RpcMsgResp::ContractBalanceResp(balance.to_sat()); + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; + } + RpcMsgReq::NewAddress => { + let new_address = maker.get_wallet().write()?.get_next_external_address()?; + let resp = RpcMsgResp::NewAddressResp(new_address.to_string()); + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; } } Ok(()) } -pub async fn start_rpc_server_thread(maker: Arc) { +pub fn start_rpc_server(maker: Arc) -> Result<(), MakerError> { let rpc_port = maker.config.rpc_port; - let rpc_socket = format!("127.0.0.1:{}", rpc_port); - let listener = TcpListener::bind(&rpc_socket).await.unwrap(); + let rpc_address = format!("127.0.0.1:{}", rpc_port); + let listener = Arc::new(TcpListener::bind(&rpc_address)?); log::info!( "[{}] RPC socket binding successful at {}", maker.config.port, - rpc_socket + rpc_address ); - tokio::spawn(async move { - loop { - let (socket, addrs) = listener.accept().await.unwrap(); - log::info!("Got RPC request from: {}", addrs); - handle_request(&maker, socket).await.unwrap(); + + listener.set_nonblocking(true)?; + + while !*maker.shutdown.read()? { + match listener.accept() { + Ok((mut stream, addr)) => { + log::info!("Got RPC request from: {}", addr); + stream.set_read_timeout(Some(Duration::from_secs(20)))?; + stream.set_write_timeout(Some(Duration::from_secs(20)))?; + handle_request(&maker, &mut stream)?; + } + + Err(e) => { + if e.kind() == ErrorKind::WouldBlock { + sleep(Duration::from_secs(3)); + continue; + } else { + log::error!("Error accepting RPC connection: {:?}", e); + return Err(e.into()); + } + } } - }); + + sleep(Duration::from_secs(3)); + } + + Ok(()) } diff --git a/src/maker/server.rs b/src/maker/server.rs new file mode 100644 index 00000000..c8f6d307 --- /dev/null +++ b/src/maker/server.rs @@ -0,0 +1,559 @@ +//! The Coinswap Maker Server. +//! +//! This module includes all server side code for the coinswap maker. +//! The server maintains the thread pool for P2P Connection, Watchtower, Bitcoin Backend and RPC Client Request. +//! The server listens at two port 6102 for P2P, and 6103 for RPC Client request. + +use std::{ + fs, + io::{ErrorKind, Read, Write}, + net::{Ipv4Addr, SocketAddr, TcpListener, TcpStream}, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + thread::{self, sleep}, + time::Duration, +}; + +use bitcoin::{absolute::LockTime, Amount}; +use bitcoind::bitcoincore_rpc::RpcApi; + +use socks::Socks5Stream; + +pub use super::Maker; + +use crate::{ + error::NetError, + maker::{ + api::{check_for_broadcasted_contracts, check_for_idle_states, ConnectionState}, + handlers::handle_message, + rpc::start_rpc_server, + }, + protocol::messages::TakerToMakerMessage, + utill::{monitor_log_for_completion, read_message, send_message, ConnectionType}, + wallet::{FidelityError, WalletError}, +}; + +use crate::maker::error::MakerError; + +/// Fetches the Maker and DNS address, and sends maker address to the DNS server. +/// Depending upon ConnectionType and test/prod environment, different maker address and DNS addresses are returned. +/// Return the Maker address and an optional tor thread handle. +/// +/// Tor thread is spawned only if ConnectionType=TOR and --feature=tor is enabled. +/// Errors if ConncetionType=TOR but, the tor feature is not enabled. +fn network_bootstrap( + maker: Arc, +) -> Result<(String, Option>), MakerError> { + let maker_port = maker.config.port; + let mut tor_handle = None; + let (maker_address, dns_address) = match maker.config.connection_type { + ConnectionType::CLEARNET => { + let maker_address = format!("127.0.0.1:{}", maker_port); + let dns_address = if cfg!(feature = "integration-test") { + format!("127.0.0.1:{}", 8080) + } else { + maker.config.directory_server_clearnet_address.clone() + }; + log::info!("[{}] Maker server address : {}", maker_port, maker_address); + + log::info!( + "[{}] Directory server address : {}", + maker_port, + dns_address + ); + + (maker_address, dns_address) + } + ConnectionType::TOR => { + if !cfg!(feature = "tor") { + return Err(MakerError::General( + "Tor setup failure. Please compile with Tor feature enabled.", + )); + } else { + let maker_socks_port = maker.config.socks_port; + + let tor_log_dir = format!("/tmp/tor-rust-maker{}/log", maker_port); + + if Path::new(tor_log_dir.as_str()).exists() { + match fs::remove_file(Path::new(tor_log_dir.as_str())) { + Ok(_) => log::info!( + "[{}] Previous Maker log file deleted successfully", + maker_port + ), + Err(_) => log::error!("[{}] Error deleting Maker log file", maker_port), + } + } + + tor_handle = Some(crate::tor::spawn_tor( + maker_socks_port, + maker_port, + format!("/tmp/tor-rust-maker{}", maker_port), + )); + thread::sleep(Duration::from_secs(10)); + + if let Err(e) = monitor_log_for_completion(&PathBuf::from(tor_log_dir), "100%") { + log::error!("[{}] Error monitoring log file: {}", maker_port, e); + } + + log::info!("[{}] Maker tor is instantiated", maker_port); + + let maker_hs_path_str = + format!("/tmp/tor-rust-maker{}/hs-dir/hostname", maker.config.port); + let maker_hs_path = PathBuf::from(maker_hs_path_str); + let mut maker_file = fs::File::open(&maker_hs_path).unwrap(); + let mut maker_onion_addr: String = String::new(); + maker_file.read_to_string(&mut maker_onion_addr).unwrap(); + maker_onion_addr.pop(); + let maker_address = format!("{}:{}", maker_onion_addr, maker.config.port); + + let directory_onion_address = if cfg!(feature = "integration-test") { + let directory_hs_path_str = + "/tmp/tor-rust-directory/hs-dir/hostname".to_string(); + let directory_hs_path = PathBuf::from(directory_hs_path_str); + let mut directory_file = fs::File::open(directory_hs_path).unwrap(); + let mut directory_onion_addr: String = String::new(); + directory_file + .read_to_string(&mut directory_onion_addr) + .unwrap(); + directory_onion_addr.pop(); + format!("{}:{}", directory_onion_addr, 8080) + } else { + maker.config.directory_server_onion_address.clone() + }; + + log::info!("[{}] Maker server address : {}", maker_port, maker_address); + + log::info!( + "[{}] Directory server address : {}", + maker_port, + directory_onion_address + ); + + (maker_address, directory_onion_address) + } + } + }; + + // Keep trying until send is successful. + loop { + let mut stream = match maker.config.connection_type { + ConnectionType::CLEARNET => match TcpStream::connect(&dns_address) { + Ok(s) => s, + Err(e) => { + log::warn!( + "[{}] TCP connection error with directory, reattempting: {}", + maker_port, + e + ); + thread::sleep(Duration::from_secs(maker.config.heart_beat_interval_secs)); + continue; + } + }, + ConnectionType::TOR => { + match Socks5Stream::connect( + format!("127.0.0.1:{}", maker.config.socks_port), + dns_address.as_str(), + ) { + Ok(s) => s.into_inner(), + Err(e) => { + log::warn!( + "[{}] TCP connection error with directory, reattempting: {}", + maker_port, + e + ); + thread::sleep(Duration::from_secs(maker.config.heart_beat_interval_secs)); + continue; + } + } + } + }; + + let request_line = format!("POST {}\n", maker_address); + if let Err(e) = stream + .write_all(request_line.as_bytes()) + .and_then(|_| stream.flush()) + { + // Error sending the payload, log and retry after waiting + log::warn!( + "[{}] Failed to send maker address to directory, reattempting: {}", + maker_port, + e + ); + thread::sleep(Duration::from_secs(maker.config.heart_beat_interval_secs)); + continue; + } + // Payload sent successfully, exit the loop + log::info!( + "[{}] Successfully sent maker address to directory", + maker_port + ); + break; + } + + Ok((maker_address, tor_handle)) +} + +/// Checks if the wallet already has fidelity bonds. if not, create the first fidelity bond. +fn setup_fidelity_bond(maker: &Arc, maker_address: &str) -> Result<(), MakerError> { + let highest_index = maker.get_wallet().read()?.get_highest_fidelity_index()?; + if let Some(i) = highest_index { + let highest_proof = maker + .get_wallet() + .read()? + .generate_fidelity_proof(i, maker_address)?; + let mut proof = maker.highest_fidelity_proof.write()?; + *proof = Some(highest_proof); + } else { + // No bond in the wallet. Lets attempt to create one. + let amount = Amount::from_sat(maker.config.fidelity_value); + let current_height = maker + .get_wallet() + .read()? + .rpc + .get_block_count() + .map_err(WalletError::Rpc)? as u32; + + // Set 100 blocks locktime for test + let locktime = if cfg!(feature = "integration-test") { + LockTime::from_height(current_height + 100).unwrap() + } else { + LockTime::from_height(maker.config.fidelity_timelock + current_height).unwrap() + }; + while !*maker.shutdown.read()? { + let fidelity_result = maker + .get_wallet() + .write()? + .create_fidelity(amount, locktime); + match fidelity_result { + // Wait for sufficient fund to create fidelity bond. + // Hard error if fidelity still can't be created. + Err(e) => { + if let WalletError::Fidelity(FidelityError::InsufficientFund { + available, + required, + }) = e + { + log::warn!("Insufficient fund to create fidelity bond."); + let amount = required - available; + let (_, addr, _) = maker + .get_wallet() + .read()? + .get_next_fidelity_address(locktime)?; + log::info!("Send {} sats to {}", amount, addr); + if cfg!(feature = "integration-test") { + sleep(Duration::from_secs(3)); + } else { + sleep(Duration::from_secs(300)); // Wait for 5 mins in production + } + continue; + } else { + log::error!( + "[{}] Fidelity Bond Creation failed: {:?}. Shutting Down Maker server", + maker.config.port, + e + ); + return Err(e.into()); + } + } + Ok(i) => { + log::info!("[{}] Successfully created fidelity bond", maker.config.port); + let highest_proof = maker + .get_wallet() + .read()? + .generate_fidelity_proof(i, maker_address)?; + let mut proof = maker.highest_fidelity_proof.write()?; + *proof = Some(highest_proof); + log::info!("[{}] Syncing and saving wallet data", maker.config.port); + maker.get_wallet().write()?.sync()?; + maker.get_wallet().read()?.save_to_disk()?; + log::info!("[{}] Sync and save successful", maker.config.port); + break; + } + } + } + } + Ok(()) +} + +/// Keep checking if the Bitcoin Core RPC connection is live. Sets the global `accepting_client` flag as per RPC connection status. +/// +/// This will not block. Once Core RPC connection is live, accepting_client will set as `true` again. +fn check_connection_with_core( + maker: Arc, + accepting_clients: Arc>, +) -> Result<(), MakerError> { + let mut rpc_ping_success = false; + while !*maker.shutdown.read()? { + // If connection is disrupted keep trying at heart_beat_interval (3 sec). + // If connection is live, keep tring at rpc_ping_interval (60 sec). + match rpc_ping_success { + true => { + sleep(Duration::from_secs(maker.config.rpc_ping_interval_secs)); + } + false => { + sleep(Duration::from_secs(maker.config.heart_beat_interval_secs)); + } + } + if let Err(e) = maker.wallet.read()?.rpc.get_blockchain_info() { + log::info!( + "[{}] RPC Connection failed. Reattempting {}", + maker.config.port, + e + ); + rpc_ping_success = false; + } else { + rpc_ping_success = true; + } + let mut mutex = accepting_clients.lock()?; + *mutex = rpc_ping_success; + } + + Ok(()) +} + +/// Handle a single client connection. +fn handle_client( + maker: Arc, + stream: &mut TcpStream, + client_addr: SocketAddr, +) -> Result<(), MakerError> { + stream.set_read_timeout(Some(Duration::from_secs( + maker.config.idle_connection_timeout, + )))?; + + stream.set_write_timeout(Some(Duration::from_secs( + maker.config.idle_connection_timeout, + )))?; + + let mut connection_state = ConnectionState::default(); + + while !*maker.shutdown.read()? { + let mut taker_msg_bytes = Vec::new(); + match read_message(stream) { + Ok(b) => taker_msg_bytes = b, + Err(e) => { + if let NetError::IO(e) = e { + if e.kind() == ErrorKind::UnexpectedEof { + continue; + } + } + } + } + + if taker_msg_bytes.len() == 0 { + continue; + } + + let taker_msg: TakerToMakerMessage = serde_cbor::from_slice(&taker_msg_bytes)?; + log::info!("[{}] <=== {}", maker.config.port, taker_msg); + + let reply = handle_message(&maker, &mut connection_state, taker_msg, client_addr.ip()); + + match reply { + Ok(reply) => { + if let Some(message) = reply { + log::info!("[{}] ===> {} ", maker.config.port, message); + if let Err(e) = send_message(stream, &message) { + log::error!("Closing due to IO error in sending message: {:?}", e); + continue; + } + } else { + continue; + } + } + Err(err) => { + match &err { + // Shutdown server if special behavior is set + MakerError::SpecialBehaviour(sp) => { + log::error!("[{}] Maker Special Behavior : {:?}", maker.config.port, sp); + maker.shutdown()?; + } + e => { + log::error!( + "[{}] Internal message handling error occurred: {:?}", + maker.config.port, + e + ); + } + } + return Err(err); + } + } + } + + Ok(()) +} + +// The main Maker Server process. +pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { + // Initialize network connections. + let (maker_address, tor_thread) = network_bootstrap(maker.clone())?; + let port = maker.config.port; + + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, maker.config.port))?; + log::info!( + "[{}] Listening for client conns at: {}", + maker.config.port, + listener.local_addr()? + ); + listener.set_nonblocking(true)?; // Needed to not block a thread waiting for incoming connection. + log::info!( + "[{}] Maker Server Address: {}", + maker.config.port, + maker_address + ); + + // Setup the wallet with fidelity bond. + maker.get_wallet().write()?.sync()?; + let network = maker.get_wallet().read()?.store.network; + let balance = maker.get_wallet().read()?.balance()?; + log::info!("[{}] Currency Network: {:?}", port, network); + log::info!("[{}] Total Wallet Balance: {:?}", port, balance); + + setup_fidelity_bond(&maker, &maker_address)?; + maker.wallet.write()?.refresh_offer_maxsize_cache()?; + + // Global server Mutex, to switch on/off p2p network. + let accepting_clients = Arc::new(Mutex::new(false)); + + // Spawn Server threads. + // All thread handles are stored in the thread_pool, which are all joined at server shutdown. + let mut thread_pool = Vec::new(); + + // 1. Bitcoin Core Connection checker thread. + // Ensures that Bitcoin Core connection is live. + // If not, it will block p2p connections until Core works again. + let maker_clone = maker.clone(); + let acc_client_clone = accepting_clients.clone(); + let conn_check_thread: thread::JoinHandle> = thread::Builder::new() + .name("Bitcoin Core Connection Checker Thread".to_string()) + .spawn(move || { + log::info!("[{}] Spawning Bitcoin Core connection checker thread", port); + check_connection_with_core(maker_clone, acc_client_clone) + })?; + thread_pool.push(conn_check_thread); + + // 2. Idle Client connection checker thread. + // This threads check idelness of peer in live swaps. + // And takes recovery measure if the peer seems to have disappeared in middlle of a swap. + let maker_clone = maker.clone(); + let idle_conn_check_thread = thread::Builder::new() + .name("Idle Client Checker Thread".to_string()) + .spawn(move || { + log::info!( + "[{}] Spawning Client connection status checker thread", + port + ); + check_for_idle_states(maker_clone.clone()) + })?; + thread_pool.push(idle_conn_check_thread); + + // 3. Watchtower thread. + // This thread checks for broadcasted contract transactions, which usually means violation of the protocol. + // When contract transaction detected in mempool it will attempt recovery. + // This can get triggered even when contracts of adjacent hops are published. Implying the whole swap route is disrupted. + let maker_clone = maker.clone(); + let contract_watcher_thread = thread::Builder::new() + .name("Contract Watcher Thread".to_string()) + .spawn(move || { + log::info!("[{}] Spawning contract-watcher thread", port); + check_for_broadcasted_contracts(maker_clone.clone()) + })?; + thread_pool.push(contract_watcher_thread); + + // 4: The RPC server thread. + // User for responding back to `maker-cli` apps. + let maker_clone = maker.clone(); + let rpc_thread = thread::Builder::new() + .name("RPC Thread".to_string()) + .spawn(move || { + log::info!("[{}] Spawning RPC server", port); + start_rpc_server(maker_clone) + })?; + + thread_pool.push(rpc_thread); + + maker.setup_complete()?; + + log::info!("[{}] Maker setup is ready", maker.config.port); + + // The P2P Client connection loop. + // Each client connection will spawn a new handler thread, which is added back in the global thread_pool. + // This loop beats at `maker.config.heart_beat_interval_secs` + while !*maker.shutdown.read()? { + let maker = maker.clone(); // This clone is needed to avoid moving the Arc in each iterations. + let heart_beat_interval = maker.config.heart_beat_interval_secs; + + // Block client connections if accepting_client=false + if !*accepting_clients.lock()? { + log::warn!( + "[{}] Bitcoin Core RPC Connection broken. Not accepting clients temporarily", + maker.config.port + ); + sleep(Duration::from_secs(heart_beat_interval)); + continue; + } + + match listener.accept() { + Ok((mut stream, client_addr)) => { + log::info!("[{}] Spawning Client Handler thread", maker.config.port); + + let client_handler_thread = thread::Builder::new() + .name("Client Handler Thread".to_string()) + .spawn(move || { + if let Err(e) = handle_client(maker, &mut stream, client_addr) { + log::error!("[{}] Error Handling client request {:?}", port, e); + Err(e) + } else { + Ok(()) + } + })?; + thread_pool.push(client_handler_thread); + } + + Err(e) => { + if e.kind() == ErrorKind::WouldBlock { + // Do nothing + } else { + log::error!( + "[{}] Error accepting incoming connection: {:?}", + maker.config.port, + e + ); + return Err(MakerError::IO(e)); + } + } + }; + + sleep(Duration::from_secs(heart_beat_interval)); + } + + log::info!("[{}] Maker is shutting down.", port); + + // Shuting down. Join all the threads. + for thread in thread_pool { + log::info!( + "[{}] Closing Thread: {}", + port, + thread.thread().name().expect("Thread name expected") + ); + let join_result = thread.join(); + if let Ok(r) = join_result { + log::info!("[{}] Thread closing result: {:?}", port, r) + } else if let Err(e) = join_result { + log::info!("[{}] error in internal thread: {:?}", port, e); + } + } + + if maker.config.connection_type == ConnectionType::TOR && cfg!(feature = "tor") { + crate::tor::kill_tor_handles(tor_thread.unwrap()); + } + + log::info!("Shutdown wallet sync initiated."); + maker.get_wallet().write()?.sync()?; + log::info!("Shutdown wallet syncing completed."); + maker.get_wallet().read()?.save_to_disk()?; + log::info!("Wallet file saved to disk."); + + Ok(()) +} diff --git a/src/market/directory.rs b/src/market/directory.rs index 81d82d9d..95d41d66 100644 --- a/src/market/directory.rs +++ b/src/market/directory.rs @@ -5,11 +5,12 @@ use std::{ collections::HashSet, - fs, io, - net::Ipv4Addr, + fs::{self, OpenOptions}, + io::{self, BufRead, BufReader, Write}, + net::{Ipv4Addr, TcpListener, TcpStream}, path::Path, sync::{Arc, RwLock}, - thread, + thread::{self, sleep}, time::Duration, }; @@ -21,12 +22,6 @@ use crate::utill::{ write_default_config, ConnectionType, }; -use tokio::{ - fs::OpenOptions, - io::{AsyncBufReadExt, AsyncWriteExt}, - net::TcpListener, -}; - /// Represents errors that can occur during directory server operations. #[derive(Debug)] pub enum DirectoryServerError { @@ -143,13 +138,12 @@ fn write_default_directory_config(config_path: &PathBuf) -> std::io::Result<()> write_default_config(config_path, config_string) } -#[tokio::main] -pub async fn start_directory_server(directory: Arc) { +pub fn start_directory_server(directory: Arc) { let address_file = directory.data_dir.join("addresses.dat"); let addresses = Arc::new(RwLock::new(HashSet::new())); - let mut handle = None; + let mut tor_handle = None; match directory.connection_type { ConnectionType::CLEARNET => {} @@ -165,7 +159,7 @@ pub async fn start_directory_server(directory: Arc) { let socks_port = directory.socks_port; let tor_port = directory.port; - handle = Some(crate::tor::spawn_tor( + tor_handle = Some(crate::tor::spawn_tor( socks_port, tor_port, "/tmp/tor-rust-directory".to_string(), @@ -191,56 +185,74 @@ pub async fn start_directory_server(directory: Arc) { } let directory_server_arc = directory.clone(); - let _ = start_rpc_server_thread(directory_server_arc, addresses.clone()).await; - - let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, directory.port)) - .await - .unwrap(); - - loop { - tokio::select! { - accept_result = listener.accept() => { - match accept_result { - Ok((stream, _)) => handle_client(stream, addresses.clone()).await, - Err(e) => log::error!("Error accepting connection: {}", e), - } - } - _ = tokio::time::sleep(Duration::from_secs(3)) => { - if *directory.shutdown.read().unwrap() { - log::info!("Shutdown signal received. Stopping directory server."); - if directory.connection_type == ConnectionType::TOR && cfg!(feature = "tor"){ - crate::tor::kill_tor_handles(handle.unwrap()); - log::info!("Directory server and Tor instance terminated successfully"); - } - break; - } else { - let file_content = addresses.read().unwrap().iter().map(|addr| { - format!("{}\n", addr) - }).collect::>().join(""); - let mut file = OpenOptions::new() - .write(true) - .create(true) - .append(true) - .open(address_file.to_str().unwrap()) - .await + let addres_arc = addresses.clone(); + let rpc_thread = thread::spawn(|| { + start_rpc_server_thread(directory_server_arc, addres_arc); + }); + + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, directory.port)).unwrap(); + + while !*directory.shutdown.read().unwrap() { + match listener.accept() { + Ok((mut stream, addrs)) => { + log::debug!("Incoming connection from : {}", addrs); + let address_arc = addresses.clone(); + stream + .set_read_timeout(Some(Duration::from_secs(20))) .unwrap(); - file.write_all(file_content.as_bytes()).await.unwrap(); - } + stream + .set_write_timeout(Some(Duration::from_secs(20))) + .unwrap(); + handle_client(&mut stream, address_arc); + } + + // If no connection received, check for shutdown or save addresses to disk + Err(e) => { + log::error!("Error accepting incoming connection: {:?}", e); } } + + sleep(Duration::from_secs(3)); } + + log::info!("Shutdown signal received. Stopping directory server."); + rpc_thread.join().unwrap(); + if let Some(handle) = tor_handle { + crate::tor::kill_tor_handles(handle); + log::info!("Directory server and Tor instance terminated successfully"); + } + + // Write the addresses to file + let file_content = addresses + .read() + .unwrap() + .iter() + .map(|addr| format!("{}\n", addr)) + .collect::>() + .join(""); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) // Override the address file + .open(address_file.to_str().unwrap()) + .unwrap(); + file.write_all(file_content.as_bytes()).unwrap(); + file.flush().unwrap(); } -async fn handle_client(mut stream: tokio::net::TcpStream, addresses: Arc>>) { - let mut reader = tokio::io::BufReader::new(&mut stream); +// The stream should have read and write timeout set. +// TODO: Use serde encoded data instead of string. +fn handle_client(stream: &mut TcpStream, addresses: Arc>>) { + let reader_stream = stream.try_clone().unwrap(); + let mut reader = BufReader::new(reader_stream); let mut request_line = String::new(); - reader.read_line(&mut request_line).await.unwrap(); - log::info!("addresses, {:?}", addresses); + + reader.read_line(&mut request_line).unwrap(); if request_line.starts_with("POST") { - let onion_address: String = request_line.replace("POST ", "").trim().to_string(); - addresses.write().unwrap().insert(onion_address.clone()); - log::info!("Got new maker address: {}", onion_address); + let addr: String = request_line.replace("POST ", "").trim().to_string(); + addresses.write().unwrap().insert(addr.clone()); + log::info!("Got new maker address: {}", addr); } else if request_line.starts_with("GET") { log::info!("Taker pinged the directory server"); let response = addresses @@ -248,7 +260,8 @@ async fn handle_client(mut stream: tokio::net::TcpStream, addresses: Arc>, -) -> Result, MakerError> { - let read_result = reader.read_u32().await; - // If its EOF, return None - log::info!("read_result: {:?}", read_result); - if read_result - .as_ref() - .is_err_and(|e| e.kind() == std::io::ErrorKind::UnexpectedEof) - { - return Ok(None); - } - let length = read_result?; - log::info!("length: {:?}", length); - if length == 0 { - return Ok(None); - } - let mut buffer = vec![0; length as usize]; - reader.read_exact(&mut buffer).await?; - log::info!("buffer: {:?}", buffer); - let message: RpcMsgReq = serde_cbor::from_slice(&buffer)?; - log::info!("message: {:?}", message); - Ok(Some(message)) -} - -pub async fn read_resp_message( - reader: &mut BufReader>, -) -> Result, MakerError> { - let read_result = reader.read_u32().await; - // If its EOF, return None - if read_result - .as_ref() - .is_err_and(|e| e.kind() == std::io::ErrorKind::UnexpectedEof) - { - return Ok(None); - } - let length = read_result?; - if length == 0 { - return Ok(None); - } - let mut buffer = vec![0; length as usize]; - reader.read_exact(&mut buffer).await?; - let message: RpcMsgResp = serde_cbor::from_slice(&buffer)?; - Ok(Some(message)) -} - -async fn handle_request( - mut socker: TcpStream, - address: Arc>>, -) -> Result<(), MakerError> { - let (socket_reader, mut socket_writer) = socker.split(); - let mut reader = BufReader::new(socket_reader); +fn handle_request(socket: &mut TcpStream, address: Arc>>) { + let req_bytes = read_message(socket).unwrap(); + let rpc_request: RpcMsgReq = serde_cbor::from_slice(&req_bytes).unwrap(); - if let Some(rpc_request) = read_rpc_message(&mut reader).await? { - match rpc_request { - RpcMsgReq::ListAddresses => { - log::info!("RPC request received: {:?}", rpc_request); - let resp = RpcMsgResp::ListAddressesResp(address.read().unwrap().clone()); - if let Err(e) = send_message(&mut socket_writer, &resp).await { - log::info!("Error sending RPC response {:?}", e); - }; - } + match rpc_request { + RpcMsgReq::ListAddresses => { + log::info!("RPC request received: {:?}", rpc_request); + let resp = RpcMsgResp::ListAddressesResp(address.read().unwrap().clone()); + if let Err(e) = send_message(socket, &resp) { + log::info!("Error sending RPC response {:?}", e); + }; } } - - Ok(()) } -pub async fn start_rpc_server_thread( +pub fn start_rpc_server_thread( directory: Arc, address: Arc>>, ) { let rpc_port = directory.rpc_port; let rpc_socket = format!("127.0.0.1:{}", rpc_port); - let listener = TcpListener::bind(&rpc_socket).await.unwrap(); + let listener = Arc::new(TcpListener::bind(&rpc_socket).unwrap()); log::info!( "[{}] RPC socket binding successful at {}", directory.rpc_port, rpc_socket ); - tokio::spawn(async move { - loop { - let (socket, addrs) = listener.accept().await.unwrap(); - log::info!("Got RPC request from: {}", addrs); - handle_request(socket, address.clone()).await.unwrap(); + listener.set_nonblocking(true).unwrap(); + + while !*directory.shutdown.read().unwrap() { + match listener.accept() { + Ok((mut stream, addr)) => { + log::info!("Got RPC request from: {}", addr); + stream + .set_read_timeout(Some(Duration::from_secs(20))) + .unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(20))) + .unwrap(); + handle_request(&mut stream, address.clone()); + } + Err(e) => { + if e.kind() == ErrorKind::WouldBlock { + sleep(Duration::from_secs(3)); + continue; + } else { + log::error!("Error accepting RPC connection: {:?}", e); + break; + } + } } - }); + sleep(Duration::from_secs(3)); + } } diff --git a/src/protocol/messages.rs b/src/protocol/messages.rs index 3c220db9..76f79c0e 100644 --- a/src/protocol/messages.rs +++ b/src/protocol/messages.rs @@ -170,14 +170,14 @@ pub struct HashPreimage { } /// Multisig Privatekeys used in the last step of coinswap to perform privatekey handover. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct MultisigPrivkey { pub multisig_redeemscript: ScriptBuf, pub key: SecretKey, } /// Message to perform the final Privatekey Handover. This is the last message of the Coinswap Protocol. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct PrivKeyHandover { pub multisig_privkeys: Vec, } @@ -221,7 +221,7 @@ impl Display for TakerToMakerMessage { } /// Represents the initial handshake message sent from Maker to Taker. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct MakerHello { pub protocol_version_min: u32, pub protocol_version_max: u32, @@ -250,13 +250,13 @@ pub struct Offer { } /// Contract Tx signatures provided by a Sender of a Coinswap. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct ContractSigsForSender { pub sigs: Vec, } /// Contract Tx and extra metadata from a Sender of a Coinswap -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct SenderContractTxInfo { pub contract_tx: Transaction, pub timelock_pubkey: PublicKey, @@ -266,7 +266,7 @@ pub struct SenderContractTxInfo { /// This message is sent by a Maker to a Taker, which is a request to the Taker for gathering signatures for the Maker as both Sender and Receiver of Coinswaps. /// This message is sent by a Maker after a [`ProofOfFunding`] has been received. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct ContractSigsAsRecvrAndSender { /// Contract Tx by which this maker is receiving Coinswap. pub receivers_contract_txs: Vec, @@ -275,13 +275,13 @@ pub struct ContractSigsAsRecvrAndSender { } /// Contract Tx signatures a Maker sends as a Receiver of CoinSwap. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct ContractSigsForRecvr { pub sigs: Vec, } /// All messages sent from Maker to Taker. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum MakerToTakerMessage { /// Protocol Handshake. MakerHello(MakerHello), diff --git a/src/taker/api.rs b/src/taker/api.rs index c095c256..e6fc98ff 100644 --- a/src/taker/api.rs +++ b/src/taker/api.rs @@ -11,15 +11,16 @@ use std::{ collections::{HashMap, HashSet}, fs, - io::Read, + io::{self, Read}, + net::TcpStream, path::{Path, PathBuf}, - thread, + thread::{self, sleep}, time::{Duration, Instant}, }; use bip39::Mnemonic; use bitcoind::bitcoincore_rpc::RpcApi; -use tokio::{net::TcpStream, select, time::sleep}; +use socks::Socks5Stream; use bitcoin::{ consensus::encode::deserialize, @@ -28,9 +29,8 @@ use bitcoin::{ rand::{rngs::OsRng, RngCore}, SecretKey, }, - Amount, BlockHash, Network, OutPoint, PublicKey, ScriptBuf, Transaction, Txid, + Amount, BlockHash, OutPoint, PublicKey, ScriptBuf, Transaction, Txid, }; -use tokio_socks::tcp::Socks5Stream; use super::{ error::TakerError, @@ -63,7 +63,7 @@ pub struct SwapParams { /// Total Amount to Swap. pub send_amount: Amount, /// How many hops. - pub maker_count: u16, + pub maker_count: usize, /// How many splits pub tx_count: u32, // TODO: Following two should be moved to TakerConfig as global configuration. @@ -241,8 +241,7 @@ impl Taker { &mut self.wallet } - #[tokio::main] - pub async fn do_coinswap(&mut self, swap_params: SwapParams) -> Result<(), TakerError> { + pub fn do_coinswap(&mut self, swap_params: SwapParams) -> Result<(), TakerError> { let tor_log_dir = "/tmp/tor-rust-taker/log".to_string(); let taker_port = self.config.port; @@ -279,7 +278,7 @@ impl Taker { } } } - self.send_coinswap(swap_params).await?; + self.send_coinswap(swap_params)?; if self.config.connection_type == ConnectionType::TOR && cfg!(feature = "tor") { crate::tor::kill_tor_handles(handle.unwrap()); @@ -296,12 +295,10 @@ impl Taker { /// by executing the contract txs. If that fails too for any reason, user should manually call the [Taker::recover_from_swap]. /// /// If that fails too. Open an issue at [our github](https://github.com/citadel-tech/coinswap/issues) - pub async fn send_coinswap(&mut self, swap_params: SwapParams) -> Result<(), TakerError> { + pub fn send_coinswap(&mut self, swap_params: SwapParams) -> Result<(), TakerError> { log::info!("Syncing Offerbook"); - let network = self.wallet.store.network; let config = self.config.clone(); - self.sync_offerbook(network, &config, swap_params.maker_count) - .await?; + self.sync_offerbook(&config, swap_params.maker_count)?; // Generate new random preimage and initiate the first hop. let mut preimage = [0u8; 32]; @@ -311,7 +308,7 @@ impl Taker { self.ongoing_swap_state.swap_params = swap_params; // Try first hop. Abort if error happens. - if let Err(e) = self.init_first_hop().await { + if let Err(e) = self.init_first_hop() { log::error!("Could not initiate first hop: {:?}", e); self.recover_from_swap()?; return Err(e); @@ -330,43 +327,41 @@ impl Taker { // Refund lock time decreases by `refund_locktime_step` for each hop. let maker_refund_locktime = self.config.refund_locktime + self.config.refund_locktime_step - * (self.ongoing_swap_state.swap_params.maker_count - maker_index - 1); + * (self.ongoing_swap_state.swap_params.maker_count - maker_index - 1) as u16; let funding_tx_infos = self.funding_info_for_next_maker(); // Attempt to initiate the next hop of the swap. If anything goes wrong, abort immediately. // If succeeded, collect the funding_outpoints and multisig_reedemscripts of the next hop. // If error then aborts from current swap. Ban the Peer. - let (funding_outpoints, multisig_reedemscripts) = match self - .send_sigs_init_next_hop(maker_refund_locktime, &funding_tx_infos) - .await - { - Ok((next_peer_info, contract_sigs)) => { - self.ongoing_swap_state.peer_infos.push(next_peer_info); - let multisig_reedemscripts = contract_sigs - .senders_contract_txs_info - .iter() - .map(|senders_contract_tx_info| { - senders_contract_tx_info.multisig_redeemscript.clone() - }) - .collect::>(); - let funding_outpoints = contract_sigs - .senders_contract_txs_info - .iter() - .map(|senders_contract_tx_info| { - senders_contract_tx_info.contract_tx.input[0].previous_output - }) - .collect::>(); + let (funding_outpoints, multisig_reedemscripts) = + match self.send_sigs_init_next_hop(maker_refund_locktime, &funding_tx_infos) { + Ok((next_peer_info, contract_sigs)) => { + self.ongoing_swap_state.peer_infos.push(next_peer_info); + let multisig_reedemscripts = contract_sigs + .senders_contract_txs_info + .iter() + .map(|senders_contract_tx_info| { + senders_contract_tx_info.multisig_redeemscript.clone() + }) + .collect::>(); + let funding_outpoints = contract_sigs + .senders_contract_txs_info + .iter() + .map(|senders_contract_tx_info| { + senders_contract_tx_info.contract_tx.input[0].previous_output + }) + .collect::>(); - (funding_outpoints, multisig_reedemscripts) - } - Err(e) => { - log::error!("Could not initiate next hop. Error : {:?}", e); - log::warn!("Starting recovery from existing swap"); - self.recover_from_swap()?; - return Ok(()); - } - }; + (funding_outpoints, multisig_reedemscripts) + } + Err(e) => { + log::error!("Could not initiate next hop. Error : {:?}", e); + log::warn!("Starting recovery from existing swap"); + self.recover_from_swap()?; + return Ok(()); + } + }; // Watch for both expected and unexpected transactions. // This errors in two cases. @@ -374,14 +369,13 @@ impl Taker { // For all cases, abort from swap immediately. // For the timeout case also ban the Peer. let txids_to_watch = funding_outpoints.iter().map(|op| op.txid).collect(); - match self.watch_for_txs(&txids_to_watch).await { + match self.watch_for_txs(&txids_to_watch) { Ok(r) => self.ongoing_swap_state.funding_txs.push(r), Err(e) => { log::error!("Error: {:?}", e); log::warn!("Starting recovery from existing swap"); if let TakerError::FundingTxWaitTimeOut = e { - let bad_maker = - &self.ongoing_swap_state.peer_infos[maker_index as usize].peer; + let bad_maker = &self.ongoing_swap_state.peer_infos[maker_index].peer; self.offerbook.add_bad_maker(bad_maker); } self.recover_from_swap()?; @@ -395,7 +389,7 @@ impl Taker { self.create_incoming_swapcoins(multisig_reedemscripts, funding_outpoints)?; log::debug!("Incoming Swapcoins: {:?}", incoming_swapcoins); self.ongoing_swap_state.incoming_swapcoins = incoming_swapcoins; - match self.request_sigs_for_incoming_swap().await { + match self.request_sigs_for_incoming_swap() { Ok(_) => (), Err(e) => { log::error!("Incoming SwapCoin Generation failed : {:?}", e); @@ -418,7 +412,7 @@ impl Taker { return Ok(()); } - match self.settle_all_swaps().await { + match self.settle_all_swaps() { Ok(_) => (), Err(e) => { log::error!("Swap Settlement Failed : {:?}", e); @@ -441,21 +435,22 @@ impl Taker { /// Initiate the first coinswap hop. Makers are selected from the [OfferBook], and round will /// fail if no suitable makers are found. /// Creates and stores the [OutgoingSwapCoin] into [OngoingSwapState], and also saves it into the [Wallet] file. - async fn init_first_hop(&mut self) -> Result<(), TakerError> { + fn init_first_hop(&mut self) -> Result<(), TakerError> { log::info!("Initializing First Hop."); // Set the Taker Position state self.ongoing_swap_state.taker_position = TakerPosition::FirstPeer; // Locktime to be used for this swap. let swap_locktime = self.config.refund_locktime - + self.config.refund_locktime_step * self.ongoing_swap_state.swap_params.maker_count; + + self.config.refund_locktime_step + * self.ongoing_swap_state.swap_params.maker_count as u16; // Loop until we find a live maker who responded to our signature request. let (maker, funding_txs) = loop { // Fail early if not enough good makers in the list to satisfy swap requirements. let untried_maker_count = self.offerbook.get_all_untried().len(); - if untried_maker_count < (self.ongoing_swap_state.swap_params.maker_count as usize) { + if untried_maker_count < (self.ongoing_swap_state.swap_params.maker_count) { log::error!("Not enough makers to satisfy swap requirements."); return Err(TakerError::NotEnoughMakersInOfferBook); } @@ -481,16 +476,13 @@ impl Taker { .collect(); // Request for Sender's Signatures - let contract_sigs = match self - .req_sigs_for_sender( - &maker.address, - &outgoing_swapcoins, - &multisig_nonces, - &hashlock_nonces, - swap_locktime, - ) - .await - { + let contract_sigs = match self.req_sigs_for_sender( + &maker.address, + &outgoing_swapcoins, + &multisig_nonces, + &hashlock_nonces, + swap_locktime, + ) { Ok(contract_sigs) => contract_sigs, Err(e) => { // Bad maker, mark it, and try next one. @@ -555,7 +547,7 @@ impl Taker { // TakerError::ContractsBroadcasted and TakerError::FundingTxWaitTimeOut. // For all cases, abort from swap immediately. // For the contract-broadcasted case also ban the Peer. - match self.watch_for_txs(&funding_txids).await { + match self.watch_for_txs(&funding_txids) { Ok(stuffs) => { self.ongoing_swap_state.funding_txs.push(stuffs); self.offerbook.add_good_maker(&maker); @@ -575,7 +567,7 @@ impl Taker { /// Return a list of confirmed funding txs with their corresponding merkle proofs. /// Errors if any watching contract txs have been broadcasted during the time too. /// The error contanis the list of broadcasted contract [Txid]s. - async fn watch_for_txs( + fn watch_for_txs( &self, funding_txids: &Vec, ) -> Result<(Vec, Vec), TakerError> { @@ -678,7 +670,7 @@ impl Taker { .collect::, _>>()?; return Ok((txes, merkleproofs)); } - sleep(Duration::from_millis(1000)).await; + sleep(Duration::from_millis(1000)); } } @@ -780,12 +772,11 @@ impl Taker { /// Send signatures to a maker, and initiate the next hop of the swap by finding a new maker. /// If no suitable makers are found in [OfferBook], next swap will not initiate and the swap round will fail. - async fn send_sigs_init_next_hop( + fn send_sigs_init_next_hop( &mut self, maker_refund_locktime: u16, funding_tx_infos: &[FundingTxInfo], ) -> Result<(NextPeerInfo, ContractSigsAsRecvrAndSender), TakerError> { - let reconnect_timeout_sec = self.config.reconnect_attempt_timeout_sec; // Configurable reconnection attempts for testing let reconnect_attempts = if cfg!(feature = "integration-test") { 10 @@ -801,67 +792,66 @@ impl Taker { }; let mut ii = 0; + + let maker_oa = self + .ongoing_swap_state + .peer_infos + .last() + .expect("at least one active maker expected") + .peer + .clone(); + loop { ii += 1; - select! { - ret = self.send_sigs_init_next_hop_once( - maker_refund_locktime, - funding_tx_infos - ) => { - match ret { - Ok(return_value) => return Ok(return_value), - Err(e) => { - let maker = &self.ongoing_swap_state.peer_infos.last().expect("at least one active maker expected").peer; - log::error!( - "Failed to exchange signatures with maker {}, \ - reattempting... error={:?}", - &maker.address, - e - ); - // If its a protocol error and not just connection error, scream hard. - if let TakerError::Protocol(msg) = e { - return Err(TakerError::Protocol(msg)) - } + match self.send_sigs_init_next_hop_once(maker_refund_locktime, funding_tx_infos) { + Ok(ret) => return Ok(ret), + Err(e) => { + // Re attempt upto reconnect_attempts tries, if timedout error + if let TakerError::Net(NetError::IO(error)) = e { + if error.kind() == io::ErrorKind::WouldBlock + || error.kind() == io::ErrorKind::TimedOut + { if ii <= reconnect_attempts { - sleep(Duration::from_secs( - if ii <= self.config.short_long_sleep_delay_transition { - sleep_delay - } else { - self.config.reconnect_long_sleep_delay - }, - )) - .await; + log::warn!( + "Timeout for settling coinswap with maker {}, reattempting...", + maker_oa.address + ); continue; } else { - // Attempt count exceeded. Ban this maker. - log::warn!("Connection attempt count exceeded with Maker:{}, Banning Maker.", maker.address); - self.offerbook.add_bad_maker(maker); - return Err(e); + log::warn!("Timeout Reattempt exceeded. Adding malicious Maker"); + self.offerbook.add_bad_maker(&maker_oa); + return Err(NetError::ConnectionTimedOut.into()); } } - } - }, - _ = sleep(Duration::from_secs(reconnect_timeout_sec)) => { - log::warn!( - "Timeout for exchange signatures with maker {}, reattempting...", - &self.ongoing_swap_state.peer_infos.last().expect("at least one active maker expected").peer.address - ); - if ii <= reconnect_attempts { - continue; } else { - // Timeout exceeded. Ban this maker. - let maker = &self.ongoing_swap_state.peer_infos.last().expect("atleast one maker expected at this stage").peer; - log::warn!("Connection timeout exceeded with Maker:{}, Banning Maker.", maker.address); - self.offerbook.add_bad_maker(maker); - return Err(NetError::ConnectionTimedOut.into()); + // Re attempt with transitory delay for all other errors + log::warn!( + "Failed to connect to maker {} to send signatures and init next hop, \ + reattempting... error={:?}", + &maker_oa.address, + e + ); + if ii <= reconnect_attempts { + sleep(Duration::from_secs( + if ii <= self.config.short_long_sleep_delay_transition { + sleep_delay + } else { + self.config.reconnect_long_sleep_delay + }, + )); + continue; + } else { + self.offerbook.add_bad_maker(&maker_oa); + return Err(e); + } } - }, + } } } } /// [Internal] Single attempt to send signatures and initiate next hop. - async fn send_sigs_init_next_hop_once( + fn send_sigs_init_next_hop_once( &mut self, maker_refund_locktime: u16, funding_tx_infos: &[FundingTxInfo], @@ -878,16 +868,20 @@ impl Taker { log::info!("Connecting to {}", this_maker.address); let address = this_maker.address.to_string(); let mut socket = match self.config.connection_type { - ConnectionType::CLEARNET => TcpStream::connect(address).await?, + ConnectionType::CLEARNET => TcpStream::connect(address)?, ConnectionType::TOR => Socks5Stream::connect( format!("127.0.0.1:{}", self.config.socks_port).as_str(), - address, - ) - .await? + address.as_str(), + )? .into_inner(), }; - // let mut socket = TcpStream::connect(this_maker.address.get_tcpstream_address()).await?; - let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?; + + let reconnect_timeout = Duration::from_secs(self.config.reconnect_attempt_timeout_sec); + + socket.set_read_timeout(Some(reconnect_timeout))?; + socket.set_write_timeout(Some(reconnect_timeout))?; + + handshake_maker(&mut socket)?; let mut next_maker = this_maker.clone(); let ( next_peer_multisig_pubkeys, @@ -970,13 +964,11 @@ impl Taker { }; let (contract_sigs_as_recvr_sender, next_swap_contract_redeemscripts) = send_proof_of_funding_and_init_next_hop( - &mut socket_reader, - &mut socket_writer, + &mut socket, this_maker_info, next_maker_info, self.get_preimage_hash(), - ) - .await?; + )?; log::info!( "<=== Recieved ContractSigsAsRecvrAndSender from {}", this_maker.address @@ -1013,16 +1005,13 @@ impl Taker { &next_peer_multisig_pubkeys, &next_swap_contract_redeemscripts, )?; - let sigs = match self - .req_sigs_for_sender( - &next_maker.address, - &watchonly_swapcoins, - &next_peer_multisig_keys_or_nonces, - &next_peer_hashlock_keys_or_nonces, - maker_refund_locktime, - ) - .await - { + let sigs = match self.req_sigs_for_sender( + &next_maker.address, + &watchonly_swapcoins, + &next_peer_multisig_keys_or_nonces, + &next_peer_hashlock_keys_or_nonces, + maker_refund_locktime, + ) { Ok(r) => { self.offerbook.add_good_maker(&next_maker); r @@ -1081,14 +1070,11 @@ impl Taker { [self.ongoing_swap_state.watchonly_swapcoins.len() - 2] }; - match self - .req_sigs_for_recvr( - previous_maker_addr, - previous_maker_watchonly_swapcoins, - &contract_sigs_as_recvr_sender.receivers_contract_txs, - ) - .await - { + match self.req_sigs_for_recvr( + previous_maker_addr, + previous_maker_watchonly_swapcoins, + &contract_sigs_as_recvr_sender.receivers_contract_txs, + ) { Ok(s) => s.sigs, Err(e) => { log::error!("Could not get Receiver's signatures : {:?}", e); @@ -1103,15 +1089,15 @@ impl Taker { this_maker.address ); send_message( - &mut socket_writer, + &mut socket, &TakerToMakerMessage::RespContractSigsForRecvrAndSender( ContractSigsForRecvrAndSender { receivers_sigs, senders_sigs, }, ), - ) - .await?; + )?; + let next_swap_info = NextPeerInfo { peer: next_maker.clone(), multisig_pubkeys: next_peer_multisig_pubkeys, @@ -1288,7 +1274,7 @@ impl Taker { } /// Request signatures for the [IncomingSwapCoin] from the last maker of the swap round. - async fn request_sigs_for_incoming_swap(&mut self) -> Result<(), TakerError> { + fn request_sigs_for_incoming_swap(&mut self) -> Result<(), TakerError> { // Intermediate hops completed. Perform the last receiving hop. let last_maker = self .ongoing_swap_state @@ -1303,19 +1289,16 @@ impl Taker { "===> Sending ReqContractSigsForRecvr to {}", last_maker.address ); - let receiver_contract_sig = match self - .req_sigs_for_recvr( - &last_maker.address, - &self.ongoing_swap_state.incoming_swapcoins, - &self - .ongoing_swap_state - .incoming_swapcoins - .iter() - .map(|swapcoin| swapcoin.contract_tx.clone()) - .collect::>(), - ) - .await - { + let receiver_contract_sig = match self.req_sigs_for_recvr( + &last_maker.address, + &self.ongoing_swap_state.incoming_swapcoins, + &self + .ongoing_swap_state + .incoming_swapcoins + .iter() + .map(|swapcoin| swapcoin.contract_tx.clone()) + .collect::>(), + ) { Ok(s) => s, Err(e) => { log::warn!("Banning Maker : {}", last_maker.address); @@ -1342,7 +1325,7 @@ impl Taker { /// Request signatures for sender side of the swap. /// Keep trying until `first_connect_attempts` limit, with time delay of `first_connect_sleep_delay_sec`. - async fn req_sigs_for_sender( + fn req_sigs_for_sender( &self, maker_address: &MakerAddress, outgoing_swapcoins: &[S], @@ -1350,6 +1333,7 @@ impl Taker { maker_hashlock_nonces: &[SecretKey], locktime: u16, ) -> Result { + let reconnect_time_out = Duration::from_secs(self.config.first_connect_attempt_timeout_sec); // Configurable reconnection attempts for testing let first_connect_attempts = if cfg!(feature = "integration-test") { 10 @@ -1365,46 +1349,57 @@ impl Taker { }; let mut ii = 0; + + let maker_addr_str = maker_address.to_string(); + log::info!("Connecting to {}", maker_addr_str); + let mut socket = match self.config.connection_type { + ConnectionType::CLEARNET => TcpStream::connect(maker_addr_str.clone())?, + ConnectionType::TOR => Socks5Stream::connect( + format!("127.0.0.1:{}", self.config.socks_port).as_str(), + &*maker_addr_str, + )? + .into_inner(), + }; + + socket.set_read_timeout(Some(reconnect_time_out))?; + socket.set_write_timeout(Some(reconnect_time_out))?; + loop { ii += 1; - select! { - ret = req_sigs_for_sender_once( - self.config.connection_type, - maker_address, - outgoing_swapcoins, - maker_multisig_nonces, - maker_hashlock_nonces, - locktime, - ) => { - match ret { - Ok(sigs) => return Ok(sigs), - Err(e) => { - log::warn!( - "Failed to request senders contract tx sigs from maker {}, \ - reattempting... error={:?}", - maker_address, - e - ); - if ii <= first_connect_attempts { - sleep(Duration::from_secs(sleep_delay)).await; - continue; - } else { - return Err(e); - } - } - } - }, - _ = sleep(Duration::from_secs(self.config.first_connect_attempt_timeout_sec)) => { + + match req_sigs_for_sender_once( + &mut socket, + outgoing_swapcoins, + maker_multisig_nonces, + maker_hashlock_nonces, + locktime, + ) { + Ok(ret) => return Ok(ret), + Err(e) => { log::warn!( - "Timeout for request senders contract tx sig from maker {}, reattempting...", - maker_address + "Failed to connect to maker {} to request signatures for receiver, \ + reattempting... error={:?}", + &maker_addr_str, + e ); - if ii <= self.config.first_connect_attempts { + if ii <= first_connect_attempts { + sleep(Duration::from_secs( + if ii <= self.config.short_long_sleep_delay_transition { + sleep_delay + } else { + self.config.reconnect_long_sleep_delay + }, + )); continue; } else { - return Err(NetError::ConnectionTimedOut.into()); + log::warn!( + "Failed to connect to maker {} to request signatures for receiver, \ + reattempt limit exceeded", + &maker_addr_str, + ); + return Err(e); } - }, + } } } } @@ -1413,12 +1408,14 @@ impl Taker { /// Keep trying until `reconnect_attempts` limit, with a time delay. /// The time delay transitions from `reconnect_short_slepp_delay` to `reconnect_locg_sleep_delay`, /// after `short_long_sleep_delay_transition` time. - async fn req_sigs_for_recvr( + fn req_sigs_for_recvr( &self, maker_address: &MakerAddress, incoming_swapcoins: &[S], receivers_contract_txes: &[Transaction], ) -> Result { + let reconnect_time_out = Duration::from_secs(self.config.reconnect_attempt_timeout_sec); + // Configurable reconnection attempts for testing let reconnect_attempts = if cfg!(feature = "integration-test") { 10 @@ -1434,51 +1431,51 @@ impl Taker { }; let mut ii = 0; + + let maker_addr_str = maker_address.to_string(); + log::info!("Connecting to {}", maker_addr_str); + let mut socket = match self.config.connection_type { + ConnectionType::CLEARNET => TcpStream::connect(maker_addr_str.clone())?, + ConnectionType::TOR => Socks5Stream::connect( + format!("127.0.0.1:{}", self.config.socks_port).as_str(), + &*maker_addr_str, + )? + .into_inner(), + }; + + socket.set_read_timeout(Some(reconnect_time_out))?; + socket.set_write_timeout(Some(reconnect_time_out))?; + loop { ii += 1; - select! { - ret = req_sigs_for_recvr_once( - self.config.connection_type, - maker_address, - incoming_swapcoins, - receivers_contract_txes, - ) => { - match ret { - Ok(sigs) => return Ok(sigs), - Err(e) => { - log::warn!( - "Failed to request receivers contract tx sigs from maker {}, \ - reattempting... error={:?}", - maker_address, - e - ); - if ii <= reconnect_attempts { - sleep(Duration::from_secs( - if ii <= self.config.short_long_sleep_delay_transition { - sleep_delay - } else { - self.config.reconnect_long_sleep_delay - }, - )) - .await; - continue; - } else { - return Err(e); - } - } - } - }, - _ = sleep(Duration::from_secs(self.config.reconnect_attempt_timeout_sec)) => { + match req_sigs_for_recvr_once(&mut socket, incoming_swapcoins, receivers_contract_txes) + { + Ok(ret) => return Ok(ret), + Err(e) => { log::warn!( - "Timeout for request receivers contract tx sig from maker {}, reattempting...", - maker_address + "Failed to connect to maker {} to request signatures for receiver, \ + reattempting... error={:?}", + &maker_addr_str, + e ); - if ii <= self.config.reconnect_attempts { + if ii <= reconnect_attempts { + sleep(Duration::from_secs( + if ii <= self.config.short_long_sleep_delay_transition { + sleep_delay + } else { + self.config.reconnect_long_sleep_delay + }, + )); continue; } else { - return Err(NetError::ConnectionTimedOut.into()); + log::warn!( + "Failed to connect to maker {} to request signatures for receiver, \ + reattempt limit exceeded", + &maker_addr_str, + ); + return Err(e); } - }, + } } } } @@ -1486,7 +1483,7 @@ impl Taker { /// Settle all the ongoing swaps. This routine sends the hash preimage to all the makers. /// Pass around the Maker's multisig privatekeys. Saves all the data in wallet file. This marks /// the ends of swap round. - async fn settle_all_swaps(&mut self) -> Result<(), TakerError> { + fn settle_all_swaps(&mut self) -> Result<(), TakerError> { let mut outgoing_privkeys: Option> = None; // Because the last peer info is the Taker, we take upto (0..n-1), where n = peer_info.len() @@ -1499,7 +1496,7 @@ impl Taker { for (index, maker_address) in maker_addresses.iter().enumerate() { if index == 0 { self.ongoing_swap_state.taker_position = TakerPosition::FirstPeer; - } else if index == ((self.ongoing_swap_state.swap_params.maker_count - 1) as usize) { + } else if index == (self.ongoing_swap_state.swap_params.maker_count - 1) { self.ongoing_swap_state.taker_position = TakerPosition::LastPeer; } else { self.ongoing_swap_state.taker_position = TakerPosition::WatchOnly; @@ -1538,69 +1535,74 @@ impl Taker { .collect::>() }; - let reconnect_time_out = self.config.reconnect_attempt_timeout_sec; + let reconnect_time_out = Duration::from_secs(self.config.reconnect_attempt_timeout_sec); let mut ii = 0; - loop { - // Configurable reconnection attempts for testing - let reconnect_attempts = if cfg!(feature = "integration-test") { - 10 - } else { - self.config.reconnect_attempts - }; - // Custom sleep delay for testing. - let sleep_delay = if cfg!(feature = "integration-test") { - 1 - } else { - self.config.reconnect_short_sleep_delay - }; + let maker_addr_str = maker_address.address.to_string(); + log::info!("Connecting to {}", maker_addr_str); + let mut socket = match self.config.connection_type { + ConnectionType::CLEARNET => TcpStream::connect(maker_addr_str.clone())?, + ConnectionType::TOR => Socks5Stream::connect( + format!("127.0.0.1:{}", self.config.socks_port).as_str(), + &*maker_addr_str, + )? + .into_inner(), + }; + + socket.set_read_timeout(Some(reconnect_time_out))?; + socket.set_write_timeout(Some(reconnect_time_out))?; + + // Configurable reconnection attempts for testing + let reconnect_attempts = if cfg!(feature = "integration-test") { + 10 + } else { + self.config.reconnect_attempts + }; + + // Custom sleep delay for testing. + let sleep_delay = if cfg!(feature = "integration-test") { + 1 + } else { + self.config.reconnect_short_sleep_delay + }; + loop { ii += 1; - select! { - ret = self.settle_one_coinswap( - &maker_address.address, - index, - &mut outgoing_privkeys, - &senders_multisig_redeemscripts, - &receivers_multisig_redeemscripts, - ) => { - if let Err(e) = ret { - log::warn!( - "Failed to connect to maker {} to settle coinswap, \ - reattempting... error={:?}", - &maker_address.address, - e - ); - if ii <= reconnect_attempts { - sleep(Duration::from_secs( - if ii <= self.config.short_long_sleep_delay_transition { - sleep_delay - } else { - self.config.reconnect_long_sleep_delay - }, - )) - .await; - continue; - } else { - self.offerbook.add_bad_maker(maker_address); - return Err(e); - } - } - break; - }, - _ = sleep(Duration::from_secs(reconnect_time_out)) => { + match self.settle_one_coinswap( + &mut socket, + index, + &mut outgoing_privkeys, + &senders_multisig_redeemscripts, + &receivers_multisig_redeemscripts, + ) { + Ok(()) => break, + Err(e) => { log::warn!( - "Timeout for settling coinswap with maker {}, reattempting...", - maker_address.address + "Failed to connect to maker {} to settle coinswap, \ + reattempting... error={:?}", + &maker_address.address, + e ); - if ii <= self.config.reconnect_attempts { + if ii <= reconnect_attempts { + sleep(Duration::from_secs( + if ii <= self.config.short_long_sleep_delay_transition { + sleep_delay + } else { + self.config.reconnect_long_sleep_delay + }, + )); continue; } else { + log::warn!( + "Failed to connect to maker {} to settle coinswap, \ + reattempt limit exceeded", + &maker_address.address, + ); self.offerbook.add_bad_maker(maker_address); - return Err(NetError::ConnectionTimedOut.into()); + return Err(e); } - }, + } } } } @@ -1608,37 +1610,27 @@ impl Taker { } /// [Internal] Setlle one swap. This is recursively called for all the makers. - async fn settle_one_coinswap<'a>( + fn settle_one_coinswap( &mut self, - maker_address: &MakerAddress, + socket: &mut TcpStream, index: usize, outgoing_privkeys: &mut Option>, senders_multisig_redeemscripts: &[ScriptBuf], receivers_multisig_redeemscripts: &[ScriptBuf], ) -> Result<(), TakerError> { - log::info!("Connecting to {}", maker_address); - let address = maker_address.to_string(); - let mut socket = match self.config.connection_type { - ConnectionType::CLEARNET => TcpStream::connect(address).await?, - ConnectionType::TOR => Socks5Stream::connect( - format!("127.0.0.1:{}", self.config.socks_port).as_str(), - address, - ) - .await? - .into_inner(), - }; - let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?; + handshake_maker(socket)?; - log::info!("===> Sending HashPreimage to {}", maker_address); + log::info!("===> Sending HashPreimage to {}", socket.peer_addr()?); let maker_private_key_handover = send_hash_preimage_and_get_private_keys( - &mut socket_reader, - &mut socket_writer, + socket, senders_multisig_redeemscripts, receivers_multisig_redeemscripts, &self.ongoing_swap_state.active_preimage, - ) - .await?; - log::info!("<=== Received PrivateKeyHandover from {}", maker_address); + )?; + log::info!( + "<=== Received PrivateKeyHandover from {}", + socket.peer_addr()? + ); let privkeys_reply = if self.ongoing_swap_state.taker_position == TakerPosition::FirstPeer { self.ongoing_swap_state @@ -1671,14 +1663,13 @@ impl Taker { *outgoing_privkeys = Some(maker_private_key_handover.multisig_privkeys); ret })?; - log::info!("===> Sending PrivateKeyHandover to {}", maker_address); + log::info!("===> Sending PrivateKeyHandover to {}", socket.peer_addr()?); send_message( - &mut socket_writer, + socket, &TakerToMakerMessage::RespPrivKeyHandover(PrivKeyHandover { multisig_privkeys: privkeys_reply, }), - ) - .await?; + )?; Ok(()) } @@ -1916,11 +1907,10 @@ impl Taker { } /// Synchronizes the offer book with addresses obtained from directory servers and local configurations. - pub async fn sync_offerbook( + pub fn sync_offerbook( &mut self, - network: Network, config: &TakerConfig, - maker_count: u16, + maker_count: usize, ) -> Result<(), TakerError> { let directory_address = match self.config.connection_type { ConnectionType::CLEARNET => { @@ -1949,15 +1939,19 @@ impl Taker { } }; + let socks_port = if self.config.connection_type == ConnectionType::TOR { + Some(self.config.socks_port) + } else { + None + }; + let addresses_from_dns = fetch_addresses_from_dns( - None, + socks_port, directory_address, - network, maker_count, config.connection_type, - ) - .await?; - let offers = fetch_offer_from_makers(addresses_from_dns, config).await; + )?; + let offers = fetch_offer_from_makers(addresses_from_dns, config); let new_offers = offers .into_iter() @@ -1972,7 +1966,7 @@ impl Taker { log::debug!("{:?}", offer); if let Err(e) = self .wallet - .verify_fidelity_proof(&offer.offer.fidelity, offer.address.to_string()) + .verify_fidelity_proof(&offer.offer.fidelity, &offer.address.to_string()) { log::warn!( "Fidelity Proof Verification failed with error: {:?}. Rejecting Offer from Maker : {}", diff --git a/src/taker/config.rs b/src/taker/config.rs index 644bd75e..b435200b 100644 --- a/src/taker/config.rs +++ b/src/taker/config.rs @@ -9,6 +9,7 @@ use crate::utill::{get_taker_dir, parse_field, parse_toml, write_default_config, /// Taker configuration with refund, connection, and sleep settings. #[derive(Debug, Clone, PartialEq)] pub struct TakerConfig { + // TODO: Move all of these to global constants. pub refund_locktime: u16, pub refund_locktime_step: u16, @@ -22,6 +23,7 @@ pub struct TakerConfig { pub short_long_sleep_delay_transition: u32, pub reconnect_attempt_timeout_sec: u64, + // TODO: Only these should be user facing configs. pub port: u16, pub socks_port: u16, pub directory_server_onion_address: String, diff --git a/src/taker/error.rs b/src/taker/error.rs index 1d0ff5b8..78368d32 100644 --- a/src/taker/error.rs +++ b/src/taker/error.rs @@ -20,7 +20,6 @@ pub enum TakerError { Wallet(WalletError), Directory(DirectoryServerError), Net(NetError), - Socks(tokio_socks::Error), Protocol(ProtocolError), SendAmountNotSet, FundingTxWaitTimeOut, @@ -63,12 +62,6 @@ impl From for TakerError { } } -impl From for TakerError { - fn from(value: tokio_socks::Error) -> Self { - Self::Socks(value) - } -} - impl From for TakerError { fn from(value: ProtocolError) -> Self { Self::Protocol(value) diff --git a/src/taker/offers.rs b/src/taker/offers.rs index bbe5a1c2..f623cf44 100644 --- a/src/taker/offers.rs +++ b/src/taker/offers.rs @@ -5,23 +5,24 @@ //! The module handles the syncing of the offer book with addresses obtained from directory servers and local configurations. //! It uses asynchronous channels for concurrent processing of maker offers. -use std::{fmt, thread, time::Duration}; - -use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, +use std::{ + fmt, + io::{Read, Write}, net::TcpStream, sync::mpsc, + thread::{self, Builder}, }; -use bitcoin::Network; - -use crate::{protocol::messages::Offer, utill::ConnectionType}; +use serde::{Deserialize, Serialize}; +use socks::Socks5Stream; -use crate::market::directory::DirectoryServerError; +use crate::{ + error::NetError, + protocol::messages::Offer, + utill::{ConnectionType, GLOBAL_PAUSE, NET_TIMEOUT}, +}; -use super::{config::TakerConfig, routines::download_maker_offer}; -use tokio_socks::tcp::Socks5Stream; +use super::{config::TakerConfig, error::TakerError, routines::download_maker_offer}; /// Represents an offer along with the corresponding maker address. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] @@ -37,19 +38,20 @@ struct OnionAddress { port: String, onion_addr: String, } + /// Enum representing maker addresses. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct MakerAddress(OnionAddress); impl MakerAddress { - pub fn new(address: String) -> Option { + pub fn new(address: &str) -> Result { if let Some((onion_addr, port)) = address.split_once(':') { - Some(Self(OnionAddress { + Ok(Self(OnionAddress { port: port.to_string(), onion_addr: onion_addr.to_string(), })) } else { - None + Err(NetError::InvalidNetworkAddress.into()) } } } @@ -60,6 +62,16 @@ impl fmt::Display for MakerAddress { } } +impl From<&mut TcpStream> for MakerAddress { + fn from(value: &mut TcpStream) -> Self { + let socket_addr = value.peer_addr().unwrap(); + MakerAddress(OnionAddress { + port: socket_addr.port().to_string(), + onion_addr: socket_addr.ip().to_string(), + }) + } +} + /// An ephemeral Offerbook tracking good and bad makers. Currently, Offerbook is initiated /// at start of every swap. So good and bad maker list will ot be persisted. // TODO: Persist the offerbook in disk. @@ -116,92 +128,105 @@ impl OfferBook { } /// Synchronizes the offer book with specific maker addresses. -pub async fn fetch_offer_from_makers( +pub fn fetch_offer_from_makers( maker_addresses: Vec, config: &TakerConfig, ) -> Vec { - let (offers_writer_m, mut offers_reader) = mpsc::channel::>(100); - //unbounded_channel makes more sense here, but results in a compile - //error i cant figure out + let (offers_writer, offers_reader) = mpsc::channel::>(); + // Thread pool for all connections to fetch maker offers. + let mut thread_pool = Vec::new(); let maker_addresses_len = maker_addresses.len(); for addr in maker_addresses { - let offers_writer = offers_writer_m.clone(); + let offers_writer = offers_writer.clone(); let taker_config: TakerConfig = config.clone(); - tokio::spawn(async move { - let offer = download_maker_offer(addr, taker_config).await; - offers_writer.send(offer).await.unwrap(); - }); + let thread = Builder::new() + .name(format!("maker_offer_fecth_thread_{}", addr)) + .spawn(move || { + let offer = download_maker_offer(addr, taker_config); + offers_writer.send(offer).unwrap(); + }) + .unwrap(); + + thread_pool.push(thread); } let mut result = Vec::::new(); for _ in 0..maker_addresses_len { - if let Some(offer_addr) = offers_reader.recv().await.unwrap() { + // TODO: Remove all unwraps and return TakerError. + if let Some(offer_addr) = offers_reader.recv().unwrap() { result.push(offer_addr); } } + + for thread in thread_pool { + log::debug!( + "Joining thread : {}", + thread.thread().name().expect("thread names expected") + ); + thread.join().unwrap(); + } result } /// Retrieves advertised maker addresses from directory servers based on the specified network. -pub async fn fetch_addresses_from_dns( +pub fn fetch_addresses_from_dns( socks_port: Option, directory_server_address: String, - _network: Network, - number_of_makers: u16, + number_of_makers: usize, connection_type: ConnectionType, -) -> Result, DirectoryServerError> { +) -> Result, TakerError> { + // TODO: Make the communication in serde_encoded bytes. + loop { - let result: Result, DirectoryServerError> = (async { - let mut stream = match connection_type { - ConnectionType::CLEARNET => TcpStream::connect(directory_server_address.as_str()) - .await - .unwrap(), - ConnectionType::TOR => Socks5Stream::connect( - format!("127.0.0.1:{}", socks_port.unwrap_or(19050)).as_str(), - directory_server_address.as_str(), - ) - .await - .map_err(|_e| { - DirectoryServerError::Other( - "Issue with fetching maker address from directory server", - ) - })? - .into_inner(), - }; - - let request_line = "GET\n"; - stream - .write_all(request_line.as_bytes()) - .await - .map_err(|_e| DirectoryServerError::Other("Error sending the request"))?; - - let mut response = String::new(); - stream - .read_to_string(&mut response) - .await - .map_err(|_e| DirectoryServerError::Other("Error receiving the response"))?; - - let addresses: Vec = response - .lines() - .map(|addr| MakerAddress::new(addr.to_string()).expect("Malformed maker address")) - .collect(); - - log::info!("Maker addresses received from DNS: {:?}", addresses); - - Ok(addresses) - }) - .await; + let mut stream = match connection_type { + ConnectionType::CLEARNET => TcpStream::connect(directory_server_address.as_str())?, + ConnectionType::TOR => { + let socket_addrs = format!("127.0.0.1:{}", socks_port.expect("Tor port expected")); + Socks5Stream::connect(socket_addrs, directory_server_address.as_str())?.into_inner() + } + }; + + stream.set_read_timeout(Some(NET_TIMEOUT))?; + stream.set_write_timeout(Some(NET_TIMEOUT))?; + stream.flush()?; + + // TODO: Handle timeout cases like the Taker/Maker comms, with attempt count and variable delays. + if let Err(e) = stream + .write_all("GET\n".as_bytes()) + .and_then(|_| stream.flush()) + { + log::error!("Error sending GET request to DNS {}.\nRe-attempting...", e); + thread::sleep(GLOBAL_PAUSE); + continue; + } + + let mut response = String::new(); + + if let Err(e) = stream.read_to_string(&mut response) { + log::error!("Error reading DNS response: {}. \nRe-attempting...", e); + thread::sleep(GLOBAL_PAUSE); + continue; + } - match result { + match response + .lines() + .map(MakerAddress::new) + .collect::, _>>() + { Ok(addresses) => { - if addresses.len() < (number_of_makers as usize) { - thread::sleep(Duration::from_secs(10)); - continue; + if addresses.len() < number_of_makers { + log::info!( + "Didn't receive enough addresses. Need: {}, Got : {}, Attempting again...", + number_of_makers, + addresses.len() + ); + thread::sleep(GLOBAL_PAUSE); + } else { + return Ok(addresses); } - return Ok(addresses); } Err(e) => { - log::error!("An error occurred: {:?}", e); - thread::sleep(Duration::from_secs(10)); + log::error!("Error decoding DNS response: {:?}. Re-attempting...", e); + thread::sleep(GLOBAL_PAUSE); continue; } } diff --git a/src/taker/routines.rs b/src/taker/routines.rs index a6a8321c..d5aef0e8 100644 --- a/src/taker/routines.rs +++ b/src/taker/routines.rs @@ -7,7 +7,8 @@ //! for communication between taker and maker. use serde::{Deserialize, Serialize}; -use std::time::Duration; +use socks::Socks5Stream; +use std::{io::ErrorKind, net::TcpStream, thread::sleep, time::Duration}; use crate::{ error::ProtocolError, @@ -25,19 +26,9 @@ use crate::{ }, Hash160, }, - utill::{read_maker_message, send_message, ConnectionType}, + utill::{read_message, send_message, ConnectionType}, }; use bitcoin::{secp256k1::SecretKey, Amount, PublicKey, ScriptBuf, Transaction}; -use tokio::{ - io::BufReader, - net::{ - tcp::{ReadHalf, WriteHalf}, - TcpStream, - }, - select, - time::sleep, -}; -use tokio_socks::tcp::Socks5Stream; use super::{ config::TakerConfig, @@ -62,56 +53,59 @@ pub struct ContractsInfo { pub wallet_label: String, } -/// Performs a handshake with a Maker and returns and Reader and Writer halves. -pub async fn handshake_maker( - socket: &mut TcpStream, -) -> Result<(BufReader, WriteHalf), TakerError> { - let (reader, mut socket_writer) = socket.split(); - let mut socket_reader = BufReader::new(reader); +/// Make a handshake with a maker. +/// Ensures that the Maker is alive and responding. +/// +// In future, handshake can be used to find protocol compatibility across multiple versions. +pub fn handshake_maker(socket: &mut TcpStream) -> Result<(), TakerError> { send_message( - &mut socket_writer, + socket, &TakerToMakerMessage::TakerHello(TakerHello { - protocol_version_min: 0, - protocol_version_max: 0, + protocol_version_min: 1, + protocol_version_max: 1, }), - ) - .await?; - let _makerhello = match read_maker_message(&mut socket_reader).await { - Ok(MakerToTakerMessage::MakerHello(m)) => m, - Ok(any) => { - return Err((ProtocolError::WrongMessage { - expected: "MakerHello".to_string(), - received: format!("{}", any), - }) - .into()); - } - Err(e) => { - return Err(e.into()); + )?; + let msg_bytes = read_message(socket)?; + let msg: MakerToTakerMessage = serde_cbor::from_slice(&msg_bytes)?; + + // Check that protocol version is always 1. + match msg { + MakerToTakerMessage::MakerHello(m) => { + if m.protocol_version_max == 1 && m.protocol_version_min == 1 { + Ok(()) + } else { + Err(ProtocolError::WrongMessage { + expected: "Only protocol version 1 is allowed".to_string(), + received: format!( + "min/max version = {}/{}", + m.protocol_version_min, m.protocol_version_max + ), + } + .into()) + } } - }; - Ok((socket_reader, socket_writer)) + any => Err((ProtocolError::WrongMessage { + expected: "MakerHello".to_string(), + received: format!("{}", any), + }) + .into()), + } } /// Request signatures for sender side of the hop. Attempt once. -pub(crate) async fn req_sigs_for_sender_once( - connection_type: ConnectionType, - maker_address: &MakerAddress, +pub(crate) fn req_sigs_for_sender_once( + socket: &mut TcpStream, outgoing_swapcoins: &[S], maker_multisig_nonces: &[SecretKey], maker_hashlock_nonces: &[SecretKey], locktime: u16, ) -> Result { - log::info!("Connecting to {}", maker_address); - let address = maker_address.to_string(); - - let mut socket = match connection_type { - ConnectionType::CLEARNET => TcpStream::connect(address).await?, - ConnectionType::TOR => Socks5Stream::connect("127.0.0.1:19050", address) - .await? - .into_inner(), - }; - let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?; - log::info!("===> Sending ReqContractSigsForSender to {}", maker_address); + log::info!("Connecting to {}", socket.peer_addr()?); + handshake_maker(socket)?; + log::info!( + "===> Sending ReqContractSigsForSender to {}", + socket.peer_addr()? + ); // TODO: Take this construction out of function body. let txs_info = maker_multisig_nonces @@ -133,16 +127,18 @@ pub(crate) async fn req_sigs_for_sender_once( .collect::>(); send_message( - &mut socket_writer, + socket, &TakerToMakerMessage::ReqContractSigsForSender(ReqContractSigsForSender { txs_info, hashvalue: outgoing_swapcoins[0].get_hashvalue(), locktime, }), - ) - .await?; - let contract_sigs_for_sender = match read_maker_message(&mut socket_reader).await { - Ok(MakerToTakerMessage::RespContractSigsForSender(m)) => { + )?; + + let msg_bytes = read_message(socket)?; + let msg: MakerToTakerMessage = serde_cbor::from_slice(&msg_bytes)?; + let contract_sigs_for_sender = match msg { + MakerToTakerMessage::RespContractSigsForSender(m) => { if m.sigs.len() != outgoing_swapcoins.len() { return Err((ProtocolError::WrongNumOfSigs { expected: outgoing_swapcoins.len(), @@ -153,16 +149,13 @@ pub(crate) async fn req_sigs_for_sender_once( m } } - Ok(any) => { + any => { return Err((ProtocolError::WrongMessage { expected: "RespContractSigsForSender".to_string(), received: format!("{}", any), }) .into()); } - Err(e) => { - return Err(e.into()); - } }; for (sig, outgoing_swapcoin) in contract_sigs_for_sender @@ -172,31 +165,25 @@ pub(crate) async fn req_sigs_for_sender_once( { outgoing_swapcoin.verify_contract_tx_sender_sig(sig)?; } - log::info!("<=== Received ContractSigsForSender from {}", maker_address); + log::info!( + "<=== Received ContractSigsForSender from {}", + socket.peer_addr()? + ); Ok(contract_sigs_for_sender) } /// Request signatures for receiver side of the hop. Attempt once. -pub(crate) async fn req_sigs_for_recvr_once( - connection_type: ConnectionType, - maker_address: &MakerAddress, +pub(crate) fn req_sigs_for_recvr_once( + socket: &mut TcpStream, incoming_swapcoins: &[S], receivers_contract_txes: &[Transaction], ) -> Result { - log::info!("Connecting to {}", maker_address); - let address = maker_address.to_string(); - let mut socket = match connection_type { - ConnectionType::CLEARNET => TcpStream::connect(address).await?, - ConnectionType::TOR => Socks5Stream::connect("127.0.0.1:19050", address) - .await? - .into_inner(), - }; - - let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?; + log::info!("Connecting to {}", socket.peer_addr()?); + handshake_maker(socket)?; // TODO: Take the message construction out of function body. send_message( - &mut socket_writer, + socket, &TakerToMakerMessage::ReqContractSigsForRecvr(ReqContractSigsForRecvr { txs: incoming_swapcoins .iter() @@ -207,10 +194,12 @@ pub(crate) async fn req_sigs_for_recvr_once( }) .collect::>(), }), - ) - .await?; - let contract_sigs_for_recvr = match read_maker_message(&mut socket_reader).await { - Ok(MakerToTakerMessage::RespContractSigsForRecvr(m)) => { + )?; + + let msg_bytes = read_message(socket)?; + let msg: MakerToTakerMessage = serde_cbor::from_slice(&msg_bytes)?; + let contract_sigs_for_recvr = match msg { + MakerToTakerMessage::RespContractSigsForRecvr(m) => { if m.sigs.len() != incoming_swapcoins.len() { return Err((ProtocolError::WrongNumOfSigs { expected: incoming_swapcoins.len(), @@ -221,16 +210,13 @@ pub(crate) async fn req_sigs_for_recvr_once( m } } - Ok(any) => { + any => { return Err((ProtocolError::WrongMessage { expected: "ContractSigsForRecvr".to_string(), received: format!("{}", any), }) .into()); } - Err(e) => { - return Err(e.into()); - } }; for (sig, swapcoin) in contract_sigs_for_recvr @@ -241,7 +227,10 @@ pub(crate) async fn req_sigs_for_recvr_once( swapcoin.verify_contract_tx_receiver_sig(sig)?; } - log::info!("<=== Received ContractSigsForRecvr from {}", maker_address); + log::info!( + "<=== Received ContractSigsForRecvr from {}", + socket.peer_addr()? + ); Ok(contract_sigs_for_recvr) } @@ -264,35 +253,39 @@ pub struct NextPeerInfoArgs { } /// [Internal] Send a Proof funding to the maker and init next hop. -pub(crate) async fn send_proof_of_funding_and_init_next_hop( - socket_reader: &mut BufReader>, - socket_writer: &mut WriteHalf<'_>, +pub(crate) fn send_proof_of_funding_and_init_next_hop( + socket: &mut TcpStream, tmi: ThisMakerInfo, npi: NextPeerInfoArgs, hashvalue: Hash160, ) -> Result<(ContractSigsAsRecvrAndSender, Vec), TakerError> { - send_message( - socket_writer, - &TakerToMakerMessage::RespProofOfFunding(ProofOfFunding { - confirmed_funding_txes: tmi.funding_tx_infos.clone(), - next_coinswap_info: npi - .next_peer_multisig_pubkeys - .iter() - .zip(npi.next_peer_hashlock_pubkeys.iter()) - .map( - |(&next_coinswap_multisig_pubkey, &next_hashlock_pubkey)| NextHopInfo { - next_multisig_pubkey: next_coinswap_multisig_pubkey, - next_hashlock_pubkey, - }, - ) - .collect::>(), - next_locktime: npi.next_maker_refund_locktime, - next_fee_rate: npi.next_maker_fee_rate.to_sat(), - }), - ) - .await?; - let contract_sigs_as_recvr_and_sender = match read_maker_message(socket_reader).await { - Ok(MakerToTakerMessage::ReqContractSigsAsRecvrAndSender(m)) => { + // Send POF + let next_coinswap_info = npi + .next_peer_multisig_pubkeys + .iter() + .zip(npi.next_peer_hashlock_pubkeys.iter()) + .map( + |(&next_coinswap_multisig_pubkey, &next_hashlock_pubkey)| NextHopInfo { + next_multisig_pubkey: next_coinswap_multisig_pubkey, + next_hashlock_pubkey, + }, + ) + .collect::>(); + + let pof_msg = TakerToMakerMessage::RespProofOfFunding(ProofOfFunding { + confirmed_funding_txes: tmi.funding_tx_infos.clone(), + next_coinswap_info, + next_locktime: npi.next_maker_refund_locktime, + next_fee_rate: npi.next_maker_fee_rate.to_sat(), + }); + + send_message(socket, &pof_msg)?; + + // Recv ContractSigsAsRecvrAndSender. + let msg_bytes = read_message(socket)?; + let msg: MakerToTakerMessage = serde_cbor::from_slice(&msg_bytes)?; + let contract_sigs_as_recvr_and_sender = match msg { + MakerToTakerMessage::ReqContractSigsAsRecvrAndSender(m) => { if m.receivers_contract_txs.len() != tmi.funding_tx_infos.len() { return Err((ProtocolError::WrongNumOfContractTxs { expected: tmi.funding_tx_infos.len(), @@ -309,16 +302,13 @@ pub(crate) async fn send_proof_of_funding_and_init_next_hop( m } } - Ok(any) => { + any => { return Err((ProtocolError::WrongMessage { expected: "ContractSigsAsRecvrAndSender".to_string(), received: format!("{}", any), }) .into()); } - Err(e) => { - return Err(e.into()); - } }; let funding_tx_values = tmi @@ -413,24 +403,24 @@ pub(crate) async fn send_proof_of_funding_and_init_next_hop( } /// Send hash preimage via the writer and read the response. -pub(crate) async fn send_hash_preimage_and_get_private_keys( - socket_reader: &mut BufReader>, - socket_writer: &mut WriteHalf<'_>, +pub(crate) fn send_hash_preimage_and_get_private_keys( + socket: &mut TcpStream, senders_multisig_redeemscripts: &[ScriptBuf], receivers_multisig_redeemscripts: &[ScriptBuf], preimage: &Preimage, ) -> Result { - send_message( - socket_writer, - &TakerToMakerMessage::RespHashPreimage(HashPreimage { - senders_multisig_redeemscripts: senders_multisig_redeemscripts.to_vec(), - receivers_multisig_redeemscripts: receivers_multisig_redeemscripts.to_vec(), - preimage: *preimage, - }), - ) - .await?; - let privkey_handover = match read_maker_message(socket_reader).await { - Ok(MakerToTakerMessage::RespPrivKeyHandover(m)) => { + let hash_preimage_msg = TakerToMakerMessage::RespHashPreimage(HashPreimage { + senders_multisig_redeemscripts: senders_multisig_redeemscripts.to_vec(), + receivers_multisig_redeemscripts: receivers_multisig_redeemscripts.to_vec(), + preimage: *preimage, + }); + + send_message(socket, &hash_preimage_msg)?; + + let msg_bytes = read_message(socket)?; + let msg: MakerToTakerMessage = serde_cbor::from_slice(&msg_bytes)?; + let privkey_handover = match msg { + MakerToTakerMessage::RespPrivKeyHandover(m) => { if m.multisig_privkeys.len() != receivers_multisig_redeemscripts.len() { return Err((ProtocolError::WrongNumOfPrivkeys { expected: receivers_multisig_redeemscripts.len(), @@ -441,42 +431,45 @@ pub(crate) async fn send_hash_preimage_and_get_private_keys( m } } - Ok(any) => { + any => { return Err((ProtocolError::WrongMessage { expected: "PrivkeyHandover".to_string(), received: format!("{}", any), }) .into()); } - Err(e) => { - return Err(e.into()); - } }; Ok(privkey_handover) } -async fn download_maker_offer_attempt_once( +fn download_maker_offer_attempt_once( addr: &MakerAddress, - connection_type: ConnectionType, + config: &TakerConfig, ) -> Result { let address = addr.to_string(); - - let mut socket = match connection_type { - ConnectionType::CLEARNET => TcpStream::connect(address).await?, - ConnectionType::TOR => Socks5Stream::connect("127.0.0.1:19050", address) - .await? - .into_inner(), + let mut socket = match config.connection_type { + ConnectionType::CLEARNET => TcpStream::connect(address)?, + ConnectionType::TOR => Socks5Stream::connect( + format!("127.0.0.1:{}", config.socks_port).as_str(), + address.as_ref(), + )? + .into_inner(), }; - let (mut socket_reader, mut socket_writer) = handshake_maker(&mut socket).await?; - send_message( - &mut socket_writer, - &TakerToMakerMessage::ReqGiveOffer(GiveOffer), - ) - .await?; + socket.set_read_timeout(Some(Duration::from_secs( + config.first_connect_attempt_timeout_sec, + )))?; + socket.set_write_timeout(Some(Duration::from_secs( + config.first_connect_attempt_timeout_sec, + )))?; - let msg = read_maker_message(&mut socket_reader).await?; + handshake_maker(&mut socket)?; + + send_message(&mut socket, &TakerToMakerMessage::ReqGiveOffer(GiveOffer))?; + + let msg_bytes = read_message(&mut socket)?; + let msg: MakerToTakerMessage = serde_cbor::from_slice(&msg_bytes)?; let offer = match msg { MakerToTakerMessage::RespOffer(offer) => offer, msg => { @@ -490,44 +483,48 @@ async fn download_maker_offer_attempt_once( Ok(*offer) } -pub async fn download_maker_offer( - address: MakerAddress, - config: TakerConfig, -) -> Option { +pub fn download_maker_offer(address: MakerAddress, config: TakerConfig) -> Option { let mut ii = 0; + loop { ii += 1; - select! { - ret = download_maker_offer_attempt_once(&address, config.connection_type) => { - match ret { - Ok(offer) => return Some(OfferAndAddress { offer, address }), - Err(e) => { + match download_maker_offer_attempt_once(&address, &config) { + Ok(offer) => return Some(OfferAndAddress { offer, address }), + Err(TakerError::IO(e)) => { + if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut { + if ii <= config.first_connect_attempts { log::warn!( - "Failed to request offer from maker {}, \ - reattempting... error={:?}", - address, - e + "Timeout for request offer from maker {}, reattempting...", + address ); - if ii <= config.first_connect_attempts { - sleep(Duration::from_secs(config.first_connect_sleep_delay_sec)).await; - continue; - } else { - return None; - } + continue; + } else { + log::error!( + "Timeout attempt exceeded for request offer from maker {}, ", + address + ); + return None; } } - }, - _ = sleep(Duration::from_secs(config.first_connect_attempt_timeout_sec)) => { - log::warn!( - "Timeout for request offer from maker {}, reattempting...", - address - ); + } + + Err(e) => { if ii <= config.first_connect_attempts { + log::warn!( + "Failed to request offer from maker {}, reattempting... error={:?}", + address, + e + ); + sleep(Duration::from_secs(config.first_connect_sleep_delay_sec)); continue; } else { + log::error!( + "Connection attempt exceeded for request offer from maker {}", + address + ); return None; } - }, + } } } } diff --git a/src/utill.rs b/src/utill.rs index 99753548..b202f568 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -3,6 +3,7 @@ use std::{ env, io::{ErrorKind, Read}, + net::TcpStream, path::{Path, PathBuf}, str::FromStr, sync::Once, @@ -30,20 +31,12 @@ use std::{ time::Duration, }; -use serde_json::Value; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt, BufReader}, - net::tcp::{ReadHalf, WriteHalf}, -}; - use crate::{ error::NetError, - protocol::{ - contract::derive_maker_pubkey_and_nonce, - messages::{MakerToTakerMessage, MultisigPrivkey}, - }, + protocol::{contract::derive_maker_pubkey_and_nonce, messages::MultisigPrivkey}, wallet::{SwapCoin, WalletError}, }; +use serde_json::Value; const INPUT_CHARSET: &str = "0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ "; @@ -53,6 +46,15 @@ const MASK_LOW_35_BITS: u64 = 0x7ffffffff; const SHIFT_FOR_C0: u64 = 35; const CHECKSUM_FINAL_XOR_VALUE: u64 = 1; +/// Global timeout for all network connections. +pub const NET_TIMEOUT: Duration = Duration::from_secs(60); + +/// Used as delays on reattempting some network communications. +pub const GLOBAL_PAUSE: Duration = Duration::from_secs(10); + +/// Global heartbeat interval for internal server threads. +pub const HEART_BEAT_INTERVAL: Duration = Duration::from_secs(3); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ConnectionType { TOR, @@ -159,26 +161,34 @@ pub fn setup_logger() { }); } -/// Can send both Taker and Maker messages. -pub async fn send_message( - socket_writer: &mut WriteHalf<'_>, +/// Send a length-appended Protocol or RPC Message through a stream. +/// The first byte sent is the length of the actual message. +pub fn send_message( + socket_writer: &mut TcpStream, message: &impl serde::Serialize, ) -> Result<(), NetError> { - let message_cbor = serde_cbor::ser::to_vec(message).map_err(NetError::Cbor)?; - socket_writer.write_u32(message_cbor.len() as u32).await?; - socket_writer.write_all(&message_cbor).await?; + let msg_bytes = serde_cbor::ser::to_vec(message).map_err(NetError::Cbor)?; + let msg_len = (msg_bytes.len() as u32).to_be_bytes(); + let mut to_send = Vec::with_capacity(msg_bytes.len() + msg_len.len()); + to_send.extend(msg_len); + to_send.extend(msg_bytes); + socket_writer.write_all(&to_send)?; + socket_writer.flush()?; Ok(()) } -/// Read a Maker Message. -pub async fn read_maker_message( - reader: &mut BufReader>, -) -> Result { - let length = reader.read_u32().await?; +/// Reads a response byte_array from a given stream. +/// Response can be any length-appended data, where the first byte is the length of the actual message. +pub fn read_message(reader: &mut TcpStream) -> Result, NetError> { + // length of incoming data + let mut len_buff = [0u8; 4]; + reader.read_exact(&mut len_buff)?; // This can give UnexpectedEOF error if theres no data to read + let length = u32::from_be_bytes(len_buff); + + // the actual data let mut buffer = vec![0; length as usize]; - reader.read_exact(&mut buffer).await?; - let message: MakerToTakerMessage = serde_cbor::from_slice(&buffer)?; - Ok(message) + reader.read_exact(&mut buffer)?; + Ok(buffer) } /// Apply the maker's privatekey to swapcoins, and check it's the correct privkey for corresponding pubkey. @@ -482,6 +492,8 @@ pub fn read_connection_network_string(network: &str) -> Result sha256d::Hash { + pub fn generate_cert_hash(&self, onion_addr: &str) -> sha256d::Hash { let cert_msg_str = format!( "fidelity-bond-cert|{}|{}|{}|{}|{}|{}", self.outpoint, self.pubkey, self.cert_expiry, self.lock_time, self.amount, onion_addr @@ -508,7 +508,7 @@ impl Wallet { pub fn generate_fidelity_proof( &self, index: u32, - maker_addr: String, + maker_addr: &str, ) -> Result { // Generate a fidelity bond proof from the fidelity data. let (bond, _, is_spent) = self @@ -542,7 +542,7 @@ impl Wallet { pub fn verify_fidelity_proof( &self, proof: &FidelityProof, - onion_addr: String, + onion_addr: &str, ) -> Result<(), WalletError> { if self.is_fidelity_expired(&proof.bond)? { return Err(FidelityError::CertExpired.into()); diff --git a/tests/abort1.rs b/tests/abort1.rs index 18582db3..43e3314c 100644 --- a/tests/abort1.rs +++ b/tests/abort1.rs @@ -19,8 +19,8 @@ use test_framework::*; /// /// The Taker after coming live again will see unfinished coinswaps in his wallet. He can reclaim his funds via /// broadcasting his contract transactions and claiming via timelock. -#[tokio::test] -async fn test_stop_taker_after_setup() { +#[test] +fn test_stop_taker_after_setup() { // ---- Setup ---- // 2 Makers with Normal behavior. @@ -36,8 +36,7 @@ async fn test_stop_taker_after_setup() { makers_config_map.into(), Some(TakerBehavior::DropConnectionAfterFullSetup), ConnectionType::CLEARNET, - ) - .await; + ); warn!("Running Test: Taker Cheats on Everybody."); diff --git a/tests/abort2_case1.rs b/tests/abort2_case1.rs index 8130f03c..0fd12d8f 100644 --- a/tests/abort2_case1.rs +++ b/tests/abort2_case1.rs @@ -19,8 +19,8 @@ use std::{thread, time::Duration}; /// not swap this maker again. /// /// CASE 1: Maker Drops Before Sending Sender's Signature, and Taker carries on with a new Maker. -#[tokio::test] -async fn test_abort_case_2_move_on_with_other_makers() { +#[test] +fn test_abort_case_2_move_on_with_other_makers() { // ---- Setup ---- // 6102 is naughty. But theres enough good ones. @@ -37,8 +37,7 @@ async fn test_abort_case_2_move_on_with_other_makers() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); warn!( "Running Test: Maker 6102 closes before sending sender's sigs. Taker moves on with other Makers." diff --git a/tests/abort2_case2.rs b/tests/abort2_case2.rs index 57481bd4..ebedab01 100644 --- a/tests/abort2_case2.rs +++ b/tests/abort2_case2.rs @@ -19,8 +19,8 @@ use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration}; /// not swap this maker again. /// /// CASE 2: Maker Drops Before Sending Sender's Signature, and Taker cannot find a new Maker, recovers from Swap. -#[tokio::test] -async fn test_abort_case_2_recover_if_no_makers_found() { +#[test] +fn test_abort_case_2_recover_if_no_makers_found() { // ---- Setup ---- // 6102 is naughty. And theres not enough makers. @@ -43,8 +43,7 @@ async fn test_abort_case_2_recover_if_no_makers_found() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); // Fund the Taker and Makers with 3 utxos of 0.05 btc each. for _ in 0..3 { diff --git a/tests/abort2_case3.rs b/tests/abort2_case3.rs index 617391bf..13e15185 100644 --- a/tests/abort2_case3.rs +++ b/tests/abort2_case3.rs @@ -19,8 +19,8 @@ use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration}; /// not swap this maker again. /// /// CASE 3: Maker Drops After Sending Sender's Signature. Taker and other Maker recovers. -#[tokio::test] -async fn maker_drops_after_sending_senders_sigs() { +#[test] +fn maker_drops_after_sending_senders_sigs() { // ---- Setup ---- // 6102 is naughty. And theres not enough makers. @@ -36,8 +36,7 @@ async fn maker_drops_after_sending_senders_sigs() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); warn!( "Running Test: Maker 6102 Closes after sending sender's signature. This is really bad. Recovery is the only option." diff --git a/tests/abort3_case1.rs b/tests/abort3_case1.rs index a637e491..9c8f0774 100644 --- a/tests/abort3_case1.rs +++ b/tests/abort3_case1.rs @@ -18,8 +18,8 @@ use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration}; /// Maker closes connection after receiving a `ContractSigsForRecvrAndSender` and doesn't broadcasts it's funding txs. /// Taker wait until a timeout (10ses for test, 5mins for prod) and starts recovery after that. // This is problematic. Needs more detailed thought. -#[tokio::test] -async fn abort3_case1_close_at_contract_sigs_for_recvr_and_sender() { +#[test] +fn abort3_case1_close_at_contract_sigs_for_recvr_and_sender() { // ---- Setup ---- // 6102 is naughty. And theres not enough makers. @@ -38,8 +38,7 @@ async fn abort3_case1_close_at_contract_sigs_for_recvr_and_sender() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); warn!("Running Test: Maker closes connection after receiving a ContractSigsForRecvrAndSender"); diff --git a/tests/abort3_case2.rs b/tests/abort3_case2.rs index 775d16d2..646b0b31 100644 --- a/tests/abort3_case2.rs +++ b/tests/abort3_case2.rs @@ -18,8 +18,8 @@ use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration}; /// Maker closes connection after sending a `ContractSigsForRecvr`. Funding txs are already broadcasted. /// The Maker will loose contract txs fees in that case, so it's not a malice. /// Taker waits for the response until timeout. Aborts if the Maker doesn't show up. -#[tokio::test] -async fn abort3_case2_close_at_contract_sigs_for_recvr() { +#[test] +fn abort3_case2_close_at_contract_sigs_for_recvr() { // ---- Setup ---- // 6102 is naughty. And theres not enough makers. @@ -35,8 +35,7 @@ async fn abort3_case2_close_at_contract_sigs_for_recvr() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); warn!("Running Test: Maker closes connection after sending a ContractSigsForRecvr"); diff --git a/tests/abort3_case3.rs b/tests/abort3_case3.rs index 6b3dad7d..68a3c535 100644 --- a/tests/abort3_case3.rs +++ b/tests/abort3_case3.rs @@ -18,8 +18,8 @@ use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration}; /// Maker closes connection at hash preimage handling. Funding txs are already broadcasted. /// The Maker will loose contract txs fees in that case, so it's not a malice. /// Taker waits for the response until timeout. Aborts if the Maker doesn't show up. -#[tokio::test] -async fn abort3_case2_close_at_contract_sigs_for_recvr() { +#[test] +fn abort3_case3_close_at_hash_preimage_handover() { // ---- Setup ---- // 6102 is naughty. And theres not enough makers. @@ -35,8 +35,7 @@ async fn abort3_case2_close_at_contract_sigs_for_recvr() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); warn!("Running Test: Maker closes conneciton at hash preimage handling"); diff --git a/tests/fidelity.rs b/tests/fidelity.rs index b2d0acea..6f965d3d 100644 --- a/tests/fidelity.rs +++ b/tests/fidelity.rs @@ -1,9 +1,8 @@ #![cfg(feature = "integration-test")] use bitcoin::{absolute::LockTime, Amount}; use coinswap::{ - maker::{error::MakerError, start_maker_server, MakerBehavior}, + maker::{start_maker_server, MakerBehavior}, utill::ConnectionType, - wallet::{FidelityError, WalletError}, }; mod test_framework; @@ -24,8 +23,8 @@ use std::{thread, time::Duration}; /// /// Maker server will error if not enough balance is present to create fidelity bond. /// A custom fidelity bond can be create using the `create_fidelity()` API. -#[tokio::test] -async fn test_fidelity() { +#[test] +fn test_fidelity() { // ---- Setup ---- let makers_config_map = [((6102, None), MakerBehavior::Normal)]; @@ -35,8 +34,7 @@ async fn test_fidelity() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); let maker = makers.first().unwrap(); @@ -56,17 +54,12 @@ async fn test_fidelity() { let maker_clone = maker.clone(); let maker_thread = thread::spawn(move || start_maker_server(maker_clone)); - thread::sleep(Duration::from_secs(5)); + thread::sleep(Duration::from_secs(20)); maker.shutdown().unwrap(); - let expected_error = maker_thread.join().unwrap(); - - matches!( - expected_error.err().unwrap(), - MakerError::Wallet(WalletError::Fidelity(FidelityError::InsufficientFund { - available: 4000000, - required: 5000000, - })) - ); + let _ = maker_thread.join().unwrap(); + + // TODO: Assert that fund request for fidelity is printed in the log. + *maker.shutdown.write().unwrap() = false; // Give Maker more funds and check fidelity bond is created at the restart of server. test_framework.send_to_address(&maker_addrs, Amount::from_btc(0.04).unwrap()); @@ -75,7 +68,7 @@ async fn test_fidelity() { let maker_clone = maker.clone(); let maker_thread = thread::spawn(move || start_maker_server(maker_clone)); - thread::sleep(Duration::from_secs(5)); + thread::sleep(Duration::from_secs(20)); maker.shutdown().unwrap(); let success = maker_thread.join().unwrap(); diff --git a/tests/malice1.rs b/tests/malice1.rs index 5998bb2d..44f82dab 100644 --- a/tests/malice1.rs +++ b/tests/malice1.rs @@ -16,8 +16,8 @@ use std::{assert_eq, collections::BTreeSet, thread, time::Duration}; /// /// The Makers identify the situation and gets their money back via contract txs. This is /// a potential DOS on Makers. But Taker would loose money too for doing this. -#[tokio::test] -async fn malice1_taker_broadcast_contract_prematurely() { +#[test] +fn malice1_taker_broadcast_contract_prematurely() { // ---- Setup ---- let makers_config_map = [ @@ -32,8 +32,7 @@ async fn malice1_taker_broadcast_contract_prematurely() { makers_config_map.into(), Some(TakerBehavior::BroadcastContractAfterFullSetup), ConnectionType::CLEARNET, - ) - .await; + ); warn!("Running Test: Taker broadcasts contract transaction prematurely"); diff --git a/tests/malice2.rs b/tests/malice2.rs index d2302034..e5a19050 100644 --- a/tests/malice2.rs +++ b/tests/malice2.rs @@ -18,8 +18,8 @@ use std::{collections::BTreeSet, thread, time::Duration}; /// /// This case is hard to "blame". As the contract transactions is available to both the Makers, its not identifiable /// which Maker is the culprit. Taker does not ban in this case. -#[tokio::test] -async fn malice2_maker_broadcast_contract_prematurely() { +#[test] +fn malice2_maker_broadcast_contract_prematurely() { // ---- Setup ---- let makers_config_map = [ @@ -34,8 +34,7 @@ async fn malice2_maker_broadcast_contract_prematurely() { makers_config_map.into(), Some(TakerBehavior::Normal), ConnectionType::CLEARNET, - ) - .await; + ); // Fund the Taker and Makers with 3 utxos of 0.05 btc each. for _ in 0..3 { diff --git a/tests/standard_swap.rs b/tests/standard_swap.rs index 9df6e97b..9a6052c4 100644 --- a/tests/standard_swap.rs +++ b/tests/standard_swap.rs @@ -17,14 +17,14 @@ use std::{assert_eq, thread, time::Duration}; /// This test demonstrates a standard coinswap round between a Taker and 2 Makers. Nothing goes wrong /// and the coinswap completes successfully. -#[tokio::test] -async fn test_standard_coinswap() { +#[test] +fn test_standard_coinswap() { // ---- Setup ---- // 2 Makers with Normal behavior. let makers_config_map = [ - ((6102, None), MakerBehavior::Normal), - ((16102, None), MakerBehavior::Normal), + ((6102, Some(19051)), MakerBehavior::Normal), + ((16102, Some(19052)), MakerBehavior::Normal), ]; // Initiate test framework, Makers and a Taker with default behavior. @@ -33,8 +33,7 @@ async fn test_standard_coinswap() { makers_config_map.into(), None, ConnectionType::CLEARNET, - ) - .await; + ); warn!("Running Test: Standard Coinswap Procedure"); diff --git a/tests/test_framework/mod.rs b/tests/test_framework/mod.rs index 7d951ed5..5e015954 100644 --- a/tests/test_framework/mod.rs +++ b/tests/test_framework/mod.rs @@ -66,7 +66,8 @@ impl TestFramework { /// Returns ([TestFramework], [Taker], [`Vec`]). /// Maker's config will follow the pattern given the input HashMap. /// If no bitcoind conf is provide a default value will be used. - pub async fn init( + #[allow(clippy::type_complexity)] + pub fn init( bitcoind_conf: Option>, makers_config_map: HashMap<(u16, Option), MakerBehavior>, taker_behavior: Option,