From 7898114a5545765903108819c66153c95d33894d Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Thu, 22 Dec 2022 10:50:01 +0300 Subject: [PATCH] [fix] #2963: `Queue` remove transactions correctly Signed-off-by: Shanin Roman --- core/src/queue.rs | 235 +++++++++++++++++---------------- core/src/sumeragi/main_loop.rs | 9 +- 2 files changed, 125 insertions(+), 119 deletions(-) diff --git a/core/src/queue.rs b/core/src/queue.rs index 36a4bf4ee47..85beabed29c 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -8,6 +8,7 @@ )] use core::time::Duration; +use std::collections::HashSet; use crossbeam_queue::ArrayQueue; use dashmap::{mapref::entry::Entry, DashMap}; @@ -107,6 +108,7 @@ impl Queue { ) } + // FIXME: Currently it is impossible to distinguish when signature check condition failed or there is just not enough signatures (#2595). fn check_tx( &self, tx: &VersionedAcceptedTransaction, @@ -166,17 +168,16 @@ impl Queue { Entry::Vacant(entry) => entry, }; + // Reason for such insertion order is to avoid situation + // when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs` entry.insert(tx); - - if let Err(err_hash) = self.queue.push(hash) { + self.queue.push(hash).map_err(|err_hash| { let (_, err_tx) = self .txs .remove(&err_hash) .expect("Inserted just before match"); - Err((err_tx, Error::Full)) - } else { - Ok(()) - } + (err_tx, Error::Full) + }) } } @@ -206,50 +207,21 @@ impl Queue { continue; } - seen.push(hash); if *entry .get() .check_signature_condition(wsv) .expect("Checked in `check_tx` just above") { - return Some(entry.get().clone()); - } - } - } - - /// Pop a single transaction. - /// - /// Unlike [`Self::pop`], unsigned transactions are not recorded. - #[allow( - clippy::expect_used, - clippy::unwrap_in_result, - clippy::cognitive_complexity - )] - pub fn pop_without_seen(&self, wsv: &WorldStateView) -> Option { - loop { - let hash = self.queue.pop()?; - let entry = match self.txs.entry(hash) { - Entry::Occupied(entry) => entry, - // As practice shows this code is not `unreachable!()`. - // When transactions are submitted quickly it can be reached. - Entry::Vacant(_) => continue, - }; - if self.check_tx(entry.get(), wsv).is_err() { - entry.remove_entry(); - continue; - } - - if *entry - .get() - .check_signature_condition(wsv) - .expect("Checked in `check_tx` just above") - { - return Some(entry.get().clone()); + let (_, tx) = entry.remove_entry(); + return Some(tx); + } else { + // FIXME: this brunch is currently unreachable (#2595). + seen.push(hash); } } } - /// Return the number of transactions in the queue + /// Return the number of transactions in the queue. pub fn tx_len(&self) -> usize { self.txs.len() } @@ -257,24 +229,45 @@ impl Queue { /// Gets transactions till they fill whole block or till the end of queue. /// /// BEWARE: Shouldn't be called in parallel with itself. - #[allow(clippy::missing_panics_doc, clippy::unwrap_in_result)] pub fn get_transactions_for_block( &self, wsv: &WorldStateView, ) -> Vec { + let mut transactions = Vec::with_capacity(self.txs_in_block); + self.put_transactions_for_block_into(wsv, &mut transactions); + transactions + } + + /// Put transaction into provided vector till they fill whole block or till the end of queue. + /// + /// BEWARE: Shouldn't be called in parallel with itself. + pub fn put_transactions_for_block_into( + &self, + wsv: &WorldStateView, + transactions: &mut Vec, + ) { + if transactions.len() >= self.txs_in_block { + return; + } + let mut seen = Vec::new(); + let mut transactions_hashes: HashSet> = transactions + .iter() + .map(VersionedAcceptedTransaction::hash) + .collect(); let out = std::iter::repeat_with(|| self.pop(&mut seen, wsv)) - .take_while(Option::is_some) - .map(Option::unwrap) - .take(self.txs_in_block) - .collect::>(); + .map_while(core::convert::identity) + // Normally we would not expect to receive transaction duplicate from the queue and `transaction_hashes.contains(tx.hash())` would be enough + // However there might be situation when we poped transaction from the queue and just after that we received its duplicate + .filter(|tx| transactions_hashes.insert(tx.hash())) + .take(self.txs_in_block - transactions.len()); + transactions.extend(out); #[allow(clippy::expect_used)] seen.into_iter() .try_for_each(|hash| self.queue.push(hash)) .expect("As we never exceed the number of transactions pending"); - out } } @@ -285,7 +278,11 @@ mod tests { use std::{str::FromStr, sync::Arc, thread, time::Duration}; use iroha_config::{base::proxy::Builder, queue::ConfigurationProxy}; - use iroha_data_model::prelude::*; + use iroha_data_model::{ + account::{ACCOUNT_SIGNATORIES_VALUE, TRANSACTION_SIGNATORIES_VALUE}, + prelude::*, + }; + use iroha_primitives::must_use::MustUse; use rand::Rng as _; use super::*; @@ -421,18 +418,44 @@ mod tests { } #[test] + #[ignore = "Multisignature is not working for now. See #2595"] fn push_multisignature_tx() { let key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()]; let kura = Kura::blank_kura_for_testing(); - let wsv = Arc::new(WorldStateView::new( - world_with_test_domains( - key_pairs - .iter() - .map(|key_pair| key_pair.public_key()) - .cloned(), - ), - kura.clone(), - )); + let wsv = { + let domain_id = DomainId::from_str("wonderland").expect("Valid"); + let mut domain = Domain::new(domain_id.clone()).build(); + let account_id = AccountId::from_str("alice@wonderland").expect("Valid"); + let mut account = Account::new( + account_id, + key_pairs.iter().map(KeyPair::public_key).cloned(), + ) + .build(); + account.set_signature_check_condition(SignatureCheckCondition( + ContainsAll::new( + EvaluatesTo::new_unchecked( + ContextValue::new( + Name::from_str(TRANSACTION_SIGNATORIES_VALUE) + .expect("TRANSACTION_SIGNATORIES_VALUE should be valid."), + ) + .into(), + ), + EvaluatesTo::new_unchecked( + ContextValue::new( + Name::from_str(ACCOUNT_SIGNATORIES_VALUE) + .expect("ACCOUNT_SIGNATORIES_VALUE should be valid."), + ) + .into(), + ), + ) + .into(), + )); + assert!(domain.add_account(account).is_none()); + Arc::new(WorldStateView::new( + World::with([domain], PeersIds::new()), + kura.clone(), + )) + }; let queue = Queue::from_configuration(&Configuration { maximum_transactions_in_block: 2, @@ -447,31 +470,57 @@ mod tests { Vec::::new().into(), 100_000, ); + let tx_limits = TransactionLimits { + max_instruction_number: 4096, + max_wasm_size_bytes: 0, + }; + let fully_signed_tx = { + let mut signed_tx = tx + .clone() + .sign((&key_pairs[0]).clone()) + .expect("Failed to sign."); + for key_pair in &key_pairs[1..] { + signed_tx = signed_tx.sign(key_pair.clone()).expect("Failed to sign"); + } + VersionedAcceptedTransaction::from_transaction(signed_tx, &tx_limits) + .expect("Failed to accept Transaction.") + }; + // Check that fully signed transaction pass signature check + assert!(matches!( + fully_signed_tx.check_signature_condition(&wsv), + Ok(MustUse(true)) + )); + let get_tx = |key_pair| { - let tx_limits = TransactionLimits { - max_instruction_number: 4096, - max_wasm_size_bytes: 0, - }; VersionedAcceptedTransaction::from_transaction( tx.clone().sign(key_pair).expect("Failed to sign."), &tx_limits, ) .expect("Failed to accept Transaction.") }; - for key_pair in key_pairs { - queue.push(get_tx(key_pair), &wsv).unwrap(); + let partially_signed_tx = get_tx(key_pair); + // Check that non of partially signed pass signature check + assert!(matches!( + partially_signed_tx.check_signature_condition(&wsv), + Ok(MustUse(false)) + )); + queue + .push(partially_signed_tx, &wsv) + .expect("Should be possible to put partially signed transaction into the queue"); } - assert_eq!(queue.queue.len(), 1); - let signature_count = queue - .txs - .get(&queue.queue.pop().unwrap()) - .unwrap() - .as_v1() - .signatures - .len(); - assert_eq!(signature_count, 2); + // Check that transactions combined into one instead of duplicating + assert_eq!(queue.tx_len(), 1); + + let mut available = queue.get_transactions_for_block(&wsv); + assert_eq!(available.len(), 1); + let tx_from_queue = available.pop().expect("Checked that have one transactions"); + // Check that transaction from queue pass signature check + assert!(matches!( + tx_from_queue.check_signature_condition(&wsv), + Ok(MustUse(true)) + )); } #[test] @@ -598,42 +647,6 @@ mod tests { assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0); } - // Queue should only drop transactions which are already committed or ttl expired. - // Others should stay in the queue until that moment. - #[test] - fn transactions_available_after_pop() { - let alice_key = KeyPair::generate().expect("Failed to generate keypair."); - let kura = Kura::blank_kura_for_testing(); - let wsv = Arc::new(WorldStateView::new( - world_with_test_domains([alice_key.public_key().clone()]), - kura.clone(), - )); - let queue = Queue::from_configuration(&Configuration { - maximum_transactions_in_block: 2, - transaction_time_to_live_ms: 100_000, - maximum_transactions_in_queue: 100, - ..ConfigurationProxy::default() - .build() - .expect("Default queue config should always build") - }); - queue - .push(accepted_tx("alice@wonderland", 100_000, alice_key), &wsv) - .expect("Failed to push tx into queue"); - - let a = queue - .get_transactions_for_block(&wsv) - .into_iter() - .map(|tx| tx.hash()) - .collect::>(); - let b = queue - .get_transactions_for_block(&wsv) - .into_iter() - .map(|tx| tx.hash()) - .collect::>(); - assert_eq!(a.len(), 1); - assert_eq!(a, b); - } - #[test] fn concurrent_stress_test() { let max_block_tx = 10; @@ -692,13 +705,9 @@ mod tests { push_txs_handle.join().unwrap(); get_txs_handle.join().unwrap(); - // Last update for queue to drop invalid txs. - let _unused = queue.get_transactions_for_block(&wsv); - // Validate the queue state. let array_queue: Vec<_> = core::iter::repeat_with(|| queue.queue.pop()) - .take_while(Option::is_some) - .map(Option::unwrap) + .map_while(core::convert::identity) .collect(); assert_eq!(array_queue.len(), queue.txs.len()); diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index b2b380068da..fda507504e3 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -643,12 +643,9 @@ pub fn run( .retain(|tx| !tx.is_expired(sumeragi.queue.tx_time_to_live)); // Pull in new transactions into the cache. - while state.transaction_cache.len() < sumeragi.queue.txs_in_block { - match sumeragi.queue.pop_without_seen(&state.wsv) { - Some(tx) => state.transaction_cache.push(tx), - None => break, - } - } + sumeragi + .queue + .put_transactions_for_block_into(&state.wsv, &mut state.transaction_cache); }; gossip_transactions(