From 7e3fafa925e642a187e92723062daed040400149 Mon Sep 17 00:00:00 2001 From: Carlo Mazzaferro Date: Wed, 16 Oct 2024 13:38:59 +0200 Subject: [PATCH] config for batch timeout --- .../src/bin/tcp_ssl_upgrade_client.rs | 25 +++++++++++++------ iris-mpc-upgrade/src/config.rs | 5 +++- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs b/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs index ce7fba526..f0082ee70 100644 --- a/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs +++ b/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs @@ -66,6 +66,12 @@ async fn main() -> eyre::Result<()> { install_tracing(); let args = UpgradeClientConfig::parse(); + let batch_timeout = if let Some(batch_timeout) = args.batch_timeout_secs { + batch_timeout + } else { + BATCH_TIMEOUT_SECONDS + }; + if args.party_id > 1 { panic!("Party id must be 0, 1"); } @@ -186,6 +192,7 @@ async fn main() -> eyre::Result<()> { ); send_batch_and_wait_for_ack( args.party_id, + batch_timeout, &mut server1, &mut server2, &mut server3, @@ -202,6 +209,7 @@ async fn main() -> eyre::Result<()> { tracing::info!("Sending final batch of size {}", batch.len()); send_batch_and_wait_for_ack( args.party_id, + batch_timeout, &mut server1, &mut server2, &mut server3, @@ -211,17 +219,18 @@ async fn main() -> eyre::Result<()> { batch.clear(); } tracing::info!("Final batch sent, waiting for acks"); - wait_for_ack(&mut server1).await?; + wait_for_ack(&mut server1, batch_timeout).await?; tracing::info!("Server 1 ack received"); - wait_for_ack(&mut server2).await?; + wait_for_ack(&mut server2, batch_timeout).await?; tracing::info!("Server 2 ack received"); - wait_for_ack(&mut server3).await?; + wait_for_ack(&mut server3, batch_timeout).await?; tracing::info!("Server 3 ack received"); Ok(()) } async fn send_batch_and_wait_for_ack( party_id: u8, + batch_timeout: u64, server1: &mut TlsStream, server2: &mut TlsStream, server3: &mut TlsStream, @@ -312,14 +321,14 @@ async fn send_batch_and_wait_for_ack( } // Handle acknowledgment from all servers - wait_for_ack(server1).await?; - wait_for_ack(server2).await?; - wait_for_ack(server3).await?; + wait_for_ack(server1, batch_timeout).await?; + wait_for_ack(server2, batch_timeout).await?; + wait_for_ack(server3, batch_timeout).await?; Ok(()) } -async fn wait_for_ack(server: &mut TlsStream) -> eyre::Result<()> { - match timeout(Duration::from_secs(BATCH_TIMEOUT_SECONDS), server.read_u8()).await { +async fn wait_for_ack(server: &mut TlsStream, batch_timout: u64) -> eyre::Result<()> { + match timeout(Duration::from_secs(batch_timout), server.read_u8()).await { Ok(Ok(BATCH_SUCCESSFUL_ACK)) => { // Ack received successfully tracing::info!("ACK received for batch"); diff --git a/iris-mpc-upgrade/src/config.rs b/iris-mpc-upgrade/src/config.rs index cafaf8d71..92034a08d 100644 --- a/iris-mpc-upgrade/src/config.rs +++ b/iris-mpc-upgrade/src/config.rs @@ -6,7 +6,7 @@ use std::{ str::FromStr, }; -pub const BATCH_TIMEOUT_SECONDS: u64 = 30; +pub const BATCH_TIMEOUT_SECONDS: u64 = 60; pub const BATCH_SUCCESSFUL_ACK: u8 = 1; pub const FINAL_BATCH_SUCCESSFUL_ACK: u8 = 42; @@ -98,6 +98,9 @@ pub struct UpgradeClientConfig { #[clap(long)] pub masks_db_url: String, + + #[clap(long)] + pub batch_timeout_secs: Option, } impl fmt::Debug for UpgradeClientConfig {