Skip to content

Commit

Permalink
Optimized broadcast #309 (#405)
Browse files Browse the repository at this point in the history
* Added extra message types

* Add send functions for new message types

* Store original value message received from proposer

* Modify handle_value for optimized broadcast

* Modify handle_echo for optimized broadcast

* Add handle_echo_hash function for optimized broadcast

* Add handle_can_decode function for optimized broadcast

* Fixes handle_ready and send_echo functions:
1) Modify handle_ready function for optimized broadcast
2) Modify send_echo function to send `Echo` messages to different subset of nodes from
handle_value and handle_ready functions

* Remove value_message and fix typos

* Add functions for filtering all_ids

* Separate send_echo to send_echo_left and send_echo_remaining

* Rename pessimism_factor to fault_estimate

* Remove redundant bools from Broadcast struct

* Fix multiple counting of nodes who sent both `Echo` and `EchoHash` by changing
`echos` map structure

* Allow conflicting `CanDecode`s from same node

* Fix left and right iterators for `Echo` and `EchoHash` messages

* Fixes bugs in left and right iterators and adds additional checks in handle
functions

* Change can_decodes to BTreeMap<Digest, BTreeSet<N>> and fix send_can_decode

* Minor fixes

* Modify send_echo_remaining to take a hash parameter

* Fix bug in left and right iterators.

* Excluding proposer in iterator led to infinite loop when our_id == proposer_id

* Fix bug in handle_echo and compute_output
* send_can_decode call in handle_echo returned early
* compute_output needed `N - f` full `Echo`s to decode

* Refactor `echos` map to take an EchoContent Enum for `Echo` and `EchoHash` messages

* Run rustfmt

* Refactor to avoid index access and multiple map lookups

* Fix comments and minor refactorings.

* Add an`AllExcept(BTreeSet<N>)` type to `Target` enum to enable sending messages
to non-validators from Broadcast.
* Use `Target::AllExcept` in Broadcast to send `Echo` messages to all non-validator nodes.
* Add `AllExcept(_)` match arms for `Target` match expressions.

* Rename `AllExcept` parameter from `known` to `exclude`.

* Modify send_can_decode to send to all nodes who haven't sent an `Echo`.

* Update docs for broadcast

* Improve formatting and add comments for broadcast docs.

* Fix formatting.

* Allow for sending multiple `CanDecode` messages with different hashes.

* Fix comments.

* Fix bug in sending `Echo`s when node has not received `CanDecode`.
  • Loading branch information
pawanjay176 authored and vkomenda committed Jun 12, 2019
1 parent 15f7313 commit 61f4ed9
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 40 deletions.
24 changes: 21 additions & 3 deletions examples/network/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crossbeam::thread::{Scope, ScopedJoinHandle};
use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender};
use hbbft::{SourcedMessage, Target, TargetedMessage};
use std::collections::BTreeSet;

/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
Expand Down Expand Up @@ -107,7 +108,7 @@ impl<M: Send> Messaging<M> {
select! {
recv(rx_from_algo) -> tm => {
if let Ok(tm) = tm {
match tm.target {
match &tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at the first
// error.
Expand All @@ -120,9 +121,26 @@ impl<M: Send> Messaging<M> {
}
}).map_err(Error::from);
},
Target::AllExcept(exclude) => {
// Send the message to all remote nodes not in `exclude`, stopping at the first
// error.
let filtered_txs: Vec<_> = (0..txs_to_comms.len())
.collect::<BTreeSet<_>>()
.difference(exclude)
.cloned()
.collect();
result = filtered_txs.iter()
.fold(Ok(()), |result, i| {
if result.is_ok() {
txs_to_comms[*i].send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::Node(i) => {
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(tm.message)
result = if *i < txs_to_comms.len() {
txs_to_comms[*i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)
Expand Down
9 changes: 8 additions & 1 deletion examples/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,21 @@ where
Q: IntoIterator<Item = TimestampedMessage<D>>,
{
for ts_msg in msgs {
match ts_msg.target {
match &ts_msg.target {
Target::All => {
for node in self.nodes.values_mut() {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::AllExcept(exclude) => {
for node in self.nodes.values_mut().filter(|n| !exclude.contains(&n.id)) {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::Node(to_id) => {
if let Some(node) = self.nodes.get_mut(&to_id) {
node.add_message(ts_msg);
Expand Down
16 changes: 16 additions & 0 deletions hbbft_testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,22 @@ where
message_count = message_count.saturating_add(1);
}

dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
}
hbbft::Target::AllExcept(exclude) => {
for to in nodes
.keys()
.filter(|&to| to != &stepped_id && !exclude.contains(to))
{
if !faulty {
message_count = message_count.saturating_add(1);
}

dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
Expand Down
Loading

0 comments on commit 61f4ed9

Please sign in to comment.