Skip to content

Commit

Permalink
Merge bitcoin#31666: multi-peer orphan resolution followups
Browse files Browse the repository at this point in the history
7426afb [p2p] assign just 1 random announcer in AddChildrenToWorkSet (glozow)
4c1fa6b test fix: make peer who sends MSG_TX announcement non-wtxidrelay (glozow)
2da46b8 pass P2PTxInvStore init args to P2PInterface init (glozow)
e3bd51e [doc] how unique_parents can be empty (glozow)
32eb6dc [refactor] assign local variable for wtxid (glozow)
18820cc multi-announcer orphan handling test fixups (glozow)
c4cc61d [fuzz] GetCandidatePeers (glozow)
7704139 [refactor] make GetCandidatePeers take uint256 and in-out vector (glozow)
6e4d392 [refactor] rename to OrphanResolutionCandidate to MaybeAdd* (glozow)
57221ad [refactor] move parent inv-adding to OrphanResolutionCandidate (glozow)

Pull request description:

  Followup to bitcoin#31397.

  Addressing (in order):
  bitcoin#31397 (comment)
  bitcoin#31397 (comment)
  bitcoin#31397 (comment)
  bitcoin#31397 (comment)
  bitcoin#31397 (comment)
  bitcoin#31397 (comment)
  bitcoin#31397 (comment)
  bitcoin#31658 (review)
  bitcoin#31397 (comment)

ACKs for top commit:
  instagibbs:
    reACK 7426afb
  marcofleon:
    reACK 7426afb
  mzumsande:
    Code Review ACK 7426afb
  dergoegge:
    Code review ACK 7426afb

Tree-SHA512: bca8f576873fdaa20b758e1ee9708ce94e618ff14726864b29b50f0f9a4db58136a286d2b654af569b09433a028901fe6bcdda68dcbfea71e2d1271934725503
  • Loading branch information
