diff --git a/iris-mpc-common/src/helpers/smpc_request.rs b/iris-mpc-common/src/helpers/smpc_request.rs index 1dc3ccb11..1612106c3 100644 --- a/iris-mpc-common/src/helpers/smpc_request.rs +++ b/iris-mpc-common/src/helpers/smpc_request.rs @@ -107,6 +107,7 @@ where pub const SMPC_MESSAGE_TYPE_ATTRIBUTE: &str = "message_type"; pub const IDENTITY_DELETION_MESSAGE_TYPE: &str = "identity_deletion"; +pub const CIRCUIT_BREAKER_MESSAGE_TYPE: &str = "circuit_breaker"; pub const UNIQUENESS_MESSAGE_TYPE: &str = "uniqueness"; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -117,6 +118,11 @@ pub struct UniquenessRequest { pub iris_shares_file_hashes: [String; 3], } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CircuitBreakerRequest { + pub batch_size: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct IdentityDeletionRequest { pub serial_id: u32, diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index b04604117..4491a5249 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -17,9 +17,10 @@ use iris_mpc_common::{ key_pair::SharesEncryptionKeyPairs, kms_dh::derive_shared_secret, smpc_request::{ - create_message_type_attribute_map, IdentityDeletionRequest, IdentityDeletionResult, - ReceiveRequestError, SQSMessage, UniquenessRequest, UniquenessResult, - IDENTITY_DELETION_MESSAGE_TYPE, SMPC_MESSAGE_TYPE_ATTRIBUTE, UNIQUENESS_MESSAGE_TYPE, + create_message_type_attribute_map, CircuitBreakerRequest, IdentityDeletionRequest, + IdentityDeletionResult, ReceiveRequestError, SQSMessage, UniquenessRequest, + UniquenessResult, CIRCUIT_BREAKER_MESSAGE_TYPE, IDENTITY_DELETION_MESSAGE_TYPE, + SMPC_MESSAGE_TYPE_ATTRIBUTE, UNIQUENESS_MESSAGE_TYPE, }, sync::SyncState, task_monitor::TaskMonitor, @@ -159,6 +160,30 @@ async fn receive_batch( .ok_or(ReceiveRequestError::NoMessageTypeAttribute)?; match request_type { + CIRCUIT_BREAKER_MESSAGE_TYPE => { + let circuit_breaker_request: CircuitBreakerRequest = + serde_json::from_str(&message.message).map_err(|e| { + ReceiveRequestError::json_parse_error("circuit_breaker_request", e) + })?; + client + .delete_message() + .queue_url(queue_url) + .receipt_handle(sqs_message.receipt_handle.unwrap()) + .send() + .await + .map_err(ReceiveRequestError::FailedToDeleteFromSQS)?; + if let Some(batch_size) = circuit_breaker_request.batch_size { + // Updating the batch size to ensure we process the messages in the next + // loop + *CURRENT_BATCH_SIZE.lock().unwrap() = + batch_size.clamp(1, max_batch_size); + tracing::info!( + "Updating batch size to {} due to circuit breaker message", + batch_size + ); + } + } + IDENTITY_DELETION_MESSAGE_TYPE => { // If it's a deletion request, we just store the serial_id and continue. // Deletion will take place when batch process starts.