Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] #2963: Queue remove transactions correctly #3035

Merged
merged 2 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 127 additions & 91 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)]

use core::time::Duration;
use std::collections::HashSet;

use crossbeam_queue::ArrayQueue;
use dashmap::{mapref::entry::Entry, DashMap};
Expand All @@ -29,12 +30,9 @@ pub struct Queue {
queue: ArrayQueue<HashOf<VersionedSignedTransaction>>,
/// [`VersionedAcceptedTransaction`]s addressed by `Hash`.
txs: DashMap<HashOf<VersionedSignedTransaction>, VersionedAcceptedTransaction>,
/// Length of [`DashMap`].
///
/// [`DashMap`] right now just iterates over itself and calculates its length like this:
/// self.txs.iter().len()
/// The maximum number of transactions in the block
pub txs_in_block: usize,
/// The maximum number of transactions
/// The maximum number of transactions in the queue
max_txs: usize,
/// Length of time after which transactions are dropped.
pub tx_time_to_live: Duration,
Expand Down Expand Up @@ -110,6 +108,7 @@ impl Queue {
)
}

// FIXME: Currently it is impossible to distinguish if signature check condition failed or if there is just not enough signatures (#2595).
fn check_tx(
&self,
tx: &VersionedAcceptedTransaction,
Expand Down Expand Up @@ -169,17 +168,16 @@ impl Queue {
Entry::Vacant(entry) => entry,
};

// Reason for such insertion order is to avoid situation
// when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs`
entry.insert(tx);

if let Err(err_hash) = self.queue.push(hash) {
self.queue.push(hash).map_err(|err_hash| {
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
Err((err_tx, Error::Full))
} else {
Ok(())
}
(err_tx, Error::Full)
})
}
}

Expand Down Expand Up @@ -209,6 +207,7 @@ impl Queue {
continue;
}

// Transactions are not removed from the queue until expired or committed
seen.push(hash);
if *entry
.get()
Expand All @@ -220,64 +219,51 @@ impl Queue {
}
}

/// Pop a single transaction.
///
/// Unlike [`Self::pop`], unsigned transactions are not recorded.
#[allow(
clippy::expect_used,
clippy::unwrap_in_result,
clippy::cognitive_complexity
)]
pub fn pop_without_seen(&self, wsv: &WorldStateView) -> Option<VersionedAcceptedTransaction> {
Erigara marked this conversation as resolved.
Show resolved Hide resolved
loop {
let hash = self.queue.pop()?;
let entry = match self.txs.entry(hash) {
Entry::Occupied(entry) => entry,
// As practice shows this code is not `unreachable!()`.
// When transactions are submitted quickly it can be reached.
Entry::Vacant(_) => continue,
};
if self.check_tx(entry.get(), wsv).is_err() {
entry.remove_entry();
continue;
}

if *entry
.get()
.check_signature_condition(wsv)
.expect("Checked in `check_tx` just above")
{
return Some(entry.get().clone());
}
}
}

/// Return the number of transactions in the queue
/// Return the number of transactions in the queue.
pub fn tx_len(&self) -> usize {
self.txs.len()
}

/// Gets transactions till they fill whole block or till the end of queue.
///
/// BEWARE: Shouldn't be called in parallel with itself.
#[allow(clippy::missing_panics_doc, clippy::unwrap_in_result)]
pub fn get_transactions_for_block(
#[cfg(test)]
fn collect_transactions_for_block(
&self,
wsv: &WorldStateView,
) -> Vec<VersionedAcceptedTransaction> {
let mut transactions = Vec::with_capacity(self.txs_in_block);
self.get_transactions_for_block(wsv, &mut transactions);
transactions
}

/// Put transactions into provided vector until they fill the whole block or there are no more transactions in the queue.
///
/// BEWARE: Shouldn't be called in parallel with itself.
Arjentix marked this conversation as resolved.
Show resolved Hide resolved
pub fn get_transactions_for_block(
&self,
wsv: &WorldStateView,
transactions: &mut Vec<VersionedAcceptedTransaction>,
) {
if transactions.len() >= self.txs_in_block {
return;
}

let mut seen = Vec::new();

let out = std::iter::repeat_with(|| self.pop(&mut seen, wsv))
.take_while(Option::is_some)
.map(Option::unwrap)
.take(self.txs_in_block)
.collect::<Vec<_>>();
let transactions_hashes: HashSet<HashOf<VersionedSignedTransaction>> = transactions
.iter()
.map(VersionedAcceptedTransaction::hash)
.collect();
let out = std::iter::from_fn(|| self.pop(&mut seen, wsv))
.filter(|tx| !transactions_hashes.contains(&tx.hash()))
Erigara marked this conversation as resolved.
Show resolved Hide resolved
.take(self.txs_in_block - transactions.len());
transactions.extend(out);

#[allow(clippy::expect_used)]
seen.into_iter()
.try_for_each(|hash| self.queue.push(hash))
.expect("As we never exceed the number of transactions pending");
QuentinI marked this conversation as resolved.
Show resolved Hide resolved
out
}
}

