Skip to content

Commit

Permalink
[fix] #2963: Queue remove transactions correctly
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Dec 28, 2022
1 parent c567735 commit 868a691
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 92 deletions.
211 changes: 125 additions & 86 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)]

use core::time::Duration;
use std::collections::HashSet;

use crossbeam_queue::ArrayQueue;
use dashmap::{mapref::entry::Entry, DashMap};
Expand Down Expand Up @@ -107,6 +108,7 @@ impl Queue {
)
}

// FIXME: Currently it is impossible to distinguish if signature check condition failed or if there is just not enough signatures (#2595).
fn check_tx(
&self,
tx: &VersionedAcceptedTransaction,
Expand Down Expand Up @@ -166,17 +168,16 @@ impl Queue {
Entry::Vacant(entry) => entry,
};

// Reason for such insertion order is to avoid situation
// when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs`
entry.insert(tx);

if let Err(err_hash) = self.queue.push(hash) {
self.queue.push(hash).map_err(|err_hash| {
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
Err((err_tx, Error::Full))
} else {
Ok(())
}
(err_tx, Error::Full)
})
}
}

Expand Down Expand Up @@ -206,6 +207,7 @@ impl Queue {
continue;
}

// Transactions are not removed from the queue until expired or committed
seen.push(hash);
if *entry
.get()
Expand All @@ -217,64 +219,51 @@ impl Queue {
}
}

/// Pop a single transaction.
///
/// Unlike [`Self::pop`], unsigned transactions are not recorded.
#[allow(
clippy::expect_used,
clippy::unwrap_in_result,
clippy::cognitive_complexity
)]
pub fn pop_without_seen(&self, wsv: &WorldStateView) -> Option<VersionedAcceptedTransaction> {
loop {
let hash = self.queue.pop()?;
let entry = match self.txs.entry(hash) {
Entry::Occupied(entry) => entry,
// As practice shows this code is not `unreachable!()`.
// When transactions are submitted quickly it can be reached.
Entry::Vacant(_) => continue,
};
if self.check_tx(entry.get(), wsv).is_err() {
entry.remove_entry();
continue;
}

if *entry
.get()
.check_signature_condition(wsv)
.expect("Checked in `check_tx` just above")
{
return Some(entry.get().clone());
}
}
}

/// Return the number of transactions in the queue
/// Return the number of transactions in the queue.
pub fn tx_len(&self) -> usize {
self.txs.len()
}

/// Gets transactions till they fill whole block or till the end of queue.
///
/// BEWARE: Shouldn't be called in parallel with itself.
#[allow(clippy::missing_panics_doc, clippy::unwrap_in_result)]
pub fn get_transactions_for_block(
#[cfg(test)]
fn collect_transactions_for_block(
&self,
wsv: &WorldStateView,
) -> Vec<VersionedAcceptedTransaction> {
let mut transactions = Vec::with_capacity(self.txs_in_block);
self.put_transactions_for_block_into(wsv, &mut transactions);
transactions
}

/// Put transactions into provided vector until they fill the whole block or there are no more transactions in the queue.
///
/// BEWARE: Shouldn't be called in parallel with itself.
pub fn put_transactions_for_block_into(
&self,
wsv: &WorldStateView,
transactions: &mut Vec<VersionedAcceptedTransaction>,
) {
if transactions.len() >= self.txs_in_block {
return;
}

let mut seen = Vec::new();

let out = std::iter::repeat_with(|| self.pop(&mut seen, wsv))
.take_while(Option::is_some)
.map(Option::unwrap)
.take(self.txs_in_block)
.collect::<Vec<_>>();
let transactions_hashes: HashSet<HashOf<VersionedSignedTransaction>> = transactions
.iter()
.map(VersionedAcceptedTransaction::hash)
.collect();
let out = std::iter::from_fn(|| self.pop(&mut seen, wsv))
.filter(|tx| !transactions_hashes.contains(&tx.hash()))
.take(self.txs_in_block - transactions.len());
transactions.extend(out);

#[allow(clippy::expect_used)]
seen.into_iter()
.try_for_each(|hash| self.queue.push(hash))
.expect("As we never exceed the number of transactions pending");
out
}
}

