Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix several bugs found in new server scanner code #146

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions src/rpc/scanner/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "server.h"

#include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <sodium/utils.h>
Expand Down Expand Up @@ -163,12 +164,16 @@ namespace lws { namespace rpc { namespace scanner

void operator()(const boost::system::error_code& error = {})
{
if (!self_ || error)
if (error)
{
if (error == boost::asio::error::operation_aborted)
return; // exiting
MONERO_THROW(error, "server acceptor failed");
}

if (!self_ || self_->stop_)
return;

assert(self_->strand_.running_in_this_thread());
BOOST_ASIO_CORO_REENTER(*this)
{
Expand All @@ -192,7 +197,7 @@ namespace lws { namespace rpc { namespace scanner

void operator()(const boost::system::error_code& error = {}) const
{
if (!self_ || error == boost::asio::error::operation_aborted)
if (!self_ || self_->stop_ || error == boost::asio::error::operation_aborted)
return;

assert(self_->strand_.running_in_this_thread());
Expand Down Expand Up @@ -223,7 +228,7 @@ namespace lws { namespace rpc { namespace scanner
return;
}

auto reader = self_->disk_.start_read(std::move(self_->read_txn_));
auto reader = self_->disk_.start_read();
if (!reader)
{
if (reader.matches(std::errc::no_lock_available))
Expand All @@ -240,6 +245,8 @@ namespace lws { namespace rpc { namespace scanner
if (current_users.count() < self_->active_.size())
{
// a shrinking user base, re-shuffle
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
self_->do_replace_users();
return;
}
Expand All @@ -254,6 +261,8 @@ namespace lws { namespace rpc { namespace scanner
new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value<db::account>())));
if (replace_threshold < new_accounts.size())
{
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
self_->do_replace_users();
return;
}
Expand All @@ -268,6 +277,8 @@ namespace lws { namespace rpc { namespace scanner

if (!active_copy.empty())
{
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
self_->do_replace_users();
return;
}
Expand Down Expand Up @@ -306,7 +317,7 @@ namespace lws { namespace rpc { namespace scanner

self_->next_thread_ %= total_threads;
}
self_->read_txn_ = reader->finish_read();
reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
}
};
Expand Down Expand Up @@ -401,6 +412,28 @@ namespace lws { namespace rpc { namespace scanner
active_ = std::move(active);
}

void server::do_stop()
{
assert(strand_.running_in_this_thread());
if (stop_)
return;

MDEBUG("Stopping rpc::scanner::server async operations");
boost::system::error_code error{};
check_timer_.cancel(error);
acceptor_.cancel(error);
acceptor_.close(error);

for (auto& remote : remote_)
{
const auto conn = remote.lock();
if (conn)
boost::asio::dispatch(conn->strand_, [conn] () { conn->cleanup(); });
}

stop_ = true;
}

boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address)
{
std::string host;
Expand Down Expand Up @@ -432,12 +465,12 @@ namespace lws { namespace rpc { namespace scanner
active_(std::move(active)),
disk_(std::move(disk)),
zclient_(std::move(zclient)),
read_txn_{},
accounts_cur_{},
next_thread_(0),
pass_hashed_(),
pass_salt_(),
webhook_verify_(webhook_verify)
webhook_verify_(webhook_verify),
stop_(false)
{
std::sort(active_.begin(), active_.end());
for (const auto& local : local_)
Expand Down Expand Up @@ -488,6 +521,9 @@ namespace lws { namespace rpc { namespace scanner
{
self->acceptor_.close();
self->acceptor_.open(endpoint.protocol());
#if !defined(_WIN32)
self->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
#endif
self->acceptor_.bind(endpoint);
self->acceptor_.listen();

Expand Down Expand Up @@ -522,7 +558,17 @@ namespace lws { namespace rpc { namespace scanner
{
const lws::scanner_options opts{self->webhook_verify_, false, false};
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))
GET_IO_SERVICE(self->check_timer_).stop();
{
self->do_stop();
self->strand_.context().stop();
}
});
}

void server::stop(const std::shared_ptr<server>& self)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
boost::asio::dispatch(self->strand_, [self] () { self->do_stop(); });
}
}}} // lws // rpc // scanner
10 changes: 8 additions & 2 deletions src/rpc/scanner/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ namespace lws { namespace rpc { namespace scanner
std::vector<db::account_id> active_;
db::storage disk_;
rpc::client zclient_;
lmdb::suspended_txn read_txn_;
db::cursor::accounts accounts_cur_;
std::size_t next_thread_;
std::array<unsigned char, 32> pass_hashed_;
std::array<unsigned char, crypto_pwhash_SALTBYTES> pass_salt_;
const ssl_verification_t webhook_verify_;
bool stop_;

//! Async acceptor routine
class acceptor;
Expand All @@ -79,6 +79,9 @@ namespace lws { namespace rpc { namespace scanner
//! Reset `local_` and `remote_` scanners. Must be called in `strand_`.
void do_replace_users();

//! Stop all async operations
void do_stop();

public:
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);

Expand All @@ -105,6 +108,9 @@ namespace lws { namespace rpc { namespace scanner
static void replace_users(const std::shared_ptr<server>& self);

//! Update `users` information on local DB
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);

//! Stop a running instance of all operations
static void stop(const std::shared_ptr<server>& self);
};
}}} // lws // rpc // scanner
3 changes: 2 additions & 1 deletion src/rpc/scanner/write_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#pragma once

#include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/write.hpp>
#include <chrono>
#include <memory>
Expand Down Expand Up @@ -167,7 +168,7 @@ namespace lws { namespace rpc { namespace scanner

if (msg.empty())
{
self->cleanup();
boost::asio::dispatch(self->strand_, [self] () { self->cleanup(); });
return;
}

Expand Down
44 changes: 27 additions & 17 deletions src/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "scanner.h"

#include <algorithm>
#include <boost/asio/use_future.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <boost/range/combine.hpp>
#include <boost/thread/condition_variable.hpp>
Expand Down Expand Up @@ -1044,23 +1045,27 @@ namespace lws
users.clear();
users.shrink_to_fit();

{
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
);
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
);

rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
}
rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));

// Blocks until sigint, local scanner issue, or exception
// Blocks until sigint, local scanner issue, storage issue, or exception
self.io_.run();
self.io_.restart();

// Make sure server stops because we could re-start after blockchain sync
rpc::scanner::server::stop(server);
self.io_.poll();
self.io_.restart();
}

template<typename R, typename Q>
Expand Down Expand Up @@ -1396,14 +1401,19 @@ namespace lws

boost::asio::steady_timer poll{sync_.io_};
poll.expires_from_now(rpc::scanner::account_poll_interval);
poll.async_wait([] (boost::system::error_code) {});
const auto ready = poll.async_wait(boost::asio::use_future);

sync_.io_.run_one();
/* The exchange rates timer could run while waiting, so ensure that
the correct timer was run. */
while (!has_shutdown() && ready.wait_for(std::chrono::seconds{0}) == std::future_status::timeout)
{
sync_.io_.run_one();
sync_.io_.restart();
}
}
else
check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts);

sync_.io_.reset();
if (has_shutdown())
return;

Expand Down
Loading