Expand All @@ -288,7 +274,11 @@ mod tests {
use std::{str::FromStr, sync::Arc, thread, time::Duration};

use iroha_config::{base::proxy::Builder, queue::ConfigurationProxy};
use iroha_data_model::prelude::*;
use iroha_data_model::{
account::{ACCOUNT_SIGNATORIES_VALUE, TRANSACTION_SIGNATORIES_VALUE},
prelude::*,
};
use iroha_primitives::must_use::MustUse;
use rand::Rng as _;

use super::*;
Expand Down Expand Up @@ -424,18 +414,44 @@ mod tests {
}

#[test]
#[ignore = "Multisignature is not working for now. See #2595"]
Erigara marked this conversation as resolved.
Show resolved Hide resolved
fn push_multisignature_tx() {
Erigara marked this conversation as resolved.
Show resolved Hide resolved
let key_pairs = [KeyPair::generate().unwrap(), KeyPair::generate().unwrap()];
let kura = Kura::blank_kura_for_testing();
let wsv = Arc::new(WorldStateView::new(
world_with_test_domains(
key_pairs
.iter()
.map(|key_pair| key_pair.public_key())
.cloned(),
),
kura.clone(),
));
let wsv = {
let domain_id = DomainId::from_str("wonderland").expect("Valid");
let mut domain = Domain::new(domain_id.clone()).build();
let account_id = AccountId::from_str("alice@wonderland").expect("Valid");
let mut account = Account::new(
account_id,
key_pairs.iter().map(KeyPair::public_key).cloned(),
)
.build();
account.set_signature_check_condition(SignatureCheckCondition(
ContainsAll::new(
EvaluatesTo::new_unchecked(
ContextValue::new(
Name::from_str(TRANSACTION_SIGNATORIES_VALUE)
.expect("TRANSACTION_SIGNATORIES_VALUE should be valid."),
)
.into(),
),
EvaluatesTo::new_unchecked(
ContextValue::new(
Name::from_str(ACCOUNT_SIGNATORIES_VALUE)
.expect("ACCOUNT_SIGNATORIES_VALUE should be valid."),
)
.into(),
),
)
.into(),
));
assert!(domain.add_account(account).is_none());
Arc::new(WorldStateView::new(
World::with([domain], PeersIds::new()),
kura.clone(),
))
};

let queue = Queue::from_configuration(&Configuration {
maximum_transactions_in_block: 2,
Expand All @@ -450,31 +466,57 @@ mod tests {
Vec::<Instruction>::new().into(),
100_000,
);
let tx_limits = TransactionLimits {
max_instruction_number: 4096,
max_wasm_size_bytes: 0,
};
let fully_signed_tx = {
let mut signed_tx = tx
.clone()
.sign((&key_pairs[0]).clone())
.expect("Failed to sign.");
for key_pair in &key_pairs[1..] {
signed_tx = signed_tx.sign(key_pair.clone()).expect("Failed to sign");
}
VersionedAcceptedTransaction::from_transaction(signed_tx, &tx_limits)
.expect("Failed to accept Transaction.")
};
// Check that fully signed transaction pass signature check
assert!(matches!(
fully_signed_tx.check_signature_condition(&wsv),
Ok(MustUse(true))
));

let get_tx = |key_pair| {
let tx_limits = TransactionLimits {
max_instruction_number: 4096,
max_wasm_size_bytes: 0,
};
VersionedAcceptedTransaction::from_transaction(
tx.clone().sign(key_pair).expect("Failed to sign."),
&tx_limits,
)
.expect("Failed to accept Transaction.")
};

for key_pair in key_pairs {
queue.push(get_tx(key_pair), &wsv).unwrap();
let partially_signed_tx = get_tx(key_pair);
// Check that non of partially signed pass signature check
assert!(matches!(
partially_signed_tx.check_signature_condition(&wsv),
Ok(MustUse(false))
));
queue
.push(partially_signed_tx, &wsv)
.expect("Should be possible to put partially signed transaction into the queue");
}

assert_eq!(queue.queue.len(), 1);
let signature_count = queue
.txs
.get(&queue.queue.pop().unwrap())
.unwrap()
.as_v1()
.signatures
.len();
assert_eq!(signature_count, 2);
// Check that transactions combined into one instead of duplicating
assert_eq!(queue.tx_len(), 1);

let mut available = queue.collect_transactions_for_block(&wsv);
assert_eq!(available.len(), 1);
let tx_from_queue = available.pop().expect("Checked that have one transactions");
// Check that transaction from queue pass signature check
assert!(matches!(
tx_from_queue.check_signature_condition(&wsv),
Ok(MustUse(true))
));
}

#[test]
Expand Down Expand Up @@ -504,7 +546,7 @@ mod tests {
thread::sleep(Duration::from_millis(10));
}

let available = queue.get_transactions_for_block(&wsv);
let available = queue.collect_transactions_for_block(&wsv);
assert_eq!(available.len(), max_block_tx as usize);
}

Expand Down Expand Up @@ -554,7 +596,7 @@ mod tests {
});
queue.push(tx.clone(), &wsv).unwrap();
wsv.transactions.insert(tx.hash());
assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0);
assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 0);
assert_eq!(queue.txs.len(), 0);
}

