Skip to content

Commit

Permalink
Add Serial Execution Engine (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
sydxsty authored Sep 28, 2024
1 parent f22a2ba commit a46ff20
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 4 deletions.
6 changes: 3 additions & 3 deletions include/peer/concurrency_control/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ namespace peer::cc {
return static_cast<Derived*>(this)->processSync(afterStart, afterCommit);
}

bool processValidatedRequests(std::vector<std::unique_ptr<proto::Envelop>>& requests,
std::vector<std::unique_ptr<proto::TxReadWriteSet>>& retRWSets,
std::vector<std::byte>& retResults) {
virtual bool processValidatedRequests(std::vector<std::unique_ptr<proto::Envelop>>& requests,
std::vector<std::unique_ptr<proto::TxReadWriteSet>>& retRWSets,
std::vector<std::byte>& retResults) {
retResults.resize(requests.size());
retRWSets.resize(requests.size());
const auto totalWorkerCount = (int)workerList.size();
Expand Down
139 changes: 139 additions & 0 deletions include/peer/concurrency_control/serial/serial_coordinator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#pragma once

#include "peer/concurrency_control/coordinator.h"
#include "peer/concurrency_control/worker_fsm.h"
#include "peer/chaincode/chaincode.h"
#include "proto/transaction.h"

namespace peer::cc::serial {
class SerialWorkerFSM : public WorkerFSM {
ReceiverState OnDestroy() override {
return peer::cc::ReceiverState::EXITED;
}

ReceiverState OnCreate() override {
return peer::cc::ReceiverState::READY;
}

ReceiverState OnExecuteTransaction() override {
return peer::cc::ReceiverState::FINISH_EXEC;
}

ReceiverState OnCommitTransaction() override {
return peer::cc::ReceiverState::FINISH_COMMIT;
}
};

class SerialCoordinator : public Coordinator<SerialWorkerFSM, SerialCoordinator> {
public:
using TxnListType = std::vector<std::unique_ptr<proto::Transaction>>;
using ResultType = proto::Transaction::ExecutionResult;

public:
bool init(const std::shared_ptr<peer::db::DBConnection>& dbc) {
db = dbc;
return true;
}

bool processValidatedRequests(std::vector<std::unique_ptr<proto::Envelop>>& requests,
std::vector<std::unique_ptr<proto::TxReadWriteSet>>& retRWSets,
std::vector<std::byte>& retResults) override {
retResults.resize(requests.size());
retRWSets.resize(requests.size());

// 1 prepare txn function
auto& fsmTxnList = _txnList;
fsmTxnList.clear();
fsmTxnList.reserve(requests.size());
for (auto& it: requests) {
auto txn = proto::Transaction::NewTransactionFromEnvelop(std::move(it));
DCHECK(txn != nullptr) << "Can not get exn from envelop!";
fsmTxnList.push_back(std::move(txn)); // txn may be nullptr
}
// 2 exec txn
for (auto& txn: fsmTxnList) {
// find the chaincode using ccList
auto ccNameSV = txn->getUserRequest().getCCNameSV();
auto* chaincode = createOrGetChaincode(ccNameSV);
auto& userRequest = txn->getUserRequest();
auto ret = chaincode->InvokeChaincode(userRequest.getFuncNameSV(), userRequest.getArgs());
// get the rwSets out of the orm
txn->setRetValue(chaincode->reset(txn->getReads(), txn->getWrites()));
// 1. transaction internal error, abort it without adding reserve table
if (ret != 0) {
txn->setExecutionResult(ResultType::ABORT_NO_RETRY);
continue;
}
// for committed transactions, read only optimization
if (txn->getWrites().empty()) {
txn->setExecutionResult(ResultType::COMMIT);
continue;
}
// apply the write set and continue
auto saveToDBFunc = [&](auto* batch) {
txn->setExecutionResult(ResultType::COMMIT);
for (const auto& kv: txn->getWrites()) {
auto& keySV = kv->getKeySV();
auto& valueSV = kv->getValueSV();
if (valueSV.empty()) {
batch->Delete({keySV.data(), keySV.size()});
} else {
batch->Put({keySV.data(), keySV.size()}, {valueSV.data(), valueSV.size()});
}
}
return true;
};
if (!db->syncWriteBatch(saveToDBFunc)) {
LOG(ERROR) << "WorkerFSMImpl can not write to db!";
}
}
// 3 finish exec txn
for (int i = 0; i < (int)requests.size(); i += 1) {
auto& txn = fsmTxnList[i];
retResults[i] = static_cast<std::byte>(false);
if (txn == nullptr) {
retRWSets[i] = std::make_unique<proto::TxReadWriteSet>();
continue;
}
if (txn->getExecutionResult() == proto::Transaction::ExecutionResult::COMMIT) {
retResults[i] = static_cast<std::byte>(true);
}
auto ret = proto::Transaction::DestroyTransaction(std::move(txn));
requests[i] = std::move(ret.first);
retRWSets[i] = std::move(ret.second);
}
return true;
}

bool processSync(const auto&, const auto&) {
CHECK(false);
return true;
}

friend class Coordinator;

protected:
SerialCoordinator() = default;

inline peer::chaincode::Chaincode* createOrGetChaincode(std::string_view ccNameSV) {
auto it = ccList.find(ccNameSV);
if (it == ccList.end()) { // chaincode not found
auto orm = peer::chaincode::ORM::NewORMFromDBInterface(db);
auto ret = peer::chaincode::NewChaincodeByName(ccNameSV, std::move(orm));
CHECK(ret != nullptr) << "chaincode name not exist!";
auto& rawPointer = *ret;
ccList[ccNameSV] = std::move(ret);
return &rawPointer;
} else {
return it->second.get();
}
}

private:
TxnListType _txnList;
util::MyFlatHashMap<std::string, std::unique_ptr<peer::chaincode::Chaincode>> ccList;
std::shared_ptr<db::DBConnection> db;
};


}
5 changes: 5 additions & 0 deletions include/peer/core/module_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace peer {
namespace crdt {
class CRDTCoordinator;
}
namespace serial {
class SerialCoordinator;
}
}
}

Expand All @@ -39,6 +42,8 @@ namespace peer::core {
// using ChaincodeType = peer::cc::crdt::CRDTCoordinator;
// Uncomment this line to enable traditional chaincode
using ChaincodeType = peer::cc::CoordinatorImpl;
// Uncomment this line to enable serial exec chaincode
// using ChaincodeType = peer::cc::serial::SerialCoordinator;

static std::unique_ptr<ModuleCoordinator> NewModuleCoordinator(const std::shared_ptr<util::Properties>& properties);

Expand Down
23 changes: 23 additions & 0 deletions src/ca/config_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,29 @@ namespace ca {
if (session == nullptr) {
return false;
}
// { // for liberasurecode
// std::vector<std::string> builder = {
// "sudo apt-get install -f && sudo apt install nasm -y",
// "&&",
// "export https_proxy=http://hkt1.hkg.hypernat.com:38120;export http_proxy=http://hkt1.hkg.hypernat.com:38119;export all_proxy=socks5://hkt1.hkg.hypernat.com:38118",
// "&&",
// "rm -rf isa-l",
// "&&",
// "git clone https://github.com/intel/isa-l",
// "&&",
// "cd isa-l/",
// "&&",
// "./autogen.sh",
// "&&",
// "./configure",
// "&&",
// "make -j",
// "&&",
// "sudo make install",};
// if (!session->executeCommand(builder, true)) {
// LOG(ERROR) << "Install isa-l failed.";
// }
// }
LOG(INFO) << "Uploading nc_bft.";
// upload the new files
auto sftp = session->createSFTPSession();
Expand Down
1 change: 1 addition & 0 deletions src/peer/core/module_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "peer/storage/mr_block_storage.h"
#include "peer/concurrency_control/deterministic/coordinator_impl.h"
#include "peer/concurrency_control/crdt/crdt_coordinator.h"
#include "peer/concurrency_control/serial/serial_coordinator.h"

namespace peer::core {

Expand Down
2 changes: 1 addition & 1 deletion vendor/leveldb_installConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CPMAddPackage(
NAME leveldb
GITHUB_REPOSITORY google/leveldb
VERSION 1.2.4
GIT_TAG 068d5ee1a3ac40dabd00d211d5013af44be55bea
GIT_TAG 23e35d792b9154f922b8b575b12596a4d8664c65
DOWNLOAD_ONLY True
GIT_SHALLOW TRUE
)
Expand Down

0 comments on commit a46ff20

Please sign in to comment.