Skip to content

Commit

Permalink
Divide Federator events processing per chain side
Browse files Browse the repository at this point in the history
This will prevent situation when locking side have a lof of events to process
and issuing events are blocked. If TTL ledger arrive  at this time then witness
will assume that attestation doesn't happent before TTL. But it can wait in the
events queue.
  • Loading branch information
oleks-rip committed May 29, 2024
1 parent daf2285 commit c4d634c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 25 deletions.
67 changes: 45 additions & 22 deletions src/xbwd/federator/Federator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Federator::Federator(
chains_[ChainType::locking].txnSubmit_->shouldSubmit,
chains_[ChainType::issuing].txnSubmit_ &&
chains_[ChainType::issuing].txnSubmit_->shouldSubmit}
, maxAttToSend_(config.maxAttToSend)
, maxAttToSend_(config.maxAttToSend)
, signingAccount_(config.signingAccount)
, keyType_{config.keyType}
, signingPK_{derivePublicKey(config.keyType, config.signingKey)}
Expand All @@ -97,7 +97,8 @@ Federator::Federator(
config.issuingChainConfig.ignoreSignerList;

std::fill(loopLocked_.begin(), loopLocked_.end(), true);
events_.reserve(16);
events_[ChainType::locking].reserve(16);
events_[ChainType::issuing].reserve(16);
}

void
Expand Down Expand Up @@ -501,9 +502,13 @@ Federator::start()
requestStop_ = false;
running_ = true;

threads_[lt_event] = std::thread([this]() {
beast::setCurrentThreadName("FederatorEvents");
this->mainLoop();
threads_[lt_event_locking] = std::thread([this]() {
beast::setCurrentThreadName("FederatorEvents L");
this->mainLoop(ChainType::locking);
});
threads_[lt_event_issuing] = std::thread([this]() {
beast::setCurrentThreadName("FederatorEvents I");
this->mainLoop(ChainType::issuing);
});

threads_[lt_txnSubmit] = std::thread([this]() {
Expand All @@ -524,7 +529,7 @@ Federator::stop()
for (int i = 0; i < lt_last; ++i)
{
std::lock_guard l(cvMutexes_[i]);
cvs_[i].notify_one();
cvs_[i].notify_all();
}

for (int i = 0; i < lt_last; ++i)
Expand All @@ -537,16 +542,21 @@ Federator::stop()
void
Federator::push(FederatorEvent&& e)
{
ChainType ct;
std::visit([&ct](auto const& e) { ct = e.chainType_; }, e);
LoopTypes const lt =
ct == ChainType::locking ? lt_event_locking : lt_event_issuing;

bool notify = false;
{
std::lock_guard l{eventsMutex_};
notify = events_.empty();
events_.push_back(std::move(e));
notify = events_[ct].empty();
events_[ct].push_back(std::move(e));
}
if (notify)
{
std::lock_guard l(cvMutexes_[lt_event]);
cvs_[lt_event].notify_one();
std::lock_guard l(cvMutexes_[lt]);
cvs_[lt].notify_all();
}
}

Expand Down Expand Up @@ -1347,7 +1357,7 @@ Federator::checkExpired(ChainType ct, std::uint32_t ledger)
if (notify)
{
std::lock_guard l(cvMutexes_[lt_txnSubmit]);
cvs_[lt_txnSubmit].notify_one();
cvs_[lt_txnSubmit].notify_all();
}
else
{
Expand Down Expand Up @@ -1562,7 +1572,7 @@ Federator::pushAttOnSubmitTxn(
if (notify)
{
std::lock_guard l(cvMutexes_[lt_txnSubmit]);
cvs_[lt_txnSubmit].notify_one();
cvs_[lt_txnSubmit].notify_all();
}
}

Expand Down Expand Up @@ -1694,27 +1704,29 @@ Federator::unlockMainLoop()
{
std::lock_guard l(loopMutexes_[i]);
loopLocked_[i] = false;
loopCvs_[i].notify_one();
loopCvs_[i].notify_all();
}
}

void
Federator::mainLoop()
Federator::mainLoop(ChainType ct)
{
auto const lt = lt_event;
LoopTypes const lt =
ct == ChainType::locking ? lt_event_locking : lt_event_issuing;
{
std::unique_lock l{loopMutexes_[lt]};
loopCvs_[lt].wait(l, [this, lt] { return !loopLocked_[lt]; });
}

auto& events(events_[ct]);
std::vector<FederatorEvent> localEvents;
localEvents.reserve(16);
while (!requestStop_)
{
{
std::lock_guard l{eventsMutex_};
assert(localEvents.empty());
localEvents.swap(events_);
localEvents.swap(events);
}
if (localEvents.empty())
{
Expand Down Expand Up @@ -1831,17 +1843,20 @@ Federator::txnSubmitLoop()

for (auto const ct : {ChainType::locking, ChainType::issuing})
{
if (accountStrs[ct].empty())
continue;

decltype(txns_)::type localTxns;
decltype(txns_)::type* pLocal = nullptr;
bool checkReady = false;

{
std::lock_guard l{txnsMutex_};

if (accountStrs[ct].empty())
continue;
if (maxAttToSend_ && (submitted_[ct].size() > maxAttToSend_))
{
++skipCtr;
continue;
}

if (errored_[ct].empty())
{
Expand Down Expand Up @@ -1940,11 +1955,19 @@ Federator::getInfo() const
// Pending events
// In most cases, events have been moved by event loop thread
std::lock_guard l{eventsMutex_};
ret["pending_events_size"] = (int)events_.size();
if (events_.size() > 0)
auto const sz = events_[ChainType::locking].size() +
events_[ChainType::issuing].size();
ret["pending_events_size"] = static_cast<unsigned>(sz);
if (sz > 0)
{
Json::Value pendingEvents{Json::arrayValue};
for (auto const& event : events_)
for (auto const& event : events_[ChainType::locking])
{
std::visit(
[&](auto const& e) { pendingEvents.append(e.toJson()); },
event);
}
for (auto const& event : events_[ChainType::issuing])
{
std::visit(
[&](auto const& e) { pendingEvents.append(e.toJson()); },
Expand Down
12 changes: 9 additions & 3 deletions src/xbwd/federator/Federator.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,14 @@ struct AttestedHistoryTx

class Federator
{
enum LoopTypes { lt_event, lt_txnSubmit, lt_last };
enum LoopTypes {
lt_event_locking,
lt_event_issuing,
lt_txnSubmit,
lt_last
};
std::array<std::thread, lt_last> threads_;

bool running_ = false;
std::atomic<bool> requestStop_ = false;

Expand All @@ -317,7 +323,7 @@ class Federator
ChainArray<bool const> const autoSubmit_; // event thread only

mutable std::mutex eventsMutex_;
std::vector<FederatorEvent> GUARDED_BY(eventsMutex_) events_;
ChainArray<std::vector<FederatorEvent>> GUARDED_BY(eventsMutex_) events_;

mutable std::mutex txnsMutex_;
ChainArray<std::vector<SubmissionPtr>> GUARDED_BY(txnsMutex_) txns_;
Expand Down Expand Up @@ -469,7 +475,7 @@ class Federator
ripple::Logs& l);

void
mainLoop() EXCLUDES(mainLoopMutex_);
mainLoop(ChainType ct) EXCLUDES(mainLoopMutex_);

void
txnSubmitLoop() EXCLUDES(txnSubmitLoopMutex_);
Expand Down

0 comments on commit c4d634c

Please sign in to comment.