Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into lexnv/reject-reasons
Browse files Browse the repository at this point in the history
  • Loading branch information
lexnv committed Sep 2, 2024
2 parents 6c48ada + e3a22d5 commit cc62427
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ prost-build = "0.13"

[dependencies]
async-trait = "0.1.81"
bs58 = "0.4.0"
bs58 = "0.5.1"
bytes = "1.6.1"
cid = "0.10.1"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
Expand Down
143 changes: 73 additions & 70 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ pub(crate) struct Kademlia {
impl Kademlia {
/// Create new [`Kademlia`].
pub(crate) fn new(mut service: TransportService, config: Config) -> Self {
let local_peer_id = service.local_peer_id;
let local_key = Key::from(service.local_peer_id);
let local_peer_id = service.local_peer_id();
let local_key = Key::from(service.local_peer_id());
let mut routing_table = RoutingTable::new(local_key.clone());

for (peer, addresses) in config.known_peers {
Expand Down Expand Up @@ -357,7 +357,7 @@ impl Kademlia {
/// the mode was set to manual.
async fn update_routing_table(&mut self, peers: &[KademliaPeer]) {
let peers: Vec<_> =
peers.iter().filter(|peer| peer.peer != self.service.local_peer_id).collect();
peers.iter().filter(|peer| peer.peer != self.service.local_peer_id()).collect();

// inform user about the routing table update, regardless of what the routing table update
// mode is
Expand Down Expand Up @@ -642,50 +642,71 @@ impl Kademlia {
}
}

/// Handle next query action.
async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
match action {
QueryAction::SendMessage { query, peer, .. } => match self.service.open_substream(peer)
{
Err(_) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "dial peer");

match self.service.dial(&peer) {
Ok(_) => match self.pending_dials.entry(peer) {
Entry::Occupied(entry) => {
entry.into_mut().push(PeerAction::SendFindNode(query));
/// Open a substream with a peer or dial the peer.
fn open_substream_or_dial(
&mut self,
peer: PeerId,
action: PeerAction,
query: Option<QueryId>,
) -> Result<(), Error> {
match self.service.open_substream(peer) {
Ok(substream_id) => {
self.pending_substreams.insert(substream_id, peer);
self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action);

Ok(())
}
Err(err) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer");

match self.service.dial(&peer) {
Ok(()) => {
self.pending_dials.entry(peer).or_default().push(action);
Ok(())
}

// Already connected is a recoverable error.
Err(Error::AlreadyConnected) => {
// Dial returned `Error::AlreadyConnected`, retry opening the substream.
match self.service.open_substream(peer) {
Ok(substream_id) => {
self.pending_substreams.insert(substream_id, peer);
self.peers
.entry(peer)
.or_default()
.pending_actions
.insert(substream_id, action);
Ok(())
}
Entry::Vacant(entry) => {
entry.insert(vec![PeerAction::SendFindNode(query)]);
Err(err) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
Err(err)
}
},
Err(error) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "failed to dial peer");
self.engine.register_response_failure(query, peer);
}
}

Ok(())
Err(error) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer");
Err(error)
}
}
Ok(substream_id) => {
tracing::trace!(
target: LOG_TARGET,
?query,
?peer,
?substream_id,
"open outbound substream for peer"
);

self.pending_substreams.insert(substream_id, peer);
self.peers
.entry(peer)
.or_default()
.pending_actions
.insert(substream_id, PeerAction::SendFindNode(query));
}
}
}

Ok(())
/// Handle next query action.
async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
match action {
QueryAction::SendMessage { query, peer, .. } => {
if self
.open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
.is_err()
{
// Announce the error to the query engine.
self.engine.register_response_failure(query, peer);
}
},
Ok(())
}
QueryAction::FindNodeQuerySucceeded {
target,
peers,
Expand Down Expand Up @@ -720,36 +741,18 @@ impl Kademlia {
let message = KademliaMessage::put_value(record);

for peer in peers {
match self.service.open_substream(peer.peer) {
Ok(substream_id) => {
self.pending_substreams.insert(substream_id, peer.peer);
self.peers
.entry(peer.peer)
.or_default()
.pending_actions
.insert(substream_id, PeerAction::SendPutValue(message.clone()));
}
Err(_) => match self.service.dial(&peer.peer) {
Ok(_) => match self.pending_dials.entry(peer.peer) {
Entry::Occupied(entry) => {
entry
.into_mut()
.push(PeerAction::SendPutValue(message.clone()));
}
Entry::Vacant(entry) => {
entry.insert(vec![PeerAction::SendPutValue(message.clone())]);
}
},
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?key,
?error,
"failed to dial peer",
);
}
},
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendPutValue(message.clone()),
None,
) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?key,
?error,
"failed to put record to peer",
);
}
}

Expand Down Expand Up @@ -889,7 +892,7 @@ impl Kademlia {

// Put the record to the specified peers.
let peers = peers.into_iter().filter_map(|peer| {
if peer == self.service.local_peer_id {
if peer == self.service.local_peer_id() {
return None;
}

Expand Down
8 changes: 8 additions & 0 deletions src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
// If we cannot make progress, return the final result.
// A query failed when we are not able to identify one single peer.
if self.is_done() {
tracing::trace!(
target: LOG_TARGET,
query = ?self.config.query,
pending = self.pending.len(),
candidates = self.candidates.len(),
"query finished"
);

return if self.responses.is_empty() {
Some(QueryAction::QueryFailed {
query: self.config.query,
Expand Down
7 changes: 6 additions & 1 deletion src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ConnectionContext {
#[derive(Debug)]
pub struct TransportService {
/// Local peer ID.
pub(crate) local_peer_id: PeerId,
local_peer_id: PeerId,

/// Protocol.
protocol: ProtocolName,
Expand Down Expand Up @@ -365,6 +365,11 @@ impl TransportService {

connection.primary.force_close()
}

/// Get local peer ID.
pub fn local_peer_id(&self) -> PeerId {
self.local_peer_id
}
}

impl Stream for TransportService {
Expand Down

0 comments on commit cc62427

Please sign in to comment.