fanquake committed Feb 4, 2025
2 parents 6f5ae1a + 7426afb commit 94ca99a
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 85 deletions.
68 changes: 33 additions & 35 deletions src/node/txdownloadman_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,21 @@ bool TxDownloadManagerImpl::AddTxAnnouncement(NodeId peer, const GenTxid& gtxid,
// - exists in orphanage
// - peer can be an orphan resolution candidate
if (gtxid.IsWtxid()) {
if (auto orphan_tx{m_orphanage.GetTx(Wtxid::FromUint256(gtxid.GetHash()))}) {
const auto wtxid{Wtxid::FromUint256(gtxid.GetHash())};
if (auto orphan_tx{m_orphanage.GetTx(wtxid)}) {
auto unique_parents{GetUniqueParents(*orphan_tx)};
std::erase_if(unique_parents, [&](const auto& txid){
return AlreadyHaveTx(GenTxid::Txid(txid), /*include_reconsiderable=*/false);
});

if (unique_parents.empty()) return true;

if (auto delay{OrphanResolutionCandidate(peer, Wtxid::FromUint256(gtxid.GetHash()), unique_parents.size())}) {
m_orphanage.AddAnnouncer(Wtxid::FromUint256(gtxid.GetHash()), peer);

const auto& info = m_peer_info.at(peer).m_connection_info;
for (const auto& parent_txid : unique_parents) {
m_txrequest.ReceivedInv(peer, GenTxid::Txid(parent_txid), info.m_preferred, now + *delay);
}
// The missing parents may have all been rejected or accepted since the orphan was added to the orphanage.
// Do not delete from the orphanage, as it may be queued for processing.
if (unique_parents.empty()) {
return true;
}

LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", peer, gtxid.GetHash().ToString());
if (MaybeAddOrphanResolutionCandidate(unique_parents, wtxid, peer, now)) {
m_orphanage.AddAnnouncer(orphan_tx->GetWitnessHash(), peer);
}

// Return even if the peer isn't an orphan resolution candidate. This would be caught by AlreadyHaveTx.
Expand Down Expand Up @@ -231,21 +229,23 @@ bool TxDownloadManagerImpl::AddTxAnnouncement(NodeId peer, const GenTxid& gtxid,
return false;
}

std::optional<std::chrono::seconds> TxDownloadManagerImpl::OrphanResolutionCandidate(NodeId nodeid, const Wtxid& orphan_wtxid, size_t num_parents)
bool TxDownloadManagerImpl::MaybeAddOrphanResolutionCandidate(const std::vector<Txid>& unique_parents, const Wtxid& wtxid, NodeId nodeid, std::chrono::microseconds now)
{
if (m_peer_info.count(nodeid) == 0) return std::nullopt;
if (m_orphanage.HaveTxFromPeer(orphan_wtxid, nodeid)) return std::nullopt;
auto it_peer = m_peer_info.find(nodeid);
if (it_peer == m_peer_info.end()) return false;
if (m_orphanage.HaveTxFromPeer(wtxid, nodeid)) return false;

const auto& peer_entry = m_peer_info.at(nodeid);
const auto& info = peer_entry.m_connection_info;

// TODO: add delays and limits based on the amount of orphan resolution we are already doing
// with this peer, how much they are using the orphanage, etc.
if (!info.m_relay_permissions) {
// This mirrors the delaying and dropping behavior in AddTxAnnouncement in order to preserve
// existing behavior: drop if we are tracking too many invs for this peer already. Each
// orphan resolution involves at least 1 transaction request which may or may not be
// currently tracked in m_txrequest, so we include that in the count.
if (m_txrequest.Count(nodeid) + num_parents > MAX_PEER_TX_ANNOUNCEMENTS) return std::nullopt;
if (m_txrequest.Count(nodeid) + unique_parents.size() > MAX_PEER_TX_ANNOUNCEMENTS) return false;
}

std::chrono::seconds delay{0s};
Expand All @@ -258,7 +258,13 @@ std::optional<std::chrono::seconds> TxDownloadManagerImpl::OrphanResolutionCandi
const bool overloaded = !info.m_relay_permissions && m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT;
if (overloaded) delay += OVERLOADED_PEER_TX_DELAY;

return delay;
// Treat finding orphan resolution candidate as equivalent to the peer announcing all missing parents.
// In the future, orphan resolution may include more explicit steps
for (const auto& parent_txid : unique_parents) {
m_txrequest.ReceivedInv(nodeid, GenTxid::Txid(parent_txid), info.m_preferred, now + delay);
}
LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", nodeid, wtxid.ToString());
return true;
}

std::vector<GenTxid> TxDownloadManagerImpl::GetRequestsToSend(NodeId nodeid, std::chrono::microseconds current_time)
Expand Down Expand Up @@ -327,7 +333,7 @@ void TxDownloadManagerImpl::MempoolAcceptedTx(const CTransactionRef& tx)
m_txrequest.ForgetTxHash(tx->GetHash());
m_txrequest.ForgetTxHash(tx->GetWitnessHash());

m_orphanage.AddChildrenToWorkSet(*tx);
m_orphanage.AddChildrenToWorkSet(*tx, m_opts.m_rng);
// If it came from the orphanage, remove it. No-op if the tx is not in txorphanage.
m_orphanage.EraseTx(tx->GetWitnessHash());
}
Expand Down Expand Up @@ -400,27 +406,19 @@ node::RejectedTxTodo TxDownloadManagerImpl::MempoolRejectedTx(const CTransaction
// means it was already added to vExtraTxnForCompact.
add_extra_compact_tx &= !m_orphanage.HaveTx(wtxid);

auto add_orphan_reso_candidate = [&](const CTransactionRef& orphan_tx, const std::vector<Txid>& unique_parents, NodeId nodeid, std::chrono::microseconds now) {
const auto& wtxid = orphan_tx->GetWitnessHash();
if (auto delay{OrphanResolutionCandidate(nodeid, wtxid, unique_parents.size())}) {
const auto& info = m_peer_info.at(nodeid).m_connection_info;
m_orphanage.AddTx(orphan_tx, nodeid);

// Treat finding orphan resolution candidate as equivalent to the peer announcing all missing parents
// In the future, orphan resolution may include more explicit steps
for (const auto& parent_txid : unique_parents) {
m_txrequest.ReceivedInv(nodeid, GenTxid::Txid(parent_txid), info.m_preferred, now + *delay);
}
LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", nodeid, wtxid.ToString());
}
};

// If there is no candidate for orphan resolution, AddTx will not be called. This means
// that if a peer is overloading us with invs and orphans, they will eventually not be
// able to add any more transactions to the orphanage.
add_orphan_reso_candidate(ptx, unique_parents, nodeid, now);
for (const auto& candidate : m_txrequest.GetCandidatePeers(ptx)) {
add_orphan_reso_candidate(ptx, unique_parents, candidate, now);
//
// Search by txid and, if the tx has a witness, wtxid
std::vector<NodeId> orphan_resolution_candidates{nodeid};
m_txrequest.GetCandidatePeers(ptx->GetHash().ToUint256(), orphan_resolution_candidates);
if (ptx->HasWitness()) m_txrequest.GetCandidatePeers(ptx->GetWitnessHash().ToUint256(), orphan_resolution_candidates);

for (const auto& nodeid : orphan_resolution_candidates) {
if (MaybeAddOrphanResolutionCandidate(unique_parents, ptx->GetWitnessHash(), nodeid, now)) {
m_orphanage.AddTx(ptx, nodeid);
}
}

// Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore.
Expand Down
8 changes: 4 additions & 4 deletions src/node/txdownloadman_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ class TxDownloadManagerImpl {
/** Helper for getting deduplicated vector of Txids in vin. */
std::vector<Txid> GetUniqueParents(const CTransaction& tx);

/** Determine candidacy (and delay) for potential orphan resolution candidate.
* @returns delay for orphan resolution if this peer is a good candidate for orphan resolution,
* std::nullopt if this peer cannot be added because it has reached download/orphanage limits.
/** If this peer is an orphan resolution candidate for this transaction, treat the unique_parents as announced by
* this peer; add them as new invs to m_txrequest.
* @returns whether this transaction was a valid orphan resolution candidate.
* */
std::optional<std::chrono::seconds> OrphanResolutionCandidate(NodeId nodeid, const Wtxid& orphan_wtxid, size_t num_parents);
bool MaybeAddOrphanResolutionCandidate(const std::vector<Txid>& unique_parents, const Wtxid& wtxid, NodeId nodeid, std::chrono::microseconds now);
};
} // namespace node
#endif // BITCOIN_NODE_TXDOWNLOADMAN_IMPL_H
6 changes: 3 additions & 3 deletions src/test/fuzz/txorphan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void initialize_orphanage()
FUZZ_TARGET(txorphan, .init = initialize_orphanage)
{
FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
FastRandomContext limit_orphans_rng{/*fDeterministic=*/true};
FastRandomContext orphanage_rng{/*fDeterministic=*/true};
SetMockTime(ConsumeTime(fuzzed_data_provider));

TxOrphanage orphanage;
Expand Down Expand Up @@ -79,7 +79,7 @@ FUZZ_TARGET(txorphan, .init = initialize_orphanage)
// previous loop and potentially the parent of this tx.
if (ptx_potential_parent) {
// Set up future GetTxToReconsider call.
orphanage.AddChildrenToWorkSet(*ptx_potential_parent);
orphanage.AddChildrenToWorkSet(*ptx_potential_parent, orphanage_rng);

// Check that all txns returned from GetChildrenFrom* are indeed a direct child of this tx.
NodeId peer_id = fuzzed_data_provider.ConsumeIntegral<NodeId>();
Expand Down Expand Up @@ -154,7 +154,7 @@ FUZZ_TARGET(txorphan, .init = initialize_orphanage)
// test mocktime and expiry
SetMockTime(ConsumeTime(fuzzed_data_provider));
auto limit = fuzzed_data_provider.ConsumeIntegral<unsigned int>();
orphanage.LimitOrphans(limit, limit_orphans_rng);
orphanage.LimitOrphans(limit, orphanage_rng);
Assert(orphanage.Size() <= limit);
});

Expand Down
13 changes: 13 additions & 0 deletions src/test/fuzz/txrequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ class Tester
tracked += m_announcements[txhash][peer].m_state != State::NOTHING;
inflight += m_announcements[txhash][peer].m_state == State::REQUESTED;
candidates += m_announcements[txhash][peer].m_state == State::CANDIDATE;

std::bitset<MAX_PEERS> expected_announcers;
for (int peer = 0; peer < MAX_PEERS; ++peer) {
if (m_announcements[txhash][peer].m_state == State::CANDIDATE || m_announcements[txhash][peer].m_state == State::REQUESTED) {
expected_announcers[peer] = true;
}
}
std::vector<NodeId> candidate_peers;
m_tracker.GetCandidatePeers(TXHASHES[txhash], candidate_peers);
assert(expected_announcers.count() == candidate_peers.size());
for (const auto& peer : candidate_peers) {
assert(expected_announcers[peer]);
}
}
assert(m_tracker.Count(peer) == tracked);
assert(m_tracker.CountInFlight(peer) == inflight);
Expand Down
26 changes: 17 additions & 9 deletions src/test/orphanage_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,19 +532,27 @@ BOOST_AUTO_TEST_CASE(peer_worksets)
BOOST_CHECK(orphanage.HaveTxFromPeer(orphan_wtxid, node));
}

// Parent accepted: add child to all 3 worksets.
orphanage.AddChildrenToWorkSet(*tx_missing_parent);
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node0), tx_orphan);
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node1), tx_orphan);
// Don't call GetTxToReconsider(node2) yet because it mutates the workset.
// Parent accepted: child is added to 1 of 3 worksets.
orphanage.AddChildrenToWorkSet(*tx_missing_parent, det_rand);
int node0_reconsider = orphanage.HaveTxToReconsider(node0);
int node1_reconsider = orphanage.HaveTxToReconsider(node1);
int node2_reconsider = orphanage.HaveTxToReconsider(node2);
BOOST_CHECK_EQUAL(node0_reconsider + node1_reconsider + node2_reconsider, 1);

