diff --git a/roles/jd-client/src/lib/job_declarator/mod.rs b/roles/jd-client/src/lib/job_declarator/mod.rs index 1c019c85de..d70b2949be 100644 --- a/roles/jd-client/src/lib/job_declarator/mod.rs +++ b/roles/jd-client/src/lib/job_declarator/mod.rs @@ -11,7 +11,7 @@ use roles_logic_sv2::{ template_distribution_sv2::SetNewPrevHash, utils::{hash_lists_tuple, Mutex}, }; -use std::{collections::HashMap, convert::TryInto, str::FromStr}; +use std::{collections::HashMap, convert::TryInto}; use stratum_common::bitcoin::{util::psbt::serialize::Deserialize, Transaction}; use tokio::task::AbortHandle; use tracing::{error, info}; @@ -24,10 +24,7 @@ use roles_logic_sv2::{ template_distribution_sv2::NewTemplate, utils::Id, }; -use std::{ - net::{IpAddr, SocketAddr}, - sync::Arc, -}; +use std::{net::SocketAddr, sync::Arc}; pub type Message = PoolMessages<'static>; pub type SendTo = SendTo_, ()>; @@ -90,17 +87,12 @@ impl JobDeclarator { .await .expect("impossible to connect"); - let proxy_address = SocketAddr::new( - IpAddr::from_str(&config.downstream_address).unwrap(), - config.downstream_port, - ); - info!( "JD proxy: setupconnection Proxy address: {:?}", - proxy_address + config.listen_address ); - SetupConnectionHandler::setup(&mut receiver, &mut sender, proxy_address) + SetupConnectionHandler::setup(&mut receiver, &mut sender, config.listen_address) .await .unwrap(); diff --git a/roles/jd-client/src/lib/mod.rs b/roles/jd-client/src/lib/mod.rs index 6c68bd8e48..4b77400f78 100644 --- a/roles/jd-client/src/lib/mod.rs +++ b/roles/jd-client/src/lib/mod.rs @@ -194,6 +194,10 @@ impl JobDeclaratorClient { } } + pub fn is_listening(&self) -> bool { + std::net::TcpStream::connect(self.config.listen_address).is_ok() + } + async fn initialize_jd_as_solo_miner( proxy_config: ProxyConfig, tx_status: async_channel::Sender>, @@ -206,15 +210,9 @@ impl JobDeclaratorClient { // SubmitSolution and send it to the TemplateReceiver let (send_solution, recv_solution) = bounded(10); - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - // Wait for downstream to connect let downstream = downstream::listen_for_downstream_mining( - downstream_addr, + proxy_config.listen_address, None, send_solution, proxy_config.withhold, @@ -319,12 +317,6 @@ impl JobDeclaratorClient { panic!() } - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - // Initialize JD part let mut parts = proxy_config.tp_address.split(':'); let ip_tp = parts.next().unwrap().to_string(); @@ -355,7 +347,7 @@ impl JobDeclaratorClient { // Wait for downstream to connect let downstream = match downstream::listen_for_downstream_mining( - downstream_addr, + proxy_config.listen_address, Some(upstream), send_solution, proxy_config.withhold, diff --git a/roles/jd-client/src/lib/proxy_config.rs b/roles/jd-client/src/lib/proxy_config.rs index f1df8fc452..710a23072a 100644 --- a/roles/jd-client/src/lib/proxy_config.rs +++ b/roles/jd-client/src/lib/proxy_config.rs @@ -2,7 +2,7 @@ use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; use roles_logic_sv2::{errors::Error, utils::CoinbaseOutput as CoinbaseOutput_}; use serde::Deserialize; -use std::time::Duration; +use std::{net::SocketAddr, time::Duration}; use stratum_common::bitcoin::TxOut; #[derive(Debug, Deserialize, Clone)] @@ -36,8 +36,7 @@ impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ { #[derive(Debug, Deserialize, Clone)] pub struct ProxyConfig { - pub downstream_address: String, - pub downstream_port: u16, + pub listen_address: SocketAddr, pub max_supported_version: u16, pub min_supported_version: u16, pub min_extranonce2_size: u16, @@ -118,7 +117,7 @@ impl ProtocolConfig { impl ProxyConfig { pub fn new( - listening_address: std::net::SocketAddr, + listen_address: std::net::SocketAddr, protocol_config: ProtocolConfig, withhold: bool, pool_config: PoolConfig, @@ -127,8 +126,7 @@ impl ProxyConfig { timeout: Duration, ) -> Self { Self { - downstream_address: listening_address.ip().to_string(), - downstream_port: listening_address.port(), + listen_address, max_supported_version: protocol_config.max_supported_version, min_supported_version: protocol_config.min_supported_version, min_extranonce2_size: protocol_config.min_extranonce2_size, diff --git a/roles/jd-server/src/lib/mod.rs b/roles/jd-server/src/lib/mod.rs index 4c4f44b60c..c3d75a40c7 100644 --- a/roles/jd-server/src/lib/mod.rs +++ b/roles/jd-server/src/lib/mod.rs @@ -20,6 +20,7 @@ use roles_logic_sv2::{ use serde::Deserialize; use std::{ convert::{TryFrom, TryInto}, + net::SocketAddr, time::Duration, }; use stratum_common::bitcoin::{Script, TxOut}; @@ -200,6 +201,10 @@ impl JobDeclaratorServer { } } } + + pub fn is_listening(&self) -> bool { + std::net::TcpStream::connect(self.config.listen_jd_address).is_ok() + } } pub fn get_coinbase_output(config: &Configuration) -> Result, Error> { @@ -251,7 +256,7 @@ impl CoinbaseOutput { pub struct Configuration { #[serde(default = "default_true")] pub async_mining_allowed: bool, - pub listen_jd_address: String, + pub listen_jd_address: SocketAddr, pub authority_public_key: Secp256k1PublicKey, pub authority_secret_key: Secp256k1SecretKey, pub cert_validity_sec: u64, @@ -285,7 +290,7 @@ impl CoreRpc { impl Configuration { pub fn new( - listen_jd_address: String, + listen_jd_address: SocketAddr, authority_public_key: Secp256k1PublicKey, authority_secret_key: Secp256k1SecretKey, cert_validity_sec: u64, diff --git a/roles/pool/src/lib/mining_pool/mod.rs b/roles/pool/src/lib/mining_pool/mod.rs index 2b179c7885..1ada20e210 100644 --- a/roles/pool/src/lib/mining_pool/mod.rs +++ b/roles/pool/src/lib/mining_pool/mod.rs @@ -93,7 +93,7 @@ impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ { #[derive(Debug, Deserialize, Clone)] pub struct Configuration { - pub listen_address: String, + pub listen_address: SocketAddr, pub tp_address: String, pub tp_authority_public_key: Option, pub authority_public_key: Secp256k1PublicKey, @@ -134,13 +134,13 @@ impl AuthorityConfig { } pub struct ConnectionConfig { - listen_address: String, + listen_address: SocketAddr, cert_validity_sec: u64, signature: String, } impl ConnectionConfig { - pub fn new(listen_address: String, cert_validity_sec: u64, signature: String) -> Self { + pub fn new(listen_address: SocketAddr, cert_validity_sec: u64, signature: String) -> Self { Self { listen_address, cert_validity_sec, diff --git a/roles/pool/src/lib/mod.rs b/roles/pool/src/lib/mod.rs index 4fee0be15c..d49c171304 100644 --- a/roles/pool/src/lib/mod.rs +++ b/roles/pool/src/lib/mod.rs @@ -100,6 +100,10 @@ impl PoolSv2 { } } } + + pub fn is_listening(&self) -> bool { + std::net::TcpStream::connect(self.config.listen_address).is_ok() + } } #[cfg(test)] diff --git a/roles/tests-integration/lib/mod.rs b/roles/tests-integration/lib/mod.rs index c6cb7354c6..c72c00efbb 100644 --- a/roles/tests-integration/lib/mod.rs +++ b/roles/tests-integration/lib/mod.rs @@ -62,7 +62,7 @@ pub async fn start_pool(template_provider_address: Option) -> (PoolS "127.0.0.1:8442".to_string() }; let connection_config = pool_sv2::mining_pool::ConnectionConfig::new( - listening_address.to_string(), + listening_address, cert_validity_sec, pool_signature, ); @@ -178,7 +178,7 @@ pub async fn start_jds(tp_address: SocketAddr) -> (JobDeclaratorServer, SocketAd "tp_password".to_string(), ); let config = Configuration::new( - listen_jd_address.to_string(), + listen_jd_address, authority_public_key, authority_secret_key, cert_validity_sec, @@ -203,7 +203,6 @@ pub async fn start_sv2_translator(upstream: SocketAddr) -> (TranslatorSv2, Socke ) .expect("failed"); let listening_address = get_available_address(); - let listening_port = listening_address.port(); let hashrate = measure_hashrate(1) as f32 / 100.0; let min_individual_miner_hashrate = hashrate; let shares_per_minute = 60.0; @@ -229,8 +228,7 @@ pub async fn start_sv2_translator(upstream: SocketAddr) -> (TranslatorSv2, Socke upstream_difficulty_config, ); let downstream_conf = translator_sv2::proxy_config::DownstreamConfig::new( - listening_address.ip().to_string(), - listening_port, + listening_address, downstream_difficulty_config, ); diff --git a/roles/tests-integration/tests/jd_integration.rs b/roles/tests-integration/tests/jd_integration.rs index 58b98d0989..b6ce6b0c20 100644 --- a/roles/tests-integration/tests/jd_integration.rs +++ b/roles/tests-integration/tests/jd_integration.rs @@ -35,3 +35,17 @@ async fn jds_should_not_panic_if_jdc_shutsdown() { let (_jdc, _jdc_addr) = start_jdc(pool_addr, tp_addr, sniffer_addr).await; assert_common_message!(sniffer.next_message_from_downstream(), SetupConnection); } + +// This test makes sure that JDS will not stop listening after one +// downstream client disconnects +#[tokio::test] +async fn jds_survives_downstream_disconnect() { + todo!() +} + +// This test makes sure that JDC will not stop listening after one +// downstream client disconnects +#[tokio::test] +async fn jdc_survives_downstream_disconnect() { + todo!() +} diff --git a/roles/tests-integration/tests/pool_integration.rs b/roles/tests-integration/tests/pool_integration.rs index 94c832a0dd..36ec8b6253 100644 --- a/roles/tests-integration/tests/pool_integration.rs +++ b/roles/tests-integration/tests/pool_integration.rs @@ -122,3 +122,30 @@ async fn header_timestamp_value_assertion_in_new_extended_mining_job() { "The `minntime` field of the second NewExtendedMiningJob does not match the `header_timestamp`!" ); } + +// This test makes sure that Pool will not stop listening after one +// downstream client disconnects +#[tokio::test] +async fn pool_survives_downstream_disconnect() { + let (_tp, tp_addr) = start_template_provider(None).await; + let (pool, pool_addr) = start_pool(Some(tp_addr)).await; + + // emulate first downstream + let downstream_a = std::net::TcpStream::connect(pool_addr).unwrap(); + + // emulate second downstream + let _downstream_b = std::net::TcpStream::connect(pool_addr).unwrap(); + + // wait a bit to make sure the TCP sockets are processed + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // kill downstream_a + downstream_a.shutdown(std::net::Shutdown::Both).unwrap(); + drop(downstream_a); + + // wait a bit to make sure the TCP sockets are processed + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // pool still listening + assert!(pool.is_listening()); +} diff --git a/roles/tests-integration/tests/translator_integration.rs b/roles/tests-integration/tests/translator_integration.rs index 6aabfb4cab..3ef8b9067e 100644 --- a/roles/tests-integration/tests/translator_integration.rs +++ b/roles/tests-integration/tests/translator_integration.rs @@ -44,3 +44,31 @@ async fn translation_proxy() { ) .await; } + +// This test makes sure that tProxy will not stop listening after one +// downstream client disconnects +#[tokio::test] +async fn tproxy_survives_downstream_disconnect() { + let (_tp, tp_addr) = start_template_provider(None).await; + let (_pool, pool_addr) = start_pool(Some(tp_addr)).await; + let (tproxy, tproxy_addr) = start_sv2_translator(pool_addr).await; + + // emulate first downstream + let downstream_a = std::net::TcpStream::connect(tproxy_addr).unwrap(); + + // emulate second downstream + let _downstream_b = std::net::TcpStream::connect(tproxy_addr).unwrap(); + + // wait a bit to make sure the TCP sockets are processed + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // kill downstream_a + downstream_a.shutdown(std::net::Shutdown::Both).unwrap(); + drop(downstream_a); + + // wait a bit to make sure the TCP sockets are processed + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // tproxy still listening + assert!(tproxy.is_listening()); +} diff --git a/roles/translator/src/lib/mod.rs b/roles/translator/src/lib/mod.rs index 5618d98728..b7b8676dc3 100644 --- a/roles/translator/src/lib/mod.rs +++ b/roles/translator/src/lib/mod.rs @@ -266,16 +266,10 @@ impl TranslatorSv2 { ); proxy::Bridge::start(b.clone()); - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - let task_collector_downstream = task_collector_init_task.clone(); // Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) downstream_sv1::Downstream::accept_connections( - downstream_addr, + proxy_config.listen_address, tx_sv1_bridge, tx_sv1_notify, status::Sender::DownstreamListener(tx_status.clone()), @@ -297,6 +291,10 @@ impl TranslatorSv2 { pub fn shutdown(&self) { self.shutdown.notify_one(); } + + pub fn is_listening(&self) -> bool { + std::net::TcpStream::connect(self.config.listen_address).is_ok() + } } fn kill_tasks(task_collector: Arc>>) { diff --git a/roles/translator/src/lib/proxy_config.rs b/roles/translator/src/lib/proxy_config.rs index ec3711c5d6..c1db74745d 100644 --- a/roles/translator/src/lib/proxy_config.rs +++ b/roles/translator/src/lib/proxy_config.rs @@ -1,13 +1,13 @@ use key_utils::Secp256k1PublicKey; use serde::Deserialize; +use std::net::SocketAddr; #[derive(Debug, Deserialize, Clone)] pub struct ProxyConfig { pub upstream_address: String, pub upstream_port: u16, pub upstream_authority_pubkey: Secp256k1PublicKey, - pub downstream_address: String, - pub downstream_port: u16, + pub listen_address: SocketAddr, pub max_supported_version: u16, pub min_supported_version: u16, pub min_extranonce2_size: u16, @@ -39,16 +39,14 @@ impl UpstreamConfig { } pub struct DownstreamConfig { - address: String, - port: u16, + listen_address: SocketAddr, difficulty_config: DownstreamDifficultyConfig, } impl DownstreamConfig { - pub fn new(address: String, port: u16, difficulty_config: DownstreamDifficultyConfig) -> Self { + pub fn new(listen_address: SocketAddr, difficulty_config: DownstreamDifficultyConfig) -> Self { Self { - address, - port, + listen_address, difficulty_config, } } @@ -66,8 +64,7 @@ impl ProxyConfig { upstream_address: upstream.address, upstream_port: upstream.port, upstream_authority_pubkey: upstream.authority_pubkey, - downstream_address: downstream.address, - downstream_port: downstream.port, + listen_address: downstream.listen_address, max_supported_version, min_supported_version, min_extranonce2_size,