diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index d109999afc5..c3e37e741cd 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -53,7 +53,7 @@ }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/config/src/queue.rs b/config/src/queue.rs index a1d925b1ed9..2b10dcd1730 100644 --- a/config/src/queue.rs +++ b/config/src/queue.rs @@ -4,7 +4,7 @@ use iroha_config_base::derive::{Documented, Proxy}; use serde::{Deserialize, Serialize}; const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16); -const DEFAULT_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER: u32 = 2_u32.pow(16); +const DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER: u32 = 2_u32.pow(16); // 24 hours const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000; const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; @@ -16,8 +16,9 @@ const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000; pub struct Configuration { /// The upper limit of the number of transactions waiting in the queue. pub max_transactions_in_queue: u32, - /// The upper limit of the number of transactions waiting for more signatures. - pub max_transactions_in_signature_buffer: u32, + /// The upper limit of the number of transactions waiting in the queue for single user. + /// Use this option to apply throttling. + pub max_transactions_in_queue_per_user: u32, /// The transaction will be dropped after this time if it is still in the queue. pub transaction_time_to_live_ms: u64, /// The threshold to determine if a transaction has been tampered to have a future timestamp. @@ -28,9 +29,7 @@ impl Default for ConfigurationProxy { fn default() -> Self { Self { max_transactions_in_queue: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE), - max_transactions_in_signature_buffer: Some( - DEFAULT_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER, - ), + max_transactions_in_queue_per_user: Some(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER), transaction_time_to_live_ms: Some(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS), future_threshold_ms: Some(DEFAULT_FUTURE_THRESHOLD_MS), } @@ -47,12 +46,12 @@ pub mod tests { pub fn arb_proxy() ( max_transactions_in_queue in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE)), - max_transactions_in_signature_buffer in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER)), + max_transactions_in_queue_per_user in prop::option::of(Just(DEFAULT_MAX_TRANSACTIONS_IN_QUEUE_PER_USER)), transaction_time_to_live_ms in prop::option::of(Just(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS)), future_threshold_ms in prop::option::of(Just(DEFAULT_FUTURE_THRESHOLD_MS)), ) -> ConfigurationProxy { - ConfigurationProxy { max_transactions_in_queue, max_transactions_in_signature_buffer, transaction_time_to_live_ms, future_threshold_ms } + ConfigurationProxy { max_transactions_in_queue, max_transactions_in_queue_per_user, transaction_time_to_live_ms, future_threshold_ms } } } } diff --git a/configs/peer/config.json b/configs/peer/config.json index a9d0087b4c3..1737604b1b3 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -34,7 +34,7 @@ }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, diff --git a/configs/peer/validator.wasm b/configs/peer/validator.wasm index 72feaff4287..fe05da72d67 100644 Binary files a/configs/peer/validator.wasm and b/configs/peer/validator.wasm differ diff --git a/core/src/queue.rs b/core/src/queue.rs index 67dea12dbda..c51159a8d4d 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -14,9 +14,9 @@ use dashmap::{mapref::entry::Entry, DashMap}; use eyre::{Report, Result}; use iroha_config::queue::Configuration; use iroha_crypto::HashOf; -use iroha_data_model::transaction::prelude::*; +use iroha_data_model::{account::AccountId, transaction::prelude::*}; use iroha_logger::{debug, info, trace, warn}; -use iroha_primitives::{must_use::MustUse, riffle_iter::RiffleIter}; +use iroha_primitives::must_use::MustUse; use rand::seq::IteratorRandom; use thiserror::Error; @@ -27,16 +27,16 @@ use crate::{prelude::*, tx::CheckSignatureCondition as _}; /// Multiple producers, single consumer #[derive(Debug)] pub struct Queue { - /// The queue for transactions that passed signature check + /// The queue for transactions queue: ArrayQueue>, - /// The queue for transactions that didn't pass signature check and are waiting for additional signatures - /// - /// Second queue is needed to prevent situation when multisig transactions prevent ordinary transactions from being added into the queue - signature_buffer: ArrayQueue>, - /// [`VersionedAcceptedTransaction`]s addressed by `Hash`. + /// [`VersionedAcceptedTransaction`]s addressed by `Hash` txs: DashMap, VersionedAcceptedTransaction>, + /// Amount of transactions per user in the queue + txs_per_user: DashMap, /// The maximum number of transactions in the queue max_txs: usize, + /// The maximum number of transactions in the queue per user. Used to apply throttling + max_txs_per_user: usize, /// Length of time after which transactions are dropped. pub tx_time_to_live: Duration, /// A point in time that is considered `Future` we cannot use @@ -65,6 +65,9 @@ pub enum Error { /// Transaction is already in blockchain #[error("Transaction is already applied")] InBlockchain, + /// User reached maximum number of transactions in the queue + #[error("User reached maximum number of trnasctions in the queue")] + MaximumTransactionsPerUser, /// Signature condition check failed #[error("Failure during signature condition execution, tx hash: {tx_hash}, reason: {reason}")] SignatureCondition { @@ -89,10 +92,10 @@ impl Queue { pub fn from_configuration(cfg: &Configuration) -> Self { Self { queue: ArrayQueue::new(cfg.max_transactions_in_queue as usize), - signature_buffer: ArrayQueue::new(cfg.max_transactions_in_signature_buffer as usize), txs: DashMap::new(), - max_txs: (cfg.max_transactions_in_queue + cfg.max_transactions_in_signature_buffer) - as usize, + txs_per_user: DashMap::new(), + max_txs: cfg.max_transactions_in_queue as usize, + max_txs_per_user: cfg.max_transactions_in_queue_per_user as usize, tx_time_to_live: Duration::from_millis(cfg.transaction_time_to_live_ms), future_threshold: Duration::from_millis(cfg.future_threshold_ms), } @@ -159,13 +162,10 @@ impl Queue { wsv: &WorldStateView, ) -> Result<(), Failure> { trace!(?tx, "Pushing to the queue"); - let signature_check_succeed = match self.check_tx(&tx, wsv) { - Err(err) => { - warn!("Failed to evaluate signature check"); - return Err(Failure { tx, err }); - } - Ok(MustUse(signature_check)) => signature_check, - }; + if let Err(err) = self.check_tx(&tx, wsv) { + warn!("Failed to evaluate signature check"); + return Err(Failure { tx, err }); + } // Get `txs_len` before entry to avoid deadlock let txs_len = self.txs.len(); @@ -194,42 +194,26 @@ impl Queue { }); } + if let Err(err) = self.check_and_increase_per_user_tx_count(&tx.payload().account_id) { + return Err(Failure { tx, err }); + } + // Insert entry first so that the `tx` popped from `queue` will always have a `(hash, tx)` record in `txs`. entry.insert(tx); - let queue_to_push = if signature_check_succeed { - &self.queue - } else { - info!("New multisignature transaction detected"); - &self.signature_buffer - }; - let res = queue_to_push.push(hash).map_err(|err_hash| { - warn!("Concrete sub-queue to push is full"); + self.queue.push(hash).map_err(|err_hash| { + warn!("Queue is full"); let (_, err_tx) = self .txs .remove(&err_hash) .expect("Inserted just before match"); + self.decrease_per_user_tx_count(&err_tx.payload().account_id); Failure { tx: err_tx, err: Error::Full, } - }); - trace!( - "Transaction queue length = {}, multisig transaction queue length = {}", - self.queue.len(), - self.signature_buffer.len() - ); - res - } - - /// Pop single transaction from the signature buffer. Record all visited and not removed transactions in `seen`. - fn pop_from_signature_buffer( - &self, - seen: &mut Vec>, - wsv: &WorldStateView, - expired_transactions: &mut Vec, - ) -> Option { - // NOTE: `SKIP_SIGNATURE_CHECK=false` because `signature_buffer` contains transaction which signature check can be either `true` or `false`. - self.pop_from::(&self.signature_buffer, seen, wsv, expired_transactions) + })?; + trace!("Transaction queue length = {}", self.queue.len(),); + Ok(()) } /// Pop single transaction from the queue. Record all visited and not removed transactions in `seen`. @@ -238,22 +222,9 @@ impl Queue { seen: &mut Vec>, wsv: &WorldStateView, expired_transactions: &mut Vec, - ) -> Option { - // NOTE: `SKIP_SIGNATURE_CHECK=true` because `queue` contains only transactions for which signature check is `true`. - self.pop_from::(&self.queue, seen, wsv, expired_transactions) - } - - /// Pop single transaction either from the queue or waiting buffer - #[inline] - fn pop_from( - &self, - queue: &ArrayQueue>, - seen: &mut Vec>, - wsv: &WorldStateView, - expired_transactions: &mut Vec, ) -> Option { loop { - let Some(hash) = queue.pop() else { + let Some(hash) = self.queue.pop() else { return None; }; let entry = match self.txs.entry(hash) { @@ -270,18 +241,19 @@ impl Queue { let tx = entry.get(); if tx.is_in_blockchain(wsv) { debug!("Transaction is already in blockchain"); - entry.remove_entry(); + let (_, tx) = entry.remove_entry(); + self.decrease_per_user_tx_count(&tx.payload().account_id); continue; } if tx.is_expired(self.tx_time_to_live) { debug!("Transaction is expired"); let (_, tx) = entry.remove_entry(); + self.decrease_per_user_tx_count(&tx.payload().account_id); expired_transactions.push(tx); continue; } seen.push(hash); - if SKIP_SIGNATURE_CHECK || *tx.check_signature_condition(wsv).unwrap_or(MustUse(false)) - { + if *tx.check_signature_condition(wsv).unwrap_or(MustUse(false)) { // Transactions are not removed from the queue until expired or committed return Some(entry.get().clone()); } @@ -322,46 +294,61 @@ impl Queue { } let mut seen_queue = Vec::new(); - let mut seen_waiting_buffer = Vec::new(); let mut expired_transactions_queue = Vec::new(); - let mut expired_transactions_waiting_buffer = Vec::new(); let txs_from_queue = core::iter::from_fn(|| { self.pop_from_queue(&mut seen_queue, wsv, &mut expired_transactions_queue) }); - let txs_from_waiting_buffer = core::iter::from_fn(|| { - self.pop_from_signature_buffer( - &mut seen_waiting_buffer, - wsv, - &mut expired_transactions_waiting_buffer, - ) - }); let transactions_hashes: HashSet> = transactions .iter() .map(VersionedAcceptedTransaction::hash) .collect(); let txs = txs_from_queue - .riffle(txs_from_waiting_buffer) .filter(|tx| !transactions_hashes.contains(&tx.hash())) .take(max_txs_in_block - transactions.len()); transactions.extend(txs); - [ - (seen_queue, &self.queue, expired_transactions_queue), - ( - seen_waiting_buffer, - &self.signature_buffer, - expired_transactions_waiting_buffer, - ), - ] - .into_iter() - .for_each(|(seen, queue, expired_txs)| { - seen.into_iter() - .try_for_each(|hash| queue.push(hash)) - .expect("Exceeded the number of transactions pending"); - expired_transactions.extend(expired_txs); - }) + seen_queue + .into_iter() + .try_for_each(|hash| self.queue.push(hash)) + .expect("Exceeded the number of transactions pending"); + expired_transactions.extend(expired_transactions_queue); + } + + /// Check that the user adhered to the maximum transaction per user limit and increment their transaction count. + fn check_and_increase_per_user_tx_count(&self, account_id: &AccountId) -> Result<(), Error> { + match self.txs_per_user.entry(account_id.clone()) { + Entry::Vacant(vacant) => { + vacant.insert(1); + } + Entry::Occupied(mut occupied) => { + let txs = *occupied.get(); + if txs >= self.max_txs_per_user { + warn!( + max_txs_per_user = self.max_txs_per_user, + %account_id, + "Account reached maximum allowed number of transactions in the queue per user" + ); + return Err(Error::MaximumTransactionsPerUser); + } + *occupied.get_mut() += 1; + } + } + + Ok(()) + } + + fn decrease_per_user_tx_count(&self, account_id: &AccountId) { + let Entry::Occupied(mut occupied) = self.txs_per_user + .entry(account_id.clone()) else { panic!("Call to decrease always should be paired with increase count. This is a bug.") }; + + let count = occupied.get_mut(); + if *count > 1 { + *count -= 1; + } else { + occupied.remove_entry(); + } } } @@ -479,166 +466,6 @@ mod tests { )); } - #[test] - fn push_tx_when_signature_buffer_is_full() { - let max_txs_in_waiting_buffer = 10; - - let alice_key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; - let bob_key_pair = KeyPair::generate().unwrap(); - let kura = Kura::blank_kura_for_testing(); - let wsv = { - let domain_id = DomainId::from_str("wonderland").expect("Valid"); - let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); - let mut domain = Domain::new(domain_id.clone()).build(&alice_id); - let bob_id = AccountId::from_str("bob@wonderland").expect("Valid"); - let mut alice = Account::new( - alice_id.clone(), - alice_key_pairs.iter().map(KeyPair::public_key).cloned(), - ) - .build(&alice_id); - alice.signature_check_condition = SignatureCheckCondition( - ContainsAll::new( - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(TRANSACTION_SIGNATORIES_VALUE) - .expect("TRANSACTION_SIGNATORIES_VALUE should be valid."), - )), - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(ACCOUNT_SIGNATORIES_VALUE) - .expect("ACCOUNT_SIGNATORIES_VALUE should be valid."), - )), - ) - .into(), - ); - let bob = - Account::new(bob_id.clone(), [bob_key_pair.public_key().clone()]).build(&bob_id); - assert!(domain.add_account(alice).is_none()); - assert!(domain.add_account(bob).is_none()); - Arc::new(WorldStateView::new( - World::with([domain], PeersIds::new()), - kura.clone(), - )) - }; - - let queue = Queue::from_configuration(&Configuration { - transaction_time_to_live_ms: 100_000, - max_transactions_in_signature_buffer: max_txs_in_waiting_buffer, - ..ConfigurationProxy::default() - .build() - .expect("Default queue config should always build") - }); - - // Fill waiting buffer with multisig transactions - for _ in 0..max_txs_in_waiting_buffer { - queue - .push( - accepted_tx("alice@wonderland", 100_000, alice_key_pairs[0].clone()), - &wsv, - ) - .expect("Failed to push tx into queue"); - thread::sleep(Duration::from_millis(10)); - } - - // Check that signature buffer is full - assert!(matches!( - queue.push( - accepted_tx("alice@wonderland", 100_000, alice_key_pairs[0].clone()), - &wsv - ), - Err(Failure { - err: Error::Full, - .. - }) - )); - - // Check that ordinary transactions can still be pushed into the queue - assert!(queue - .push( - accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), - &wsv, - ) - .is_ok()) - } - - #[test] - fn push_multisig_tx_when_queue_is_full() { - let max_txs_in_queue = 10; - - let alice_key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; - let bob_key_pair = KeyPair::generate().unwrap(); - let kura = Kura::blank_kura_for_testing(); - let wsv = { - let domain_id = DomainId::from_str("wonderland").expect("Valid"); - let alice_id = AccountId::from_str("alice@wonderland").expect("Valid"); - let mut domain = Domain::new(domain_id.clone()).build(&alice_id); - let bob_id = AccountId::from_str("bob@wonderland").expect("Valid"); - let mut alice = Account::new( - alice_id.clone(), - alice_key_pairs.iter().map(KeyPair::public_key).cloned(), - ) - .build(&alice_id); - alice.signature_check_condition = SignatureCheckCondition( - ContainsAll::new( - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(TRANSACTION_SIGNATORIES_VALUE) - .expect("TRANSACTION_SIGNATORIES_VALUE should be valid."), - )), - EvaluatesTo::new_unchecked(ContextValue::new( - Name::from_str(ACCOUNT_SIGNATORIES_VALUE) - .expect("ACCOUNT_SIGNATORIES_VALUE should be valid."), - )), - ) - .into(), - ); - let bob = - Account::new(bob_id.clone(), [bob_key_pair.public_key().clone()]).build(&bob_id); - assert!(domain.add_account(alice).is_none()); - assert!(domain.add_account(bob).is_none()); - Arc::new(WorldStateView::new( - World::with([domain], PeersIds::new()), - kura.clone(), - )) - }; - - let queue = Queue::from_configuration(&Configuration { - transaction_time_to_live_ms: 100_000, - max_transactions_in_queue: max_txs_in_queue, - ..ConfigurationProxy::default() - .build() - .expect("Default queue config should always build") - }); - - // Fill queue with ordinary transactions - for _ in 0..max_txs_in_queue { - queue - .push( - accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), - &wsv, - ) - .expect("Failed to push tx into queue"); - thread::sleep(Duration::from_millis(10)); - } - - // Check that queue is full - assert!(matches!( - queue.push( - accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), - &wsv - ), - Err(Failure { - err: Error::Full, - .. - }) - )); - - // Check that multisig transactions can still be pushed into the queue - assert!(queue - .push( - accepted_tx("alice@wonderland", 100_000, alice_key_pairs[0].clone()), - &wsv, - ) - .is_ok()) - } - #[test] fn push_tx_signature_condition_failure() { let max_txs_in_queue = 10; @@ -987,6 +814,10 @@ mod tests { Err(Failure { err: Error::Full, .. }) => (), + Err(Failure { + err: Error::MaximumTransactionsPerUser, + .. + }) => (), Err(Failure { err, .. }) => panic!("{err}"), } } @@ -1054,4 +885,96 @@ mod tests { )); assert_eq!(queue.txs.len(), 1); } + + #[test] + fn queue_throttling() { + let alice_key_pair = KeyPair::generate().unwrap(); + let bob_key_pair = KeyPair::generate().unwrap(); + let kura = Kura::blank_kura_for_testing(); + let world = { + let domain_id = DomainId::from_str("wonderland").expect("Valid"); + let alice_account_id = AccountId::from_str("alice@wonderland").expect("Valid"); + let bob_account_id = AccountId::from_str("bob@wonderland").expect("Valid"); + let mut domain = Domain::new(domain_id).build(&alice_account_id); + let alice_account = Account::new( + alice_account_id.clone(), + [alice_key_pair.public_key().clone()], + ) + .build(&alice_account_id); + let bob_account = + Account::new(bob_account_id.clone(), [bob_key_pair.public_key().clone()]) + .build(&bob_account_id); + assert!(domain.add_account(alice_account).is_none()); + assert!(domain.add_account(bob_account).is_none()); + World::with([domain], PeersIds::new()) + }; + let mut wsv = WorldStateView::new(world, kura.clone()); + + let queue = Queue::from_configuration(&Configuration { + transaction_time_to_live_ms: 100_000, + max_transactions_in_queue: 100, + max_transactions_in_queue_per_user: 1, + ..ConfigurationProxy::default() + .build() + .expect("Default queue config should always build") + }); + + // First push by Alice should be fine + queue + .push( + accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + + // Second push by Alice excide limit and will be rejected + let result = queue.push( + accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()), + &wsv, + ); + assert!( + matches!( + result, + Err(Failure { + tx: _, + err: Error::MaximumTransactionsPerUser + }), + ), + "Failed to match: {:?}", + result, + ); + + // First push by Bob should be fine despite previous Alice error + queue + .push( + accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + + let transactions = queue.collect_transactions_for_block(&wsv, 10); + assert_eq!(transactions.len(), 2); + for transaction in transactions { + // Put transaction hashes into wsv as if they were in the blockchain + wsv.transactions.insert(transaction.hash()); + } + // Cleanup transactions + let transactions = queue.collect_transactions_for_block(&wsv, 10); + assert!(transactions.is_empty()); + + // After cleanup Alice and Bob pushes should work fine + queue + .push( + accepted_tx("alice@wonderland", 100_000, alice_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + + queue + .push( + accepted_tx("bob@wonderland", 100_000, bob_key_pair.clone()), + &wsv, + ) + .expect("Failed to push tx into queue"); + } } diff --git a/docs/source/references/config.md b/docs/source/references/config.md index c130363d5ba..4b781ccfbde 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -65,7 +65,7 @@ The following is the default configuration used by Iroha. }, "QUEUE": { "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000, "FUTURE_THRESHOLD_MS": 1000 }, @@ -394,7 +394,7 @@ Has type `Option`[^1]. Can be configured via environm { "FUTURE_THRESHOLD_MS": 1000, "MAX_TRANSACTIONS_IN_QUEUE": 65536, - "MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER": 65536, + "MAX_TRANSACTIONS_IN_QUEUE_PER_USER": 65536, "TRANSACTION_TIME_TO_LIVE_MS": 86400000 } ``` @@ -419,11 +419,11 @@ Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MA 65536 ``` -### `queue.max_transactions_in_signature_buffer` +### `queue.max_transactions_in_queue_per_user` -The upper limit of the number of transactions waiting for more signatures. +The upper limit of the number of transactions waiting in the queue for single user. -Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MAX_TRANSACTIONS_IN_SIGNATURE_BUFFER` +Has type `Option`[^1]. Can be configured via environment variable `QUEUE_MAX_TRANSACTIONS_IN_QUEUE_PER_USER` ```json 65536