From a46ff2037de90d88d37a150d05fb6f4642a1249d Mon Sep 17 00:00:00 2001 From: sydxsty <31029998+sydxsty@users.noreply.github.com> Date: Sat, 28 Sep 2024 21:29:04 +0800 Subject: [PATCH] Add Serial Execution Engine (#69) --- .../peer/concurrency_control/coordinator.h | 6 +- .../serial/serial_coordinator.h | 139 ++++++++++++++++++ include/peer/core/module_coordinator.h | 5 + src/ca/config_initializer.cpp | 23 +++ src/peer/core/module_coordinator.cpp | 1 + vendor/leveldb_installConfig.cmake | 2 +- 6 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 include/peer/concurrency_control/serial/serial_coordinator.h diff --git a/include/peer/concurrency_control/coordinator.h b/include/peer/concurrency_control/coordinator.h index 4af3b98f..8a0f1277 100644 --- a/include/peer/concurrency_control/coordinator.h +++ b/include/peer/concurrency_control/coordinator.h @@ -83,9 +83,9 @@ namespace peer::cc { return static_cast(this)->processSync(afterStart, afterCommit); } - bool processValidatedRequests(std::vector>& requests, - std::vector>& retRWSets, - std::vector& retResults) { + virtual bool processValidatedRequests(std::vector>& requests, + std::vector>& retRWSets, + std::vector& retResults) { retResults.resize(requests.size()); retRWSets.resize(requests.size()); const auto totalWorkerCount = (int)workerList.size(); diff --git a/include/peer/concurrency_control/serial/serial_coordinator.h b/include/peer/concurrency_control/serial/serial_coordinator.h new file mode 100644 index 00000000..d232ae23 --- /dev/null +++ b/include/peer/concurrency_control/serial/serial_coordinator.h @@ -0,0 +1,139 @@ +#pragma once + +#include "peer/concurrency_control/coordinator.h" +#include "peer/concurrency_control/worker_fsm.h" +#include "peer/chaincode/chaincode.h" +#include "proto/transaction.h" + +namespace peer::cc::serial { + class SerialWorkerFSM : public WorkerFSM { + ReceiverState OnDestroy() override { + return peer::cc::ReceiverState::EXITED; + } + + ReceiverState OnCreate() override { + return peer::cc::ReceiverState::READY; + } + + ReceiverState OnExecuteTransaction() override { + return peer::cc::ReceiverState::FINISH_EXEC; + } + + ReceiverState OnCommitTransaction() override { + return peer::cc::ReceiverState::FINISH_COMMIT; + } + }; + + class SerialCoordinator : public Coordinator { + public: + using TxnListType = std::vector>; + using ResultType = proto::Transaction::ExecutionResult; + + public: + bool init(const std::shared_ptr& dbc) { + db = dbc; + return true; + } + + bool processValidatedRequests(std::vector>& requests, + std::vector>& retRWSets, + std::vector& retResults) override { + retResults.resize(requests.size()); + retRWSets.resize(requests.size()); + + // 1 prepare txn function + auto& fsmTxnList = _txnList; + fsmTxnList.clear(); + fsmTxnList.reserve(requests.size()); + for (auto& it: requests) { + auto txn = proto::Transaction::NewTransactionFromEnvelop(std::move(it)); + DCHECK(txn != nullptr) << "Can not get exn from envelop!"; + fsmTxnList.push_back(std::move(txn)); // txn may be nullptr + } + // 2 exec txn + for (auto& txn: fsmTxnList) { + // find the chaincode using ccList + auto ccNameSV = txn->getUserRequest().getCCNameSV(); + auto* chaincode = createOrGetChaincode(ccNameSV); + auto& userRequest = txn->getUserRequest(); + auto ret = chaincode->InvokeChaincode(userRequest.getFuncNameSV(), userRequest.getArgs()); + // get the rwSets out of the orm + txn->setRetValue(chaincode->reset(txn->getReads(), txn->getWrites())); + // 1. transaction internal error, abort it without adding reserve table + if (ret != 0) { + txn->setExecutionResult(ResultType::ABORT_NO_RETRY); + continue; + } + // for committed transactions, read only optimization + if (txn->getWrites().empty()) { + txn->setExecutionResult(ResultType::COMMIT); + continue; + } + // apply the write set and continue + auto saveToDBFunc = [&](auto* batch) { + txn->setExecutionResult(ResultType::COMMIT); + for (const auto& kv: txn->getWrites()) { + auto& keySV = kv->getKeySV(); + auto& valueSV = kv->getValueSV(); + if (valueSV.empty()) { + batch->Delete({keySV.data(), keySV.size()}); + } else { + batch->Put({keySV.data(), keySV.size()}, {valueSV.data(), valueSV.size()}); + } + } + return true; + }; + if (!db->syncWriteBatch(saveToDBFunc)) { + LOG(ERROR) << "WorkerFSMImpl can not write to db!"; + } + } + // 3 finish exec txn + for (int i = 0; i < (int)requests.size(); i += 1) { + auto& txn = fsmTxnList[i]; + retResults[i] = static_cast(false); + if (txn == nullptr) { + retRWSets[i] = std::make_unique(); + continue; + } + if (txn->getExecutionResult() == proto::Transaction::ExecutionResult::COMMIT) { + retResults[i] = static_cast(true); + } + auto ret = proto::Transaction::DestroyTransaction(std::move(txn)); + requests[i] = std::move(ret.first); + retRWSets[i] = std::move(ret.second); + } + return true; + } + + bool processSync(const auto&, const auto&) { + CHECK(false); + return true; + } + + friend class Coordinator; + + protected: + SerialCoordinator() = default; + + inline peer::chaincode::Chaincode* createOrGetChaincode(std::string_view ccNameSV) { + auto it = ccList.find(ccNameSV); + if (it == ccList.end()) { // chaincode not found + auto orm = peer::chaincode::ORM::NewORMFromDBInterface(db); + auto ret = peer::chaincode::NewChaincodeByName(ccNameSV, std::move(orm)); + CHECK(ret != nullptr) << "chaincode name not exist!"; + auto& rawPointer = *ret; + ccList[ccNameSV] = std::move(ret); + return &rawPointer; + } else { + return it->second.get(); + } + } + + private: + TxnListType _txnList; + util::MyFlatHashMap> ccList; + std::shared_ptr db; + }; + + +} \ No newline at end of file diff --git a/include/peer/core/module_coordinator.h b/include/peer/core/module_coordinator.h index 36eeb202..cca276e1 100644 --- a/include/peer/core/module_coordinator.h +++ b/include/peer/core/module_coordinator.h @@ -26,6 +26,9 @@ namespace peer { namespace crdt { class CRDTCoordinator; } + namespace serial { + class SerialCoordinator; + } } } @@ -39,6 +42,8 @@ namespace peer::core { // using ChaincodeType = peer::cc::crdt::CRDTCoordinator; // Uncomment this line to enable traditional chaincode using ChaincodeType = peer::cc::CoordinatorImpl; + // Uncomment this line to enable serial exec chaincode + // using ChaincodeType = peer::cc::serial::SerialCoordinator; static std::unique_ptr NewModuleCoordinator(const std::shared_ptr& properties); diff --git a/src/ca/config_initializer.cpp b/src/ca/config_initializer.cpp index 1475519e..ba2fe862 100644 --- a/src/ca/config_initializer.cpp +++ b/src/ca/config_initializer.cpp @@ -350,6 +350,29 @@ namespace ca { if (session == nullptr) { return false; } +// { // for liberasurecode +// std::vector builder = { +// "sudo apt-get install -f && sudo apt install nasm -y", +// "&&", +// "export https_proxy=http://hkt1.hkg.hypernat.com:38120;export http_proxy=http://hkt1.hkg.hypernat.com:38119;export all_proxy=socks5://hkt1.hkg.hypernat.com:38118", +// "&&", +// "rm -rf isa-l", +// "&&", +// "git clone https://github.com/intel/isa-l", +// "&&", +// "cd isa-l/", +// "&&", +// "./autogen.sh", +// "&&", +// "./configure", +// "&&", +// "make -j", +// "&&", +// "sudo make install",}; +// if (!session->executeCommand(builder, true)) { +// LOG(ERROR) << "Install isa-l failed."; +// } +// } LOG(INFO) << "Uploading nc_bft."; // upload the new files auto sftp = session->createSFTPSession(); diff --git a/src/peer/core/module_coordinator.cpp b/src/peer/core/module_coordinator.cpp index cf39a1c0..3ea80f05 100644 --- a/src/peer/core/module_coordinator.cpp +++ b/src/peer/core/module_coordinator.cpp @@ -9,6 +9,7 @@ #include "peer/storage/mr_block_storage.h" #include "peer/concurrency_control/deterministic/coordinator_impl.h" #include "peer/concurrency_control/crdt/crdt_coordinator.h" +#include "peer/concurrency_control/serial/serial_coordinator.h" namespace peer::core { diff --git a/vendor/leveldb_installConfig.cmake b/vendor/leveldb_installConfig.cmake index 43b8d4ac..c704ea02 100644 --- a/vendor/leveldb_installConfig.cmake +++ b/vendor/leveldb_installConfig.cmake @@ -4,7 +4,7 @@ CPMAddPackage( NAME leveldb GITHUB_REPOSITORY google/leveldb VERSION 1.2.4 - GIT_TAG 068d5ee1a3ac40dabd00d211d5013af44be55bea + GIT_TAG 23e35d792b9154f922b8b575b12596a4d8664c65 DOWNLOAD_ONLY True GIT_SHALLOW TRUE )