Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Integration Test for disconnecting downstream connections on all roles #1434

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use roles_logic_sv2::{
template_distribution_sv2::SetNewPrevHash,
utils::{hash_lists_tuple, Mutex},
};
use std::{collections::HashMap, convert::TryInto, str::FromStr};
use std::{collections::HashMap, convert::TryInto};
use stratum_common::bitcoin::{util::psbt::serialize::Deserialize, Transaction};
use tokio::task::AbortHandle;
use tracing::{error, info};
Expand All @@ -24,10 +24,7 @@ use roles_logic_sv2::{
template_distribution_sv2::NewTemplate,
utils::Id,
};
use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
};
use std::{net::SocketAddr, sync::Arc};

pub type Message = PoolMessages<'static>;
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
Expand Down Expand Up @@ -90,17 +87,12 @@ impl JobDeclarator {
.await
.expect("impossible to connect");

let proxy_address = SocketAddr::new(
IpAddr::from_str(&config.downstream_address).unwrap(),
config.downstream_port,
);

info!(
"JD proxy: setupconnection Proxy address: {:?}",
proxy_address
config.listen_address
);

SetupConnectionHandler::setup(&mut receiver, &mut sender, proxy_address)
SetupConnectionHandler::setup(&mut receiver, &mut sender, config.listen_address)
.await
.unwrap();

Expand Down
20 changes: 6 additions & 14 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ impl JobDeclaratorClient {
}
}

pub fn is_listening(&self) -> bool {
std::net::TcpStream::connect(self.config.listen_address).is_ok()
}

