Skip to content

Commit

Permalink
Snapshot Term/whole TxID in KV so as to report status correctly in mu…
Browse files Browse the repository at this point in the history
…lti-threaded configuration (#1288)
  • Loading branch information
achamayou authored Jun 15, 2020
1 parent fc1e833 commit aa85cd8
Show file tree
Hide file tree
Showing 21 changed files with 272 additions and 119 deletions.
20 changes: 15 additions & 5 deletions src/consensus/pbft/libbyz/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ bool Replica::compare_execution_results(

void Replica::playback_request(kv::Tx& tx)
{
auto view = tx.get_view(pbft_requests_map);
auto req_v = view->get(0);
auto tx_view = tx.get_view(pbft_requests_map);
auto req_v = tx_view->get(0);
CCF_ASSERT(
req_v.has_value(),
"Deserialised request but it was not found in the requests map");
Expand Down Expand Up @@ -499,7 +499,7 @@ void Replica::playback_request(kv::Tx& tx)
vec_exec_cmds[0] = std::move(execute_tentative_request(
*req, playback_max_local_commit_value, true, &tx, -1));

exec_command(vec_exec_cmds, playback_byz_info, 1, 0, false);
exec_command(vec_exec_cmds, playback_byz_info, 1, 0, false, view());
did_exec_gov_req = did_exec_gov_req || playback_byz_info.did_exec_gov_req;

auto owned_req = req.release();
Expand Down Expand Up @@ -2443,7 +2443,12 @@ bool Replica::execute_tentative(Pre_prepare* pp, ByzInfo& info, uint64_t nonce)
pp, info.max_local_commit_value, vec_exec_cmds, num_requests))
{
exec_command(
vec_exec_cmds, info, num_requests, nonce, !pp->should_reorder());
vec_exec_cmds,
info,
num_requests,
nonce,
!pp->should_reorder(),
pp->view());
return true;
}
return false;
Expand Down Expand Up @@ -2488,7 +2493,12 @@ bool Replica::execute_tentative(
}

exec_command(
vec_exec_cmds, info, num_requests, nonce, !pp->should_reorder());
vec_exec_cmds,
info,
num_requests,
nonce,
!pp->should_reorder(),
pp->view());
if (!node_info.general_info.support_threading)
{
cb(pp, this, std::move(ctx));
Expand Down
3 changes: 2 additions & 1 deletion src/consensus/pbft/libbyz/test/replica_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class ExecutionMock
ByzInfo& info,
uint32_t num_requests,
uint64_t nonce,
bool executed_single_threaded) {
bool executed_single_threaded,
View view) {
for (uint32_t i = 0; i < num_requests; ++i)
{
std::unique_ptr<ExecCommandMsg>& msg = msgs[i];
Expand Down
5 changes: 3 additions & 2 deletions src/consensus/pbft/libbyz/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ using ExecCommand = std::function<int(
ByzInfo& info,
uint32_t num_requests,
uint64_t nonce,
bool executed_single_threaded)>;
bool executed_single_threaded,
View view)>;

using VerifyAndParseCommand = std::function<std::unique_ptr<pbft::RequestCtx>(
Byz_req* inb, uint8_t* req_start, size_t req_size)>;
Byz_req* inb, uint8_t* req_start, size_t req_size)>;
2 changes: 1 addition & 1 deletion src/consensus/pbft/pbft.h
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ namespace pbft
return client_proxy->get_statistics();
}

