Skip to content

Commit

Permalink
simplify client...
Browse files Browse the repository at this point in the history
dont use context.run() just run two threads
  • Loading branch information
williamhCode committed Dec 26, 2024
1 parent b8407ed commit ba8b256
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 65 deletions.
9 changes: 0 additions & 9 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ jobs:
with:
path: artifacts

- name: Debug artifact directory
run: |
echo "Root of artifacts directory:"
ls -al artifacts
echo "Contents of arm64 artifact:"
ls -al artifacts/Neogurt-arm64
echo "Contents of x86 artifact:"
ls -al artifacts/Neogurt-x86
- name: Create release
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
71 changes: 30 additions & 41 deletions src/nvim/msgpack_rpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ Client::~Client() {
if (socket->is_open()) socket->close();
}

for (auto& thread : contextThreads) {
thread.join();
for (auto& thread : rwThreads) {
if (thread.joinable()) {
thread.join();
}
}
}

bool Client::ConnectStdio(
const std::string& command, bool interactive, const std::string& dir
) {
clientType = ClientType::Stdio;

readPipe = std::make_unique<bp::async_pipe>(context);
writePipe = std::make_unique<bp::async_pipe>(context);

Expand Down Expand Up @@ -68,11 +71,8 @@ bool Client::ConnectStdio(
}
exit = false;

co_spawn(context, DoRead(), asio::detached);
co_spawn(context, DoWrite(), asio::detached);

contextThreads.emplace_back([this]() { context.run(); });
contextThreads.emplace_back([this]() { context.run(); });
rwThreads.emplace_back([this]() { DoRead(); });
rwThreads.emplace_back([this]() { DoWrite(); });

return true;
}
Expand All @@ -93,18 +93,17 @@ bool Client::ConnectTcp(std::string_view host, uint16_t port) {
}
exit = false;

co_spawn(context, DoRead(), asio::detached);
co_spawn(context, DoWrite(), asio::detached);

contextThreads.emplace_back([this]() { context.run(); });
contextThreads.emplace_back([this]() { context.run(); });
rwThreads.emplace_back([this]() { DoRead(); });
rwThreads.emplace_back([this]() { DoWrite(); });

return true;
}

void Client::Disconnect() {
exit = true;
msgsOut.Exit();
if (!exit) {
msgsOut.Exit();
exit = true;
}
}