async fn initialize_jd_as_solo_miner(
proxy_config: ProxyConfig,
tx_status: async_channel::Sender<status::Status<'static>>,
Expand All @@ -206,15 +210,9 @@ impl JobDeclaratorClient {
// SubmitSolution and send it to the TemplateReceiver
let (send_solution, recv_solution) = bounded(10);

// Format `Downstream` connection address
let downstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
proxy_config.downstream_port,
);

// Wait for downstream to connect
let downstream = downstream::listen_for_downstream_mining(
downstream_addr,
proxy_config.listen_address,
None,
send_solution,
proxy_config.withhold,
Expand Down Expand Up @@ -319,12 +317,6 @@ impl JobDeclaratorClient {
panic!()
}

// Format `Downstream` connection address
let downstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
proxy_config.downstream_port,
);

// Initialize JD part
let mut parts = proxy_config.tp_address.split(':');
let ip_tp = parts.next().unwrap().to_string();
Expand Down Expand Up @@ -355,7 +347,7 @@ impl JobDeclaratorClient {

// Wait for downstream to connect
let downstream = match downstream::listen_for_downstream_mining(
downstream_addr,
proxy_config.listen_address,
Some(upstream),
send_solution,
proxy_config.withhold,
Expand Down
10 changes: 4 additions & 6 deletions roles/jd-client/src/lib/proxy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use roles_logic_sv2::{errors::Error, utils::CoinbaseOutput as CoinbaseOutput_};
use serde::Deserialize;
use std::time::Duration;
use std::{net::SocketAddr, time::Duration};
use stratum_common::bitcoin::TxOut;

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -36,8 +36,7 @@ impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {

#[derive(Debug, Deserialize, Clone)]
pub struct ProxyConfig {
pub downstream_address: String,
pub downstream_port: u16,
pub listen_address: SocketAddr,
pub max_supported_version: u16,
pub min_supported_version: u16,
pub min_extranonce2_size: u16,
Expand Down Expand Up @@ -118,7 +117,7 @@ impl ProtocolConfig {

impl ProxyConfig {
pub fn new(
listening_address: std::net::SocketAddr,
listen_address: std::net::SocketAddr,
protocol_config: ProtocolConfig,
withhold: bool,
pool_config: PoolConfig,
Expand All @@ -127,8 +126,7 @@ impl ProxyConfig {
timeout: Duration,
) -> Self {
Self {
downstream_address: listening_address.ip().to_string(),
downstream_port: listening_address.port(),
listen_address,
max_supported_version: protocol_config.max_supported_version,
min_supported_version: protocol_config.min_supported_version,
min_extranonce2_size: protocol_config.min_extranonce2_size,
Expand Down
9 changes: 7 additions & 2 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use roles_logic_sv2::{
use serde::Deserialize;
use std::{
convert::{TryFrom, TryInto},
net::SocketAddr,
time::Duration,
};
use stratum_common::bitcoin::{Script, TxOut};
Expand Down Expand Up @@ -200,6 +201,10 @@ impl JobDeclaratorServer {
}
}
}

pub fn is_listening(&self) -> bool {
std::net::TcpStream::connect(self.config.listen_jd_address).is_ok()
}
}

pub fn get_coinbase_output(config: &Configuration) -> Result<Vec<TxOut>, Error> {
Expand Down Expand Up @@ -251,7 +256,7 @@ impl CoinbaseOutput {
pub struct Configuration {
#[serde(default = "default_true")]
pub async_mining_allowed: bool,
pub listen_jd_address: String,
pub listen_jd_address: SocketAddr,
pub authority_public_key: Secp256k1PublicKey,
pub authority_secret_key: Secp256k1SecretKey,
pub cert_validity_sec: u64,
Expand Down Expand Up @@ -285,7 +290,7 @@ impl CoreRpc {

impl Configuration {
pub fn new(
listen_jd_address: String,
listen_jd_address: SocketAddr,
authority_public_key: Secp256k1PublicKey,
authority_secret_key: Secp256k1SecretKey,
cert_validity_sec: u64,
Expand Down
6 changes: 3 additions & 3 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {

#[derive(Debug, Deserialize, Clone)]
pub struct Configuration {
pub listen_address: String,
pub listen_address: SocketAddr,
pub tp_address: String,
pub tp_authority_public_key: Option<Secp256k1PublicKey>,
pub authority_public_key: Secp256k1PublicKey,
Expand Down Expand Up @@ -134,13 +134,13 @@ impl AuthorityConfig {
}

pub struct ConnectionConfig {
listen_address: String,
listen_address: SocketAddr,
cert_validity_sec: u64,
signature: String,
}

impl ConnectionConfig {
pub fn new(listen_address: String, cert_validity_sec: u64, signature: String) -> Self {
pub fn new(listen_address: SocketAddr, cert_validity_sec: u64, signature: String) -> Self {
Self {
listen_address,
cert_validity_sec,
Expand Down
4 changes: 4 additions & 0 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl PoolSv2 {
}
}
}

pub fn is_listening(&self) -> bool {
std::net::TcpStream::connect(self.config.listen_address).is_ok()
}
}

#[cfg(test)]
Expand Down
8 changes: 3 additions & 5 deletions roles/tests-integration/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn start_pool(template_provider_address: Option<SocketAddr>) -> (PoolS
"127.0.0.1:8442".to_string()
};
let connection_config = pool_sv2::mining_pool::ConnectionConfig::new(
listening_address.to_string(),
listening_address,
cert_validity_sec,
pool_signature,
);
Expand Down Expand Up @@ -178,7 +178,7 @@ pub async fn start_jds(tp_address: SocketAddr) -> (JobDeclaratorServer, SocketAd
"tp_password".to_string(),
);
let config = Configuration::new(
listen_jd_address.to_string(),
listen_jd_address,
authority_public_key,
authority_secret_key,
cert_validity_sec,
Expand All @@ -203,7 +203,6 @@ pub async fn start_sv2_translator(upstream: SocketAddr) -> (TranslatorSv2, Socke
)
.expect("failed");
let listening_address = get_available_address();
let listening_port = listening_address.port();
let hashrate = measure_hashrate(1) as f32 / 100.0;
let min_individual_miner_hashrate = hashrate;
let shares_per_minute = 60.0;
Expand All @@ -229,8 +228,7 @@ pub async fn start_sv2_translator(upstream: SocketAddr) -> (TranslatorSv2, Socke
upstream_difficulty_config,
);
let downstream_conf = translator_sv2::proxy_config::DownstreamConfig::new(
listening_address.ip().to_string(),
listening_port,
listening_address,
downstream_difficulty_config,
);

Expand Down
14 changes: 14 additions & 0 deletions roles/tests-integration/tests/jd_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,17 @@ async fn jds_should_not_panic_if_jdc_shutsdown() {
let (_jdc, _jdc_addr) = start_jdc(pool_addr, tp_addr, sniffer_addr).await;
assert_common_message!(sniffer.next_message_from_downstream(), SetupConnection);
}

// This test makes sure that JDS will not stop listening after one
// downstream client disconnects
#[tokio::test]
async fn jds_survives_downstream_disconnect() {
todo!()
}

// This test makes sure that JDC will not stop listening after one
// downstream client disconnects
#[tokio::test]
async fn jdc_survives_downstream_disconnect() {
todo!()
}
27 changes: 27 additions & 0 deletions roles/tests-integration/tests/pool_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,30 @@ async fn header_timestamp_value_assertion_in_new_extended_mining_job() {
"The `minntime` field of the second NewExtendedMiningJob does not match the `header_timestamp`!"
);
}

// This test makes sure that Pool will not stop listening after one
// downstream client disconnects
#[tokio::test]
async fn pool_survives_downstream_disconnect() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (pool, pool_addr) = start_pool(Some(tp_addr)).await;

// emulate first downstream
let downstream_a = std::net::TcpStream::connect(pool_addr).unwrap();

// emulate second downstream
let _downstream_b = std::net::TcpStream::connect(pool_addr).unwrap();

// wait a bit to make sure the TCP sockets are processed
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// kill downstream_a
downstream_a.shutdown(std::net::Shutdown::Both).unwrap();
drop(downstream_a);

// wait a bit to make sure the TCP sockets are processed
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// pool still listening
assert!(pool.is_listening());
}
28 changes: 28 additions & 0 deletions roles/tests-integration/tests/translator_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,31 @@ async fn translation_proxy() {
)
.await;
}

// This test makes sure that tProxy will not stop listening after one
// downstream client disconnects
#[tokio::test]
async fn tproxy_survives_downstream_disconnect() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (_pool, pool_addr) = start_pool(Some(tp_addr)).await;
let (tproxy, tproxy_addr) = start_sv2_translator(pool_addr).await;

// emulate first downstream
let downstream_a = std::net::TcpStream::connect(tproxy_addr).unwrap();

// emulate second downstream
let _downstream_b = std::net::TcpStream::connect(tproxy_addr).unwrap();

// wait a bit to make sure the TCP sockets are processed
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// kill downstream_a
downstream_a.shutdown(std::net::Shutdown::Both).unwrap();
drop(downstream_a);

// wait a bit to make sure the TCP sockets are processed
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// tproxy still listening
assert!(tproxy.is_listening());
}
12 changes: 5 additions & 7 deletions roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,10 @@ impl TranslatorSv2 {
);
proxy::Bridge::start(b.clone());

// Format `Downstream` connection address
let downstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
proxy_config.downstream_port,
);

let task_collector_downstream = task_collector_init_task.clone();
// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices)
downstream_sv1::Downstream::accept_connections(
downstream_addr,
proxy_config.listen_address,
tx_sv1_bridge,
tx_sv1_notify,
status::Sender::DownstreamListener(tx_status.clone()),
Expand All @@ -297,6 +291,10 @@ impl TranslatorSv2 {
pub fn shutdown(&self) {
self.shutdown.notify_one();
}

pub fn is_listening(&self) -> bool {
std::net::TcpStream::connect(self.config.listen_address).is_ok()
}
}

fn kill_tasks(task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>) {
Expand Down
15 changes: 6 additions & 9 deletions roles/translator/src/lib/proxy_config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use key_utils::Secp256k1PublicKey;
use serde::Deserialize;
use std::net::SocketAddr;

#[derive(Debug, Deserialize, Clone)]
pub struct ProxyConfig {
pub upstream_address: String,
pub upstream_port: u16,
pub upstream_authority_pubkey: Secp256k1PublicKey,
pub downstream_address: String,
pub downstream_port: u16,
pub listen_address: SocketAddr,
pub max_supported_version: u16,
pub min_supported_version: u16,
pub min_extranonce2_size: u16,
Expand Down Expand Up @@ -39,16 +39,14 @@ impl UpstreamConfig {
}

pub struct DownstreamConfig {
address: String,
port: u16,
listen_address: SocketAddr,
difficulty_config: DownstreamDifficultyConfig,
}

impl DownstreamConfig {
pub fn new(address: String, port: u16, difficulty_config: DownstreamDifficultyConfig) -> Self {
pub fn new(listen_address: SocketAddr, difficulty_config: DownstreamDifficultyConfig) -> Self {
Self {
address,
port,
listen_address,
difficulty_config,
}
}
Expand All @@ -66,8 +64,7 @@ impl ProxyConfig {
upstream_address: upstream.address,
upstream_port: upstream.port,
upstream_authority_pubkey: upstream.authority_pubkey,
downstream_address: downstream.address,
downstream_port: downstream.port,
listen_address: downstream.listen_address,
max_supported_version,
min_supported_version,
min_extranonce2_size,
Expand Down
Loading