Skip to content

Commit

Permalink
[refactor] #3039: Introduce waiting buffer for the multisigs
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Jan 12, 2023
1 parent 228c2ec commit dce1888
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 48 deletions.
1 change: 1 addition & 0 deletions config/iroha_test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"QUEUE": {
"MAXIMUM_TRANSACTIONS_IN_BLOCK": 8192,
"MAXIMUM_TRANSACTIONS_IN_QUEUE": 65536,
"MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER": 65536,
"TRANSACTION_TIME_TO_LIVE_MS": 86400000,
"FUTURE_THRESHOLD_MS": 1000
},
Expand Down
9 changes: 8 additions & 1 deletion config/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};

const DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK: u32 = 2_u32.pow(9);
const DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE: u32 = 2_u32.pow(16);
const DEFAULT_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER: u32 = 2_u32.pow(16);
// 24 hours
const DEFAULT_TRANSACTION_TIME_TO_LIVE_MS: u64 = 24 * 60 * 60 * 1000;
const DEFAULT_FUTURE_THRESHOLD_MS: u64 = 1000;
Expand All @@ -18,6 +19,8 @@ pub struct Configuration {
pub maximum_transactions_in_block: u32,
/// The upper limit of the number of transactions waiting in the queue.
pub maximum_transactions_in_queue: u32,
/// The upper limit of the number of transactions waiting for more signatures.
pub maximum_transactions_in_waiting_buffer: u32,
/// The transaction will be dropped after this time if it is still in the queue.
pub transaction_time_to_live_ms: u64,
/// The threshold to determine if a transaction has been tampered to have a future timestamp.
Expand All @@ -29,6 +32,9 @@ impl Default for ConfigurationProxy {
Self {
maximum_transactions_in_block: Some(DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK),
maximum_transactions_in_queue: Some(DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE),
maximum_transactions_in_waiting_buffer: Some(
DEFAULT_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER,
),
transaction_time_to_live_ms: Some(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS),
future_threshold_ms: Some(DEFAULT_FUTURE_THRESHOLD_MS),
}
Expand All @@ -46,11 +52,12 @@ pub mod tests {
(
maximum_transactions_in_block in prop::option::of(Just(DEFAULT_MAXIMUM_TRANSACTIONS_IN_BLOCK)),
maximum_transactions_in_queue in prop::option::of(Just(DEFAULT_MAXIMUM_TRANSACTIONS_IN_QUEUE)),
maximum_transactions_in_waiting_buffer in prop::option::of(Just(DEFAULT_MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER)),
transaction_time_to_live_ms in prop::option::of(Just(DEFAULT_TRANSACTION_TIME_TO_LIVE_MS)),
future_threshold_ms in prop::option::of(Just(DEFAULT_FUTURE_THRESHOLD_MS)),
)
-> ConfigurationProxy {
ConfigurationProxy { maximum_transactions_in_block, maximum_transactions_in_queue, transaction_time_to_live_ms, future_threshold_ms }
ConfigurationProxy { maximum_transactions_in_block, maximum_transactions_in_queue, maximum_transactions_in_waiting_buffer, transaction_time_to_live_ms, future_threshold_ms }
}
}
}
1 change: 1 addition & 0 deletions configs/peer/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"QUEUE": {
"MAXIMUM_TRANSACTIONS_IN_BLOCK": 512,
"MAXIMUM_TRANSACTIONS_IN_QUEUE": 65536,
"MAXIMUM_TRANSACTIONS_IN_WAITING_BUFFER": 65536,
"TRANSACTION_TIME_TO_LIVE_MS": 86400000,
"FUTURE_THRESHOLD_MS": 1000
},
Expand Down
243 changes: 196 additions & 47 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::prelude::*;
pub struct Queue {
/// The queue proper
queue: ArrayQueue<HashOf<VersionedSignedTransaction>>,
/// The queue for transactions that are still waiting for signatures
waiting_buffer: ArrayQueue<HashOf<VersionedSignedTransaction>>,
/// [`VersionedAcceptedTransaction`]s addressed by `Hash`.
txs: DashMap<HashOf<VersionedSignedTransaction>, VersionedAcceptedTransaction>,
/// The maximum number of transactions in the block
Expand Down Expand Up @@ -72,8 +74,10 @@ impl Queue {
pub fn from_configuration(cfg: &Configuration) -> Self {
Self {
queue: ArrayQueue::new(cfg.maximum_transactions_in_queue as usize),
waiting_buffer: ArrayQueue::new(cfg.maximum_transactions_in_waiting_buffer as usize),
txs: DashMap::new(),
max_txs: cfg.maximum_transactions_in_queue as usize,
max_txs: (cfg.maximum_transactions_in_queue
+ cfg.maximum_transactions_in_waiting_buffer) as usize,
txs_in_block: cfg.maximum_transactions_in_block as usize,
tx_time_to_live: Duration::from_millis(cfg.transaction_time_to_live_ms),
future_threshold: Duration::from_millis(cfg.future_threshold_ms),
Expand Down Expand Up @@ -114,7 +118,9 @@ impl Queue {
tx: &VersionedAcceptedTransaction,
wsv: &WorldStateView,
) -> Result<MustUse<bool>, Error> {
if tx.is_expired(self.tx_time_to_live) {
if tx.is_in_future(self.future_threshold) {
Err(Error::InFuture)
} else if tx.is_expired(self.tx_time_to_live) {
Err(Error::Expired)
} else if tx.is_in_blockchain(wsv) {
Err(Error::InBlockchain)
Expand All @@ -141,65 +147,64 @@ impl Queue {
tx: VersionedAcceptedTransaction,
wsv: &WorldStateView,
) -> Result<(), (VersionedAcceptedTransaction, Error)> {
if tx.is_in_future(self.future_threshold) {
Err((tx, Error::InFuture))
} else if let Err(e) = self.check_tx(&tx, wsv) {
Err((tx, e))
} else if self.txs.len() >= self.max_txs {
Err((tx, Error::Full))
} else {
let hash = tx.hash();
let entry = match self.txs.entry(hash) {
Entry::Occupied(mut old_tx) => {
// MST case
old_tx
.get_mut()
.as_mut_v1()
.signatures
.extend(tx.as_v1().signatures.clone());
return Ok(());
match self.check_tx(&tx, wsv) {
Err(e) => Err((tx, e)),
Ok(MustUse(signature_check)) => {
// Get `txs_len` before entry to avoid deadlock
let txs_len = self.txs.len();
let hash = tx.hash();
let entry = match self.txs.entry(hash) {
Entry::Occupied(mut old_tx) => {
// MST case
old_tx
.get_mut()
.as_mut_v1()
.signatures
.extend(tx.as_v1().signatures.clone());
return Ok(());
}
Entry::Vacant(entry) => entry,
};
if txs_len >= self.max_txs {
return Err((tx, Error::Full));
}
Entry::Vacant(entry) => entry,
};

// Reason for such insertion order is to avoid situation
// when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs`
entry.insert(tx);
self.queue.push(hash).map_err(|err_hash| {
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
(err_tx, Error::Full)
})
// Reason for such insertion order is to avoid situation
// when poped from the `queue` hash does not yet has corresponding (hash, tx) record in `txs`
entry.insert(tx);
let queue_to_push = if signature_check {
&self.queue
} else {
&self.waiting_buffer
};
queue_to_push.push(hash).map_err(|err_hash| {
let (_, err_tx) = self
.txs
.remove(&err_hash)
.expect("Inserted just before match");
(err_tx, Error::Full)
})
}
}
}

/// Pop single transaction.
///
/// Records unsigned transaction in `seen`.
#[allow(
clippy::expect_used,
clippy::unwrap_in_result,
clippy::cognitive_complexity
)]
fn pop(
&self,
seen: &mut Vec<HashOf<VersionedSignedTransaction>>,
queue_to_pop: &ArrayQueue<HashOf<VersionedSignedTransaction>>,
wsv: &WorldStateView,
) -> Option<VersionedAcceptedTransaction> {
loop {
let hash = self.queue.pop()?;
let hash = queue_to_pop.pop()?;
let entry = match self.txs.entry(hash) {
Entry::Occupied(entry) => entry,
// As practice shows this code is not `unreachable!()`.
// When transactions are submitted quickly it can be reached.
Entry::Vacant(_) => continue,
};
if self.check_tx(entry.get(), wsv).is_err() {
entry.remove_entry();
continue;
}

match self.check_tx(entry.get(), wsv) {
Err(_) => {
Expand Down Expand Up @@ -247,21 +252,115 @@ impl Queue {
return;
}

let mut seen = Vec::new();
let mut seen_queue = Vec::new();
let mut seen_waiting_buffer = Vec::new();

let pop_from_queue = core::iter::from_fn(|| self.pop(&mut seen_queue, &self.queue, wsv));
let pop_from_waiting_buffer =
core::iter::from_fn(|| self.pop(&mut seen_waiting_buffer, &self.waiting_buffer, wsv));

let transactions_hashes: HashSet<HashOf<VersionedSignedTransaction>> = transactions
.iter()
.map(VersionedAcceptedTransaction::hash)
.collect();
let out = std::iter::from_fn(|| self.pop(&mut seen, wsv))
let out = pop_from_queue
.round_robin(pop_from_waiting_buffer)
.filter(|tx| !transactions_hashes.contains(&tx.hash()))
.take(self.txs_in_block - transactions.len());
transactions.extend(out);

#[allow(clippy::expect_used)]
seen.into_iter()
.try_for_each(|hash| self.queue.push(hash))
.expect("As we never exceed the number of transactions pending");
for (seen, queue) in [
(seen_queue, &self.queue),
(seen_waiting_buffer, &self.waiting_buffer),
] {
#[allow(clippy::expect_used)]
seen.into_iter()
.try_for_each(|hash| queue.push(hash))
.expect("As we never exceed the number of transactions pending");
}
}
}

/// Iterator which combine two iterators into the single one in round-robin fashion.
///
/// ```ignore
/// [(a0,a1,a2,..),(b0,b1,b2,..)] -> (a0,b0,a1,b1,a2,b2,..)
/// ```
struct RoundRobinIterator<A, B> {
left_iter: A,
right_iter: B,
state: RoundRobinState,
}

enum RoundRobinState {
CurrentLeft,
CurrentRight,
LeftExhausted,
RightExhausted,
BothExhausted,
}

trait RoundRobinIter: Iterator + Sized {
fn round_robin<I: IntoIterator<Item = Self::Item>>(
self,
iter: I,
) -> RoundRobinIterator<Self, <I as IntoIterator>::IntoIter> {
RoundRobinIterator {
left_iter: self,
right_iter: iter.into_iter(),
state: RoundRobinState::CurrentLeft,
}
}
}

impl<T: Iterator> RoundRobinIter for T {}

impl<A, B, T> Iterator for RoundRobinIterator<A, B>
where
A: Iterator<Item = T>,
B: Iterator<Item = T>,
{
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
use RoundRobinState::*;
loop {
match self.state {
BothExhausted => break None,
LeftExhausted => {
let item = self.right_iter.next();
if item.is_none() {
self.state = BothExhausted;
}
break item;
}
RightExhausted => {
let item = self.left_iter.next();
if item.is_none() {
self.state = BothExhausted;
}
break item;
}
CurrentLeft => {
let item = self.left_iter.next();
if item.is_none() {
self.state = LeftExhausted;
continue;
}
self.state = CurrentRight;
break item;
}
CurrentRight => {
let item = self.right_iter.next();
if item.is_none() {
self.state = RightExhausted;
continue;
}
self.state = CurrentLeft;
break item;
}
}
}
}
}

Expand Down Expand Up @@ -771,4 +870,54 @@ mod tests {
assert!(matches!(queue.push(tx, &wsv), Err((_, Error::InFuture))));
assert_eq!(queue.txs.len(), 1);
}

#[test]
fn round_robin_iter_a_eq_b_size() {
let a = vec![0, 2, 4, 6, 8];
let b = vec![1, 3, 5, 7, 9];
assert_eq!(
vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
a.into_iter().round_robin(b).collect::<Vec<u8>>()
);
}

#[test]
fn round_robin_iter_a_gt_b_size() {
let a = vec![0, 2, 4, 6, 8];
let b = vec![1, 3, 5];
assert_eq!(
vec![0, 1, 2, 3, 4, 5, 6, 8],
a.into_iter().round_robin(b).collect::<Vec<u8>>()
);
}

#[test]
fn round_robin_iter_a_lt_b_size() {
let a = vec![0, 2, 4];
let b = vec![1, 3, 5, 7, 9];
assert_eq!(
vec![0, 1, 2, 3, 4, 5, 7, 9],
a.into_iter().round_robin(b).collect::<Vec<u8>>()
);
}

#[test]
fn round_robin_iter_a_empty() {
let a = vec![0, 2, 4, 6, 8];
let b = Vec::new();
assert_eq!(
vec![0, 2, 4, 6, 8],
a.into_iter().round_robin(b).collect::<Vec<u8>>()
);
}

#[test]
fn round_robin_iter_b_empty() {
let a = Vec::new();
let b = vec![1, 3, 5, 7, 9];
assert_eq!(
vec![1, 3, 5, 7, 9],
a.into_iter().round_robin(b).collect::<Vec<u8>>()
);
}
}
Loading

0 comments on commit dce1888

Please sign in to comment.