Expand Down Expand Up @@ -592,13 +634,13 @@ mod tests {
)
.expect("Failed to push tx into queue");
std::thread::sleep(Duration::from_millis(101));
assert_eq!(queue.get_transactions_for_block(&wsv).len(), 1);
assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 1);

queue
.push(accepted_tx("alice@wonderland", 300, alice_key), &wsv)
.expect("Failed to push tx into queue");
std::thread::sleep(Duration::from_millis(210));
assert_eq!(queue.get_transactions_for_block(&wsv).len(), 0);
assert_eq!(queue.collect_transactions_for_block(&wsv).len(), 0);
}

// Queue should only drop transactions which are already committed or ttl expired.
Expand All @@ -624,12 +666,12 @@ mod tests {
.expect("Failed to push tx into queue");

let a = queue
.get_transactions_for_block(&wsv)
.collect_transactions_for_block(&wsv)
.into_iter()
.map(|tx| tx.hash())
.collect::<Vec<_>>();
let b = queue
.get_transactions_for_block(&wsv)
.collect_transactions_for_block(&wsv)
.into_iter()
.map(|tx| tx.hash())
.collect::<Vec<_>>();
Expand Down Expand Up @@ -683,7 +725,7 @@ mod tests {

thread::spawn(move || {
while start_time.elapsed() < run_for {
for tx in queue_arc_clone.get_transactions_for_block(&wsv_clone) {
for tx in queue_arc_clone.collect_transactions_for_block(&wsv_clone) {
wsv_clone.transactions.insert(tx.hash());
}
// Simulate random small delays
Expand All @@ -695,14 +737,8 @@ mod tests {
push_txs_handle.join().unwrap();
get_txs_handle.join().unwrap();

// Last update for queue to drop invalid txs.
let _unused = queue.get_transactions_for_block(&wsv);

// Validate the queue state.
let array_queue: Vec<_> = core::iter::repeat_with(|| queue.queue.pop())
.take_while(Option::is_some)
.map(Option::unwrap)
.collect();
let array_queue: Vec<_> = core::iter::from_fn(|| queue.queue.pop()).collect();

assert_eq!(array_queue.len(), queue.txs.len());
for tx in array_queue {
Expand Down
9 changes: 3 additions & 6 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,9 @@ pub fn run<F>(
.retain(|tx| !tx.is_expired(sumeragi.queue.tx_time_to_live));

// Pull in new transactions into the cache.
while state.transaction_cache.len() < sumeragi.queue.txs_in_block {
match sumeragi.queue.pop_without_seen(&state.wsv) {
Some(tx) => state.transaction_cache.push(tx),
None => break,
}
}
sumeragi
.queue
.get_transactions_for_block(&state.wsv, &mut state.transaction_cache);
};

gossip_transactions(
Expand Down
Loading