From 0316fe377ae412ea43a24fce9706a3f1afa6d4a8 Mon Sep 17 00:00:00 2001 From: "Greg \"GothAck\" Miell" Date: Wed, 26 Feb 2020 15:29:19 +0000 Subject: [PATCH] Flatten internal non-exception replies for #7 --- compiler/flatrpcc_inja/flatrpc.cpp.inja | 27 +++++++++++++------------ rpc/include/flatrpc/rpcbase.hpp | 2 ++ rpc/include/flatrpc/rpcclientbase.hpp | 3 +++ rpc/rpcbase.cpp | 26 ++++++++++++++++++++++++ rpc/rpcclient.cpp | 11 ++++++++++ 5 files changed, 56 insertions(+), 13 deletions(-) diff --git a/compiler/flatrpcc_inja/flatrpc.cpp.inja b/compiler/flatrpcc_inja/flatrpc.cpp.inja index 078d132..fb98d16 100644 --- a/compiler/flatrpcc_inja/flatrpc.cpp.inja +++ b/compiler/flatrpcc_inja/flatrpc.cpp.inja @@ -131,24 +131,25 @@ void {{ service.nameClient }}::makeRequest( TResponsePromiseVar &&resProm ) { thread_local LocalSocket socket(_context, zmqpp::socket_type::dealer); - auto nativeReq = make_shared(); - nativeReq->requestId = nextRequestId(move(resProm), callName); - nativeReq->type = RPCType::CLIENT_REQ; - nativeReq->name = callName; - nativeReq->data = visit(overloaded { + + auto buf = packInt( + nextRequestId(move(resProm), callName), + RPCType::CLIENT_REQ, + callName, + visit(overloaded { ## for request in service.requests - [&]({{ request.local }}TPtr &ptr) { - flatbuffers::FlatBufferBuilder fbb; + [&]({{ request.local }}TPtr &ptr) { + flatbuffers::FlatBufferBuilder fbb; - auto rep = {{ request.cpp }}::Pack(fbb, ptr.get()); - fbb.Finish(rep); + auto rep = {{ request.cpp }}::Pack(fbb, ptr.get()); + fbb.Finish(rep); - return vector(fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()); - }, + return vector(fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()); + }, ## endfor - }, req); + }, req) + ); - auto buf = packInt(nativeReq); zmqpp::message msg; msg << ""; msg << buf; diff --git a/rpc/include/flatrpc/rpcbase.hpp b/rpc/include/flatrpc/rpcbase.hpp index 4a96157..6c8f5af 100644 --- a/rpc/include/flatrpc/rpcbase.hpp +++ b/rpc/include/flatrpc/rpcbase.hpp @@ -66,6 +66,8 @@ class RpcBase : public RpcSockets, public virtual RpcRunnable { TIntNativePtr unpackInt(std::string &&data); std::string packInt(TIntNativePtr nativePtr); + std::string packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::vector data); + std::string packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::exception &exception); TIntNativePtr makeReply(TIntNativePtr req); diff --git a/rpc/include/flatrpc/rpcclientbase.hpp b/rpc/include/flatrpc/rpcclientbase.hpp index d3c2761..8607dce 100644 --- a/rpc/include/flatrpc/rpcclientbase.hpp +++ b/rpc/include/flatrpc/rpcclientbase.hpp @@ -24,6 +24,9 @@ class RpcClientBase : public RpcBase { virtual void timeoutRequest(size_t id) = 0; void workerThread() override; virtual void handleResponse(TData &&response) = 0; + + void makeRequest(uint64_t requestId, flatrpc::rpc::RPCType type, std::string callName, std::vector req); + std::unordered_map _requestTimes; std::unordered_map _requestCallNames; }; diff --git a/rpc/rpcbase.cpp b/rpc/rpcbase.cpp index a8a9174..748e8cb 100644 --- a/rpc/rpcbase.cpp +++ b/rpc/rpcbase.cpp @@ -8,6 +8,8 @@ using namespace std; using namespace zmqpp; +using namespace flatbuffers; +using namespace flatrpc::rpc; RpcBase::RpcBase(zmqpp::context &ctx, zmqpp ::socket_type type) : @@ -160,3 +162,27 @@ void RpcBase::stop() { void RpcBase::joinReactor() { _reactorThread.join(); } + +std::string RpcBase::packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::vector data) { + FlatBufferBuilder fbb; + auto fbbName = fbb.CreateString(name); + auto fbbData = fbb.CreateVector(data); + + auto rpc = CreateRPC(fbb, requestId, type, fbbName, fbbData); + fbb.Finish(rpc); + + return {fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()}; +} + +std::string RpcBase::packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::exception &exception) { + FlatBufferBuilder fbb; + auto fbbName = fbb.CreateString(name); + auto fbbWhat = fbb.CreateString(exception.what()); + auto fbbException = CreateException(fbb, fbbWhat); + + + auto rpc = CreateRPC(fbb, requestId, type, fbbName, 0, fbbException); + fbb.Finish(rpc); + + return {fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()}; +} diff --git a/rpc/rpcclient.cpp b/rpc/rpcclient.cpp index be3699f..e3cf564 100644 --- a/rpc/rpcclient.cpp +++ b/rpc/rpcclient.cpp @@ -61,3 +61,14 @@ void RpcClientBase::workerThread() { } } } + +void RpcClientBase::makeRequest(uint64_t requestId, flatrpc::rpc::RPCType type, string callName, std::vector req) { + thread_local LocalSocket socket(_context, zmqpp::socket_type::dealer); + + auto buf = packInt(requestId, type, callName, req); + + zmqpp::message msg; + msg << ""; + msg << buf; + socket.send(msg); +}