Expand All @@ -285,7 +274,11 @@ mod tests {
use std::{str::FromStr, sync::Arc, thread, time::Duration};

use iroha_config::{base::proxy::Builder, queue::ConfigurationProxy};
use iroha_data_model::prelude::*;
use iroha_data_model::{
account::{ACCOUNT_SIGNATORIES_VALUE, TRANSACTION_SIGNATORIES_VALUE},
prelude::*,
};
use iroha_primitives::must_use::MustUse;
use rand::Rng as _;

use super::*;
Expand Down Expand Up @@ -421,18 +414,44 @@ mod tests {
}

#[test]
#[ignore = "Multisignature is not working for now. See #2595"]
fn push_multisignature_tx() {
let key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()];
let kura = Kura::blank_kura_for_testing();
let wsv = Arc::new(WorldStateView::new(
world_with_test_domains(
key_pairs
.iter()
.map(|key_pair| key_pair.public_key())
.cloned(),
),
kura.clone(),
));
let wsv = {
let domain_id = DomainId::from_str("wonderland").expect("Valid");
let mut domain = Domain::new(domain_id.clone()).build();
let account_id = AccountId::from_str("alice@wonderland").expect("Valid");
let mut account = Account::new(
account_id,
key_pairs.iter().map(KeyPair::public_key).cloned(),
)
.build();
account.set_signature_check_condition(SignatureCheckCondition(
ContainsAll::new(
EvaluatesTo::new_unchecked(
ContextValue::new(
Name::from_str(TRANSACTION_SIGNATORIES_VALUE)
.expect("TRANSACTION_SIGNATORIES_VALUE should be valid."),
)
.into(),
),
EvaluatesTo::new_unchecked(
ContextValue::new(
Name::from_str(ACCOUNT_SIGNATORIES_VALUE)
.expect("ACCOUNT_SIGNATORIES_VALUE should be valid."),
)
.into(),
),
)
.into(),
));
assert!(domain.add_account(account).is_none());
Arc::new(WorldStateView::new(
World::with([domain], PeersIds::new()),
kura.clone(),
))
};

let queue = Queue::from_configuration(&Configuration {
maximum_transactions_in_block: 2,
Expand All @@ -447,31 +466,57 @@ mod tests {
Vec::<Instruction>::new().into(),
100_000,
);
let tx_limits = TransactionLimits {
max_instruction_number: 4096,
max_wasm_size_bytes: 0,
};
let fully_signed_tx = {
let mut signed_tx = tx
.clone()
.sign((&key_pairs[0]).clone())
.expect("Failed to sign.");
for key_pair in &key_pairs[1..] {
signed_tx = signed_tx.sign(key_pair.clone()).expect("Failed to sign");
}
VersionedAcceptedTransaction::from_transaction(signed_tx, &tx_limits)
.expect("Failed to accept Transaction.")
};
// Check that fully signed transaction pass signature check
assert!(matches!(
fully_signed_tx.check_signature_condition(&wsv),
Ok(MustUse(true))
));

let get_tx = |key_pair| {
let tx_limits = TransactionLimits {
max_instruction_number: 4096,
max_wasm_size_bytes: 0,
};
VersionedAcceptedTransaction::from_transaction(
tx.clone().sign(key_pair).expect("Failed to sign."),
&tx_limits,
)
.expect("Failed to accept Transaction.")
};

for key_pair in key_pairs {
queue.push(get_tx(key_pair), &wsv).unwrap();
let partially_signed_tx = get_tx(key_pair);
// Check that non of partially signed pass signature check
assert!(matches!(
partially_signed_tx.check_signature_condition(&wsv),
Ok(MustUse(false))
));
queue
.push(partially_signed_tx, &wsv)
.expect("Should be possible to put partially signed transaction into the queue");
}

assert_eq!(queue.queue.len(), 1);
let signature_count = queue
.txs
.get(&queue.queue.pop().unwrap())
.unwrap()
.as_v1()
.signatures
.len();
assert_eq!(signature_count, 2);
// Check that transactions combined into one instead of duplicating
assert_eq!(queue.tx_len(), 1);

let mut available = queue.collect_transactions_for_block(&wsv);
assert_eq!(available.len(), 1);
let tx_from_queue = available.pop().expect("Checked that have one transactions");
// Check that transaction from queue pass signature check
assert!(matches!(
tx_from_queue.check_signature_condition(&wsv),
Ok(MustUse(true))
));
}

#[test]
Expand Down Expand Up @@ -501,7 +546,7 @@ mod tests {
thread::sleep(Duration::from_millis(10));
}

let available = queue.get_transactions_for_block(&wsv);
let available = queue.collect_transactions_for_block(&wsv);
assert_eq!(available.len(), max_block_tx as usize);
}

Expand Down Expand Up @@ -551,7 +596,7 @@ mod tests {
});
queue.push(tx.clone(), &wsv).unwrap();
wsv.transactions.insert(tx.hash());
assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0);
assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 0);
assert_eq!(queue.txs.len(), 0);
}

