Skip to content

Commit

Permalink
Flatten internal non-exception replies for #7
Browse files Browse the repository at this point in the history
  • Loading branch information
GothAck committed Mar 13, 2020
1 parent d8ee434 commit 0316fe3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 13 deletions.
27 changes: 14 additions & 13 deletions compiler/flatrpcc_inja/flatrpc.cpp.inja
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,25 @@ void {{ service.nameClient }}::makeRequest(
TResponsePromiseVar &&resProm
) {
thread_local LocalSocket socket(_context, zmqpp::socket_type::dealer);
auto nativeReq = make_shared<TIntNative>();
nativeReq->requestId = nextRequestId(move(resProm), callName);
nativeReq->type = RPCType::CLIENT_REQ;
nativeReq->name = callName;
nativeReq->data = visit(overloaded {

auto buf = packInt(
nextRequestId(move(resProm), callName),
RPCType::CLIENT_REQ,
callName,
visit(overloaded {
## for request in service.requests
[&]({{ request.local }}TPtr &ptr) {
flatbuffers::FlatBufferBuilder fbb;
[&]({{ request.local }}TPtr &ptr) {
flatbuffers::FlatBufferBuilder fbb;

auto rep = {{ request.cpp }}::Pack(fbb, ptr.get());
fbb.Finish(rep);
auto rep = {{ request.cpp }}::Pack(fbb, ptr.get());
fbb.Finish(rep);

return vector<signed char>(fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize());
},
return vector<signed char>(fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize());
},
## endfor
}, req);
}, req)
);

auto buf = packInt(nativeReq);
zmqpp::message msg;
msg << "";
msg << buf;
Expand Down
2 changes: 2 additions & 0 deletions rpc/include/flatrpc/rpcbase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class RpcBase : public RpcSockets, public virtual RpcRunnable {
TIntNativePtr unpackInt(std::string &&data);

std::string packInt(TIntNativePtr nativePtr);
std::string packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::vector<signed char> data);
std::string packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::exception &exception);

TIntNativePtr makeReply(TIntNativePtr req);

Expand Down
3 changes: 3 additions & 0 deletions rpc/include/flatrpc/rpcclientbase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class RpcClientBase : public RpcBase {
virtual void timeoutRequest(size_t id) = 0;
void workerThread() override;
virtual void handleResponse(TData &&response) = 0;

void makeRequest(uint64_t requestId, flatrpc::rpc::RPCType type, std::string callName, std::vector<signed char> req);

std::unordered_map<size_t, std::chrono::_V2::steady_clock::time_point> _requestTimes;
std::unordered_map<size_t, std::string> _requestCallNames;
};
Expand Down
26 changes: 26 additions & 0 deletions rpc/rpcbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

using namespace std;
using namespace zmqpp;
using namespace flatbuffers;
using namespace flatrpc::rpc;

RpcBase::RpcBase(zmqpp::context &ctx, zmqpp
::socket_type type) :
Expand Down Expand Up @@ -160,3 +162,27 @@ void RpcBase::stop() {
void RpcBase::joinReactor() {
_reactorThread.join();
}

std::string RpcBase::packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::vector<signed char> data) {
FlatBufferBuilder fbb;
auto fbbName = fbb.CreateString(name);
auto fbbData = fbb.CreateVector<int8_t>(data);

auto rpc = CreateRPC(fbb, requestId, type, fbbName, fbbData);
fbb.Finish(rpc);

return {fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()};
}

std::string RpcBase::packInt(uint64_t requestId, flatrpc::rpc::RPCType type, const std::string& name, std::exception &exception) {
FlatBufferBuilder fbb;
auto fbbName = fbb.CreateString(name);
auto fbbWhat = fbb.CreateString(exception.what());
auto fbbException = CreateException(fbb, fbbWhat);


auto rpc = CreateRPC(fbb, requestId, type, fbbName, 0, fbbException);
fbb.Finish(rpc);

return {fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()};
}
11 changes: 11 additions & 0 deletions rpc/rpcclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@ void RpcClientBase::workerThread() {
}
}
}

void RpcClientBase::makeRequest(uint64_t requestId, flatrpc::rpc::RPCType type, string callName, std::vector<signed char> req) {
thread_local LocalSocket socket(_context, zmqpp::socket_type::dealer);

auto buf = packInt(requestId, type, callName, req);

zmqpp::message msg;
msg << "";
msg << buf;
socket.send(msg);
}

0 comments on commit 0316fe3

Please sign in to comment.