bool replicate(const kv::BatchVector& entries) override
bool replicate(const kv::BatchVector& entries, View view) override
{
for (auto& [index, data, globally_committable] : entries)
{
Expand Down
6 changes: 5 additions & 1 deletion src/consensus/pbft/pbft_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,13 @@ namespace pbft
ByzInfo& info,
uint32_t num_requests,
uint64_t nonce,
bool executed_single_threaded) {
bool executed_single_threaded,
View view) {
info.pending_cmd_callbacks = num_requests;
info.version_before_execution_start = store->current_version();
// PBFT views start at 0, where Raft (and therefore CCF, historically)
// starts at 2
store->set_view(view + 2);
for (uint32_t i = 0; i < num_requests; ++i)
{
std::unique_ptr<ExecCommandMsg>& msg = msgs[i];
Expand Down
45 changes: 31 additions & 14 deletions src/consensus/pbft/pbft_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ namespace pbft
virtual void commit_new_view(
const pbft::NewView& new_view, pbft::NewViewsMap& pbft_new_views_map) = 0;
virtual std::shared_ptr<kv::AbstractTxEncryptor> get_encryptor() = 0;
virtual void set_view(Term t) = 0;

virtual kv::TxID next_txid() = 0;
};

template <typename T, typename S>
Expand Down Expand Up @@ -93,16 +96,16 @@ namespace pbft
auto p = x.lock();
if (p)
{
auto version = p->next_version();
auto txid = p->next_txid();
LOG_TRACE_FMT("Storing pre prepare at seqno {}", pp.seqno);
auto success = p->commit(
version,
[version,
txid,
[txid,
&pbft_pre_prepares_map,
&signatures,
pp,
root = std::vector<uint8_t>(root.p, root.p + root.n)]() {
kv::Tx tx(version);
kv::Tx tx(txid.version);
auto pp_view = tx.get_view(pbft_pre_prepares_map);
pp_view->put(0, pp);
auto sig_view = tx.get_view(signatures);
Expand All @@ -113,7 +116,7 @@ namespace pbft
false);
if (success == kv::CommitSuccess::OK)
{
return version;
return txid.version;
}
}
return kv::NoVersion;
Expand Down Expand Up @@ -147,25 +150,21 @@ namespace pbft
auto p = x.lock();
if (p)
{
auto version = p->next_version();
auto txid = p->next_txid();
LOG_TRACE_FMT(
"Storing new view message at view {} for node {}",
new_view.view,
new_view.node_id);

auto success = p->commit(
version,
[version, &pbft_new_views_map, new_view]() {
kv::Tx tx(version);
txid,
[txid, &pbft_new_views_map, new_view]() {
kv::Tx tx(txid.version);
auto vc_view = tx.get_view(pbft_new_views_map);
vc_view->put(0, new_view);
return tx.commit_reserved();
},
false);
if (success == kv::CommitSuccess::OK)
{
return;
}
}
}

Expand All @@ -175,7 +174,6 @@ namespace pbft
if (p)
{
p->compact(v);
return;
}
}

Expand All @@ -198,6 +196,25 @@ namespace pbft
return kv::NoVersion;
}

kv::TxID next_txid()
{
auto p = x.lock();
if (p)
{
return p->next_txid();
}
return {0, kv::NoVersion};
}

void set_view(Term view)
{
auto p = x.lock();
if (p)
{
p->set_term(view);
}
}

std::shared_ptr<kv::AbstractTxEncryptor> get_encryptor()
{
auto p = x.lock();
Expand Down
21 changes: 19 additions & 2 deletions src/consensus/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ namespace raft
}

template <typename T>
bool replicate(const std::vector<std::tuple<Index, T, bool>>& entries)
bool replicate(
const std::vector<std::tuple<Index, T, bool>>& entries, Term term)
{
std::lock_guard<SpinLock> guard(lock);

Expand All @@ -338,6 +339,16 @@ namespace raft
return false;
}

if (term != current_term)
{
LOG_FAIL_FMT(
"Failed to replicate {} items at term {}, current term is {}",
entries.size(),
term,
current_term);
return false;
}

LOG_DEBUG_FMT("Replicating {} entries", entries.size());

for (auto& [index, data, globally_committable] : entries)
Expand Down Expand Up @@ -1031,6 +1042,11 @@ namespace raft
{
rollback(commit_idx);
}
else
{
// but we still want the KV to know which term we're in
store->set_term(current_term);
}

committable_indices.clear();
state = Leader;
Expand Down Expand Up @@ -1197,7 +1213,8 @@ namespace raft

void rollback(Index idx)
{
store->rollback(idx);
store->rollback(idx, current_term);
LOG_DEBUG_FMT("Setting term in store to: {}", current_term);
ledger->truncate(idx);
last_idx = idx;
LOG_DEBUG_FMT("Rolled back at {}", idx);
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/raft/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ namespace raft
raft->force_become_leader(seqno, view, terms, commit_seqno);
}

bool replicate(const kv::BatchVector& entries) override
bool replicate(const kv::BatchVector& entries, View view) override
{
return raft->replicate(entries);
return raft->replicate(entries, view);
}

std::pair<View, SeqNo> get_committed_txid() override
Expand Down
16 changes: 11 additions & 5 deletions src/consensus/raft/raft_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ namespace raft
bool public_only = false,
Term* term = nullptr) = 0;
virtual void compact(Index v) = 0;
virtual void rollback(Index v) = 0;
virtual void rollback(Index v, std::optional<Term> t = std::nullopt) = 0;
virtual void set_term(Term t) = 0;
};

template <typename T, typename S>
Expand Down Expand Up @@ -60,13 +61,18 @@ namespace raft
}
}

void rollback(Index v)
void rollback(Index v, std::optional<Term> t = std::nullopt)
{
auto p = x.lock();
if (p)
{
p->rollback(v);
}
p->rollback(v, t);
}

void set_term(Term t)
{
auto p = x.lock();
if (p)
p->set_term(t);
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/consensus/raft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class RaftDriver
{
std::cout << " KV" << node_id << "->>Node" << node_id
<< ": replicate idx: " << idx << std::endl;
_nodes.at(node_id).raft->replicate(kv::BatchVector{{idx, data, true}});
_nodes.at(node_id).raft->replicate(kv::BatchVector{{idx, data, true}}, 1);
}

void disconnect(raft::NodeId left, raft::NodeId right)
Expand Down
14 changes: 12 additions & 2 deletions src/consensus/raft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,20 @@ namespace raft
#endif
}

virtual void rollback(Index i)
virtual void rollback(Index i, std::optional<Term> t = std::nullopt)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>KV" << _id << ": rollback i: " << i
std::cout << " Node" << _id << "->>KV" << _id << ": rollback i: " << i;
if (t.has_value())
std::cout << " term: " << t.value();
std::cout << std::endl;
#endif
}

virtual void set_term(Term t)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>KV" << _id << ": set_term t: " << t
<< std::endl;
#endif
}
Expand Down
Loading

0 comments on commit aa85cd8

Please sign in to comment.