Expand Down Expand Up @@ -589,13 +634,13 @@ mod tests {
)
.expect("Failed to push tx into queue");
std::thread::sleep(Duration::from_millis(101));
assert_eq!(queue.get_transactions_for_block(&wsv).len(), 1);
assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 1);

queue
.push(accepted_tx("alice@wonderland", 300, alice_key), &wsv)
.expect("Failed to push tx into queue");
std::thread::sleep(Duration::from_millis(210));
assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0);
assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 0);
}

// Queue should only drop transactions which are already committed or ttl expired.
Expand All @@ -621,12 +666,12 @@ mod tests {
.expect("Failed to push tx into queue");

let a = queue
.get_transactions_for_block(&wsv)
.collect_transactions_for_block(&wsv)
.into_iter()
.map(|tx| tx.hash())
.collect::<Vec<_>>();
let b = queue
.get_transactions_for_block(&wsv)
.collect_transactions_for_block(&wsv)
.into_iter()
.map(|tx| tx.hash())
.collect::<Vec<_>>();
Expand Down Expand Up @@ -680,7 +725,7 @@ mod tests {

thread::spawn(move || {
while start_time.elapsed() < run_for {
for tx in queue_arc_clone.get_transactions_for_block(&wsv_clone) {
for tx in queue_arc_clone.collect_transactions_for_block(&wsv_clone) {
wsv_clone.transactions.insert(tx.hash());
}
// Simulate random small delays
Expand All @@ -692,14 +737,8 @@ mod tests {
push_txs_handle.join().unwrap();
get_txs_handle.join().unwrap();

// Last update for queue to drop invalid txs.
let _unused = queue.get_transactions_for_block(&wsv);

// Validate the queue state.
let array_queue: Vec<_> = core::iter::repeat_with(|| queue.queue.pop())
.take_while(Option::is_some)
.map(Option::unwrap)
.collect();
let array_queue: Vec<_> = core::iter::from_fn(|| queue.queue.pop()).collect();

assert_eq!(array_queue.len(), queue.txs.len());
for tx in array_queue {
Expand Down
9 changes: 3 additions & 6 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,9 @@ pub fn run<F>(
.retain(|tx| !tx.is_expired(sumeragi.queue.tx_time_to_live));

// Pull in new transactions into the cache.
while state.transaction_cache.len() < sumeragi.queue.txs_in_block {
match sumeragi.queue.pop_without_seen(&state.wsv) {
Some(tx) => state.transaction_cache.push(tx),
None => break,
}
}
sumeragi
.queue
.put_transactions_for_block_into(&state.wsv, &mut state.transaction_cache);
};

gossip_transactions(
Expand Down

0 comments on commit 868a691

Please sign in to comment.