Skip to content

Commit

Permalink
Broadcast confirm_req in batches (#1243)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergiySW authored and rkeene committed Nov 19, 2018
1 parent c99328f commit 03f5cd3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
53 changes: 43 additions & 10 deletions rai/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ void rai::network::republish_block_batch (std::deque<std::shared_ptr<rai::block>
if (!blocks_a.empty ())
{
std::weak_ptr<rai::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, blocks_a, delay_a]() {
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.republish_block_batch (blocks_a, delay_a);
Expand Down Expand Up @@ -367,7 +367,7 @@ void rai::network::broadcast_confirm_req (std::shared_ptr<rai::block> block_a)
if (list->empty () || node.peers.total_weight () < node.config.online_weight_minimum.number ())
{
// broadcast request to all peers
list = std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector ());
list = std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector (100));
}

/*
Expand Down Expand Up @@ -416,6 +416,31 @@ void rai::network::broadcast_confirm_req_base (std::shared_ptr<rai::block> block
}
}

void rai::network::broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>> deque_a, unsigned delay_a)
{
auto pair (deque_a.front ());
deque_a.pop_front ();
auto block (pair.first);
// confirm_req to representatives
auto endpoints (pair.second);
if (!endpoints->empty ())
{
broadcast_confirm_req_base (block, endpoints, delay_a);
}
/* Continue while blocks remain
Broadcast with random delay between delay_a & 2*delay_a */
if (!deque_a.empty ())
{
std::weak_ptr<rai::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, deque_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.broadcast_confirm_req_batch (deque_a, delay_a);
}
});
}
}

void rai::network::send_confirm_req (rai::endpoint const & endpoint_a, std::shared_ptr<rai::block> block)
{
rai::confirm_req message (block);
Expand Down Expand Up @@ -2296,7 +2321,7 @@ std::map<rai::endpoint, unsigned> rai::peer_container::list_version ()
return result;
}

std::vector<rai::peer_information> rai::peer_container::list_vector ()
std::vector<rai::peer_information> rai::peer_container::list_vector (size_t count_a)
{
std::vector<peer_information> result;
std::lock_guard<std::mutex> lock (mutex);
Expand All @@ -2305,6 +2330,10 @@ std::vector<rai::peer_information> rai::peer_container::list_vector ()
result.push_back (*i);
}
std::random_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
{
result.resize (count_a, rai::peer_information (rai::endpoint{}, 0));
}
return result;
}

Expand Down Expand Up @@ -3929,10 +3958,10 @@ void rai::active_transactions::announce_votes ()
rai::transaction transaction (node.store.environment, false);
unsigned unconfirmed_count (0);
unsigned unconfirmed_announcements (0);
unsigned mass_request_count (0);
std::vector<rai::block_hash> blocks_bundle;
std::vector<std::shared_ptr<rai::election>> blocks_bundle_elections;
std::deque<std::shared_ptr<rai::block>> rebroadcast_bundle;
std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>> confirm_req_bundle;

for (auto i (roots.begin ()), n (roots.end ()); i != n; ++i)
{
Expand Down Expand Up @@ -4027,16 +4056,14 @@ void rai::active_transactions::announce_votes ()
}
}
}
if (!reps->empty () && (total_weight > node.config.online_weight_minimum.number () || mass_request_count > 20))
if ((!reps->empty () && total_weight > node.config.online_weight_minimum.number ()) || roots.size () > 5)
{
// broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing
node.network.broadcast_confirm_req_base (i->confirm_req_options.first, std::make_shared<std::vector<rai::peer_information>> (*reps), 0);
confirm_req_bundle.push_back (std::make_pair (i->confirm_req_options.first, reps));
}
else
{
// broadcast request to all peers
node.network.broadcast_confirm_req_base (i->confirm_req_options.first, std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector ()), 0);
++mass_request_count;
confirm_req_bundle.push_back (std::make_pair (i->confirm_req_options.first, std::make_shared<std::vector<rai::peer_information>> (node.peers.list_vector (100))));
}
}
}
Expand All @@ -4061,6 +4088,12 @@ void rai::active_transactions::announce_votes ()
this->node.vote_processor.vote (vote, this->node.network.endpoint ());
});
}

//confirm_req broadcast
if (!confirm_req_bundle.empty ())
{
node.network.broadcast_confirm_req_batch (confirm_req_bundle);
}
for (auto i (inactive.begin ()), n (inactive.end ()); i != n; ++i)
{
auto root_it (roots.find (*i));
Expand Down Expand Up @@ -4094,7 +4127,7 @@ void rai::active_transactions::announce_loop ()
while (!stopped)
{
announce_votes ();
condition.wait_for (lock, std::chrono::milliseconds (announce_interval_ms + roots.size () * node.network.broadcast_interval_ms));
condition.wait_for (lock, std::chrono::milliseconds (announce_interval_ms + roots.size () * node.network.broadcast_interval_ms * 3 / 2));
}
}

Expand Down
3 changes: 2 additions & 1 deletion rai/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class peer_container
// List of all peers
std::deque<rai::endpoint> list ();
std::map<rai::endpoint, unsigned> list_version ();
std::vector<peer_information> list_vector ();
std::vector<peer_information> list_vector (size_t);
// A list of random peers sized for the configured rebroadcast fanout
std::deque<rai::endpoint> list_fanout ();
// Get the next peer for attempting bootstrap
Expand Down Expand Up @@ -454,6 +454,7 @@ class network
void send_node_id_handshake (rai::endpoint const &, boost::optional<rai::uint256_union> const & query, boost::optional<rai::uint256_union> const & respond_to);
void broadcast_confirm_req (std::shared_ptr<rai::block>);
void broadcast_confirm_req_base (std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>>, unsigned = broadcast_interval_ms);
void send_confirm_req (rai::endpoint const &, std::shared_ptr<rai::block>);
void send_buffer (uint8_t const *, size_t, rai::endpoint const &, std::function<void(boost::system::error_code const &, size_t)>);
rai::endpoint endpoint ();
Expand Down

0 comments on commit 03f5cd3

Please sign in to comment.