bool Client::IsConnected() {
Expand Down Expand Up @@ -135,7 +134,7 @@ uint32_t Client::Msgid() {
return currId++;
}

asio::awaitable<void> Client::DoRead() {
void Client::DoRead() {
unpacker.reserve_buffer(readSize);

while (IsConnected()) {
Expand All @@ -145,25 +144,19 @@ asio::awaitable<void> Client::DoRead() {
std::size_t length;

if (clientType == ClientType::Stdio) {
length = co_await readPipe->async_read_some(
buffer, asio::redirect_error(asio::use_awaitable, ec)
);
length = readPipe->read_some(buffer, ec);
} else if (clientType == ClientType::Tcp) {
length = co_await socket->async_read_some(
buffer, asio::redirect_error(asio::use_awaitable, ec)
);
length = socket->read_some(buffer, ec);
}

if (ec) {
if (ec == asio::error::eof) {
LOG_INFO("Client::DoRead: The server closed the connection");
Disconnect();
co_return;

} else {
LOG_ERR("Client::DoRead: Error - {}", ec.message());
continue;
}
return;
}
LOG_ERR("Client::DoRead: Error - {}", ec.message());
continue;
}

unpacker.buffer_consumed(length);
Expand All @@ -190,7 +183,7 @@ asio::awaitable<void> Client::DoRead() {
.promise = std::move(promise),
});

std::thread([this, msgid = request.msgid, future = std::move(future)] mutable {
std::thread([weak_self = weak_from_this(), msgid = request.msgid, future = std::move(future)] mutable {
ResponseOut msg;

auto result = future.get();
Expand All @@ -204,7 +197,10 @@ asio::awaitable<void> Client::DoRead() {

msgpack::sbuffer buffer;
msgpack::pack(buffer, msg);
Write(std::move(buffer));

if (auto self = weak_self.lock()) {
self->Write(std::move(buffer));
}
}).detach();

} else if (type == MessageType::Response) {
Expand Down Expand Up @@ -248,29 +244,24 @@ asio::awaitable<void> Client::DoRead() {
unpacker.reserve_buffer(readSize);
}
}

co_return;
}

void Client::Write(msgpack::sbuffer&& buffer) {
msgsOut.Push(std::move(buffer));
}

asio::awaitable<void> Client::DoWrite() {
while (msgsOut.Wait()) {
void Client::DoWrite() {
while (IsConnected() && msgsOut.Wait()) {

auto& msgBuffer = msgsOut.Front();
auto buffer = asio::buffer(msgBuffer.data(), msgBuffer.size());

boost::system::error_code ec;

if (clientType == ClientType::Stdio) {
co_await writePipe->async_write_some(
buffer, asio::redirect_error(asio::use_awaitable, ec)
);
writePipe->write_some(buffer, ec);
} else if (clientType == ClientType::Tcp) {
co_await socket->async_write_some(
buffer, asio::redirect_error(asio::use_awaitable, ec)
);
socket->write_some(buffer, ec);
}

if (ec) {
Expand All @@ -279,8 +270,6 @@ asio::awaitable<void> Client::DoWrite() {

msgsOut.Pop();
}

co_return;
}

} // namespace rpc
8 changes: 4 additions & 4 deletions src/nvim/msgpack_rpc/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ enum class ClientType {
Tcp,
};

struct Client {
struct Client : std::enable_shared_from_this<Client> {
private:
asio::io_context context;

Expand All @@ -72,7 +72,7 @@ struct Client {
// tcp
std::unique_ptr<asio::ip::tcp::socket> socket;

std::vector<std::jthread> contextThreads;
std::vector<std::jthread> rwThreads;
std::atomic_bool exit;

std::unordered_map<u_int32_t, std::promise<msgpack::object_handle>> responses;
Expand Down Expand Up @@ -113,9 +113,9 @@ struct Client {
std::atomic_uint32_t currId = 0;

uint32_t Msgid();
asio::awaitable<void> DoRead();
void DoRead();
void Write(msgpack::sbuffer&& buffer);
asio::awaitable<void> DoWrite();
void DoWrite();
};

std::future<msgpack::object_handle>
Expand Down
4 changes: 2 additions & 2 deletions src/nvim/nvim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using namespace std::chrono_literals;

bool Nvim::ConnectStdio(bool interactive, const std::string& dir) {
client = std::make_unique<rpc::Client>();
client = std::make_shared<rpc::Client>();

// std::string luaInitPath = ROOT_DIR "/lua/init.lua";
// std::string cmd = "nvim --embed --headless "
Expand All @@ -21,7 +21,7 @@ bool Nvim::ConnectStdio(bool interactive, const std::string& dir) {
}

std::future<bool> Nvim::ConnectTcp(std::string_view host, uint16_t port) {
client = std::make_unique<rpc::Client>();
client = std::make_shared<rpc::Client>();

auto timeout = 500ms;
auto elapsed = 0ms;
Expand Down
2 changes: 1 addition & 1 deletion src/nvim/nvim.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace rpc { struct Client; }

// Nvim client that wraps the rpc client.
struct Nvim {
std::unique_ptr<rpc::Client> client;
std::shared_ptr<rpc::Client> client;
UiEvents uiEvents;
// int channelId;

Expand Down
10 changes: 2 additions & 8 deletions src/session/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,10 @@ bool SessionManager::ShouldQuit() {
int prevId = CurrSession()->id;

// remove disconnected sessions
std::vector<int> toEraseIds;
std::erase_if(sessionsOrder, [&toEraseIds](auto* sessionPtr) {
std::erase_if(sessionsOrder, [this](auto* sessionPtr) {
auto& session = *sessionPtr;
bool toErase = !session->nvim.client->IsConnected();
if (toErase) toEraseIds.push_back(session->id);
if (toErase) sessions.erase(session->id);
return toErase;
});

Expand All @@ -244,11 +243,6 @@ bool SessionManager::ShouldQuit() {
SessionSwitchInternal(currSession);
}

// erase after session switch, else main thread might access removed session
for (auto id : toEraseIds) {
sessions.erase(id);
}

return false;
}

Expand Down

0 comments on commit ba8b256

Please sign in to comment.