NodeId assigned_peer;
if (node0_reconsider) {
assigned_peer = node0;
} else if (node1_reconsider) {
assigned_peer = node1;
} else {
BOOST_CHECK(node2_reconsider);
assigned_peer = node2;
}

// EraseForPeer also removes that tx from the workset.
orphanage.EraseForPeer(node0);
orphanage.EraseForPeer(assigned_peer);
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node0), nullptr);

// However, the other peers' worksets are not touched.
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node2), tx_orphan);

// Delete this tx, clearing the orphanage.
BOOST_CHECK_EQUAL(orphanage.EraseTx(orphan_wtxid), 1);
BOOST_CHECK_EQUAL(orphanage.Size(), 0);
Expand Down
26 changes: 16 additions & 10 deletions src/txorphanage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,29 @@ void TxOrphanage::LimitOrphans(unsigned int max_orphans, FastRandomContext& rng)
if (nEvicted > 0) LogDebug(BCLog::TXPACKAGES, "orphanage overflow, removed %u tx\n", nEvicted);
}

void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx)
void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx, FastRandomContext& rng)
{
for (unsigned int i = 0; i < tx.vout.size(); i++) {
const auto it_by_prev = m_outpoint_to_orphan_it.find(COutPoint(tx.GetHash(), i));
if (it_by_prev != m_outpoint_to_orphan_it.end()) {
for (const auto& elem : it_by_prev->second) {
// Belt and suspenders, each orphan should always have at least 1 announcer.
if (!Assume(!elem->second.announcers.empty())) continue;
for (const auto announcer: elem->second.announcers) {
// Get this source peer's work set, emplacing an empty set if it didn't exist
// (note: if this peer wasn't still connected, we would have removed the orphan tx already)
std::set<Wtxid>& orphan_work_set = m_peer_work_set.try_emplace(announcer).first->second;
// Add this tx to the work set
orphan_work_set.insert(elem->first);
LogDebug(BCLog::TXPACKAGES, "added %s (wtxid=%s) to peer %d workset\n",
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), announcer);
}

// Select a random peer to assign orphan processing, reducing wasted work if the orphan is still missing
// inputs. However, we don't want to create an issue in which the assigned peer can purposefully stop us
// from processing the orphan by disconnecting.
auto announcer_iter = std::begin(elem->second.announcers);
std::advance(announcer_iter, rng.randrange(elem->second.announcers.size()));
auto announcer = *(announcer_iter);

// Get this source peer's work set, emplacing an empty set if it didn't exist
// (note: if this peer wasn't still connected, we would have removed the orphan tx already)
std::set<Wtxid>& orphan_work_set = m_peer_work_set.try_emplace(announcer).first->second;
// Add this tx to the work set
orphan_work_set.insert(elem->first);
LogDebug(BCLog::TXPACKAGES, "added %s (wtxid=%s) to peer %d workset\n",
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), announcer);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/txorphanage.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TxOrphanage {
void LimitOrphans(unsigned int max_orphans, FastRandomContext& rng);

/** Add any orphans that list a particular tx as a parent into the from peer's work set */
void AddChildrenToWorkSet(const CTransaction& tx);
void AddChildrenToWorkSet(const CTransaction& tx, FastRandomContext& rng);

/** Does this peer have any work to do? */
bool HaveTxToReconsider(NodeId peer);
Expand Down
20 changes: 6 additions & 14 deletions src/txrequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,21 +574,13 @@ class TxRequestTracker::Impl {
}
}

std::vector<NodeId> GetCandidatePeers(const CTransactionRef& tx) const
void GetCandidatePeers(const uint256& txhash, std::vector<NodeId>& result_peers) const
{
// Search by txid and, if the tx has a witness, wtxid
std::vector<uint256> hashes{tx->GetHash().ToUint256()};
if (tx->HasWitness()) hashes.emplace_back(tx->GetWitnessHash().ToUint256());

std::vector<NodeId> result_peers;
for (const uint256& txhash : hashes) {
auto it = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash && it->GetState() != State::COMPLETED) {
result_peers.push_back(it->m_peer);
++it;
}
auto it = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash && it->GetState() != State::COMPLETED) {
result_peers.push_back(it->m_peer);
++it;
}
return result_peers;
}

void ReceivedInv(NodeId peer, const GenTxid& gtxid, bool preferred,
Expand Down Expand Up @@ -738,7 +730,7 @@ size_t TxRequestTracker::CountInFlight(NodeId peer) const { return m_impl->Count
size_t TxRequestTracker::CountCandidates(NodeId peer) const { return m_impl->CountCandidates(peer); }
size_t TxRequestTracker::Count(NodeId peer) const { return m_impl->Count(peer); }
size_t TxRequestTracker::Size() const { return m_impl->Size(); }
std::vector<NodeId> TxRequestTracker::GetCandidatePeers(const CTransactionRef& tx) const { return m_impl->GetCandidatePeers(tx); }
void TxRequestTracker::GetCandidatePeers(const uint256& txhash, std::vector<NodeId>& result_peers) const { return m_impl->GetCandidatePeers(txhash, result_peers); }
void TxRequestTracker::SanityCheck() const { m_impl->SanityCheck(); }

void TxRequestTracker::PostGetRequestableSanityCheck(std::chrono::microseconds now) const
Expand Down
5 changes: 3 additions & 2 deletions src/txrequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ class TxRequestTracker {
/** Count how many announcements are being tracked in total across all peers and transaction hashes. */
size_t Size() const;

/** For some tx return all peers with non-COMPLETED announcements for its txid or wtxid. The resulting vector may contain duplicate NodeIds. */
std::vector<NodeId> GetCandidatePeers(const CTransactionRef& tx) const;
/** For some txhash (txid or wtxid), finds all peers with non-COMPLETED announcements and appends them to
* result_peers. Does not try to ensure that result_peers contains no duplicates. */
void GetCandidatePeers(const uint256& txhash, std::vector<NodeId>& result_peers) const;

/** Access to the internal priority computation (testing only) */
uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const;
Expand Down
Loading

0 comments on commit 94ca99a

Please sign in to comment.