Skip to content

Commit

Permalink
Performance (#946)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashamis authored Mar 13, 2020
1 parent 5d94163 commit d0e07d8
Show file tree
Hide file tree
Showing 27 changed files with 443 additions and 227 deletions.
1 change: 0 additions & 1 deletion src/consensus/pbft/libbyz/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ void Node::send(Message* m, int i)

void Node::send(Message* m, Principal* p)
{
PBFT_ASSERT(m->size() <= Max_message_size, "Message is too big");
PBFT_ASSERT(m->tag() < Max_message_tag, "Invalid message tag");
PBFT_ASSERT(p != nullptr, "Must send to a principal");

Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Prepare::Prepare(View v, Seqno s, Digest& d, Principal* dst, bool is_signed) :
{
struct signature
{
uint32_t magic = 0xba55ba11;
uint32_t magic = 0xba5eba11;
NodeId id;
Digest d;

Expand Down
79 changes: 44 additions & 35 deletions src/consensus/pbft/libbyz/Rep_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ char* Rep_info::new_reply(
r->set_size(message_size);
r->trim();
char* ret = r->contents() + sizeof(Reply_rep);
std::lock_guard<SpinLock> mguard(lock);
auto ret_insert = reps.insert({Key{(size_t)pid, rid, n}, std::move(r)});
if (ret_insert.second)
{
return ret;
std::lock_guard<SpinLock> mguard(lock);
auto ret_insert =
reps.insert({Key{static_cast<size_t>(pid), rid, n}, std::move(r)});
if (ret_insert.second)
{
return ret;
}
}

return nullptr;
Expand All @@ -59,22 +62,23 @@ int Rep_info::new_reply_size() const

void Rep_info::end_reply(int pid, Request_id rid, Seqno n, int size)
{
std::lock_guard<SpinLock> mguard(lock);
auto it = reps.find({(size_t)pid, rid, n});
if (it != reps.end())
Reply* r;
{
std::lock_guard<SpinLock> mguard(lock);
auto it = reps.find({static_cast<size_t>(pid), rid, n});
if (it == reps.end())
{
LOG_INFO_FMT(
" Attempt to end reply not in this < {}, {}, {} >", pid, rid, n);
return;
}
Reply* r = it->second.get();
Reply_rep& rr = r->rep();
rr.rid = rid;
rr.reply_size = size;
rr.digest = Digest(r->contents() + sizeof(Reply_rep), size);
int old_size = sizeof(Reply_rep) + rr.reply_size;
r->set_size(old_size + MAC_size);
return;
}

LOG_INFO << " Attempt to end reply not in this < " << pid << "," << rid << ","
<< n << ">" << std::endl;
}

Reply* Rep_info::reply(int pid, Request_id rid, Seqno n)
Expand All @@ -91,34 +95,40 @@ Reply* Rep_info::reply(int pid, Request_id rid, Seqno n)

void Rep_info::send_reply(int pid, Request_id rid, Seqno n, View v, int id)
{
std::lock_guard<SpinLock> mguard(lock);
auto it = reps.find({(size_t)pid, rid, n});
if (it != reps.end())
std::unique_ptr<Reply> r;
{
Reply* r = it->second.get();
Reply_rep& rr = r->rep();
std::lock_guard<SpinLock> mguard(lock);
auto it = reps.find({(size_t)pid, rid, n});

PBFT_ASSERT(rr.reply_size != -1, "Invalid state");
PBFT_ASSERT(rr.extra == 0 && rr.v == 0 && rr.replica == 0, "Invalid state");
if (it == reps.end())
{
LOG_INFO << " Attempt to send reply not in this < " << pid << "," << rid
<< "," << n << ">" << std::endl;
return;
}

int old_size = sizeof(Reply_rep) + rr.reply_size;
r = std::move(it->second);
reps.erase(it);
}

rr.extra = 1;
rr.v = v;
rr.replica = id;
Reply_rep& rr = r->rep();

r->auth_type = Auth_type::out;
r->auth_len = sizeof(Reply_rep);
r->auth_src_offset = 0;
r->auth_dst_offset = old_size;
PBFT_ASSERT(rr.reply_size != -1, "Invalid state");
PBFT_ASSERT(rr.extra == 0 && rr.v == 0 && rr.replica == 0, "Invalid state");

pbft::GlobalState::get_node().send(r, pid);
reps.erase(it);
return;
}
int old_size = sizeof(Reply_rep) + rr.reply_size;

rr.extra = 1;
rr.v = v;
rr.replica = id;

r->auth_type = Auth_type::out;
r->auth_len = sizeof(Reply_rep);
r->auth_src_offset = 0;
r->auth_dst_offset = old_size;

LOG_INFO << " Attempt to send reply not in this < " << pid << "," << rid
<< "," << n << ">" << std::endl;
pbft::GlobalState::get_node().send(r.get(), pid);
return;
}

void Rep_info::clear()
Expand All @@ -133,7 +143,6 @@ void Rep_info::dump_state(std::ostream& os)
for (auto& pair : reps)
{
os << " cid: " << pair.first.cid << " rid: " << pair.first.rid
<< " seqno: " << pair.first.n
<< " digest hash:" << pair.second->digest().hash() << std::endl;
<< " seqno: " << pair.first.n << std::endl;
}
}
2 changes: 0 additions & 2 deletions src/consensus/pbft/libbyz/Rep_info_exactly_once.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ void Rep_info_exactly_once::end_reply(int pid, Request_id rid, int sz)
Reply_rep& rr = r->rep();
rr.rid = rid;
rr.reply_size = sz;
rr.digest = Digest(r->contents() + sizeof(Reply_rep), sz);

int old_size = sizeof(Reply_rep) + rr.reply_size;
r->set_size(old_size + MAC_size);
Expand Down Expand Up @@ -185,7 +184,6 @@ void Rep_info_exactly_once::dump_state(std::ostream& os)
for (int i = 0; i < reps.size(); i++)
{
os << "i: " << i << " rid: " << reps[i]->request_id()
<< " digest hash:" << reps[i]->digest().hash()
<< " tentative:" << ireps[i].tentative << std::endl;
}
}
10 changes: 0 additions & 10 deletions src/consensus/pbft/libbyz/Rep_info_exactly_once.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ class Rep_info_exactly_once
// Effects: Returns the timestamp in the last message sent to
// principal "pid".

Digest& digest(int pid);
// Requires: "pid" is a valid principal identifier.
// Effects: Returns a reference to the digest of the last reply
// value sent to pid.

Reply* reply(int pid);
// Requires: "pid" is a valid principal identifier.
// Effects: Returns a pointer to the last reply value sent to "pid"
Expand Down Expand Up @@ -138,11 +133,6 @@ inline Request_id Rep_info_exactly_once::req_id(int pid)
return reps[pid]->request_id();
}

inline Digest& Rep_info_exactly_once::digest(int pid)
{
return reps[pid]->digest();
}

inline Reply* Rep_info_exactly_once::reply(int pid)
{
return reps[pid];
Expand Down
34 changes: 23 additions & 11 deletions src/consensus/pbft/libbyz/Replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,13 @@ void Replica::receive_message(const uint8_t* data, uint32_t size)
bool Replica::compare_execution_results(
const ByzInfo& info, Pre_prepare* pre_prepare)
{
// We are currently not ordering the execution on the backups correctly.
// This will be resolved in the immediate future.
if (enclave::ThreadMessaging::thread_count > 2)
{
return true;
}

auto& r_pp_root = pre_prepare->get_replicated_state_merkle_root();

auto execution_match = true;
Expand Down Expand Up @@ -441,11 +448,10 @@ void Replica::playback_request(ccf::Store::Tx& tx)

waiting_for_playback_pp = true;

std::vector<std::unique_ptr<ExecCommandMsg>> cmds;
cmds.emplace_back(execute_tentative_request(
vec_exec_cmds[0] = std::move(execute_tentative_request(
*req, playback_max_local_commit_value, true, &tx, true));

exec_command(cmds, playback_byz_info);
exec_command(vec_exec_cmds, playback_byz_info, 1);
}

void Replica::playback_pre_prepare(ccf::Store::Tx& tx)
Expand Down Expand Up @@ -2174,6 +2180,7 @@ std::unique_ptr<ExecCommandMsg> Replica::execute_tentative_request(
last_tentative_execute,
max_local_commit_value,
stash_replier,
request.user_id(),
&Replica::execute_tentative_request_end,
tx);

Expand Down Expand Up @@ -2211,7 +2218,8 @@ void Replica::execute_tentative_request_end(ExecCommandMsg& msg, ByzInfo& info)
bool Replica::create_execute_commands(
Pre_prepare* pp,
int64_t& max_local_commit_value,
std::vector<std::unique_ptr<ExecCommandMsg>>& cmds)
std::array<std::unique_ptr<ExecCommandMsg>, Max_requests_in_batch>& cmds,
uint32_t& num_requests)
{
if (
pp->seqno() == last_tentative_execute + 1 && !state.in_fetch_state() &&
Expand All @@ -2224,6 +2232,7 @@ bool Replica::create_execute_commands(
Pre_prepare::Requests_iter iter(pp);
Request request;

num_requests = 0;
while (iter.get(request))
{
auto cmd = execute_tentative_request(
Expand All @@ -2232,7 +2241,8 @@ bool Replica::create_execute_commands(
!iter.has_more_requests(),
nullptr,
pp->seqno());
cmds.push_back(std::move(cmd));
cmds[num_requests] = std::move(cmd);
++num_requests;
}
return true;
}
Expand All @@ -2246,10 +2256,11 @@ bool Replica::execute_tentative(Pre_prepare* pp, ByzInfo& info)
pp->seqno(),
last_tentative_execute);

std::vector<std::unique_ptr<ExecCommandMsg>> cmds;
if (create_execute_commands(pp, info.max_local_commit_value, cmds))
uint32_t num_requests;
if (create_execute_commands(
pp, info.max_local_commit_value, vec_exec_cmds, num_requests))
{
exec_command(cmds, info);
exec_command(vec_exec_cmds, info, num_requests);
return true;
}
return false;
Expand All @@ -2267,8 +2278,9 @@ bool Replica::execute_tentative(
void(cb)(Pre_prepare*, Replica*, std::unique_ptr<ExecTentativeCbCtx>),
std::unique_ptr<ExecTentativeCbCtx> ctx)
{
std::vector<std::unique_ptr<ExecCommandMsg>> cmds;
if (create_execute_commands(pp, ctx->info.max_local_commit_value, cmds))
uint32_t num_requests;
if (create_execute_commands(
pp, ctx->info.max_local_commit_value, vec_exec_cmds, num_requests))
{
ByzInfo& info = ctx->info;
if (cb != nullptr)
Expand All @@ -2290,7 +2302,7 @@ bool Replica::execute_tentative(
}
}

exec_command(cmds, info);
exec_command(vec_exec_cmds, info, num_requests);
if (!node_info.general_info.support_threading)
{
cb(pp, this, std::move(ctx));
Expand Down
5 changes: 4 additions & 1 deletion src/consensus/pbft/libbyz/Replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,14 @@ class Replica : public Node, public IMessageReceiveBase

bool is_exec_pending = false;
std::list<Message*> pending_recv_msgs;
std::array<std::unique_ptr<ExecCommandMsg>, Max_requests_in_batch>
vec_exec_cmds;

bool create_execute_commands(
Pre_prepare* pp,
int64_t& max_local_commit_value,
std::vector<std::unique_ptr<ExecCommandMsg>>& cmds);
std::array<std::unique_ptr<ExecCommandMsg>, Max_requests_in_batch>& cmds,
uint32_t& num_requests);

bool execute_tentative(Pre_prepare* pp, ByzInfo& info);

Expand Down
13 changes: 0 additions & 13 deletions src/consensus/pbft/libbyz/Reply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ Reply::Reply(
Request_id req,
Seqno n,
int replica,
Digest& d,
Principal* p,
bool tentative) :
Message(Reply_tag, sizeof(Reply_rep) + MAC_size)
Expand All @@ -49,7 +48,6 @@ Reply::Reply(
rep().n = n;
rep().replica = replica;
rep().reply_size = -1;
rep().digest = d;

INCR_OP(reply_auth);
START_CC(reply_auth_cycles);
Expand Down Expand Up @@ -90,7 +88,6 @@ void Reply::authenticate(Principal* p, int act_len, bool tentative)
}

rep().reply_size = act_len;
rep().digest = Digest(contents() + sizeof(Reply_rep), act_len);
int old_size = sizeof(Reply_rep) + act_len;
set_size(old_size + MAC_size);

Expand Down Expand Up @@ -155,16 +152,6 @@ bool Reply::pre_verify()
return false;
}

// Check reply
if (full())
{
Digest d(contents() + sizeof(Reply_rep), rep_size);
if (d != rep().digest)
{
return false;
}
}

// Check signature.
INCR_OP(reply_auth_ver);
START_CC(reply_auth_ver_cycles);
Expand Down
12 changes: 1 addition & 11 deletions src/consensus/pbft/libbyz/Reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ struct Reply_rep : public Message_rep
View v; // current view
Request_id rid; // unique request identifier
Seqno n; // sequence number when request was executed
Digest digest; // digest of reply.
int replica; // id of replica sending the reply
int reply_size; // if negative, reply is not full.
// Followed by a reply that is "reply_size" bytes long and
Expand Down Expand Up @@ -77,7 +76,6 @@ class Reply : public Message
Request_id req,
Seqno n,
int replica,
Digest& d,
Principal* p,
bool tentative);
// Effects: Creates a new empty Reply message and appends a MAC for principal
Expand All @@ -100,9 +98,6 @@ class Reply : public Message
int id() const;
// Effects: Fetches the replier's identifier from the message.

Digest& digest() const;
// Effects: Fetches the digest from the message.

char* reply(int& len);
// Effects: Returns a pointer to the reply and sets len to the
// reply size.
Expand Down Expand Up @@ -159,11 +154,6 @@ inline int Reply::id() const
return rep().replica;
}

inline Digest& Reply::digest() const
{
return rep().digest;
}

inline char* Reply::reply(int& len)
{
len = rep().reply_size;
Expand All @@ -187,6 +177,6 @@ inline bool Reply::match(Reply* r)
return false;
}

return (rep().digest == r->rep().digest) & (rep().n == r->rep().n) &
return (rep().n == r->rep().n) &
((!is_tentative() & !r->is_tentative()) | (view() == r->view()));
}
8 changes: 6 additions & 2 deletions src/consensus/pbft/libbyz/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,13 @@ static char* service_mem = 0;
static IMessageReceiveBase* message_receive_base;

ExecCommand exec_command =
[](std::vector<std::unique_ptr<ExecCommandMsg>>& msgs, ByzInfo& info) {
for (auto& msg : msgs)
[](
std::array<std::unique_ptr<ExecCommandMsg>, Max_requests_in_batch>& msgs,
ByzInfo& info,
uint32_t num_requests) {
for (uint32_t i = 0; i < num_requests; ++i)
{
std::unique_ptr<ExecCommandMsg>& msg = msgs[i];
Byz_req* inb = &msg->inb;
Byz_rep& outb = msg->outb;
int client = msg->client;
Expand Down
Loading

0 comments on commit d0e07d8

Please sign in to comment.