Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized broadcast #309 #405

Merged
merged 35 commits into from
Jun 12, 2019
Merged

Conversation

pawanjay176
Copy link
Contributor

@pawanjay176 pawanjay176 commented Apr 29, 2019

WIP for #309

  • Add the extra CanDecode and EchoHash messages in the Messages enum.
  • Have pessimism factor g as a configurable parameter in the Broadcast struct.
  • Write handle_echo_hash and handle_can_decode functions and modify other handle functions accordingly.
  • Change handle_value to prevent echoing a message back to proposer. Would save (N-1) messages regardless of g (original idea of the issue).
  • Test normal flow of messages
  • Test for expected reduction in number of messages with different values of g.
  • Additional testing

Copy link
Collaborator

@afck afck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make sure you apply rustfmt; the formatting is off in some places. (And Clippy in the end: Both are required by our CI.)

let p = self.echos.get(self.our_id()).unwrap().clone();
// Could also have another error type if it panics.
let (_, p) = self.echos.get(self.our_id()).unwrap();
let p = p.clone().unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not assume that we have our own Echo here. (See my earlier comment, which probably became outdated.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will pass p as an argument here.

@pawanjay176
Copy link
Contributor Author

@afck the dynamic_honey_badger and queueing_honey_badger tests fail right after node restarts in the tests. Trying to figure out why by looking into the Subset crate to check the lifecycle of a Broadcast instance.

@afck
Copy link
Collaborator

afck commented May 9, 2019

That's weird: Do the Broadcast tests pass? If they do, but the higher-level tests fail, that shows that we need to extend the Broadcast tests (#396).

Copy link
Collaborator

@afck afck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I also couldn't find the cause for the failing tests so far.)

@@ -28,14 +29,23 @@ pub struct Broadcast<N> {
coding: Coding,
/// If we are the proposer: whether we have already sent the `Value` messages with the shards.
value_sent: bool,
/// Whether we have already multicast `Echo`.
/// Whether we have already send `Echo` left nodes and right nodes who haven't sent `CanDecode`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "sent"
Maybe: "Whether we have already sent `Echo` to all nodes who haven't sent `CanDecode`."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is still missing a "to". How do you visualise "left" and "right" on a circular list, I wonder? Can you have a one-dimensional counterpart of these terms instead? Possibly predecessor and successor?

/// Whether we have already output a value.
decided: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<N, Proof<Vec<u8>>>,
/// /// Estimate as to how many nodes we think are faulty.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate "///"
I'd also clarify that this is only for performance, not for liveness and correctness.
Maybe: "Number of faulty nodes to optimize performance for."?

@@ -81,6 +91,7 @@ impl<N: NodeIdT> Broadcast<N> {
let data_shard_num = netinfo.num_nodes() - parity_shard_num;
let coding =
Coding::new(data_shard_num, parity_shard_num).map_err(|_| Error::InvalidNodeCount)?;
let g = netinfo.num_faulty();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this fault_estimate, too, instead of g.

if self.echos.get(self.our_id()) == Some(&p) {

if let Some(EchoContent::Full(proof)) = self.echos.get(self.our_id()) {
if *proof == p {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should also check whether the hash matches, if we only have an EchoContent::Hash. So I'd actually just compare the hashes whenever this is Some(echo_content).

Copy link
Contributor Author

@pawanjay176 pawanjay176 May 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only have an EchoContent::Hash, then we can't return from the if let statement right? We need to update the echos map with the actual full proof. So it would have to be

match self.echos.get(self.our_id()) {
    Some(val) if val.hash() != p.root_hash() => {
        return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleValues).into())
    }
    Some(EchoContent::Full(proof)) if *proof == p => {
        warn!(
            "Node {:?} received Value({:?}) multiple times from {:?}.",
            self.our_id(),
            HexProof(&p),
            sender_id
        );
        return Ok(Step::default());
    },
    _ => ()
};

i.e. return () if we have a Some(EchoContent::Hash) that matches with p.root_hash() or None so that the rest of the function continues.

.unwrap_or(&BTreeSet::new())
.clone();
values.insert(sender_id.clone());
self.can_decodes.insert(*hash, values);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.can_decodes
    .entry(*hash)
    .or_default()
    .insert(sender_id.clone());

if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let echo_msg = Message::Echo(p.clone());
let step: Step<_> = Target::All.message(echo_msg).into();
let mut step = Step::default();
// `N - 2f + g` node ids to the left of our_id (excluding our_id and proposer_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposer isn't currently excluded, is it?
(I think it's better that way: That causes additional complications and should probably be a separate PR, if at all.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No its not 😅 Forgot to change the comment.

@pawanjay176
Copy link
Contributor Author

pawanjay176 commented May 9, 2019

@afck I think i found the problem. The current implementation uses Target::All to send the Echo and Ready messages and Target::All sends to both observer and validator nodes. As soon as we restart a node in the DynamicHoneyBadger tests, the restarted node becomes an observer and is not sent any message except Target::All. In my implementation, the nodes to send the messages to are derived from self.netinfo.all_ids() which don't include observer nodes and they don't receive any of Echo, EchoHash and CanDecode messages from the validating nodes.

Since Target::All doesn't get resolved at the Broadcast level, I cannot access observer nodes from any of the broadcast functions. Should there be an observers map in the NetworkInfo struct? That would involve updating the NetworkInfo instance for the non-deleted nodes once the observers join the network. Is there a simpler way?

@afck
Copy link
Collaborator

afck commented May 9, 2019

Right, I hadn't thought of that!
I'm almost thinking that for now, the easiest solution would be a Target::Observers variant, and just always sending a full Echo there.

Slightly more optimized: Only send a full Echo to the observers if you are among the 2 f + 1 nodes to the left of the proposer.

What do you think, @vkomenda?

.take(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.skip(1);
for id in left {
let msg = Target::Node(id.clone()).message(echo_msg.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cloning of message content makes me wish for a different Target. At least a target that allows sending the same content to a list of nodes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; maybe Target should simply have the two variants AllExcept (blacklist) and Nodes (whitelist). That should cover all our cases.
Anyway, let's do that later, in a separate issue: #408

@vkomenda
Copy link
Contributor

vkomenda commented May 10, 2019

I'm almost thinking that for now, the easiest solution would be a Target::Observers variant, and just always sending a full Echo there.

What if the new variant allowed sending the same message content to a given list of nodes plus any observers not on that list? This would allow to reduce cloning of message content inside Broadcast although later that content would have to be cloned anyway when sending it to each of those nodes. But at least we would have eliminated for-loops over lists of nodes by replacing them with pushing a single message with that list of recipients.

With such a new message variant, we would change the sender queue slightly to marshal the content to the correct list of nodes that includes any observers not known to Broadcast.

@afck
Copy link
Collaborator

afck commented May 11, 2019

Yes, that's true, especially for broadcast, where some of the messages are relatively large. And I don't even think they necessarily have to be cloned later: The application logic can just serialize it once (instead of the current duplicated serialization), and then send those same bytes to all the recipients. No need to clone anything.

On the other hand, maybe it's not Target that should contain a list of nodes, but the TargetedMessage, that should contain a list of targets?

Anyway, I think that's beyond the scope of this PR and needs some design and discussion first. I'd just add Target::Observers for now.

@vkomenda
Copy link
Contributor

Anyway, I think that's beyond the scope of this PR and needs some design and discussion first. I'd just add Target::Observers for now.

Take care to filter the non-observers from Target::Node messages in that case to avoid overlap with Target::Observers.

@pawanjay176
Copy link
Contributor Author

pawanjay176 commented May 12, 2019

So just to clarify, what you guys are suggesting is having an additional

Observers(Vec<N>) type in the Target enum. After doing that, we would have to add a match arm to defer_messages in traits.rs. In there, we would have to filter only the nodes that we have passed in the Observers list and all observer nodes from the peer_epochs map. But how do we check if a node in the peer_epochs map is an observer or a validator in that function?

@afck
Copy link
Collaborator

afck commented May 12, 2019

No, just Observers, without parameters. The Broadcast protocol should not have to know about specific observers.

But that's a good point: I'm not sure whether we have the information in traits.rs about who is or isn't an observer…
We could add a target AllExcept(Vec<N>) instead, so Broadcast could put into the target the nodes that it does know about. Not a nice solution, I know… 😬

@pawanjay176
Copy link
Contributor Author

My mistake. I thought you meant something like NodesPlusObservers(Vec<N>) where the SenderQueue would send it to all the passed nodes + all observer nodes. So instead of having the loop for the Echo and EchoHash messages, we just do Target::NodesPlusObservers(nodes).

But again, I don't think traits.rs has info on who is or isn't a validator. AllExcept seems to work but is kind of hacky 😅

@vkomenda
Copy link
Contributor

vkomenda commented May 13, 2019

My mistake. I thought you meant something like NodesPlusObservers(Vec) where the SenderQueue would send it to all the passed nodes + all observer nodes. So instead of having the loop for the Echo and EchoHash messages, we just do Target::NodesPlusObservers(nodes).

Right, that sounds like my initial reading of the new Target variant which @afck said could be a different PR where we could make a TargetedMessage to contain a list of recipients. That also has some unclear points to me:

  • Do we have to ensure that that list of recipients is non-empty? E.g.,
pub struct TargetedMessage<M, N> {
    pub target: Target<N>,
    pub more_targets: Vec<Target<N>>,
    pub message: M,
}
  • If the intended list of targets may include extra nodes as defined by the Target variant (e.g., extra observer nodes as decided by the sender queue), wouldn't it be clearer to actually record that extra hint in TargetedMessage as well? Or maybe not. I'm not so sure about this one.

Anyhow, if AllExcept repairs the tests then let's go ahead with it. The resulting set operation seems to be the same, but with a set difference added inside. However, maybe I'm complicating things and no difference is needed? Otherwise we would be filtering all_ids quite a bit.

@pawanjay176
Copy link
Contributor Author

Another thing we could possibly do would be to have an additional function in the ConsensusProtocol trait.

fn is_node_validator(&self) -> bool;

Every implementation of the ConsensusProtocol trait ought to have an Arc<NetworkInfo<N>> from which we can query if the passed Node is a validator or not. Though I'm not super convinced if having an is_node_validator in the ConsensusProtocol trait makes sense.

@vkomenda
Copy link
Contributor

vkomenda commented May 13, 2019

Another thing we could possibly do would be to have an additional function in the ConsensusProtocol trait.

Sounds dubious. ConsensusProtocol instances don't represent nodes. Can you use NetworkInfo::is_validator() or NetworkInfo::is_node_validator(id) instead?

@afck
Copy link
Collaborator

afck commented May 13, 2019

Do we have to ensure that that list of recipients is non-empty?

I don't think so. I'd prefer if Broadcast simply didn't have to know whether there are observers and who they are. It should just be able to occasionally say "send this to the observers, too, if there are any", so that it's guaranteed that they can also verify the protocol.
So e.g. in the situation where it already sent full Echos to all its fellow validators and is now ready to decode and terminate, it would want to send an additional Echo to any observers, to guarantee that they can decode, too. (Not sure whether we'd want to do that in the beginning or in the end.)
In that case, it would return an Echo with target AllExcept(list_of_all_the_validators). If there is no observer, that message would go nowhere, which is fine.

AllExcept is not elegant, but I think it's the simplest solution for now.

@vkomenda
Copy link
Contributor

Do we have to ensure that that list of recipients is non-empty?

I don't think so. I'd prefer if Broadcast simply didn't have to know whether there are observers and who they are.

You've answered a slightly different question but I'm with you on this one :)

AllExcept(Vec<N>) does fit into the current TargetedMessage well.

Copy link
Collaborator

@afck afck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!
I think the only things that are left to do now are rebasing on master, running the simulation with and without this optimization to verify that it's worth the added complexity, and updating the module documentation.

I'm wondering whether we should remove Target::All now, since it's equivalent to AllExcept(Vec::new()). (But probably not in this PR; it's already huge!)

@pawanjay176
Copy link
Contributor Author

pawanjay176 commented May 18, 2019

@afck ran the simulation with the new code. I'm getting ~20% improvement in average message size handled by a node for message sizes > 100 bytes. I think the reason for this sub optimal improvement is because in many instances, a node receives 2f + 1 Readys before it receives CanDecode from its 'right' nodes. Hence, it ends up sending the Echos to those nodes before terminating even if those nodes have enough shards to decode. Does that make sense or am I missing something?

@afck
Copy link
Collaborator

afck commented May 20, 2019

Awesome, thanks! That makes sense and I think 20% already means it's worth making this optimization! 🎉
I imagine that if you simulate a larger batch size, lower bandwidth and more nodes, the number might go up further? But maybe not. Of course the 66% improvement is only in an unrealistic best case scenario where everything apart from the payload is negligible, and everyone manages to send their CanDecode before they get their Echo.

@pawanjay176
Copy link
Contributor Author

pawanjay176 commented May 20, 2019

Added a few minor optimizations to ensure more CanDecode messages are delivered.

  1. Send EchoHash messages before Echo messages - We send a CanDecode message to all nodes who have sent us a EchoHash message after we have the minimum shares to decode. This is to ensure that we have as many EchoHash messages possible before we start sending out CanDecodes. If EchoHash messages are delivered later than Echos, it is possible that a node sends Ready and terminates before it sent out necessary CanDecodes. This will lead to more heavy Echo messages being sent.
  2. Modify send_can_decode to set can_decode_sent = true only after receiving all EchoHashs and Modify handle_echo_hash to call send_can_decode everytime we receive a new EchoHash: Earlier, we set can_decode_sent=true after we have minimum shares and any number of EchoHashs. This is to ensure that we send all the EchoHashs possible before terminating.

Note: This could lead to possible retransmissions of CanDecodes to the same node. Just wanted to check if the idea is right before adding another map to keep track of all nodes we have sent CanDecodes to.

These are the message size improvements that I got with and without the optimization:

optimization

@afck
Copy link
Collaborator

afck commented May 21, 2019

I don't agree with those two changes:
We should just send CanDecode as soon as we have N - 2 f Echos, and we should send it to everyone every validator who hasn't sent us an Echo.
Whether they've sent us an EchoHash or not doesn't matter.

@pawanjay176
Copy link
Contributor Author

You are right. I overcomplicated the issue for no reason. I just needed to send CanDecode to everyone who didn't send us Echo instead of sending to everyone who sent us EchoHash.

Should I reset the changes and force push the fix or revert and apply the fix on top of the commit?

@afck
Copy link
Collaborator

afck commented May 21, 2019

Sure, feel free to force-push to remove the latest commit.

@pawanjay176 pawanjay176 force-pushed the optimized_broadcast branch from b469f9e to 9281f40 Compare May 28, 2019 13:01
Copy link
Collaborator

@afck afck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! 👍
Most of my comments are just nit-picks and no blockers for merging.
But I think can_decode_sent does need to be stored by hash for the algorithm to be correct.

.take(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.skip(1);
for id in left {
let msg = Target::Node(id.clone()).message(echo_msg.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; maybe Target should simply have the two variants AllExcept (blacklist) and Nodes (whitelist). That should cover all our cases.
Anyway, let's do that later, in a separate issue: #408

let mut step = Step::default();

// Upon receiving `N - 2f` `Echo`s with this root hash, send `CanDecode`
if !self.can_decode_sent && self.count_echos_full(&hash) >= self.coding.data_shard_count() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to store can_decode_sent by hash?
Because we might have to send more than one of them, e.g. if the proposer is faulty, it might send a value v1 with hash h1 to N - f nodes and value v2 with hash h2 to f + 1 nodes, with the overlapping nodes all malicious. So you may end up receiving f + 1 Echos with h2, even though in the end v1 will get accepted. In that case, you should send CanDecode with both hashes eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right. Will fix this.

Copy link
Collaborator

@afck afck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thank you! 🎉
@vkomenda: Please also take a final look, and merge if you're happy with it.

@afck afck changed the title WIP: Optimized broadcast #309 Optimized broadcast #309 Jun 6, 2019
Copy link
Collaborator

@afck afck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests failed for me! tests/dynamic_honey_badger.proptest-regressions contains:

cc 53b9134103d3f553062b32b38cdd359c8d5e3c68ab1311afcc888f2f9edff3ba # shrinks to cfg = TestConfig { dimension: NetworkDimension { size: 14, faulty: 0 }, total_txs: 20, batch_size: 10, contribution_size: 1, seed: [241, 107, 204, 130, 53, 111, 85, 7, 89, 3, 240, 65, 183, 26, 167, 233] }

We need to find out whether this is due to the Broadcast optimization before merging.

@vkomenda
Copy link
Contributor

vkomenda commented Jun 6, 2019

Yet another failing Broadcast seed (possibly related to the one above:

cc c7c972d3cefc07093bdde048ff8a005dc72f6afca9911131bda6172511a0976c # shrinks to seed = [15, 50, 223, 151, 165, 51, 17, 57, 238, 60, 25, 199, 137, 121, 37, 120]

@afck
Copy link
Collaborator

afck commented Jun 7, 2019

Great, thanks! Yes, we should focus on the failing broadcast test first then, since it's probably the reason for the failing DHB test. 👍

@afck
Copy link
Collaborator

afck commented Jun 7, 2019

@vkomenda: If I put that line in tests/broadcast.proptest-regressions, the tests still pass. Or is that seed for a different test?

@vkomenda
Copy link
Contributor

vkomenda commented Jun 7, 2019

Strange... That is my broadcast.proptest-regressions.

@pawanjay176
Copy link
Contributor Author

@vkomenda the tests pass for me too for the seed that you provided.
looking into why the seed @afck provided is failing.

@pawanjay176
Copy link
Contributor Author

@afck the test is failing because the time_limit was exceeded. Removed the time_limit and the tests passed.
Checking why that particular test is taking so freakishly long now.

@afck
Copy link
Collaborator

afck commented Jun 10, 2019

It make sense that it takes a bit longer: We are sending a few more messages now.
But if there is a huge difference to master, we should find out the cause, yes.

@vkomenda: Are you using @pawanjay176's current optimized_broadcast branch?

@vkomenda
Copy link
Contributor

@vkomenda: Are you using @pawanjay176's current optimized_broadcast branch?

Yes. I'm also running the tests with use-insecure-test-only-mock-crypto. test_broadcast_random_delivery_adv_propose fails quickly, without a timeout.

@vkomenda
Copy link
Contributor

That failing test actually passes without use-insecure-test-only-mock-crypto.

@afck
Copy link
Collaborator

afck commented Jun 11, 2019

You're right, thanks! I can reproduce it now.

@afck
Copy link
Collaborator

afck commented Jun 11, 2019

OK, I found the bug: When sending the full Echos at the end we were filtering out everyone instead of no one if we hadn't received any CanDecodes yet. This fixes the tests for me:

--- a/src/broadcast/broadcast.rs
+++ b/src/broadcast/broadcast.rs
@@ -134,6 +134,7 @@ impl<N: NodeIdT> Broadcast<N> {
         if !self.netinfo.is_node_validator(sender_id) {
             return Err(Error::UnknownSender);
         }
+        debug!("{}: Handling {:?} from {:?}", self, &message, sender_id);
         match message {
             Message::Value(p) => self.handle_value(sender_id, p),
             Message::Echo(p) => self.handle_echo(sender_id, p),
@@ -443,21 +444,20 @@ impl<N: NodeIdT> Broadcast<N> {
         let echo_msg = Message::Echo(p);
         let mut step = Step::default();
 
-        if let Some(senders) = self.can_decodes.get(hash) {
-            // Remaining node ids to the right of our_id
-            // after arranging all node ids in a circular list.
-            let right = self
-                .netinfo
-                .all_ids()
-                .cycle()
-                .skip_while(|x| *x != self.our_id())
-                .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
-                .take_while(|x| *x != self.our_id());
-            let msgs = right
-                .filter(|id| !senders.contains(id))
-                .map(|id| Target::Node(id.clone()).message(echo_msg.clone()));
-            step.messages.extend(msgs);
-        }
+        let senders = self.can_decodes.get(hash);
+        // Remaining node ids to the right of our_id
+        // after arranging all node ids in a circular list.
+        let right = self
+            .netinfo
+            .all_ids()
+            .cycle()
+            .skip_while(|x| *x != self.our_id())
+            .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
+            .take_while(|x| *x != self.our_id());
+        let msgs = right
+            .filter(|id| senders.map_or(true, |s| !s.contains(id)))
+            .map(|id| Target::Node(id.clone()).message(echo_msg.clone()));
+        step.messages.extend(msgs);
         Ok(step)
     }

@afck
Copy link
Collaborator

afck commented Jun 11, 2019

(With this change, it passed one hour of the broadcast tests in a loop. 🎉)
(Edit: And another hour with the full test suite! 🎉 🎉)

@pawanjay176
Copy link
Contributor Author

Awesome! Will push the change.
Why did the broadcast test fail only for mock-crypto and not for the normal case though?

@afck
Copy link
Collaborator

afck commented Jun 11, 2019

It would eventually have failed with normal crypto, too, just not with the same seed. The crypto implementations create different keys, so the order of the keys is different, which changes iteration order in some places, I think.

@pawanjay176
Copy link
Contributor Author

Ohh I thought it would be same for same seeds. Makes sense now.

@vkomenda vkomenda merged commit 61f4ed9 into poanetwork:master Jun 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants