From aa85cd8e2f3bee31c3736b1dfe4a8c96ab9bcf78 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Mon, 15 Jun 2020 17:39:58 +0100 Subject: [PATCH] Snapshot Term/whole TxID in KV so as to report status correctly in multi-threaded configuration (#1288) --- src/consensus/pbft/libbyz/replica.cpp | 20 +++-- .../pbft/libbyz/test/replica_unit_tests.cpp | 3 +- src/consensus/pbft/libbyz/types.h | 5 +- src/consensus/pbft/pbft.h | 2 +- src/consensus/pbft/pbft_config.h | 6 +- src/consensus/pbft/pbft_types.h | 45 +++++++---- src/consensus/raft/raft.h | 21 ++++- src/consensus/raft/raft_consensus.h | 4 +- src/consensus/raft/raft_types.h | 16 ++-- src/consensus/raft/test/driver.h | 2 +- src/consensus/raft/test/logging_stub.h | 14 +++- src/consensus/raft/test/main.cpp | 30 +++---- src/kv/kv_types.h | 23 +++++- src/kv/store.h | 80 +++++++++++++++---- src/kv/test/stub_consensus.h | 4 +- src/kv/tx.h | 51 ++++++++---- src/kv/view_containers.h | 2 +- src/node/history.h | 30 ++++--- src/node/node_state.h | 12 +-- src/node/rpc/frontend.h | 11 +-- src/node/test/history.cpp | 10 +-- 21 files changed, 272 insertions(+), 119 deletions(-) diff --git a/src/consensus/pbft/libbyz/replica.cpp b/src/consensus/pbft/libbyz/replica.cpp index 116bb24f0804..cfb4dbe4abe8 100644 --- a/src/consensus/pbft/libbyz/replica.cpp +++ b/src/consensus/pbft/libbyz/replica.cpp @@ -463,8 +463,8 @@ bool Replica::compare_execution_results( void Replica::playback_request(kv::Tx& tx) { - auto view = tx.get_view(pbft_requests_map); - auto req_v = view->get(0); + auto tx_view = tx.get_view(pbft_requests_map); + auto req_v = tx_view->get(0); CCF_ASSERT( req_v.has_value(), "Deserialised request but it was not found in the requests map"); @@ -499,7 +499,7 @@ void Replica::playback_request(kv::Tx& tx) vec_exec_cmds[0] = std::move(execute_tentative_request( *req, playback_max_local_commit_value, true, &tx, -1)); - exec_command(vec_exec_cmds, playback_byz_info, 1, 0, false); + exec_command(vec_exec_cmds, playback_byz_info, 1, 0, false, view()); did_exec_gov_req = did_exec_gov_req || playback_byz_info.did_exec_gov_req; auto owned_req = req.release(); @@ -2443,7 +2443,12 @@ bool Replica::execute_tentative(Pre_prepare* pp, ByzInfo& info, uint64_t nonce) pp, info.max_local_commit_value, vec_exec_cmds, num_requests)) { exec_command( - vec_exec_cmds, info, num_requests, nonce, !pp->should_reorder()); + vec_exec_cmds, + info, + num_requests, + nonce, + !pp->should_reorder(), + pp->view()); return true; } return false; @@ -2488,7 +2493,12 @@ bool Replica::execute_tentative( } exec_command( - vec_exec_cmds, info, num_requests, nonce, !pp->should_reorder()); + vec_exec_cmds, + info, + num_requests, + nonce, + !pp->should_reorder(), + pp->view()); if (!node_info.general_info.support_threading) { cb(pp, this, std::move(ctx)); diff --git a/src/consensus/pbft/libbyz/test/replica_unit_tests.cpp b/src/consensus/pbft/libbyz/test/replica_unit_tests.cpp index 382686a0b082..af3f09fd00ef 100644 --- a/src/consensus/pbft/libbyz/test/replica_unit_tests.cpp +++ b/src/consensus/pbft/libbyz/test/replica_unit_tests.cpp @@ -43,7 +43,8 @@ class ExecutionMock ByzInfo& info, uint32_t num_requests, uint64_t nonce, - bool executed_single_threaded) { + bool executed_single_threaded, + View view) { for (uint32_t i = 0; i < num_requests; ++i) { std::unique_ptr& msg = msgs[i]; diff --git a/src/consensus/pbft/libbyz/types.h b/src/consensus/pbft/libbyz/types.h index 9e3c1c46f6b6..8b67d73abe09 100644 --- a/src/consensus/pbft/libbyz/types.h +++ b/src/consensus/pbft/libbyz/types.h @@ -141,7 +141,8 @@ using ExecCommand = std::function; + bool executed_single_threaded, + View view)>; using VerifyAndParseCommand = std::function( - Byz_req* inb, uint8_t* req_start, size_t req_size)>; \ No newline at end of file + Byz_req* inb, uint8_t* req_start, size_t req_size)>; diff --git a/src/consensus/pbft/pbft.h b/src/consensus/pbft/pbft.h index 93af00d2cf96..c3e62005774f 100644 --- a/src/consensus/pbft/pbft.h +++ b/src/consensus/pbft/pbft.h @@ -623,7 +623,7 @@ namespace pbft return client_proxy->get_statistics(); } - bool replicate(const kv::BatchVector& entries) override + bool replicate(const kv::BatchVector& entries, View view) override { for (auto& [index, data, globally_committable] : entries) { diff --git a/src/consensus/pbft/pbft_config.h b/src/consensus/pbft/pbft_config.h index 90a25b5691da..fb4e1a6945ab 100644 --- a/src/consensus/pbft/pbft_config.h +++ b/src/consensus/pbft/pbft_config.h @@ -259,9 +259,13 @@ namespace pbft ByzInfo& info, uint32_t num_requests, uint64_t nonce, - bool executed_single_threaded) { + bool executed_single_threaded, + View view) { info.pending_cmd_callbacks = num_requests; info.version_before_execution_start = store->current_version(); + // PBFT views start at 0, where Raft (and therefore CCF, historically) + // starts at 2 + store->set_view(view + 2); for (uint32_t i = 0; i < num_requests; ++i) { std::unique_ptr& msg = msgs[i]; diff --git a/src/consensus/pbft/pbft_types.h b/src/consensus/pbft/pbft_types.h index 08acc5d60c81..7425f11be04c 100644 --- a/src/consensus/pbft/pbft_types.h +++ b/src/consensus/pbft/pbft_types.h @@ -61,6 +61,9 @@ namespace pbft virtual void commit_new_view( const pbft::NewView& new_view, pbft::NewViewsMap& pbft_new_views_map) = 0; virtual std::shared_ptr get_encryptor() = 0; + virtual void set_view(Term t) = 0; + + virtual kv::TxID next_txid() = 0; }; template @@ -93,16 +96,16 @@ namespace pbft auto p = x.lock(); if (p) { - auto version = p->next_version(); + auto txid = p->next_txid(); LOG_TRACE_FMT("Storing pre prepare at seqno {}", pp.seqno); auto success = p->commit( - version, - [version, + txid, + [txid, &pbft_pre_prepares_map, &signatures, pp, root = std::vector(root.p, root.p + root.n)]() { - kv::Tx tx(version); + kv::Tx tx(txid.version); auto pp_view = tx.get_view(pbft_pre_prepares_map); pp_view->put(0, pp); auto sig_view = tx.get_view(signatures); @@ -113,7 +116,7 @@ namespace pbft false); if (success == kv::CommitSuccess::OK) { - return version; + return txid.version; } } return kv::NoVersion; @@ -147,25 +150,21 @@ namespace pbft auto p = x.lock(); if (p) { - auto version = p->next_version(); + auto txid = p->next_txid(); LOG_TRACE_FMT( "Storing new view message at view {} for node {}", new_view.view, new_view.node_id); auto success = p->commit( - version, - [version, &pbft_new_views_map, new_view]() { - kv::Tx tx(version); + txid, + [txid, &pbft_new_views_map, new_view]() { + kv::Tx tx(txid.version); auto vc_view = tx.get_view(pbft_new_views_map); vc_view->put(0, new_view); return tx.commit_reserved(); }, false); - if (success == kv::CommitSuccess::OK) - { - return; - } } } @@ -175,7 +174,6 @@ namespace pbft if (p) { p->compact(v); - return; } } @@ -198,6 +196,25 @@ namespace pbft return kv::NoVersion; } + kv::TxID next_txid() + { + auto p = x.lock(); + if (p) + { + return p->next_txid(); + } + return {0, kv::NoVersion}; + } + + void set_view(Term view) + { + auto p = x.lock(); + if (p) + { + p->set_term(view); + } + } + std::shared_ptr get_encryptor() { auto p = x.lock(); diff --git a/src/consensus/raft/raft.h b/src/consensus/raft/raft.h index 26a6e75eee9c..4daa3c842bb0 100644 --- a/src/consensus/raft/raft.h +++ b/src/consensus/raft/raft.h @@ -326,7 +326,8 @@ namespace raft } template - bool replicate(const std::vector>& entries) + bool replicate( + const std::vector>& entries, Term term) { std::lock_guard guard(lock); @@ -338,6 +339,16 @@ namespace raft return false; } + if (term != current_term) + { + LOG_FAIL_FMT( + "Failed to replicate {} items at term {}, current term is {}", + entries.size(), + term, + current_term); + return false; + } + LOG_DEBUG_FMT("Replicating {} entries", entries.size()); for (auto& [index, data, globally_committable] : entries) @@ -1031,6 +1042,11 @@ namespace raft { rollback(commit_idx); } + else + { + // but we still want the KV to know which term we're in + store->set_term(current_term); + } committable_indices.clear(); state = Leader; @@ -1197,7 +1213,8 @@ namespace raft void rollback(Index idx) { - store->rollback(idx); + store->rollback(idx, current_term); + LOG_DEBUG_FMT("Setting term in store to: {}", current_term); ledger->truncate(idx); last_idx = idx; LOG_DEBUG_FMT("Rolled back at {}", idx); diff --git a/src/consensus/raft/raft_consensus.h b/src/consensus/raft/raft_consensus.h index 7ce6d38f7516..dd1e63503b03 100644 --- a/src/consensus/raft/raft_consensus.h +++ b/src/consensus/raft/raft_consensus.h @@ -49,9 +49,9 @@ namespace raft raft->force_become_leader(seqno, view, terms, commit_seqno); } - bool replicate(const kv::BatchVector& entries) override + bool replicate(const kv::BatchVector& entries, View view) override { - return raft->replicate(entries); + return raft->replicate(entries, view); } std::pair get_committed_txid() override diff --git a/src/consensus/raft/raft_types.h b/src/consensus/raft/raft_types.h index 4bca81b787b8..77902ee8a9b6 100644 --- a/src/consensus/raft/raft_types.h +++ b/src/consensus/raft/raft_types.h @@ -28,7 +28,8 @@ namespace raft bool public_only = false, Term* term = nullptr) = 0; virtual void compact(Index v) = 0; - virtual void rollback(Index v) = 0; + virtual void rollback(Index v, std::optional t = std::nullopt) = 0; + virtual void set_term(Term t) = 0; }; template @@ -60,13 +61,18 @@ namespace raft } } - void rollback(Index v) + void rollback(Index v, std::optional t = std::nullopt) { auto p = x.lock(); if (p) - { - p->rollback(v); - } + p->rollback(v, t); + } + + void set_term(Term t) + { + auto p = x.lock(); + if (p) + p->set_term(t); } }; diff --git a/src/consensus/raft/test/driver.h b/src/consensus/raft/test/driver.h index 70ec4c02a13d..cf64e79ff0da 100644 --- a/src/consensus/raft/test/driver.h +++ b/src/consensus/raft/test/driver.h @@ -223,7 +223,7 @@ class RaftDriver { std::cout << " KV" << node_id << "->>Node" << node_id << ": replicate idx: " << idx << std::endl; - _nodes.at(node_id).raft->replicate(kv::BatchVector{{idx, data, true}}); + _nodes.at(node_id).raft->replicate(kv::BatchVector{{idx, data, true}}, 1); } void disconnect(raft::NodeId left, raft::NodeId right) diff --git a/src/consensus/raft/test/logging_stub.h b/src/consensus/raft/test/logging_stub.h index 9965173b9e56..9a353369ebc9 100644 --- a/src/consensus/raft/test/logging_stub.h +++ b/src/consensus/raft/test/logging_stub.h @@ -134,10 +134,20 @@ namespace raft #endif } - virtual void rollback(Index i) + virtual void rollback(Index i, std::optional t = std::nullopt) { #ifdef STUB_LOG - std::cout << " Node" << _id << "->>KV" << _id << ": rollback i: " << i + std::cout << " Node" << _id << "->>KV" << _id << ": rollback i: " << i; + if (t.has_value()) + std::cout << " term: " << t.value(); + std::cout << std::endl; +#endif + } + + virtual void set_term(Term t) + { +#ifdef STUB_LOG + std::cout << " Node" << _id << "->>KV" << _id << ": set_term t: " << t << std::endl; #endif } diff --git a/src/consensus/raft/test/main.cpp b/src/consensus/raft/test/main.cpp index 1693d233c2fd..beb9da8e56ca 100644 --- a/src/consensus/raft/test/main.cpp +++ b/src/consensus/raft/test/main.cpp @@ -85,7 +85,7 @@ DOCTEST_TEST_CASE("Single node commit" * doctest::test_suite("single")) entry->push_back(2); entry->push_back(3); - r0.replicate(kv::BatchVector{{i, entry, true}}); + r0.replicate(kv::BatchVector{{i, entry, true}}, 1); DOCTEST_REQUIRE(r0.get_last_idx() == i); DOCTEST_REQUIRE(r0.get_commit_idx() == i); } @@ -338,10 +338,10 @@ DOCTEST_TEST_CASE( DOCTEST_INFO("Try to replicate on a follower, and fail"); std::vector entry = {1, 2, 3}; auto data = std::make_shared>(entry); - DOCTEST_REQUIRE_FALSE(r1.replicate(kv::BatchVector{{1, data, true}})); + DOCTEST_REQUIRE_FALSE(r1.replicate(kv::BatchVector{{1, data, true}}, 1)); DOCTEST_INFO("Tell the leader to replicate a message"); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 1); DOCTEST_REQUIRE(*r0.ledger->ledger.front() == entry); DOCTEST_INFO("The other nodes are not told about this yet"); @@ -440,7 +440,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple")) std::vector first_entry = {1, 2, 3}; auto data = std::make_shared>(first_entry); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data, true}}, 1)); r0.periodic(ms(10)); DOCTEST_REQUIRE( @@ -543,8 +543,8 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple")) std::vector second_entry = {2, 2, 2}; auto data_2 = std::make_shared>(second_entry); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data_1, true}})); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, data_2, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data_1, true}}, 1)); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, data_2, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2); r0.periodic(ms(10)); DOCTEST_REQUIRE(r0.channels->sent_append_entries.size() == 1); @@ -565,7 +565,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple")) { std::vector third_entry = {3, 3, 3}; auto data = std::make_shared>(third_entry); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{3, data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{3, data, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 3); // Simulate that the append entries was not deserialised successfully @@ -594,7 +594,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple")) { std::vector fourth_entry = {4, 4, 4}; auto data = std::make_shared>(fourth_entry); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{4, data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{4, data, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 4); r0.periodic(ms(10)); DOCTEST_REQUIRE(r0.channels->sent_append_entries.size() == 1); @@ -607,7 +607,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple")) { std::vector fifth_entry = {5, 5, 5}; auto data = std::make_shared>(fifth_entry); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{5, data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{5, data, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 5); r0.periodic(ms(10)); DOCTEST_REQUIRE(r0.channels->sent_append_entries.size() == 1); @@ -706,7 +706,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit") for (size_t i = 1; i <= num_big_entries; ++i) { - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{i, data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{i, data, true}}, 1)); DOCTEST_REQUIRE( msg_response == dispatch_all_and_DOCTEST_CHECK( @@ -724,7 +724,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit") for (size_t i = num_big_entries + 1; i <= individual_entries; ++i) { - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{i, smaller_data, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{i, smaller_data, true}}, 1)); dispatch_all(nodes, r0.channels->sent_append_entries); } @@ -856,7 +856,7 @@ DOCTEST_TEST_CASE( DOCTEST_INFO("Node 0 compacts twice but Nodes 1 and 2 only once"); { - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, first_entry, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, first_entry, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 1); r0.periodic(ms(10)); DOCTEST_REQUIRE(r0.channels->sent_append_entries.size() == 2); @@ -870,7 +870,7 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE( 1 == dispatch_all(nodes, r2.channels->sent_append_entries_response)); - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, second_entry, true}})); + DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, second_entry, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2); r0.periodic(ms(10)); DOCTEST_REQUIRE(r0.channels->sent_append_entries.size() == 2); @@ -963,7 +963,7 @@ DOCTEST_TEST_CASE( "Node 1 and Node 2 proceed to compact at idx 2, where Node 0 has " "compacted for a previous term"); { - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{2, second_entry, true}})); + DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{2, second_entry, true}}, 2)); DOCTEST_REQUIRE(r1.ledger->ledger.size() == 2); r1.periodic(ms(10)); DOCTEST_REQUIRE(r1.channels->sent_append_entries.size() == 2); @@ -980,7 +980,7 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(r0.channels->sent_append_entries_response.size() == 0); DOCTEST_INFO("Another entry from Node 1 so that Node 2 can also compact"); - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}})); + DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2)); DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); r1.periodic(ms(10)); DOCTEST_REQUIRE(r1.channels->sent_append_entries.size() == 2); diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index f9f6572c6af9..a6c2bfc06a26 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -28,11 +28,17 @@ namespace kv // Term describes an epoch of Versions. It is incremented when global kv's // writer(s) changes. Term and Version combined give a unique identifier for - // all accepted kv modifications. Terms are handled by Raft via the + // all accepted kv modifications. Terms are handled by Consensus via the // TermHistory using Term = uint64_t; using NodeId = uint64_t; + struct TxID + { + Term term = 0; + Version version = 0; + }; + using BatchVector = std::vector< std::tuple>, bool>>; @@ -222,7 +228,7 @@ namespace kv state = Primary; } - virtual bool replicate(const BatchVector& entries) = 0; + virtual bool replicate(const BatchVector& entries, View view) = 0; virtual std::pair get_committed_txid() = 0; virtual View get_view(SeqNo seqno) = 0; @@ -343,9 +349,15 @@ namespace kv { public: virtual ~AbstractStore() {} + virtual Version next_version() = 0; + virtual TxID next_txid() = 0; + virtual Version current_version() = 0; + virtual TxID current_txid() = 0; + virtual Version commit_version() = 0; + virtual std::shared_ptr get_consensus() = 0; virtual std::shared_ptr get_history() = 0; virtual std::shared_ptr get_encryptor() = 0; @@ -354,9 +366,12 @@ namespace kv bool public_only = false, Term* term = nullptr) = 0; virtual void compact(Version v) = 0; - virtual void rollback(Version v) = 0; + virtual void rollback(Version v, std::optional t = std::nullopt) = 0; + virtual void set_term(Term t) = 0; + virtual CommitSuccess commit( - Version v, PendingTx pt, bool globally_committable) = 0; + const TxID& txid, PendingTx pt, bool globally_committable) = 0; + virtual size_t commit_gap() = 0; }; diff --git a/src/kv/store.h b/src/kv/store.h index 128050bb892e..5c16346cfa01 100644 --- a/src/kv/store.h +++ b/src/kv/store.h @@ -7,6 +7,8 @@ #include "map.h" #include "view_containers.h" +#include + namespace kv { class Store : public AbstractStore @@ -22,6 +24,7 @@ namespace kv std::shared_ptr encryptor = nullptr; Version version = 0; Version compacted = 0; + Term term = 0; SpinLock maps_lock; SpinLock version_lock; @@ -251,18 +254,28 @@ namespace kv map.second->post_compact(); } - void rollback(Version v) override + void rollback(Version v, std::optional t = std::nullopt) override { // This is called to roll the store back to the state it was in // at the specified version. // No transactions can be prepared or committed during rollback. std::lock_guard mguard(maps_lock); - if (v >= current_version()) - return; + { + std::lock_guard vguard(version_lock); + // The term should always be updated on rollback() when passed + // regardless of whether version needs to be updated or not + if (t.has_value()) + term = t.value(); + if (v >= version) + return; + } if (v < commit_version()) - return; + throw std::logic_error(fmt::format( + "Attempting rollback to {}, earlier than commit version {}", + v, + commit_version())); for (auto& map : maps) map.second->lock(); @@ -287,10 +300,16 @@ namespace kv e->rollback(v); } + void set_term(Term t) override + { + std::lock_guard vguard(version_lock); + term = t; + } + DeserialiseSuccess deserialise_views( const std::vector& data, bool public_only = false, - Term* term = nullptr, + Term* term_ = nullptr, ViewContainer* tx = nullptr) { // If we pass in a transaction we don't want to commit, just deserialise @@ -419,7 +438,7 @@ namespace kv if (h) { - if (!h->verify(term)) + if (!h->verify(term_)) { LOG_FAIL_FMT("Failed to deserialize"); LOG_DEBUG_FMT("Signature in transaction {} failed to verify", v); @@ -463,7 +482,7 @@ namespace kv if (tx) { - tx->set_view_list(views); + tx->set_view_list(views, term); } return success; @@ -508,11 +527,18 @@ namespace kv Version current_version() override { - // Must lock in case the version is being incremented. + // Must lock in case the version or term is being incremented. std::lock_guard vguard(version_lock); return version; } + TxID current_txid() override + { + // Must lock in case the version is being incremented. + std::lock_guard vguard(version_lock); + return {term, version}; + } + Version commit_version() override { // Must lock in case the store is being compacted. @@ -521,7 +547,9 @@ namespace kv } CommitSuccess commit( - Version version, PendingTx pending_tx, bool globally_committable) override + const TxID& txid, + PendingTx pending_tx, + bool globally_committable) override { auto r = get_consensus(); if (!r) @@ -529,21 +557,29 @@ namespace kv LOG_DEBUG_FMT( "Store::commit {}{}", - version, + txid.version, (globally_committable ? " globally_committable" : "")); BatchVector batch; Version previous_last_replicated = 0; Version next_last_replicated = 0; Version previous_rollback_count = 0; + kv::Consensus::View replication_view = 0; { std::lock_guard vguard(version_lock); - if (globally_committable && version > last_committable) - last_committable = version; + // This can happen when a transaction started before a view change, + // but tries to commit after the view change is complete. + LOG_DEBUG_FMT( + "Want to commit for term {}, term is {}", txid.term, term); + if (txid.term != term) + return CommitSuccess::NO_REPLICATE; + + if (globally_committable && txid.version > last_committable) + last_committable = txid.version; pending_txs.insert( - {version, + {txid.version, std::make_pair(std::move(pending_tx), globally_committable)}); auto h = get_history(); @@ -569,7 +605,7 @@ namespace kv if (h) { - h->add_pending(reqid, version, data_shared); + h->add_pending(reqid, txid.version, data_shared); } LOG_DEBUG_FMT( @@ -585,9 +621,11 @@ namespace kv previous_rollback_count = rollback_count; previous_last_replicated = last_replicated; next_last_replicated = last_replicated + batch.size(); + + replication_view = term; } - if (r->replicate(batch)) + if (r->replicate(batch, replication_view)) { std::lock_guard vguard(version_lock); if ( @@ -618,6 +656,18 @@ namespace kv return version; } + TxID next_txid() override + { + std::lock_guard vguard(version_lock); + + // Get the next global version. If we would go negative, wrap to 0. + ++version; + if (version < 0) + version = 0; + + return {term, version}; + } + size_t commit_gap() override { std::lock_guard vguard(version_lock); diff --git a/src/kv/test/stub_consensus.h b/src/kv/test/stub_consensus.h index a1a420f608a5..779b330fab7e 100644 --- a/src/kv/test/stub_consensus.h +++ b/src/kv/test/stub_consensus.h @@ -23,7 +23,7 @@ namespace kv consensus_type(consensus_type_) {} - bool replicate(const BatchVector& entries) override + bool replicate(const BatchVector& entries, View view) override { for (const auto& entry : entries) { @@ -153,7 +153,7 @@ namespace kv return false; } - bool replicate(const BatchVector& entries) override + bool replicate(const BatchVector& entries, View view) override { return false; } diff --git a/src/kv/tx.h b/src/kv/tx.h index 2f15aab90e38..203522c82d79 100644 --- a/src/kv/tx.h +++ b/src/kv/tx.h @@ -13,10 +13,11 @@ namespace kv { private: OrderedViews view_list; - bool committed; - bool success; - Version read_version; - Version version; + bool committed = false; + bool success = false; + Version read_version = NoVersion; + Version version = NoVersion; + Term term = 0; kv::TxHistory::RequestID req_id; @@ -25,7 +26,7 @@ namespace kv { using MapView = typename M::TxView; - // If the M is present, its AbtractTxView should be an M::TxView. This + // If the M is present, its AbstractTxView should be an M::TxView. This // invariant could be broken by set_view_list, which will produce an error // here auto search = view_list.find(m.get_name()); @@ -54,7 +55,9 @@ namespace kv if (read_version == NoVersion) { // Grab opacity version that all Maps should be queried at. - read_version = m.get_store()->current_version(); + auto txid = m.get_store()->current_txid(); + term = txid.term; + read_version = txid.version; } MapView* typed_view = m.template create_view(read_version); @@ -83,24 +86,20 @@ namespace kv success = false; read_version = NoVersion; version = NoVersion; + term = 0; } public: - Tx() : - view_list(), - committed(false), - success(false), - read_version(NoVersion), - version(NoVersion) - {} + Tx() : view_list() {} Tx(const Tx& that) = delete; - void set_view_list(OrderedViews& view_list_) override + void set_view_list(OrderedViews& view_list_, Term term_) override { // if view list is not empty then any coinciding keys will not be // overwritten view_list.merge(view_list_); + term = term_; } void set_req_id(const kv::TxHistory::RequestID& req_id_) @@ -122,6 +121,11 @@ namespace kv return read_version; } + Version get_term() + { + return term; + } + /** Get a transaction view on a map. * * This adds the map to the transaction set if it is not yet present. @@ -209,7 +213,9 @@ namespace kv } return store->commit( - version, MovePendingTx(std::move(data), std::move(req_id)), false); + {term, version}, + MovePendingTx(std::move(data), std::move(req_id)), + false); } catch (const std::exception& e) { @@ -240,6 +246,21 @@ namespace kv return version; } + /** Commit term if committed + * + * @return Commit term + */ + Version commit_term() + { + if (!committed) + throw std::logic_error("Transaction not yet committed"); + + if (!success) + throw std::logic_error("Transaction aborted"); + + return term; + } + std::vector serialise(bool include_reads = false) { if (!committed) diff --git a/src/kv/view_containers.h b/src/kv/view_containers.h index 50b24c298eee..54e91dd39a6d 100644 --- a/src/kv/view_containers.h +++ b/src/kv/view_containers.h @@ -25,7 +25,7 @@ namespace kv struct ViewContainer { virtual ~ViewContainer() = default; - virtual void set_view_list(OrderedViews& view_list) = 0; + virtual void set_view_list(OrderedViews& view_list, Term term) = 0; }; // Atomically checks for conflicts then applies the writes in a set of views diff --git a/src/node/history.h b/src/node/history.h index c4fc4c937da6..21595693369f 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -121,14 +121,14 @@ namespace ccf void emit_signature() override { - auto version = store.next_version(); - LOG_INFO_FMT("Issuing signature at {}", version); + auto txid = store.next_txid(); + LOG_INFO_FMT("Issuing signature at {}.{}", txid.term, txid.version); store.commit( - version, - [version, this]() { - kv::Tx sig(version); + txid, + [txid, this]() { + kv::Tx sig(txid.version); auto sig_view = sig.get_view(signatures); - Signature sig_value(id, version); + Signature sig_value(id, txid.version); sig_view->put(0, sig_value); return sig.commit_reserved(); }, @@ -591,26 +591,24 @@ namespace ccf if (consensus->type() == ConsensusType::RAFT) { - auto version = store.next_version(); - auto view = consensus->get_view(); + auto txid = store.next_txid(); auto commit_txid = consensus->get_committed_txid(); - LOG_DEBUG_FMT("Issuing signature at {}", version); LOG_DEBUG_FMT( "Signed at {} in view: {} commit was: {}.{}", - version, - view, + txid.version, + txid.term, commit_txid.first, commit_txid.second); store.commit( - version, - [version, view, commit_txid, this]() { - kv::Tx sig(version); + txid, + [txid, commit_txid, this]() { + kv::Tx sig(txid.version); auto sig_view = sig.get_view(signatures); crypto::Sha256Hash root = replicated_state_tree.get_root(); Signature sig_value( id, - version, - view, + txid.version, + txid.term, commit_txid.second, commit_txid.first, root, diff --git a/src/node/node_state.h b/src/node/node_state.h index 6f466132b43b..3bb999ad7a16 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -602,9 +602,6 @@ namespace ccf // When reaching the end of the public ledger, truncate to last signed // index and promote network secrets to this index - kv::Tx tx; - GenesisGenerator g(network, tx); - network.tables->rollback(last_recovered_commit_idx); ledger_truncate(last_recovered_commit_idx); LOG_INFO_FMT( @@ -614,9 +611,14 @@ namespace ccf network.ledger_secrets->init(last_recovered_commit_idx + 1); setup_encryptor(network.consensus_type); + // KV term must be set before the first Tx is committed + LOG_INFO_FMT( + "Setting term on public recovery KV to {}", term_history.size() + 2); + network.tables->set_term(term_history.size() + 2); + kv::Tx tx; + GenesisGenerator g(network, tx); g.create_service(network.identity->cert, last_recovered_commit_idx + 1); - g.retire_active_nodes(); self = g.add_node({node_info_network, @@ -1726,4 +1728,4 @@ namespace ccf setup_basic_hooks(); } }; -} \ No newline at end of file +} diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index ba9b7e1d4766..9527625305b4 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -366,12 +366,14 @@ namespace ccf auto cv = tx.commit_version(); if (cv == 0) cv = tx.get_read_version(); - if (cv == kv::NoVersion) - cv = tables.current_version(); - ctx->set_seqno(cv); if (consensus != nullptr) { - ctx->set_view(consensus->get_view(cv)); + if (cv != kv::NoVersion) + { + ctx->set_seqno(cv); + ctx->set_view(tx.commit_term()); + } + // Deprecated, this will be removed in future releases ctx->set_global_commit(consensus->get_committed_seqno()); if ( @@ -582,7 +584,6 @@ namespace ccf process_command(ctx, tx, ctx->session->original_caller->caller_id, fn); version = tx.get_version(); - return {std::move(rep.value()), version}; } diff --git a/src/node/test/history.cpp b/src/node/test/history.cpp index 6ded41b680ce..c7ed65d06ff7 100644 --- a/src/node/test/history.cpp +++ b/src/node/test/history.cpp @@ -26,7 +26,7 @@ class DummyConsensus : public kv::StubConsensus DummyConsensus(kv::Store* store_) : store(store_) {} - bool replicate(const kv::BatchVector& entries) override + bool replicate(const kv::BatchVector& entries, View view) override { if (store) { @@ -213,7 +213,7 @@ class CompactingConsensus : public kv::StubConsensus CompactingConsensus(kv::Store* store_) : store(store_) {} - bool replicate(const kv::BatchVector& entries) override + bool replicate(const kv::BatchVector& entries, View view) override { for (auto& [version, data, committable] : entries) { @@ -275,7 +275,7 @@ TEST_CASE( INFO("Batch of two, starting with a commitable"); { - auto rv = store.next_version(); + auto rv = store.next_txid(); kv::Tx tx; auto txv = tx.get_view(table); @@ -286,7 +286,7 @@ TEST_CASE( store.commit( rv, [rv, &other_table]() { - kv::Tx txr(rv); + kv::Tx txr(rv.version); auto txrv = txr.get_view(other_table); txrv->put(0, 1); return txr.commit_reserved(); @@ -320,7 +320,7 @@ class RollbackConsensus : public kv::StubConsensus rollback_to(rollback_to_) {} - bool replicate(const kv::BatchVector& entries) override + bool replicate(const kv::BatchVector& entries, View view) override { for (auto& [version, data, committable] : entries) {