Skip to content

Commit

Permalink
feat: Add DHT and RequestResponse notifications (#461)
Browse files Browse the repository at this point in the history
# Description

This PR implements the following changes:

- [x] Add DHT notifications
  - [x] Put receipt and workflow info to DHT notifications
  - [x] Got receipt and workflow info from DHT notifications
  - [x] Receipt and workflow info quorum success notifications
  - [x] Receipt and workflow info quorum failure notifications
- [x] Add request-response notifications
  - [x] Sent workflow info to peer
  - [x] Received workflow info from peer
- [x] Update notifications to accept `Ipld` data, not just strings
- [x] Refactor swarm `FoundEvent` into `DecodedRecord` and `FoundEvent`
(to include extra information on `FoundEvent` without overloading
decoding mechanism)
- [x] Add better error for timed out records (records with the code
"Timeout")
- [x] Fix `IndexedResources` decoding and add/update unit tests to check
it
- [x] Fix flaky `test_libp2p_receipt_gossip_serial` integration test
- [x] Remove no connected peer checks (we dial up a peer if they are not
connected but stored in the DHT)
- [x] Rename `check_lines_for` test utility to `check_for_line_with`
- [x] Remove unused `handler_timeout_fn`s
- [x] Move `defaults.toml` from `homestar-runtime/fixtures` to
`homestar-runtime/config`
- [x] Update `ReceivedReceiptPubsub` notification "peerId" field to
"publisher" to better reflect the role of the peer (breaking change for
clients).
- [x] Remove receipt persistence on receiving workflow info (to be
replaced with another mechanism)
- [x] Break `retrieve_from_query` into `retrieve_from_dht` and
`retrieve_from_provider`
- [x] Update retrieve workflow info from provider mechanism to trigger
on `retrieve_from_dht` error or timeout
- [x] Update DHT behavior to only add addresses manually with the
`kad::BucketInserts::Manual` configuration
- [x] Handle swarm `RequestResponse` `ResponseSent` event with a debug
log (instead of leaving it uncaught)
- [x] Increase threads allocated per test

## Link to issue

Closes #131
Closes #475

## Type of change

- [x] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [x] Refactor (non-breaking change that updates existing functionality)
- [x] Breaking change (fix or feature that would cause existing
functionality to not work as expected)

The breaking change is the update to the `ReceivedReceiptPubsub`
notification.

## Test plan (required)

We've added tests to check:
  - [x] Put receipt and workflow info to DHT
  - [x] Got workflow info from DHT notifications
  - [x] Receipt and workflow info quorum success
  - [x] Receipt and workflow info quorum failure
  - [x] Provider sent workflow info to peer
  - [x] Peer received workflow info from a provider
  - [x] Peer received workflow info indirectly through a recursive request
(Test written, but ignored until we can isolate nodes)

We also have the starts for a test of got receipt from DHT that will be
used with future updates to receipt retrieval (in the
`test_libp2p_dht_records_serial` test, but commented out)

---------

Co-authored-by: Quinn Wilton <[email protected]>
  • Loading branch information
bgins and QuinnWilton authored Jan 11, 2024
1 parent b1cf4da commit 5c876c5
Show file tree
Hide file tree
Showing 32 changed files with 2,317 additions and 383 deletions.
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ serial = { max-threads = 1 }
[[profile.default.overrides]]
filter = 'test(/_serial$/)'
test-group = 'serial'
threads-required = 4

[[profile.ci.overrides]]
filter = 'test(/_serial$/)'
test-group = 'serial'
threads-required = 4
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ mesh_n = 2
mesh_outbound_min = 1

[node.network.libp2p.dht]
p2p_provider_timeout = 30
p2p_receipt_timeout = 500
p2p_workflow_info_timeout = 500
p2p_provider_timeout = 10000
receipt_quorum = 2
workflow_quorum = 3

Expand Down
10 changes: 8 additions & 2 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ pub(crate) struct EventHandler<DB: Database> {
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
/// Timeout for p2p provider requests.
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
p2p_provider_timeout: Duration,
/// Accessible database instance.
db: DB,
Expand Down Expand Up @@ -101,7 +103,9 @@ pub(crate) struct EventHandler<DB: Database> {
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
/// Timeout for p2p provider requests.
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
p2p_provider_timeout: Duration,
/// Accesible database instance.
db: DB,
Expand Down Expand Up @@ -174,6 +178,7 @@ where
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
swarm,
Expand Down Expand Up @@ -214,6 +219,7 @@ where
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
swarm,
Expand Down
156 changes: 115 additions & 41 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//! Internal [Event] type and [Handler] implementation.

#[cfg(feature = "websocket-notify")]
use super::swarm_event::FoundEvent;
use super::EventHandler;
#[cfg(feature = "websocket-notify")]
use crate::event_handler::notification::{
self, emit_receipt, EventNotificationTyp, SwarmNotification,
use crate::event_handler::{
notification::{self, emit_receipt, EventNotificationTyp, SwarmNotification},
swarm_event::{ReceiptEvent, WorkflowInfoEvent},
};
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
db::Database,
event_handler::{channel::AsyncChannelSender, Handler, P2PSender, ResponseEvent},
event_handler::{channel::AsyncChannelSender, Handler, P2PSender},
network::{
pubsub,
swarm::{CapsuleTag, RequestResponseKey, TopicMessage},
Expand All @@ -32,7 +35,7 @@ use maplit::btreemap;
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};
#[cfg(all(feature = "ipfs", not(feature = "test-utils")))]
use tokio::runtime::Handle;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

const RENDEZVOUS_NAMESPACE: &str = "homestar";

Expand Down Expand Up @@ -105,6 +108,9 @@ pub(crate) enum Event {
FindRecord(QueryRecord),
/// Remove a given record from the DHT, e.g. a [Receipt].
RemoveRecord(QueryRecord),
/// [Receipt] or [workflow::Info] stored event.
#[cfg(feature = "websocket-notify")]
StoredRecord(FoundEvent),
/// Outbound request event to pull data from peers.
OutboundRequest(PeerRequest),
/// Get providers for a record in the DHT, e.g. workflow information.
Expand Down Expand Up @@ -146,6 +152,42 @@ impl Event {
}
Event::FindRecord(record) => record.find(event_handler).await,
Event::RemoveRecord(record) => record.remove(event_handler).await,
#[cfg(feature = "websocket-notify")]
Event::StoredRecord(event) => match event {
FoundEvent::Receipt(ReceiptEvent {
peer_id,
receipt,
notification_type,
}) => notification::emit_event(
event_handler.ws_evt_sender(),
notification_type,
btreemap! {
"publisher" => peer_id.map_or(Ipld::Null, |peer_id| Ipld::String(peer_id.to_string())),
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
),
FoundEvent::Workflow(WorkflowInfoEvent {
peer_id,
workflow_info,
notification_type,
}) => {
if let Some(peer_label) = notification_type.workflow_info_source_label() {
notification::emit_event(
event_handler.ws_evt_sender(),
notification_type,
btreemap! {
peer_label => peer_id.map_or(Ipld::Null, |peer_id| Ipld::String(peer_id.to_string())),
"cid" => Ipld::String(workflow_info.cid().to_string()),
"name" => workflow_info.name.map_or(Ipld::Null, |name| Ipld::String(name.to_string())),
"numTasks" => Ipld::Integer(workflow_info.num_tasks as i128),
"progress" => Ipld::List(workflow_info.progress.iter().map(|cid| Ipld::String(cid.to_string())).collect()),
"progressCount" => Ipld::Integer(workflow_info.progress_count as i128),
},
)
}
}
},
Event::OutboundRequest(PeerRequest {
peer,
request,
Expand All @@ -171,7 +213,6 @@ impl Event {
.map_err(anyhow::Error::new)?;

let key = RequestResponseKey::new(cid.to_string().into(), capsule_tag);

event_handler.query_senders.insert(query_id, (key, sender));
}
Event::Providers(Ok((providers, key, sender))) => {
Expand Down Expand Up @@ -306,8 +347,8 @@ impl Captured {
SwarmNotification::PublishedReceiptPubsub,
),
btreemap! {
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
);
}
Expand Down Expand Up @@ -337,38 +378,95 @@ impl Captured {
};

if let Ok(receipt_bytes) = Receipt::invocation_capsule(&invocation_receipt) {
let _id = event_handler
event_handler
.swarm
.behaviour_mut()
.kademlia
.put_record(
Record::new(instruction_bytes, receipt_bytes.to_vec()),
receipt_quorum,
)
.map_err(|err| {
warn!(subject = "libp2p.put_record.err",
.map_or_else(
|err| {
warn!(subject = "libp2p.put_record.err",
category = "publish_event",
err=?err,
"receipt not PUT onto DHT")
});
},
|query_id| {
let key = RequestResponseKey::new(
receipt.cid().to_string().into(),
CapsuleTag::Receipt,
);
event_handler.query_senders.insert(query_id, (key, None));

debug!(
subject = "libp2p.put_record",
category = "publish_event",
"receipt PUT onto DHT"
);

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::PutReceiptDht,
),
btreemap! {
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
);
},
);

Arc::make_mut(&mut self.workflow).increment_progress(receipt_cid);
let workflow_cid_bytes = self.workflow.cid_as_bytes();
if let Ok(workflow_bytes) = self.workflow.capsule() {
let _id = event_handler
event_handler
.swarm
.behaviour_mut()
.kademlia
.put_record(
Record::new(workflow_cid_bytes, workflow_bytes),
workflow_quorum,
)
.map_err(|err| {
warn!(subject = "libp2p.put_record.err",
.map_or_else(
|err| {
warn!(subject = "libp2p.put_record.err",
category = "publish_event",
err=?err,
"workflow information not PUT onto DHT")
});
},
|query_id| {
let key = RequestResponseKey::new(
self.workflow.cid().to_string().into(),
CapsuleTag::Workflow,
);
event_handler.query_senders.insert(query_id, (key, None));

debug!(
subject = "libp2p.put_record",
category = "publish_event",
"workflow info PUT onto DHT"
);

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
EventNotificationTyp::SwarmNotification(
SwarmNotification::PutWorkflowInfoDht,
),
btreemap! {
"cid" => Ipld::String(self.workflow.cid().to_string()),
"name" => self.workflow.name.as_ref().map_or(Ipld::Null, |name| Ipld::String(name.to_string())),
"numTasks" => Ipld::Integer(self.workflow.num_tasks as i128),
"progress" => Ipld::List(self.workflow.progress.iter().map(|cid| Ipld::String(cid.to_string())).collect()),
"progressCount" => Ipld::Integer(self.workflow.progress_count as i128),
},
)
},
);
} else {
error!(
subject = "libp2p.put_record.err",
Expand Down Expand Up @@ -457,8 +555,8 @@ impl Replay {
SwarmNotification::PublishedReceiptPubsub,
),
btreemap! {
"cid" => receipt.cid().to_string(),
"ran" => receipt.ran().to_string()
"cid" => Ipld::String(receipt.cid().to_string()),
"ran" => Ipld::String(receipt.ran().to_string())
},
);
})
Expand Down Expand Up @@ -490,14 +588,6 @@ impl QueryRecord {
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

let id = event_handler
.swarm
.behaviour_mut()
Expand All @@ -512,14 +602,6 @@ impl QueryRecord {
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

event_handler
.swarm
.behaviour_mut()
Expand All @@ -537,14 +619,6 @@ impl QueryRecord {
where
DB: Database,
{
if event_handler.connections.peers.is_empty() {
if let Some(sender) = self.sender {
let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await;
}

return;
}

let id = event_handler
.swarm
.behaviour_mut()
Expand Down
Loading

0 comments on commit 5c876c5

Please sign in to comment.