diff --git a/deploy/prod/common-values-iris-mpc.yaml b/deploy/prod/common-values-iris-mpc.yaml index 8c8ebe458..efdadf928 100644 --- a/deploy/prod/common-values-iris-mpc.yaml +++ b/deploy/prod/common-values-iris-mpc.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:v0.8.23" +image: "ghcr.io/worldcoin/iris-mpc:v0.8.25" environment: prod replicaCount: 1 @@ -28,6 +28,12 @@ readinessProbe: path: /health port: health +startupProbe: + enabled: true + httpGet: + path: /health + port: health + resources: limits: cpu: 192 @@ -63,3 +69,13 @@ tolerations: keelPolling: # -- Specifies whether keel should poll for container updates enabled: true + +libsDir: + enabled: true + path: "/libs" + size: 2Gi + files: + - path: "/usr/local/cuda-12.2/targets/x86_64-linux/lib/libcublasLt.so.12.2.5.6" + file: "libcublasLt.so.12.2.5.6" + - path: "/usr/local/cuda-12.2/targets/x86_64-linux/lib/libcublas.so.12.2.5.6" + file: "libcublas.so.12.2.5.6" diff --git a/deploy/prod/common-values-upgrade-server-left.yaml b/deploy/prod/common-values-upgrade-server-left.yaml index 30b8b4c55..1af61ff57 100644 --- a/deploy/prod/common-values-upgrade-server-left.yaml +++ b/deploy/prod/common-values-upgrade-server-left.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:v0.8.23" +image: "ghcr.io/worldcoin/iris-mpc:v0.8.25" environment: prod replicaCount: 1 @@ -17,6 +17,12 @@ ports: name: health protocol: TCP +startupProbe: + enabled: true + httpGet: + path: /health + port: health + livenessProbe: httpGet: path: /health @@ -42,6 +48,7 @@ imagePullSecrets: nodeSelector: kubernetes.io/arch: amd64 + beta.kubernetes.io/instance-type: t3.2xlarge hostNetwork: true diff --git a/deploy/prod/common-values-upgrade-server-right.yaml b/deploy/prod/common-values-upgrade-server-right.yaml index 5a8ee39f7..c4dc12abc 100644 --- a/deploy/prod/common-values-upgrade-server-right.yaml +++ b/deploy/prod/common-values-upgrade-server-right.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:v0.8.23" +image: "ghcr.io/worldcoin/iris-mpc:v0.8.25" environment: prod replicaCount: 1 @@ -17,6 +17,12 @@ ports: name: health protocol: TCP +startupProbe: + enabled: true + httpGet: + path: /health + port: health + livenessProbe: httpGet: path: /health @@ -42,6 +48,7 @@ imagePullSecrets: nodeSelector: kubernetes.io/arch: amd64 + beta.kubernetes.io/instance-type: t3.2xlarge hostNetwork: true diff --git a/deploy/prod/smpcv2-0-prod/values-iris-mpc.yaml b/deploy/prod/smpcv2-0-prod/values-iris-mpc.yaml index 1c5a6d4fb..ce9dab082 100644 --- a/deploy/prod/smpcv2-0-prod/values-iris-mpc.yaml +++ b/deploy/prod/smpcv2-0-prod/values-iris-mpc.yaml @@ -5,9 +5,6 @@ env: - name: NCCL_SOCKET_IFNAME value: "eth0" - - name: NCCL_DEBUG - value: "trace" - - name: NCCL_COMM_ID value: "iris-mpc-node.1.smpcv2.worldcoin.org:4000" @@ -80,6 +77,12 @@ env: - name: SMPC__MAX_BATCH_SIZE value: "64" + - name: SMPC__FAKE_DB_SIZE + value: "6000000" + + - name: SMPC__DISABLE_PERSISTENCE + value: "true" + initContainer: enabled: true image: "amazon/aws-cli:2.17.62" @@ -97,7 +100,7 @@ initContainer: # Set up environment variables HOSTED_ZONE_ID=$(aws route53 list-hosted-zones-by-name --dns-name "$PARTY_ID".smpcv2.worldcoin.org --query "HostedZones[].Id" --output text) - + # Generate the JSON content in memory BATCH_JSON=$(cat < u64 { diff --git a/iris-mpc-gpu/src/server/actor.rs b/iris-mpc-gpu/src/server/actor.rs index 278289b09..3aff28506 100644 --- a/iris-mpc-gpu/src/server/actor.rs +++ b/iris-mpc-gpu/src/server/actor.rs @@ -65,7 +65,7 @@ impl ServerActorHandle { } } -const DB_CHUNK_SIZE: usize = 512; +const DB_CHUNK_SIZE: usize = 1 << 15; const KDF_SALT: &str = "111a1a93518f670e9bb0c2c68888e2beb9406d4c4ed571dc77b801e676ae3091"; // Random 32 byte salt pub struct ServerActor { @@ -100,6 +100,7 @@ pub struct ServerActor { max_batch_size: usize, max_db_size: usize, return_partial_results: bool, + disable_persistence: bool, } const NON_MATCH_ID: u32 = u32::MAX; @@ -113,6 +114,7 @@ impl ServerActor { max_db_size: usize, max_batch_size: usize, return_partial_results: bool, + disable_persistence: bool, ) -> eyre::Result<(Self, ServerActorHandle)> { let device_manager = Arc::new(DeviceManager::init()); Self::new_with_device_manager( @@ -123,6 +125,7 @@ impl ServerActor { max_db_size, max_batch_size, return_partial_results, + disable_persistence, ) } #[allow(clippy::too_many_arguments)] @@ -134,6 +137,7 @@ impl ServerActor { max_db_size: usize, max_batch_size: usize, return_partial_results: bool, + disable_persistence: bool, ) -> eyre::Result<(Self, ServerActorHandle)> { let ids = device_manager.get_ids_from_magic(0); let comms = device_manager.instantiate_network_from_ids(party_id, &ids)?; @@ -146,6 +150,7 @@ impl ServerActor { max_db_size, max_batch_size, return_partial_results, + disable_persistence, ) } @@ -159,6 +164,7 @@ impl ServerActor { max_db_size: usize, max_batch_size: usize, return_partial_results: bool, + disable_persistence: bool, ) -> eyre::Result<(Self, ServerActorHandle)> { let (tx, rx) = mpsc::channel(job_queue_size); let actor = Self::init( @@ -170,6 +176,7 @@ impl ServerActor { max_db_size, max_batch_size, return_partial_results, + disable_persistence, )?; Ok((actor, ServerActorHandle { job_queue: tx })) } @@ -184,6 +191,7 @@ impl ServerActor { max_db_size: usize, max_batch_size: usize, return_partial_results: bool, + disable_persistence: bool, ) -> eyre::Result { assert!(max_batch_size != 0); let mut kdf_nonce = 0; @@ -344,6 +352,7 @@ impl ServerActor { max_batch_size, max_db_size, return_partial_results, + disable_persistence, }) } @@ -832,40 +841,42 @@ impl ServerActor { eyre::bail!("DB size exceeded"); } - record_stream_time!( - &self.device_manager, - &self.streams[0], - events, - "db_write", - { - for i in 0..self.device_manager.device_count() { - self.device_manager.device(i).bind_to_thread().unwrap(); - for insertion_idx in insertion_list[i].clone() { - write_db_at_index( - &self.left_code_db_slices, - &self.left_mask_db_slices, - &self.right_code_db_slices, - &self.right_mask_db_slices, - &compact_device_queries_left, - &compact_device_sums_left, - &compact_device_queries_right, - &compact_device_sums_right, - insertion_idx, - self.current_db_sizes[i], + if !self.disable_persistence { + record_stream_time!( + &self.device_manager, + &self.streams[0], + events, + "db_write", + { + for i in 0..self.device_manager.device_count() { + self.device_manager.device(i).bind_to_thread().unwrap(); + for insertion_idx in insertion_list[i].clone() { + write_db_at_index( + &self.left_code_db_slices, + &self.left_mask_db_slices, + &self.right_code_db_slices, + &self.right_mask_db_slices, + &compact_device_queries_left, + &compact_device_sums_left, + &compact_device_queries_right, + &compact_device_sums_right, + insertion_idx, + self.current_db_sizes[i], + i, + &self.streams[0], + ); + self.current_db_sizes[i] += 1; + } + + tracing::debug!( + "Updating DB size on device {}: {:?}", i, - &self.streams[0], + self.current_db_sizes[i] ); - self.current_db_sizes[i] += 1; } - - tracing::debug!( - "Updating DB size on device {}: {:?}", - i, - self.current_db_sizes[i] - ); } - } - ); + ); + } // Pass to internal sender thread return_channel diff --git a/iris-mpc-gpu/tests/e2e.rs b/iris-mpc-gpu/tests/e2e.rs index 373867bc1..0832d21dd 100644 --- a/iris-mpc-gpu/tests/e2e.rs +++ b/iris-mpc-gpu/tests/e2e.rs @@ -119,6 +119,7 @@ mod e2e_test { DB_SIZE + DB_BUFFER, MAX_BATCH_SIZE, true, + false, ) { Ok((mut actor, handle)) => { actor.load_full_db(&(&db0.0, &db0.1), &(&db0.0, &db0.1), DB_SIZE); @@ -145,6 +146,7 @@ mod e2e_test { DB_SIZE + DB_BUFFER, MAX_BATCH_SIZE, true, + false, ) { Ok((mut actor, handle)) => { actor.load_full_db(&(&db1.0, &db1.1), &(&db1.0, &db1.1), DB_SIZE); @@ -171,6 +173,7 @@ mod e2e_test { DB_SIZE + DB_BUFFER, MAX_BATCH_SIZE, true, + false, ) { Ok((mut actor, handle)) => { actor.load_full_db(&(&db2.0, &db2.1), &(&db2.0, &db2.1), DB_SIZE); diff --git a/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs b/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs index f398403d8..e2a966461 100644 --- a/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs +++ b/iris-mpc-upgrade/src/bin/tcp_ssl_upgrade_client.rs @@ -66,6 +66,12 @@ async fn main() -> eyre::Result<()> { install_tracing(); let args = UpgradeClientConfig::parse(); + let batch_timeout = if let Some(batch_timeout) = args.batch_timeout_secs { + batch_timeout + } else { + BATCH_TIMEOUT_SECONDS + }; + if args.party_id > 1 { panic!("Party id must be 0, 1"); } @@ -186,6 +192,7 @@ async fn main() -> eyre::Result<()> { ); send_batch_and_wait_for_ack( args.party_id, + batch_timeout, &mut server1, &mut server2, &mut server3, @@ -202,6 +209,7 @@ async fn main() -> eyre::Result<()> { tracing::info!("Sending final batch of size {}", batch.len()); send_batch_and_wait_for_ack( args.party_id, + batch_timeout, &mut server1, &mut server2, &mut server3, @@ -211,17 +219,18 @@ async fn main() -> eyre::Result<()> { batch.clear(); } tracing::info!("Final batch sent, waiting for acks"); - wait_for_ack(&mut server1).await?; + wait_for_ack(&mut server1, batch_timeout).await?; tracing::info!("Server 1 ack received"); - wait_for_ack(&mut server2).await?; + wait_for_ack(&mut server2, batch_timeout).await?; tracing::info!("Server 2 ack received"); - wait_for_ack(&mut server3).await?; + wait_for_ack(&mut server3, batch_timeout).await?; tracing::info!("Server 3 ack received"); Ok(()) } async fn send_batch_and_wait_for_ack( party_id: u8, + batch_timeout: u64, server1: &mut TlsStream, server2: &mut TlsStream, server3: &mut TlsStream, @@ -238,9 +247,9 @@ async fn send_batch_and_wait_for_ack( let batch_size = batch.len(); // Send the batch size to all servers let (batch_size_result_a, batch_size_result_b, batch_size_result_c) = ( - server1.write_u8(batch_size as u8), - server2.write_u8(batch_size as u8), - server3.write_u8(batch_size as u8), + server1.write_u64(batch_size as u64), + server2.write_u64(batch_size as u64), + server3.write_u64(batch_size as u64), ) .join() .await; @@ -312,14 +321,14 @@ async fn send_batch_and_wait_for_ack( } // Handle acknowledgment from all servers - wait_for_ack(server1).await?; - wait_for_ack(server2).await?; - wait_for_ack(server3).await?; + wait_for_ack(server1, batch_timeout).await?; + wait_for_ack(server2, batch_timeout).await?; + wait_for_ack(server3, batch_timeout).await?; Ok(()) } -async fn wait_for_ack(server: &mut TlsStream) -> eyre::Result<()> { - match timeout(Duration::from_secs(BATCH_TIMEOUT_SECONDS), server.read_u8()).await { +async fn wait_for_ack(server: &mut TlsStream, batch_timeout: u64) -> eyre::Result<()> { + match timeout(Duration::from_secs(batch_timeout), server.read_u8()).await { Ok(Ok(BATCH_SUCCESSFUL_ACK)) => { // Ack received successfully tracing::info!("ACK received for batch"); diff --git a/iris-mpc-upgrade/src/bin/tcp_upgrade_server.rs b/iris-mpc-upgrade/src/bin/tcp_upgrade_server.rs index 923a32ba9..ec30ce02e 100644 --- a/iris-mpc-upgrade/src/bin/tcp_upgrade_server.rs +++ b/iris-mpc-upgrade/src/bin/tcp_upgrade_server.rs @@ -128,8 +128,8 @@ async fn main() -> eyre::Result<()> { for batch_num in 0..num_batches + 1 { tracing::info!("Processing batch {} of size: {}", batch_num, batch_size1); let start_time = Instant::now(); - let batch_size_1_message = client_stream1.read_u8().await?; - let batch_size_2_message = client_stream2.read_u8().await?; + let batch_size_1_message = client_stream1.read_u64().await?; + let batch_size_2_message = client_stream2.read_u64().await?; if batch_size_1_message != batch_size_2_message { bail!( diff --git a/iris-mpc-upgrade/src/config.rs b/iris-mpc-upgrade/src/config.rs index cafaf8d71..92034a08d 100644 --- a/iris-mpc-upgrade/src/config.rs +++ b/iris-mpc-upgrade/src/config.rs @@ -6,7 +6,7 @@ use std::{ str::FromStr, }; -pub const BATCH_TIMEOUT_SECONDS: u64 = 30; +pub const BATCH_TIMEOUT_SECONDS: u64 = 60; pub const BATCH_SUCCESSFUL_ACK: u8 = 1; pub const FINAL_BATCH_SUCCESSFUL_ACK: u8 = 42; @@ -98,6 +98,9 @@ pub struct UpgradeClientConfig { #[clap(long)] pub masks_db_url: String, + + #[clap(long)] + pub batch_timeout_secs: Option, } impl fmt::Debug for UpgradeClientConfig { diff --git a/iris-mpc-upgrade/src/lib.rs b/iris-mpc-upgrade/src/lib.rs index f9cf93136..73676b66f 100644 --- a/iris-mpc-upgrade/src/lib.rs +++ b/iris-mpc-upgrade/src/lib.rs @@ -143,14 +143,14 @@ impl IrisCodeUpgrader { *a = a.wrapping_add(*b); } let duration = start_time.elapsed(); - tracing::info!("Computed iris codes STEP DURATION: {:.2?}", duration); + tracing::debug!("Computed iris codes STEP DURATION: {:.2?}", duration); let start_time = std::time::Instant::now(); self.iris_sink .store_code_mask_share(id, &result, &mask) .await?; let duration = start_time.elapsed(); - tracing::info!("Stored iris codes STEP DURATION: {:.2?}", duration); + tracing::debug!("Stored iris codes STEP DURATION: {:.2?}", duration); Ok(()) } } diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index 3f809c3e8..e6f77d442 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -675,9 +675,10 @@ async fn server_main(config: Config) -> eyre::Result<()> { config.max_db_size, config.max_batch_size, config.return_partial_results, + config.disable_persistence, ) { Ok((mut actor, handle)) => { - if config.fake_db_size > 0 { + let res = if config.fake_db_size > 0 { tracing::warn!( "Faking db with {} entries, returned results will be random.", config.fake_db_size @@ -687,42 +688,42 @@ async fn server_main(config: Config) -> eyre::Result<()> { / actor.current_db_sizes().len(); actor.current_db_sizes().len() ]); - return Ok(()); - } - - tracing::info!( - "Initialize iris db: Loading from DB (parallelism: {})", - parallelism - ); - let res = tokio::runtime::Handle::current().block_on(async { - let mut stream = store.stream_irises_par(parallelism).await; - let mut record_counter = 0; - while let Some(iris) = stream.try_next().await? { - if iris.index() > store_len { - tracing::error!("Inconsistent iris index {}", iris.index()); - return Err(eyre!("Inconsistent iris index {}", iris.index())); + Ok(()) + } else { + tracing::info!( + "Initialize iris db: Loading from DB (parallelism: {})", + parallelism + ); + tokio::runtime::Handle::current().block_on(async { + let mut stream = store.stream_irises_par(parallelism).await; + let mut record_counter = 0; + while let Some(iris) = stream.try_next().await? { + if iris.index() > store_len { + tracing::error!("Inconsistent iris index {}", iris.index()); + return Err(eyre!("Inconsistent iris index {}", iris.index())); + } + actor.load_single_record( + iris.index() - 1, + iris.left_code(), + iris.left_mask(), + iris.right_code(), + iris.right_mask(), + ); + record_counter += 1; } - actor.load_single_record( - iris.index() - 1, - iris.left_code(), - iris.left_mask(), - iris.right_code(), - iris.right_mask(), - ); - record_counter += 1; - } - tracing::info!("Preprocessing db"); - actor.preprocess_db(); + tracing::info!("Preprocessing db"); + actor.preprocess_db(); - tracing::info!( - "Loaded {} records from db into memory [DB sizes: {:?}]", - record_counter, - actor.current_db_sizes() - ); + tracing::info!( + "Loaded {} records from db into memory [DB sizes: {:?}]", + record_counter, + actor.current_db_sizes() + ); - eyre::Ok(()) - }); + eyre::Ok(()) + }) + }; match res { Ok(_) => { @@ -835,7 +836,7 @@ async fn server_main(config: Config) -> eyre::Result<()> { .insert_results(&mut tx, &uniqueness_results) .await?; - if !codes_and_masks.is_empty() { + if !codes_and_masks.is_empty() && !config_bg.disable_persistence { let db_serial_ids = store_bg .insert_irises(&mut tx, &codes_and_masks) .await